Module qfly.qtm
Expand source code
import asyncio
import math
import os
from threading import Thread
import xml.etree.cElementTree as ET
import qtm
from qfly import Pose
class QtmWrapper(Thread):
"""
Wrapper for opening and running an asynchronous QTM connection
that receives and responds to real time data packets.
Designed for real time interactive applications, e.g. drone control.
Each entity being tracked should:
1) instantiate its own QtmWrapper,
2) be defined as a rigid body in QTM,
3) pass a callback function to its QtmWrapper which responds to pose data.
"""
def __init__(self, body, on_pose, qtm_ip="127.0.0.1"):
"""
Construct QtmWrapper object
Parameters
----------
body : string
Name of 6DOF rigid body being tracked.
on_pose : function(Pose)
Callback to trigger when pose packet is received.
qtm_ip : string
IP address of QTM instance.
"""
Thread.__init__(self)
self.body = body
self.qtm_ip = qtm_ip
self.on_pose = on_pose
self.tracking_loss = 0
self._body_idx = None
self._connection = None
self._stay_open = True
self.start()
def run(self):
"""
Run QTM wrapper coroutine.
"""
asyncio.run(self._life_cycle())
async def _life_cycle(self):
"""
QTM wrapper coroutine.
"""
await self._connect()
while(self._stay_open):
await asyncio.sleep(1)
await self._close()
async def _connect(self):
"""
Connect to QTM machine.
"""
# Establish connection
print('[QTM] Connecting to QTM at ' + self.qtm_ip)
self._connection = await qtm.connect(self.qtm_ip)
# Quit if QTM unavailable
if self._connection is None:
print("[QTM] Could not connect to QTM! Terminating...")
os._exit(1)
# Register index of body for 6D tracking
params_xml = await self._connection.get_parameters(parameters=['6d'])
xml = ET.fromstring(params_xml)
for index, body in enumerate(xml.findall("*/Body/Name")):
if body.text.strip() == self.body:
self._body_idx = index
print(
f'[QTM] Index for rigid body "{self.body}" is: {self._body_idx}')
# Quit if body not found
if self._body_idx is None:
print(f'[QTM] Rigid body "{self.body}" not found! Terminating...')
os._exit(1)
# Assign 6D streaming callback
try:
await self._connection.stream_frames(components=['6D'],
on_packet=self._on_packet)
except asyncio.TimeoutError:
print("[QTM] Frame stream TimeoutError! Terminating...")
os._exit(1)
def _on_packet(self, packet):
"""
Process 6D packet stream into Pose object and pass on.
Parameters
----------
packet : QRTPacket
Incoming packet from QTM
"""
# Extract 6D component from packet
header, component_6d = packet.get_6d()
# Increment tracking loss if no component found
if component_6d is None:
print('[QTM] Packet without 6D component! Moving on...')
self.tracking_loss += 1
return
# Extract relevant body data from 6D component
body_6d = component_6d[self._body_idx]
# Create Pose object from 6D data
pose = Pose.from_qtm_6d(body_6d)
# Check validity and pass on
if pose.is_valid():
self.on_pose(pose)
self.tracking_loss = 0
else:
self.tracking_loss += 1
async def _close(self):
"""
End lifecycle by disconnecting from QTM machine.
"""
await self._connection.stream_frames_stop()
self._connection.disconnect()
def close(self):
"""
Stop QTM wrapper thread.
"""
self._stay_open = False
self.join()
Classes
class QtmWrapper (body, on_pose, qtm_ip='127.0.0.1')
-
Wrapper for opening and running an asynchronous QTM connection that receives and responds to real time data packets.
Designed for real time interactive applications, e.g. drone control. Each entity being tracked should: 1) instantiate its own QtmWrapper, 2) be defined as a rigid body in QTM, 3) pass a callback function to its QtmWrapper which responds to pose data.
Construct QtmWrapper object
Parameters
body
:string
- Name of 6DOF rigid body being tracked.
on_pose
:function(Pose)
- Callback to trigger when pose packet is received.
qtm_ip
:string
- IP address of QTM instance.
Expand source code
class QtmWrapper(Thread): """ Wrapper for opening and running an asynchronous QTM connection that receives and responds to real time data packets. Designed for real time interactive applications, e.g. drone control. Each entity being tracked should: 1) instantiate its own QtmWrapper, 2) be defined as a rigid body in QTM, 3) pass a callback function to its QtmWrapper which responds to pose data. """ def __init__(self, body, on_pose, qtm_ip="127.0.0.1"): """ Construct QtmWrapper object Parameters ---------- body : string Name of 6DOF rigid body being tracked. on_pose : function(Pose) Callback to trigger when pose packet is received. qtm_ip : string IP address of QTM instance. """ Thread.__init__(self) self.body = body self.qtm_ip = qtm_ip self.on_pose = on_pose self.tracking_loss = 0 self._body_idx = None self._connection = None self._stay_open = True self.start() def run(self): """ Run QTM wrapper coroutine. """ asyncio.run(self._life_cycle()) async def _life_cycle(self): """ QTM wrapper coroutine. """ await self._connect() while(self._stay_open): await asyncio.sleep(1) await self._close() async def _connect(self): """ Connect to QTM machine. """ # Establish connection print('[QTM] Connecting to QTM at ' + self.qtm_ip) self._connection = await qtm.connect(self.qtm_ip) # Quit if QTM unavailable if self._connection is None: print("[QTM] Could not connect to QTM! Terminating...") os._exit(1) # Register index of body for 6D tracking params_xml = await self._connection.get_parameters(parameters=['6d']) xml = ET.fromstring(params_xml) for index, body in enumerate(xml.findall("*/Body/Name")): if body.text.strip() == self.body: self._body_idx = index print( f'[QTM] Index for rigid body "{self.body}" is: {self._body_idx}') # Quit if body not found if self._body_idx is None: print(f'[QTM] Rigid body "{self.body}" not found! Terminating...') os._exit(1) # Assign 6D streaming callback try: await self._connection.stream_frames(components=['6D'], on_packet=self._on_packet) except asyncio.TimeoutError: print("[QTM] Frame stream TimeoutError! Terminating...") os._exit(1) def _on_packet(self, packet): """ Process 6D packet stream into Pose object and pass on. Parameters ---------- packet : QRTPacket Incoming packet from QTM """ # Extract 6D component from packet header, component_6d = packet.get_6d() # Increment tracking loss if no component found if component_6d is None: print('[QTM] Packet without 6D component! Moving on...') self.tracking_loss += 1 return # Extract relevant body data from 6D component body_6d = component_6d[self._body_idx] # Create Pose object from 6D data pose = Pose.from_qtm_6d(body_6d) # Check validity and pass on if pose.is_valid(): self.on_pose(pose) self.tracking_loss = 0 else: self.tracking_loss += 1 async def _close(self): """ End lifecycle by disconnecting from QTM machine. """ await self._connection.stream_frames_stop() self._connection.disconnect() def close(self): """ Stop QTM wrapper thread. """ self._stay_open = False self.join()
Ancestors
- threading.Thread
Methods
def close(self)
-
Stop QTM wrapper thread.
Expand source code
def close(self): """ Stop QTM wrapper thread. """ self._stay_open = False self.join()
def run(self)
-
Run QTM wrapper coroutine.
Expand source code
def run(self): """ Run QTM wrapper coroutine. """ asyncio.run(self._life_cycle())