Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions modules/object_tracker/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""
ObjectTracker module for persistent tracking of detected objects across frames.

Two modes available:
1. On-device (DepthAI): Use configure_tracker_node() + parse_tracklets()
2. Software (host): Use SoftwareTracker.update(detections)
"""

from .tracked_object import TrackedObject, TrackingStatus
from .detection import Detection

# On-device DepthAI tracker
from .object_tracker import configure_tracker_node, parse_tracklets

# Software tracker (accepts Detection objects)
from .software_tracker import SoftwareTracker

# Workers
from .object_tracker_worker import object_tracker_run, object_tracker_read_loop
26 changes: 26 additions & 0 deletions modules/object_tracker/detection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""
Detection input class - interface contract with detection team.

This matches the Detection class from the SpatialDetectionNetwork team.
"""

from dataclasses import dataclass


@dataclass
class Detection:
"""
Standardized detection result from SpatialDetectionNetwork.

This is the input format we receive from the detection team.
"""

label: str
confidence: float
x: float # spatial X (meters, camera frame)
y: float # spatial Y (meters, camera frame)
z: float # spatial Z / depth (meters, camera frame)
xmin: float # bbox left (pixels or normalized)
ymin: float # bbox top (pixels or normalized)
xmax: float # bbox right (pixels or normalized)
ymax: float # bbox bottom (pixels or normalized)
172 changes: 172 additions & 0 deletions modules/object_tracker/object_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
"""
ObjectTracker module using DepthAI's built-in ObjectTracker node.

Configures the ObjectTracker node within a DepthAI pipeline and parses
tracklet output into TrackedObject data classes.

The ObjectTracker node is part of the on-device pipeline:
SpatialDetectionNetwork.out ──► ObjectTracker ──► XLinkOut("tracklets")

This module provides:
- configure_tracker_node(): sets up the node in a shared pipeline
- parse_tracklets(): converts raw DepthAI tracklets into TrackedObject list

Reference: https://docs.luxonis.com/software/depthai/depthai-components/nodes/objecttracker/
"""

from typing import List

import depthai as dai

from .tracked_object import TrackedObject, TrackingStatus


# Map DepthAI tracklet status to our TrackingStatus enum
_STATUS_MAP = {
dai.Tracklet.TrackingStatus.NEW: TrackingStatus.NEW,
dai.Tracklet.TrackingStatus.TRACKED: TrackingStatus.TRACKED,
dai.Tracklet.TrackingStatus.LOST: TrackingStatus.LOST,
dai.Tracklet.TrackingStatus.REMOVED: TrackingStatus.LOST,
}

# Available tracker algorithms
TRACKER_TYPES = {
"ZERO_TERM_COLOR_HISTOGRAM": dai.TrackerType.ZERO_TERM_COLOR_HISTOGRAM,
"ZERO_TERM_IMAGELESS": dai.TrackerType.ZERO_TERM_IMAGELESS,
"SHORT_TERM_IMAGELESS": dai.TrackerType.SHORT_TERM_IMAGELESS,
"SHORT_TERM_KCF": dai.TrackerType.SHORT_TERM_KCF,
}


def configure_tracker_node(
pipeline: dai.Pipeline,
spatial_detection_network: dai.node.SpatialDetectionNetwork,
tracker_type: str = "SHORT_TERM_IMAGELESS",
labels_to_track: List[int] = None,
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter labels_to_track: List[int] = None should be typed as Optional[List[int]] = None for correctness. While Python allows this, type checkers like mypy will flag this as an error since None is not a valid List[int]. Import Optional from typing and update the type hint.

Copilot uses AI. Check for mistakes.
) -> dai.node.ObjectTracker:
"""
Create and configure an ObjectTracker node in the DepthAI pipeline.

This wires the tracker to the SpatialDetectionNetwork outputs.
Teammates provide the pipeline and spatial_detection_network node;
this function adds the tracker on top.

Args:
pipeline: The shared DepthAI pipeline (created by teammates).
spatial_detection_network: The detection network node whose
outputs we consume.
tracker_type: Algorithm name. One of:
ZERO_TERM_COLOR_HISTOGRAM, ZERO_TERM_IMAGELESS,
SHORT_TERM_IMAGELESS, SHORT_TERM_KCF.
labels_to_track: List of class label indices to track.
If None, tracks all detected labels.

Returns:
The configured ObjectTracker node (already linked to inputs
and to an XLinkOut named "tracklets").
"""
if tracker_type not in TRACKER_TYPES:
raise ValueError(
f"Unknown tracker_type '{tracker_type}'. "
f"Options: {list(TRACKER_TYPES.keys())}"
)

