Module qfly.crazyflie
Expand source code
from threading import Thread
import time
import traceback
from cflib.crazyflie.log import LogConfig
from cflib.crazyflie.syncLogger import SyncLogger
import cflib.crtp
from cflib.crazyflie import Crazyflie
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
import qfly
class QualisysCrazyflie(Thread):
"""
Wrapper for Crazyflie drone to fly with Qualisys motion capture systems
Attributes
----------
cf_body_name : str
Name of Crazyflie's rigid body in QTM
cf_uri : str
Crazyflie radio address
pose : Pose
Pose object keeping track of whereabouts
world : World
World object defining airspace rules
"""
def __init__(self,
cf_body_name,
cf_uri,
world,
marker_ids=[1, 2, 3, 4],
qtm_ip="127.0.0.1"):
"""
Construct QualisysCrazyflie object.
Parameters
----------
cf_body_name : str
Name of Crazyflie's rigid body in QTM.
cf_uri : str
Crazyflie radio address.
world : World
World object defining airspace rules.
marker_ids : [int] (optional)
ID numbers to be assigned to active markers
in order of front, right, back, left.
qtm_ip : str (optional)
IP address of QTM host.
"""
print(f'[{cf_body_name}@{cf_uri}] Initializing...')
cflib.crtp.init_drivers()
self.cf_body_name = cf_body_name
self.cf_uri = cf_uri
self.world = world
self.marker_ids = marker_ids
self.pose = None
self.anchor = None
self.qtm = None
self.qtm_ip = qtm_ip
self.cf = Crazyflie(ro_cache=None, rw_cache=None)
self.scf = SyncCrazyflie(self.cf_uri, cf=self.cf)
print(f'[{self.cf_body_name}@{self.cf_uri}] Connecting...')
def __enter__(self):
"""
Enter QualisysCrazyflie context
"""
self.scf.open_link()
print(f'[{self.cf_body_name}@{self.cf_uri}] Connected...')
# Set active marker IDs
print(
f'[{self.cf_body_name}@{self.cf_uri}] Setting active marker IDs: {self.marker_ids}')
self.cf.param.set_value('activeMarker.front', self.marker_ids[0])
self.cf.param.set_value('activeMarker.right', self.marker_ids[1])
self.cf.param.set_value('activeMarker.back', self.marker_ids[2])
self.cf.param.set_value('activeMarker.left', self.marker_ids[3])
# Turn off LED to conserve battery
self.set_led_ring(0)
# Slow down
self.set_speed_limit(self.world.speed_limit)
self.qtm = qfly.QtmWrapper(
self.cf_body_name,
lambda pose: self._set_pose(pose),
qtm_ip=self.qtm_ip)
print(
f'[{self.cf_body_name}@{self.cf_uri}] Connecting to QTM at {self.qtm.qtm_ip}...')
self.setup()
return self
def __exit__(self, exc_type=None, exc_value=None, tb=None):
"""
Exit QualisysCrazyflie context
"""
print(
f'[{self.cf_body_name}@{self.cf_uri}] Exiting...')
if exc_type is not None:
print(
f'[{self.cf_body_name}@{self.cf_uri}] Encountered exception on exit...')
traceback.print_exception(exc_type, exc_value, tb)
self.qtm.close()
self.scf.close_link()
def is_safe(self, world=None):
"""
Perform safety checks, return False if unsafe
Parameters
----------
world : World (optional)
World object defining airspace rules.
Defaults to object's own world if not supplied.
"""
if world is None:
world = self.world
# Is the drone tracked properly?
if self.qtm.tracking_loss > world.tracking_tolerance:
# Respond
print(f'''[{self.cf_body_name}@{self.cf_uri}] !!! SAFETY VIOLATION !!!
TRACKING LOST FOR {str(self.world.tracking_tolerance)} FRAMES!''')
return False
# Is the drone inside the safe volume?
if not (
# x direction
world.origin.x - world.expanse < self.pose.x < world.origin.x + world.expanse
# y direction
and world.origin.y - world.expanse < self.pose.y < world.origin.y + world.expanse
# z direction
and 0 < self.pose.z < world.origin.z + (2 * world.expanse)):
# Respond
print(f'''[{self.cf_body_name}@{self.cf_uri}] !!! SAFETY VIOLATION !!!
DRONE OUTSIDE SAFE VOLUME AT ({str(self.pose)})!''')
return False
else:
return True
def ascend(self, z_ceiling=1, step=12.0):
"""
Execute one step of a gentle rising sequence directly upward from current position.
Parameters
----------
z_ceiling : float (optional)
Height to ascend to.
(Unit: m)
step : int (optional)
Distance between target keyframes.
(Unit: cm)
"""
init_pose = self.pose
_z_cm = int(init_pose.z * 100)
target = qfly.Pose(init_pose.x,
init_pose.y,
min(z_ceiling, float((_z_cm + step) / 100.0)))
# print(
# f'[{self.cf_body_name}@{self.cf_uri}] Ascending from {_z_cm} cm to {z_ceiling}...')
self.safe_position_setpoint(target)
def descend(self, z_floor=0.0, step=12.0):
"""
Execute one step of a gentle landing sequence directly downward from current position.
Parameters
----------
z_floor : float (optional)
Height to descend to.
(Unit: m)
step : int (optional)
Distance between target keyframes.
(Unit: cm)
"""
init_pose = self.pose
_z_cm = int(init_pose.z * 100)
target = qfly.Pose(init_pose.x,
init_pose.y,
float((_z_cm - step) / 100.0))
# Engage
if target.z < z_floor:
self.cf.commander.send_stop_setpoint()
else:
# print(
# f'[{self.cf_body_name}@{self.cf_uri}] Descending from {_z_cm} cm to {z_floor} cm...')
self.safe_position_setpoint(target)
def land_in_place(self):
"""
Land sequence directly downward from current position.
"""
if self.anchor is None or self.anchor.distance_to(self.pose) > self.world.padding:
self.anchor = self.pose
print(
f'[{self.cf_body_name}@{self.cf_uri}] Landing to ground...')
target = qfly.Pose(self.anchor.x, self.anchor.y, 0)
if self.pose.distance_to(target) < 0.1:
self.anchor = None
self.cf.commander.send_stop_setpoint()
else:
self.safe_position_setpoint(target)
def land_to_moving_target(self, target, z_offset=0.5, decrement=3, timestep=0.15):
"""
WARNING: UNDER DEVELOPMENT. DO NOT USE. DOES NOT SUPPORT SWARMS.
Executes a gentle landing sequence aiming at a live target.
Parameters
----------
target : object
An object that has a pose attribute
z_offset : float (optional)
Vertical offset between target and landing start pose.
(Unit: m)
(Default: 0.5 m)
decrement : int (optional)
Distance between target keyframes.
(Unit: cm)
(Default: 3 cm)
timestep : float (optional
Time between target keyframes.
(Unit: s)
(Default: 0.15 s)
"""
try:
init_pose = qfly.Pose(target.pose.x,
target.pose.y,
target.pose.z + z_offset)
except AttributeError:
print(f'''[{self.cf_body_name}@{self.cf_uri}] !!! ERROR !!!
Landing target has no pose attribute! Landing in place instead!''')
self.land_in_place()
return
z_cm = int(init_pose.z * 100)
print(
f'[{self.cf_body_name}@{self.cf_uri}] Landing to live target from {z_cm} cm...')
# Linear interpolation between starting pose and target
while z_cm > target.pose.z * 100:
target = qfly.Pose(target.pose.x, target.pose.y,
float(z_cm / 100.0))
self.safe_position_setpoint(target)
time.sleep(timestep)
z_cm = z_cm - decrement
self.cf.commander.send_stop_setpoint()
def rise_in_place(self, z=1):
"""
Ascension sequence directly upwards from current position.
Parameters
----------
z_ceiling : float (optional)
Height to ascend to. (unit: m)
"""
if self.anchor is None or self.anchor.distance_to(self.pose) > self.world.padding:
self.anchor = self.pose
print(
f'[{self.cf_body_name}@{self.cf_uri}] Ascending...')
target = qfly.Pose(self.anchor.x, self.anchor.y, z)
self.safe_position_setpoint(target)
def safe_position_setpoint(self, target, world=None):
"""
Sets a geofenced absolute position setpoint
within safe airspace defined by world.
Parameters
----------
target : Pose
Pose object bearting target coordinate and yaw.
Yaw defaults to 0 if not supplied.
world : World (optional)
World object defining airspace rules.
Defaults to QualisysCrazyflie object's own world.
"""
# Sane defaults
if world is None:
world = self.world
if target.yaw == None:
target.yaw = 0
# Keep inside safe airspace
target.clamp(world)
# Engage
self.cf.commander.send_position_setpoint(
target.x, target.y, target.z, target.yaw)
def set_speed_limit(self, speed_limit):
"""
Sets speed in horizontal (xy) and vertical (z) dimensions.
Parameters
----------
speed_limit : float
Speed limit.
(Unit: m/s)
"""
print(f'[{self.cf_body_name}@{self.cf_uri}] Speed limit: {speed_limit} m/s')
self.cf.param.set_value('posCtlPid.xyVelMax', speed_limit)
self.cf.param.set_value('posCtlPid.zVelMax', speed_limit)
def setup(self):
"""
Executes engineering boilerplate to initialize drone.
Assumes most drone parameters at factory defaults.
If in doubt, inspect drone parameters
using Bitcraze client and documentation.
"""
print(f'[{self.cf_body_name}@{self.cf_uri}] Setting up drone...')
# Choose estimator
self.cf.param.set_value('stabilizer.estimator', '2')
# Black magic
self.cf.param.set_value('locSrv.extQuatStdDev', 0.2)
# Reset estimator
self.cf.param.set_value('kalman.resetEstimation', '1')
time.sleep(0.1)
self.cf.param.set_value('kalman.resetEstimation', '0')
# Stabilize
print(
f'[{self.cf_body_name}@{self.cf_uri}] Stabilizing...')
log_config = LogConfig(name='Kalman Variance', period_in_ms=500)
log_config.add_variable('kalman.varPX', 'float')
log_config.add_variable('kalman.varPY', 'float')
log_config.add_variable('kalman.varPZ', 'float')
var_y_history = [1000] * 10
var_x_history = [1000] * 10
var_z_history = [1000] * 10
threshold = 0.001
with SyncLogger(self.scf, log_config) as logger:
for log_entry in logger:
data = log_entry[1]
var_x_history.append(data['kalman.varPX'])
var_x_history.pop(0)
var_y_history.append(data['kalman.varPY'])
var_y_history.pop(0)
var_z_history.append(data['kalman.varPZ'])
var_z_history.pop(0)
min_x = min(var_x_history)
max_x = max(var_x_history)
min_y = min(var_y_history)
max_y = max(var_y_history)
min_z = min(var_z_history)
max_z = max(var_z_history)
print(f'[{self.cf_body_name}@{self.cf_uri}] ' +
"Kalman variance | X: {:8.4f} Y: {:8.4f} Z: {:8.4f}".format(
max_x - min_x, max_y - min_y, max_z - min_z))
if (max_x - min_x) < threshold and (
max_y - min_y) < threshold and (
max_z - min_z) < threshold:
break
def set_led_ring(self, val):
"""
Set LED ring effect.
Parameters
----------
val : int
LED ring effect ID. See Bitcraze documentation:
https://www.bitcraze.io/documentation/repository/crazyflie-firmware/master/api/params/#ring
"""
self.cf.param.set_value('ring.effect', val)
def set_speed_limit(self, speed_limit):
"""
Set speed limit.
Parameters
----------
speed_limit : float
Limit for horizontal (xy) and vertical (z) speed. (unit: m/s)
"""
print(f'[{self.cf_body_name}@{self.cf_uri}] Speed limit: {speed_limit} m/s')
self.cf.param.set_value('posCtlPid.xyVelMax', speed_limit)
self.cf.param.set_value('posCtlPid.zVelMax', speed_limit)
def _set_pose(self, pose):
"""
Sets internal Pose object and stream to drone
Parameters
----------
pose : Pose
Pose object containing coordinates
"""
self.pose = pose
# Send to Crazyflie
if self.cf is not None:
self.cf.extpos.send_extpos(pose.x, pose.y, pose.z)
# qw = qfly.utils.sqrt(
# 1 + pose.rotmatrix[0][0] + pose.rotmatrix[1][1] + pose.rotmatrix[2][2]) / 2
# qx = qfly.utils.sqrt(
# 1 + pose.rotmatrix[0][0] - pose.rotmatrix[1][1] - pose.rotmatrix[2][2]) / 2
# qy = qfly.utils.sqrt(
# 1 - pose.rotmatrix[0][0] + pose.rotmatrix[1][1] - pose.rotmatrix[2][2]) / 2
# qz = qfly.utils.sqrt(
# 1 - pose.rotmatrix[0][0] - pose.rotmatrix[1][1] + pose.rotmatrix[2][2]) / 2
# ql = qfly.utils.sqrt(qx ** 2 + qy ** 2 + qz ** 2 + qw ** 2)
# self.cf.extpos.send_extpose(
# pose.x, pose.y, pose.z, qx / ql, qy / ql, qz / ql, qw / ql)
Classes
class QualisysCrazyflie (cf_body_name, cf_uri, world, marker_ids=[1, 2, 3, 4], qtm_ip='127.0.0.1')
-
Wrapper for Crazyflie drone to fly with Qualisys motion capture systems
Attributes
cf_body_name
:str
- Name of Crazyflie's rigid body in QTM
cf_uri
:str
- Crazyflie radio address
pose
:Pose
- Pose object keeping track of whereabouts
world
:World
- World object defining airspace rules
Construct QualisysCrazyflie object.
Parameters
cf_body_name
:str
- Name of Crazyflie's rigid body in QTM.
cf_uri
:str
- Crazyflie radio address.
world
:World
- World object defining airspace rules.
marker_ids
:[int] (optional)
- ID numbers to be assigned to active markers in order of front, right, back, left.
qtm_ip
:str (optional)
- IP address of QTM host.
Expand source code
class QualisysCrazyflie(Thread): """ Wrapper for Crazyflie drone to fly with Qualisys motion capture systems Attributes ---------- cf_body_name : str Name of Crazyflie's rigid body in QTM cf_uri : str Crazyflie radio address pose : Pose Pose object keeping track of whereabouts world : World World object defining airspace rules """ def __init__(self, cf_body_name, cf_uri, world, marker_ids=[1, 2, 3, 4], qtm_ip="127.0.0.1"): """ Construct QualisysCrazyflie object. Parameters ---------- cf_body_name : str Name of Crazyflie's rigid body in QTM. cf_uri : str Crazyflie radio address. world : World World object defining airspace rules. marker_ids : [int] (optional) ID numbers to be assigned to active markers in order of front, right, back, left. qtm_ip : str (optional) IP address of QTM host. """ print(f'[{cf_body_name}@{cf_uri}] Initializing...') cflib.crtp.init_drivers() self.cf_body_name = cf_body_name self.cf_uri = cf_uri self.world = world self.marker_ids = marker_ids self.pose = None self.anchor = None self.qtm = None self.qtm_ip = qtm_ip self.cf = Crazyflie(ro_cache=None, rw_cache=None) self.scf = SyncCrazyflie(self.cf_uri, cf=self.cf) print(f'[{self.cf_body_name}@{self.cf_uri}] Connecting...') def __enter__(self): """ Enter QualisysCrazyflie context """ self.scf.open_link() print(f'[{self.cf_body_name}@{self.cf_uri}] Connected...') # Set active marker IDs print( f'[{self.cf_body_name}@{self.cf_uri}] Setting active marker IDs: {self.marker_ids}') self.cf.param.set_value('activeMarker.front', self.marker_ids[0]) self.cf.param.set_value('activeMarker.right', self.marker_ids[1]) self.cf.param.set_value('activeMarker.back', self.marker_ids[2]) self.cf.param.set_value('activeMarker.left', self.marker_ids[3]) # Turn off LED to conserve battery self.set_led_ring(0) # Slow down self.set_speed_limit(self.world.speed_limit) self.qtm = qfly.QtmWrapper( self.cf_body_name, lambda pose: self._set_pose(pose), qtm_ip=self.qtm_ip) print( f'[{self.cf_body_name}@{self.cf_uri}] Connecting to QTM at {self.qtm.qtm_ip}...') self.setup() return self def __exit__(self, exc_type=None, exc_value=None, tb=None): """ Exit QualisysCrazyflie context """ print( f'[{self.cf_body_name}@{self.cf_uri}] Exiting...') if exc_type is not None: print( f'[{self.cf_body_name}@{self.cf_uri}] Encountered exception on exit...') traceback.print_exception(exc_type, exc_value, tb) self.qtm.close() self.scf.close_link() def is_safe(self, world=None): """ Perform safety checks, return False if unsafe Parameters ---------- world : World (optional) World object defining airspace rules. Defaults to object's own world if not supplied. """ if world is None: world = self.world # Is the drone tracked properly? if self.qtm.tracking_loss > world.tracking_tolerance: # Respond print(f'''[{self.cf_body_name}@{self.cf_uri}] !!! SAFETY VIOLATION !!! TRACKING LOST FOR {str(self.world.tracking_tolerance)} FRAMES!''') return False # Is the drone inside the safe volume? if not ( # x direction world.origin.x - world.expanse < self.pose.x < world.origin.x + world.expanse # y direction and world.origin.y - world.expanse < self.pose.y < world.origin.y + world.expanse # z direction and 0 < self.pose.z < world.origin.z + (2 * world.expanse)): # Respond print(f'''[{self.cf_body_name}@{self.cf_uri}] !!! SAFETY VIOLATION !!! DRONE OUTSIDE SAFE VOLUME AT ({str(self.pose)})!''') return False else: return True def ascend(self, z_ceiling=1, step=12.0): """ Execute one step of a gentle rising sequence directly upward from current position. Parameters ---------- z_ceiling : float (optional) Height to ascend to. (Unit: m) step : int (optional) Distance between target keyframes. (Unit: cm) """ init_pose = self.pose _z_cm = int(init_pose.z * 100) target = qfly.Pose(init_pose.x, init_pose.y, min(z_ceiling, float((_z_cm + step) / 100.0))) # print( # f'[{self.cf_body_name}@{self.cf_uri}] Ascending from {_z_cm} cm to {z_ceiling}...') self.safe_position_setpoint(target) def descend(self, z_floor=0.0, step=12.0): """ Execute one step of a gentle landing sequence directly downward from current position. Parameters ---------- z_floor : float (optional) Height to descend to. (Unit: m) step : int (optional) Distance between target keyframes. (Unit: cm) """ init_pose = self.pose _z_cm = int(init_pose.z * 100) target = qfly.Pose(init_pose.x, init_pose.y, float((_z_cm - step) / 100.0)) # Engage if target.z < z_floor: self.cf.commander.send_stop_setpoint() else: # print( # f'[{self.cf_body_name}@{self.cf_uri}] Descending from {_z_cm} cm to {z_floor} cm...') self.safe_position_setpoint(target) def land_in_place(self): """ Land sequence directly downward from current position. """ if self.anchor is None or self.anchor.distance_to(self.pose) > self.world.padding: self.anchor = self.pose print( f'[{self.cf_body_name}@{self.cf_uri}] Landing to ground...') target = qfly.Pose(self.anchor.x, self.anchor.y, 0) if self.pose.distance_to(target) < 0.1: self.anchor = None self.cf.commander.send_stop_setpoint() else: self.safe_position_setpoint(target) def land_to_moving_target(self, target, z_offset=0.5, decrement=3, timestep=0.15): """ WARNING: UNDER DEVELOPMENT. DO NOT USE. DOES NOT SUPPORT SWARMS. Executes a gentle landing sequence aiming at a live target. Parameters ---------- target : object An object that has a pose attribute z_offset : float (optional) Vertical offset between target and landing start pose. (Unit: m) (Default: 0.5 m) decrement : int (optional) Distance between target keyframes. (Unit: cm) (Default: 3 cm) timestep : float (optional Time between target keyframes. (Unit: s) (Default: 0.15 s) """ try: init_pose = qfly.Pose(target.pose.x, target.pose.y, target.pose.z + z_offset) except AttributeError: print(f'''[{self.cf_body_name}@{self.cf_uri}] !!! ERROR !!! Landing target has no pose attribute! Landing in place instead!''') self.land_in_place() return z_cm = int(init_pose.z * 100) print( f'[{self.cf_body_name}@{self.cf_uri}] Landing to live target from {z_cm} cm...') # Linear interpolation between starting pose and target while z_cm > target.pose.z * 100: target = qfly.Pose(target.pose.x, target.pose.y, float(z_cm / 100.0)) self.safe_position_setpoint(target) time.sleep(timestep) z_cm = z_cm - decrement self.cf.commander.send_stop_setpoint() def rise_in_place(self, z=1): """ Ascension sequence directly upwards from current position. Parameters ---------- z_ceiling : float (optional) Height to ascend to. (unit: m) """ if self.anchor is None or self.anchor.distance_to(self.pose) > self.world.padding: self.anchor = self.pose print( f'[{self.cf_body_name}@{self.cf_uri}] Ascending...') target = qfly.Pose(self.anchor.x, self.anchor.y, z) self.safe_position_setpoint(target) def safe_position_setpoint(self, target, world=None): """ Sets a geofenced absolute position setpoint within safe airspace defined by world. Parameters ---------- target : Pose Pose object bearting target coordinate and yaw. Yaw defaults to 0 if not supplied. world : World (optional) World object defining airspace rules. Defaults to QualisysCrazyflie object's own world. """ # Sane defaults if world is None: world = self.world if target.yaw == None: target.yaw = 0 # Keep inside safe airspace target.clamp(world) # Engage self.cf.commander.send_position_setpoint( target.x, target.y, target.z, target.yaw) def set_speed_limit(self, speed_limit): """ Sets speed in horizontal (xy) and vertical (z) dimensions. Parameters ---------- speed_limit : float Speed limit. (Unit: m/s) """ print(f'[{self.cf_body_name}@{self.cf_uri}] Speed limit: {speed_limit} m/s') self.cf.param.set_value('posCtlPid.xyVelMax', speed_limit) self.cf.param.set_value('posCtlPid.zVelMax', speed_limit) def setup(self): """ Executes engineering boilerplate to initialize drone. Assumes most drone parameters at factory defaults. If in doubt, inspect drone parameters using Bitcraze client and documentation. """ print(f'[{self.cf_body_name}@{self.cf_uri}] Setting up drone...') # Choose estimator self.cf.param.set_value('stabilizer.estimator', '2') # Black magic self.cf.param.set_value('locSrv.extQuatStdDev', 0.2) # Reset estimator self.cf.param.set_value('kalman.resetEstimation', '1') time.sleep(0.1) self.cf.param.set_value('kalman.resetEstimation', '0') # Stabilize print( f'[{self.cf_body_name}@{self.cf_uri}] Stabilizing...') log_config = LogConfig(name='Kalman Variance', period_in_ms=500) log_config.add_variable('kalman.varPX', 'float') log_config.add_variable('kalman.varPY', 'float') log_config.add_variable('kalman.varPZ', 'float') var_y_history = [1000] * 10 var_x_history = [1000] * 10 var_z_history = [1000] * 10 threshold = 0.001 with SyncLogger(self.scf, log_config) as logger: for log_entry in logger: data = log_entry[1] var_x_history.append(data['kalman.varPX']) var_x_history.pop(0) var_y_history.append(data['kalman.varPY']) var_y_history.pop(0) var_z_history.append(data['kalman.varPZ']) var_z_history.pop(0) min_x = min(var_x_history) max_x = max(var_x_history) min_y = min(var_y_history) max_y = max(var_y_history) min_z = min(var_z_history) max_z = max(var_z_history) print(f'[{self.cf_body_name}@{self.cf_uri}] ' + "Kalman variance | X: {:8.4f} Y: {:8.4f} Z: {:8.4f}".format( max_x - min_x, max_y - min_y, max_z - min_z)) if (max_x - min_x) < threshold and ( max_y - min_y) < threshold and ( max_z - min_z) < threshold: break def set_led_ring(self, val): """ Set LED ring effect. Parameters ---------- val : int LED ring effect ID. See Bitcraze documentation: https://www.bitcraze.io/documentation/repository/crazyflie-firmware/master/api/params/#ring """ self.cf.param.set_value('ring.effect', val) def set_speed_limit(self, speed_limit): """ Set speed limit. Parameters ---------- speed_limit : float Limit for horizontal (xy) and vertical (z) speed. (unit: m/s) """ print(f'[{self.cf_body_name}@{self.cf_uri}] Speed limit: {speed_limit} m/s') self.cf.param.set_value('posCtlPid.xyVelMax', speed_limit) self.cf.param.set_value('posCtlPid.zVelMax', speed_limit) def _set_pose(self, pose): """ Sets internal Pose object and stream to drone Parameters ---------- pose : Pose Pose object containing coordinates """ self.pose = pose # Send to Crazyflie if self.cf is not None: self.cf.extpos.send_extpos(pose.x, pose.y, pose.z) # qw = qfly.utils.sqrt( # 1 + pose.rotmatrix[0][0] + pose.rotmatrix[1][1] + pose.rotmatrix[2][2]) / 2 # qx = qfly.utils.sqrt( # 1 + pose.rotmatrix[0][0] - pose.rotmatrix[1][1] - pose.rotmatrix[2][2]) / 2 # qy = qfly.utils.sqrt( # 1 - pose.rotmatrix[0][0] + pose.rotmatrix[1][1] - pose.rotmatrix[2][2]) / 2 # qz = qfly.utils.sqrt( # 1 - pose.rotmatrix[0][0] - pose.rotmatrix[1][1] + pose.rotmatrix[2][2]) / 2 # ql = qfly.utils.sqrt(qx ** 2 + qy ** 2 + qz ** 2 + qw ** 2) # self.cf.extpos.send_extpose( # pose.x, pose.y, pose.z, qx / ql, qy / ql, qz / ql, qw / ql)
Ancestors
- threading.Thread
Methods
def ascend(self, z_ceiling=1, step=12.0)
-
Execute one step of a gentle rising sequence directly upward from current position.
Parameters
z_ceiling
:float (optional)
- Height to ascend to. (Unit: m)
step
:int (optional)
- Distance between target keyframes. (Unit: cm)
Expand source code
def ascend(self, z_ceiling=1, step=12.0): """ Execute one step of a gentle rising sequence directly upward from current position. Parameters ---------- z_ceiling : float (optional) Height to ascend to. (Unit: m) step : int (optional) Distance between target keyframes. (Unit: cm) """ init_pose = self.pose _z_cm = int(init_pose.z * 100) target = qfly.Pose(init_pose.x, init_pose.y, min(z_ceiling, float((_z_cm + step) / 100.0))) # print( # f'[{self.cf_body_name}@{self.cf_uri}] Ascending from {_z_cm} cm to {z_ceiling}...') self.safe_position_setpoint(target)
def descend(self, z_floor=0.0, step=12.0)
-
Execute one step of a gentle landing sequence directly downward from current position.
Parameters
z_floor
:float (optional)
- Height to descend to. (Unit: m)
step
:int (optional)
- Distance between target keyframes. (Unit: cm)
Expand source code
def descend(self, z_floor=0.0, step=12.0): """ Execute one step of a gentle landing sequence directly downward from current position. Parameters ---------- z_floor : float (optional) Height to descend to. (Unit: m) step : int (optional) Distance between target keyframes. (Unit: cm) """ init_pose = self.pose _z_cm = int(init_pose.z * 100) target = qfly.Pose(init_pose.x, init_pose.y, float((_z_cm - step) / 100.0)) # Engage if target.z < z_floor: self.cf.commander.send_stop_setpoint() else: # print( # f'[{self.cf_body_name}@{self.cf_uri}] Descending from {_z_cm} cm to {z_floor} cm...') self.safe_position_setpoint(target)
def is_safe(self, world=None)
-
Perform safety checks, return False if unsafe Parameters
world
:World (optional)
- World object defining airspace rules. Defaults to object's own world if not supplied.
Expand source code
def is_safe(self, world=None): """ Perform safety checks, return False if unsafe Parameters ---------- world : World (optional) World object defining airspace rules. Defaults to object's own world if not supplied. """ if world is None: world = self.world # Is the drone tracked properly? if self.qtm.tracking_loss > world.tracking_tolerance: # Respond print(f'''[{self.cf_body_name}@{self.cf_uri}] !!! SAFETY VIOLATION !!! TRACKING LOST FOR {str(self.world.tracking_tolerance)} FRAMES!''') return False # Is the drone inside the safe volume? if not ( # x direction world.origin.x - world.expanse < self.pose.x < world.origin.x + world.expanse # y direction and world.origin.y - world.expanse < self.pose.y < world.origin.y + world.expanse # z direction and 0 < self.pose.z < world.origin.z + (2 * world.expanse)): # Respond print(f'''[{self.cf_body_name}@{self.cf_uri}] !!! SAFETY VIOLATION !!! DRONE OUTSIDE SAFE VOLUME AT ({str(self.pose)})!''') return False else: return True
def land_in_place(self)
-
Land sequence directly downward from current position.
Expand source code
def land_in_place(self): """ Land sequence directly downward from current position. """ if self.anchor is None or self.anchor.distance_to(self.pose) > self.world.padding: self.anchor = self.pose print( f'[{self.cf_body_name}@{self.cf_uri}] Landing to ground...') target = qfly.Pose(self.anchor.x, self.anchor.y, 0) if self.pose.distance_to(target) < 0.1: self.anchor = None self.cf.commander.send_stop_setpoint() else: self.safe_position_setpoint(target)
def land_to_moving_target(self, target, z_offset=0.5, decrement=3, timestep=0.15)
-
WARNING: UNDER DEVELOPMENT. DO NOT USE. DOES NOT SUPPORT SWARMS. Executes a gentle landing sequence aiming at a live target.
Parameters
target
:object
- An object that has a pose attribute
z_offset
:float (optional)
- Vertical offset between target and landing start pose. (Unit: m) (Default: 0.5 m)
decrement
:int (optional)
- Distance between target keyframes. (Unit: cm) (Default: 3 cm)
timestep
:float (optional
- Time between target keyframes. (Unit: s) (Default: 0.15 s)
Expand source code
def land_to_moving_target(self, target, z_offset=0.5, decrement=3, timestep=0.15): """ WARNING: UNDER DEVELOPMENT. DO NOT USE. DOES NOT SUPPORT SWARMS. Executes a gentle landing sequence aiming at a live target. Parameters ---------- target : object An object that has a pose attribute z_offset : float (optional) Vertical offset between target and landing start pose. (Unit: m) (Default: 0.5 m) decrement : int (optional) Distance between target keyframes. (Unit: cm) (Default: 3 cm) timestep : float (optional Time between target keyframes. (Unit: s) (Default: 0.15 s) """ try: init_pose = qfly.Pose(target.pose.x, target.pose.y, target.pose.z + z_offset) except AttributeError: print(f'''[{self.cf_body_name}@{self.cf_uri}] !!! ERROR !!! Landing target has no pose attribute! Landing in place instead!''') self.land_in_place() return z_cm = int(init_pose.z * 100) print( f'[{self.cf_body_name}@{self.cf_uri}] Landing to live target from {z_cm} cm...') # Linear interpolation between starting pose and target while z_cm > target.pose.z * 100: target = qfly.Pose(target.pose.x, target.pose.y, float(z_cm / 100.0)) self.safe_position_setpoint(target) time.sleep(timestep) z_cm = z_cm - decrement self.cf.commander.send_stop_setpoint()
def rise_in_place(self, z=1)
-
Ascension sequence directly upwards from current position.
Parameters
z_ceiling
:float (optional)
- Height to ascend to. (unit: m)
Expand source code
def rise_in_place(self, z=1): """ Ascension sequence directly upwards from current position. Parameters ---------- z_ceiling : float (optional) Height to ascend to. (unit: m) """ if self.anchor is None or self.anchor.distance_to(self.pose) > self.world.padding: self.anchor = self.pose print( f'[{self.cf_body_name}@{self.cf_uri}] Ascending...') target = qfly.Pose(self.anchor.x, self.anchor.y, z) self.safe_position_setpoint(target)
def safe_position_setpoint(self, target, world=None)
-
Sets a geofenced absolute position setpoint within safe airspace defined by world.
Parameters
target
:Pose
- Pose object bearting target coordinate and yaw. Yaw defaults to 0 if not supplied.
world
:World (optional)
- World object defining airspace rules. Defaults to QualisysCrazyflie object's own world.
Expand source code
def safe_position_setpoint(self, target, world=None): """ Sets a geofenced absolute position setpoint within safe airspace defined by world. Parameters ---------- target : Pose Pose object bearting target coordinate and yaw. Yaw defaults to 0 if not supplied. world : World (optional) World object defining airspace rules. Defaults to QualisysCrazyflie object's own world. """ # Sane defaults if world is None: world = self.world if target.yaw == None: target.yaw = 0 # Keep inside safe airspace target.clamp(world) # Engage self.cf.commander.send_position_setpoint( target.x, target.y, target.z, target.yaw)
def set_led_ring(self, val)
-
Set LED ring effect.
Parameters
val
:int
- LED ring effect ID. See Bitcraze documentation: https://www.bitcraze.io/documentation/repository/crazyflie-firmware/master/api/params/#ring
Expand source code
def set_led_ring(self, val): """ Set LED ring effect. Parameters ---------- val : int LED ring effect ID. See Bitcraze documentation: https://www.bitcraze.io/documentation/repository/crazyflie-firmware/master/api/params/#ring """ self.cf.param.set_value('ring.effect', val)
def set_speed_limit(self, speed_limit)
-
Set speed limit.
Parameters
speed_limit
:float
- Limit for horizontal (xy) and vertical (z) speed. (unit: m/s)
Expand source code
def set_speed_limit(self, speed_limit): """ Set speed limit. Parameters ---------- speed_limit : float Limit for horizontal (xy) and vertical (z) speed. (unit: m/s) """ print(f'[{self.cf_body_name}@{self.cf_uri}] Speed limit: {speed_limit} m/s') self.cf.param.set_value('posCtlPid.xyVelMax', speed_limit) self.cf.param.set_value('posCtlPid.zVelMax', speed_limit)
def setup(self)
-
Executes engineering boilerplate to initialize drone. Assumes most drone parameters at factory defaults. If in doubt, inspect drone parameters using Bitcraze client and documentation.
Expand source code
def setup(self): """ Executes engineering boilerplate to initialize drone. Assumes most drone parameters at factory defaults. If in doubt, inspect drone parameters using Bitcraze client and documentation. """ print(f'[{self.cf_body_name}@{self.cf_uri}] Setting up drone...') # Choose estimator self.cf.param.set_value('stabilizer.estimator', '2') # Black magic self.cf.param.set_value('locSrv.extQuatStdDev', 0.2) # Reset estimator self.cf.param.set_value('kalman.resetEstimation', '1') time.sleep(0.1) self.cf.param.set_value('kalman.resetEstimation', '0') # Stabilize print( f'[{self.cf_body_name}@{self.cf_uri}] Stabilizing...') log_config = LogConfig(name='Kalman Variance', period_in_ms=500) log_config.add_variable('kalman.varPX', 'float') log_config.add_variable('kalman.varPY', 'float') log_config.add_variable('kalman.varPZ', 'float') var_y_history = [1000] * 10 var_x_history = [1000] * 10 var_z_history = [1000] * 10 threshold = 0.001 with SyncLogger(self.scf, log_config) as logger: for log_entry in logger: data = log_entry[1] var_x_history.append(data['kalman.varPX']) var_x_history.pop(0) var_y_history.append(data['kalman.varPY']) var_y_history.pop(0) var_z_history.append(data['kalman.varPZ']) var_z_history.pop(0) min_x = min(var_x_history) max_x = max(var_x_history) min_y = min(var_y_history) max_y = max(var_y_history) min_z = min(var_z_history) max_z = max(var_z_history) print(f'[{self.cf_body_name}@{self.cf_uri}] ' + "Kalman variance | X: {:8.4f} Y: {:8.4f} Z: {:8.4f}".format( max_x - min_x, max_y - min_y, max_z - min_z)) if (max_x - min_x) < threshold and ( max_y - min_y) < threshold and ( max_z - min_z) < threshold: break