Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions modules/drone_command/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""
Drone command module for translating tracked object positions into
MAVLink flight commands (move-to and face-toward).
"""

from .drone_command import DroneCommander
from .drone_command_worker import drone_command_worker_run
255 changes: 255 additions & 0 deletions modules/drone_command/drone_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
"""
Drone command interface using MAVLink.

Translates tracked-object positions (camera-relative meters) into
MAVLink commands that move the drone toward a target or yaw to face it.

Coordinate frames
-----------------
Camera (OAK-D): x = right, y = down, z = forward
Drone body: x = forward, y = right, z = down
NED (world): x = north, y = east, z = down

The camera is assumed to be forward-facing and aligned with the body
frame, so the conversion is:
body_x = camera_z (forward)
body_y = camera_x (right)
body_z = camera_y (down)

Body-to-NED requires the drone's current yaw heading.

MAVLink messages used
---------------------
- SET_POSITION_TARGET_LOCAL_NED (mavutil #84)
Move to an offset in the local NED frame.
- COMMAND_LONG MAV_CMD_CONDITION_YAW (#115)
Rotate to face a heading.
"""

import math
import logging
from typing import Optional

from pymavlink import mavutil
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pymavlink dependency is imported on line 33 but is not listed in requirements.txt or pyproject.toml. This will cause an ImportError when the module is loaded. Add pymavlink to the project dependencies.

Copilot uses AI. Check for mistakes.

logger = logging.getLogger(__name__)


class DroneCommander:
"""
Sends MAVLink commands to the flight controller.

Usage::

commander = DroneCommander("tcp:localhost:5762")
commander.connect()
commander.move_to_target(forward=2.0, right=0.5, down=0.3)
commander.face_target(forward=2.0, right=0.5)
"""

def __init__(
self,
connection_string: str,
baud_rate: int = 57600,
) -> None:
self._connection_string = connection_string
self._baud_rate = baud_rate
self._conn: Optional[mavutil.mavlink_connection] = None

# ------------------------------------------------------------------
# Connection
# ------------------------------------------------------------------

def connect(self) -> None:
"""
Establish MAVLink connection and wait for a heartbeat.
"""
logger.info(
"Connecting to flight controller at %s …",
self._connection_string,
)
self._conn = mavutil.mavlink_connection(
self._connection_string,
baud=self._baud_rate,
)
self._conn.wait_heartbeat()
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connect() method calls wait_heartbeat() on line 75 without a timeout parameter. If the flight controller is not available or not sending heartbeats, this call will block indefinitely. Consider adding a timeout parameter to wait_heartbeat() or wrapping the call in a timeout mechanism to prevent the application from hanging during connection.

Suggested change
self._conn.wait_heartbeat()
self._conn.wait_heartbeat(timeout=30)

Copilot uses AI. Check for mistakes.
logger.info(
"Connected (system %d, component %d).",
self._conn.target_system,
self._conn.target_component,
)

# ------------------------------------------------------------------
# Coordinate helpers
# ------------------------------------------------------------------

@staticmethod
def camera_to_body(
cam_x: float,
cam_y: float,
cam_z: float,
) -> tuple:
"""
Convert camera-frame coords to drone body-frame coords.

Camera: x=right, y=down, z=forward
Body: x=forward, y=right, z=down

Returns:
(body_x, body_y, body_z) in metres.
"""
return cam_z, cam_x, cam_y

@staticmethod
def body_to_ned(
body_x: float,
body_y: float,
body_z: float,
yaw_rad: float,
) -> tuple:
"""
Rotate body-frame offset into NED using the drone's current yaw.

Args:
body_x: Forward (m).
body_y: Right (m).
body_z: Down (m).
yaw_rad: Current heading in radians (0 = north, CW positive).

Returns:
(north, east, down) in metres.
Comment on lines +99 to +120
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent spelling: the file uses "meters" on line 4 but "metres" on lines 99 and 120. Use consistent spelling throughout. The American spelling "meters" is more common in technical documentation.

Copilot uses AI. Check for mistakes.
"""
cos_yaw = math.cos(yaw_rad)
sin_yaw = math.sin(yaw_rad)
north = body_x * cos_yaw - body_y * sin_yaw
east = body_x * sin_yaw + body_y * cos_yaw
down = body_z
return north, east, down
Comment on lines +117 to +127
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment on line 117 states "yaw_rad: Current heading in radians (0 = north, CW positive)", but the rotation matrix implementation on lines 124-125 uses standard CCW (counter-clockwise) rotation convention. The rotation matrix "north = body_x * cos_yaw - body_y * sin_yaw; east = body_x * sin_yaw + body_y * cos_yaw" rotates CCW as yaw increases. This inconsistency between documentation and implementation will cause confusion. Verify the correct convention and update either the comment or the rotation matrix accordingly.

Copilot uses AI. Check for mistakes.

# ------------------------------------------------------------------
# Movement commands
# ------------------------------------------------------------------

def move_to_target(
self,
forward: float,
right: float,
down: float,
yaw_rad: float = 0.0,
) -> None:
"""
Send a position-offset command in the local NED frame.

The offsets are body-frame distances to the target; they are
rotated into NED before sending.

Args:
forward: Body-frame forward offset (m).
right: Body-frame right offset (m).
down: Body-frame down offset (m).
yaw_rad: Drone's current yaw (rad) for NED conversion.
"""
if self._conn is None:
raise RuntimeError("Not connected. Call connect() first.")

north, east, d = self.body_to_ned(forward, right, down, yaw_rad)

# type_mask: ignore velocity, acceleration, yaw, yaw_rate
# bits 0-2 = position (use), 3-5 = velocity (ignore),
# 6-8 = acceleration (ignore), 10 = yaw (ignore),
# 11 = yaw_rate (ignore)
type_mask = 0b0000_1111_1111_1000 # 0x0FF8 — only position used

self._conn.mav.set_position_target_local_ned_send(
0, # time_boot_ms
self._conn.target_system,
self._conn.target_component,
mavutil.mavlink.MAV_FRAME_BODY_OFFSET_NED, # offset from current pos
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Frame mismatch: the code manually rotates body coordinates to NED using body_to_ned() on line 155, then sends these NED coordinates using MAV_FRAME_BODY_OFFSET_NED on line 167. MAV_FRAME_BODY_OFFSET_NED expects body-frame coordinates as input and will perform the rotation to NED internally, causing a double rotation. Either use MAV_FRAME_LOCAL_OFFSET_NED with the NED coordinates, or use MAV_FRAME_BODY_OFFSET_NED with the body-frame coordinates (forward, right, down) directly without the body_to_ned conversion.

Suggested change
mavutil.mavlink.MAV_FRAME_BODY_OFFSET_NED, # offset from current pos
mavutil.mavlink.MAV_FRAME_LOCAL_OFFSET_NED, # offset from current pos in local NED

Copilot uses AI. Check for mistakes.
type_mask,
north, # x (north)
east, # y (east)
d, # z (down)
0, 0, 0, # vx, vy, vz (ignored)
0, 0, 0, # afx, afy, afz (ignored)
0, 0, # yaw, yaw_rate (ignored)
)

logger.debug(
"move_to_target body=(%.2f, %.2f, %.2f) NED=(%.2f, %.2f, %.2f)",
forward, right, down, north, east, d,
)

def face_target(
self,
forward: float,
right: float,
) -> None:
"""
Yaw the drone to face a target given its body-frame offset.

Computes the required heading change from the body-frame
forward/right offset and sends MAV_CMD_CONDITION_YAW.

Args:
forward: Body-frame forward offset (m) to target.
right: Body-frame right offset (m) to target.
"""
if self._conn is None:
raise RuntimeError("Not connected. Call connect() first.")

# angle from drone nose to target (positive = clockwise)
Comment on lines +196 to +200
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The face_target method will call math.atan2(right, forward). When both forward and right are zero (target directly above/below the drone), atan2(0, 0) returns 0, which is technically correct but may not represent the intended behavior. Consider adding validation to handle the edge case where the target is at the same horizontal position as the drone, or document this behavior.

Suggested change
"""
if self._conn is None:
raise RuntimeError("Not connected. Call connect() first.")
# angle from drone nose to target (positive = clockwise)
Note:
If both ``forward`` and ``right`` are zero, the target is
directly above or below the drone in the vertical axis and
has no horizontal bearing. In this implementation,
``atan2(0, 0)`` is allowed and interpreted as a 0° yaw
change (i.e. no change in heading).
"""
if self._conn is None:
raise RuntimeError("Not connected. Call connect() first.")
# Angle from drone nose to target (positive = clockwise).
# Note: when forward == 0 and right == 0 (target directly above/below),
# atan2(0, 0) returns 0, which we treat as "no yaw change".

Copilot uses AI. Check for mistakes.
angle_deg = math.degrees(math.atan2(right, forward))

# direction: 1 = CW, -1 = CCW
direction = 1 if angle_deg >= 0 else -1
angle_deg = abs(angle_deg)

self._conn.mav.command_long_send(
self._conn.target_system,
self._conn.target_component,
mavutil.mavlink.MAV_CMD_CONDITION_YAW,
0, # confirmation
angle_deg, # param1: target angle (degrees)
0, # param2: angular speed (deg/s, 0 = default)
direction, # param3: 1=CW, -1=CCW
1, # param4: 1=relative, 0=absolute
0, 0, 0, # params 5-7 (unused)
)

logger.debug(
"face_target body_offset=(%.2f, %.2f) yaw_change=%.1f° %s",
forward, right, angle_deg, "CW" if direction == 1 else "CCW",
)

# ------------------------------------------------------------------
# High-level helper
# ------------------------------------------------------------------

def track_target(
self,
cam_x: float,
cam_y: float,
cam_z: float,
yaw_rad: float = 0.0,
move: bool = True,
face: bool = True,
) -> None:
"""
Convenience method: convert camera coords and send both
move + face commands for a single tracked object.

Args:
cam_x: Camera-frame x (right, m).
cam_y: Camera-frame y (down, m).
cam_z: Camera-frame z (forward/depth, m).
yaw_rad: Drone's current heading (rad).
move: Whether to send a position command.
face: Whether to send a yaw command.
"""
forward, right, down = self.camera_to_body(cam_x, cam_y, cam_z)

if face:
self.face_target(forward, right)

if move:
self.move_to_target(forward, right, down, yaw_rad)
99 changes: 99 additions & 0 deletions modules/drone_command/drone_command_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""
Worker that reads TrackedObject lists from the ObjectTracker output queue
and commands the drone to move toward / face the selected target.

Target selection policy (simple):
- Pick the TRACKED object closest to the camera (smallest z).
- Ignore NEW objects until they become TRACKED (avoids false positives).
- If all objects are LOST, hold position and do nothing.
"""

import logging
from typing import List, Optional

from modules.object_tracker.tracked_object import TrackedObject, TrackingStatus
Comment on lines +12 to +14
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import references a non-existent module 'modules.object_tracker.tracked_object'. This module does not exist in the codebase. The code will fail at runtime with an ImportError. Consider either creating the missing module or updating the import path to reference an existing module.

Suggested change
from typing import List, Optional
from modules.object_tracker.tracked_object import TrackedObject, TrackingStatus
from enum import Enum
from typing import List, Optional, Protocol
try:
from modules.object_tracker.tracked_object import TrackedObject, TrackingStatus
except ImportError: # Fallback when object tracker module is unavailable
class TrackingStatus(Enum):
NEW = "NEW"
TRACKED = "TRACKED"
LOST = "LOST"
class TrackedObject(Protocol):
object_id: int
label: str
x: float
y: float
z: float
status: TrackingStatus

Copilot uses AI. Check for mistakes.
from .drone_command import DroneCommander

logger = logging.getLogger(__name__)


def _select_target(
tracked_objects: List[TrackedObject],
) -> Optional[TrackedObject]:
"""
Choose which tracked object to pursue.

Policy: closest TRACKED object by depth (z).

Args:
tracked_objects: All objects from the current frame.

Returns:
The selected TrackedObject, or None if nothing to pursue.
"""
candidates = [
obj
for obj in tracked_objects
if obj.status == TrackingStatus.TRACKED
]

if not candidates:
return None

# smallest z = closest to camera
return min(candidates, key=lambda obj: obj.z)


def drone_command_worker_run(
input_queue, # multiprocessing.Queue[List[TrackedObject]]
connection_string: str,
baud_rate: int = 57600,
move: bool = True,
face: bool = True,
) -> None:
"""
Main loop: read tracked objects, pick a target, command the drone.

Args:
input_queue: Queue of List[TrackedObject] from ObjectTracker.
connection_string: MAVLink address (e.g. "tcp:localhost:5762").
baud_rate: Serial baud rate.
move: Send position commands.
face: Send yaw commands.
"""
commander = DroneCommander(
connection_string=connection_string,
baud_rate=baud_rate,
)
commander.connect()

logger.info("Drone command worker started. Waiting for tracked objects …")

while True:
tracked_objects: List[TrackedObject] = input_queue.get()

target = _select_target(tracked_objects)

if target is None:
logger.debug("No TRACKED target this frame — holding position.")
continue

logger.info(
"Pursuing target id=%d label=%s at (%.2f, %.2f, %.2f)",
target.object_id,
target.label,
target.x,
target.y,
target.z,
)

# cam coords → body → MAVLink
# yaw_rad=0.0 for now; in production read from flight controller
commander.track_target(
cam_x=target.x,
cam_y=target.y,
cam_z=target.z,
yaw_rad=0.0,
Comment on lines +92 to +96
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a hardcoded yaw_rad=0.0 will produce incorrect position commands when the drone is not facing north. The comment on line 91 acknowledges this ("in production read from flight controller"), but the code as written will send incorrect movement commands. Consider either reading the actual yaw from the flight controller using an ATTITUDE message, or documenting this as a critical limitation that must be addressed before deployment.

Copilot uses AI. Check for mistakes.
move=move,
face=face,
)
Comment on lines +73 to +99
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The infinite loop lacks error handling. If any exception occurs during queue.get(), target selection, or command sending, the worker will crash without cleanup. Consider wrapping the loop body in a try-except block to handle and log exceptions, allowing the worker to continue processing or shut down gracefully.

Suggested change
tracked_objects: List[TrackedObject] = input_queue.get()
target = _select_target(tracked_objects)
if target is None:
logger.debug("No TRACKED target this frame — holding position.")
continue
logger.info(
"Pursuing target id=%d label=%s at (%.2f, %.2f, %.2f)",
target.object_id,
target.label,
target.x,
target.y,
target.z,
)
# cam coords → body → MAVLink
# yaw_rad=0.0 for now; in production read from flight controller
commander.track_target(
cam_x=target.x,
cam_y=target.y,
cam_z=target.z,
yaw_rad=0.0,
move=move,
face=face,
)
try:
tracked_objects: List[TrackedObject] = input_queue.get()
target = _select_target(tracked_objects)
if target is None:
logger.debug("No TRACKED target this frame — holding position.")
continue
logger.info(
"Pursuing target id=%d label=%s at (%.2f, %.2f, %.2f)",
target.object_id,
target.label,
target.x,
target.y,
target.z,
)
# cam coords → body → MAVLink
# yaw_rad=0.0 for now; in production read from flight controller
commander.track_target(
cam_x=target.x,
cam_y=target.y,
cam_z=target.z,
yaw_rad=0.0,
move=move,
face=face,
)
except KeyboardInterrupt:
logger.info("Drone command worker received KeyboardInterrupt, shutting down.")
break
except Exception:
logger.exception(
"Unhandled exception in drone command worker loop; continuing."
)
continue

Copilot uses AI. Check for mistakes.
Comment on lines +64 to +99
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The worker function establishes a MAVLink connection on line 68 but provides no mechanism for graceful shutdown or connection cleanup. The infinite loop on line 72 has no exit condition. Consider adding a shutdown signal mechanism (e.g., poison pill in queue, threading.Event, or signal handler) to allow clean termination and proper resource cleanup.

Copilot uses AI. Check for mistakes.
Comment on lines +92 to +99
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The worker sends MAVLink commands every time it receives tracked objects from the queue, with no rate limiting. If the upstream tracker sends data at high frequency (e.g., 30 Hz from video frames), this could flood the flight controller with rapid position and yaw commands, potentially causing instability or overwhelming the autopilot's command buffer. Consider implementing rate limiting, command throttling, or a minimum movement threshold before sending new commands.

Copilot uses AI. Check for mistakes.
4 changes: 4 additions & 0 deletions requirements-pytorch.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Runtime dependencies for target tracking pipeline

depthai>=2.24.0
pymavlink>=2.4.40
Loading