# --- create tracker node ---
tracker = pipeline.create(dai.node.ObjectTracker)
tracker.setTrackerType(TRACKER_TYPES[tracker_type])
tracker.setTrackerIdAssignmentPolicy(
dai.TrackerIdAssignmentPolicy.UNIQUE_ID,
)

if labels_to_track is not None:
tracker.setDetectionLabelsToTrack(labels_to_track)

# --- link detection network outputs into tracker inputs ---
# passthrough frame (RGB preview used for detection)
spatial_detection_network.passthrough.link(tracker.inputTrackerFrame)
# detection frame (same frame, used for re-identification)
spatial_detection_network.passthrough.link(tracker.inputDetectionFrame)
# detection results (bounding boxes + spatial coords)
spatial_detection_network.out.link(tracker.inputDetections)

# --- create XLinkOut so host can read tracklets ---
tracker_out = pipeline.create(dai.node.XLinkOut)
tracker_out.setStreamName("tracklets")
tracker.out.link(tracker_out.input)

return tracker


def parse_tracklets(
tracklets_data: dai.Tracklets,
label_map: List[str],
frame_width: int,
frame_height: int,
) -> List[TrackedObject]:
"""
Convert raw DepthAI Tracklets output into a list of TrackedObject.

Called each frame after reading from the device output queue.

Args:
tracklets_data: Raw tracklets from device.getOutputQueue("tracklets").get()
label_map: Ordered list of class names matching model label indices
(e.g. ["person", "car", "landing_pad"]).
frame_width: Original frame width in pixels (for denormalizing bbox).
frame_height: Original frame height in pixels.

Returns:
List of TrackedObject with persistent IDs, status, and smoothed
spatial coordinates.
"""
tracked_objects: List[TrackedObject] = []

for tracklet in tracklets_data.tracklets:
# --- status ---
status = _STATUS_MAP.get(tracklet.status, TrackingStatus.LOST)

# skip objects that have been fully removed
if tracklet.status == dai.Tracklet.TrackingStatus.REMOVED:
continue

# --- label ---
label_index = tracklet.label
label = (
label_map[label_index]
if label_index < len(label_map)
else str(label_index)
)

Comment on lines +134 to +139
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If label_map is empty and a tracklet is received, this will cause label_index < len(label_map) to always be False (since len([]) is 0), resulting in all labels being converted to string indices instead of names. Consider adding validation that label_map is not empty, or handling the empty case explicitly.

Suggested change
label = (
label_map[label_index]
if label_index < len(label_map)
else str(label_index)
)
if label_map and 0 <= label_index < len(label_map):
label = label_map[label_index]
else:
label = str(label_index)

Copilot uses AI. Check for mistakes.
# --- confidence ---
confidence = tracklet.srcImgDetection.confidence

# --- smoothed spatial coordinates (meters) ---
spatial = tracklet.spatialCoordinates
x = spatial.x / 1000.0 # mm -> m
y = spatial.y / 1000.0
z = spatial.z / 1000.0

# --- bounding box (denormalize from 0-1 to pixels) ---
roi = tracklet.roi.denormalize(frame_width, frame_height)
bbox_x = int(roi.topLeft().x)
bbox_y = int(roi.topLeft().y)
bbox_width = int(roi.bottomRight().x - roi.topLeft().x)
bbox_height = int(roi.bottomRight().y - roi.topLeft().y)

tracked_objects.append(
TrackedObject(
object_id=tracklet.id,
status=status,
label=label,
confidence=confidence,
x=x,
y=y,
z=z,
bbox_x=bbox_x,
bbox_y=bbox_y,
bbox_width=bbox_width,
bbox_height=bbox_height,
)
)

return tracked_objects
116 changes: 116 additions & 0 deletions modules/object_tracker/object_tracker_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""
Worker process for ObjectTracker.

Reads tracklet output from the OAK-D device queue, converts it into
TrackedObject data classes, and pushes them to the next pipeline stage.

Follows the existing worker pattern (producer-consumer via queues).
"""

import logging
from typing import List

import depthai as dai

from .object_tracker import configure_tracker_node, parse_tracklets
from .tracked_object import TrackedObject

logger = logging.getLogger(__name__)


def object_tracker_run(
pipeline: dai.Pipeline,
spatial_detection_network: dai.node.SpatialDetectionNetwork,
label_map: List[str],
frame_width: int,
frame_height: int,
output_queue, # multiprocessing.Queue[List[TrackedObject]]
tracker_type: str = "SHORT_TERM_IMAGELESS",
labels_to_track: List[int] = None,
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter labels_to_track: List[int] = None should be typed as Optional[List[int]] = None for correctness. While Python allows this, type checkers like mypy will flag this as an error since None is not a valid List[int]. Import Optional from typing and update the type hint.

Copilot uses AI. Check for mistakes.
) -> None:
Comment on lines +21 to +30
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name object_tracker_run is misleading - it only configures the tracker node but doesn't run anything. The actual read loop is in a separate function object_tracker_read_loop. Consider renaming to configure_object_tracker or setup_object_tracker to better reflect its behavior, or combine both functions if they're always used together.

Copilot uses AI. Check for mistakes.
"""
Main worker entry point for the ObjectTracker.

Configures the tracker node inside the given pipeline, then
continuously reads tracklet output and pushes TrackedObject lists
to output_queue.

In the full system the pipeline is started externally (because
StereoDepth and SpatialDetectionNetwork share the same device
pipeline). This function is called *before* pipeline start so it
can wire the tracker node, and then enters the read loop *after*
the caller starts the device.

Args:
pipeline: The shared DepthAI pipeline.
spatial_detection_network: Detection node to wire into.
label_map: Ordered class names matching model label indices.
frame_width: Frame width in pixels.
frame_height: Frame height in pixels.
output_queue: Queue for downstream consumers.
tracker_type: Tracker algorithm name.
labels_to_track: Label indices to track (None = all).
"""
configure_tracker_node(
pipeline=pipeline,
spatial_detection_network=spatial_detection_network,
tracker_type=tracker_type,
labels_to_track=labels_to_track,
)

logger.info(
"ObjectTracker node configured (type=%s). "
"Waiting for pipeline to start on device.",
tracker_type,
)


def object_tracker_read_loop(
device: dai.Device,
label_map: List[str],
frame_width: int,
frame_height: int,
output_queue, # multiprocessing.Queue[List[TrackedObject]]
) -> None:
"""
Blocking loop that reads tracklets from the device and pushes
TrackedObject lists to output_queue.

Call this after the device has been started with the pipeline.

Args:
device: Running OAK-D device.
label_map: Ordered class names.
frame_width: Frame width in pixels.
frame_height: Frame height in pixels.
output_queue: Queue for downstream consumers.
"""
tracklet_queue = device.getOutputQueue(
name="tracklets",
maxSize=4,
blocking=False,
)

logger.info("ObjectTracker read loop started.")

while True:
tracklets_data = tracklet_queue.get() # blocks until next frame
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment states # blocks until next frame but the queue is created with blocking=False on line 91. The get() call will actually block by default regardless of the queue's blocking parameter (which only affects put() operations). Consider clarifying the comment or making the blocking behavior explicit by using tracklet_queue.get(block=True) to match the comment's intent.

Suggested change
tracklets_data = tracklet_queue.get() # blocks until next frame
tracklets_data = tracklet_queue.get(blocking=True) # blocks until next frame

Copilot uses AI. Check for mistakes.

tracked_objects = parse_tracklets(
tracklets_data=tracklets_data,
label_map=label_map,
frame_width=frame_width,
frame_height=frame_height,
)

if tracked_objects:
logger.debug(
"Frame produced %d tracked objects: %s",
len(tracked_objects),
[
f"id={t.object_id} status={t.status.value}"
for t in tracked_objects
],
)

output_queue.put(tracked_objects)
Comment on lines +96 to +116
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The infinite loop in object_tracker_read_loop has no error handling or exit mechanism. If an exception occurs (e.g., device disconnection, queue errors), the worker will crash without cleanup. Consider adding try-except blocks and a mechanism to gracefully exit the loop (e.g., checking a stop event or catching specific exceptions).

Copilot uses AI. Check for mistakes.
Loading
Loading