Skip to content

refactor(trackers): decouple DeepSORTKalmanBoxTracker from SORTKalmanBoxTracker #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 135 additions & 4 deletions trackers/core/deepsort/kalman_box_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,154 @@

import numpy as np

from trackers.core.sort.kalman_box_tracker import SORTKalmanBoxTracker


class DeepSORTKalmanBoxTracker(SORTKalmanBoxTracker):
class DeepSORTKalmanBoxTracker:
"""
The `DeepSORTKalmanBoxTracker` class represents the internals of a single
tracked object (bounding box), with a Kalman filter to predict and update
its position. It also maintains a feature vector for the object, which is
used to identify the object across frames.

Attributes:
tracker_id (int): Unique identifier for the tracker.
number_of_successful_updates (int): Number of times the object has been
updated successfully.
time_since_update (int): Number of frames since the last update.
state (np.ndarray): State vector of the bounding box.
F (np.ndarray): State transition matrix.
H (np.ndarray): Measurement matrix.
Q (np.ndarray): Process noise covariance matrix.
R (np.ndarray): Measurement noise covariance matrix.
P (np.ndarray): Error covariance matrix.
features (list[np.ndarray]): List of feature vectors.
count_id (int): Class variable to assign unique IDs to each tracker.

Args:
bbox (np.ndarray): Initial bounding box in the form [x1, y1, x2, y2].
feature (Optional[np.ndarray]): Optional initial feature vector.
"""

count_id = 0

@classmethod
def get_next_tracker_id(cls) -> int:
"""
Class method that returns the next available tracker ID.

Returns:
int: The next available tracker ID.
"""
next_id = cls.count_id
cls.count_id += 1
return next_id

def __init__(self, bbox: np.ndarray, feature: Optional[np.ndarray] = None):
super().__init__(bbox)
# Initialize with a temporary ID of -1
# Will be assigned a real ID when the track is considered mature
self.tracker_id = -1

# Number of hits indicates how many times the object has been
# updated successfully
self.number_of_successful_updates = 1
# Number of frames since the last update
self.time_since_update = 0

# For simplicity, we keep a small state vector:
# (x, y, x2, y2, vx, vy, vx2, vy2).
# We'll store the bounding box in "self.state"
self.state = np.zeros((8, 1), dtype=np.float32)

# Initialize state directly from the first detection
self.state[0] = bbox[0]
self.state[1] = bbox[1]
self.state[2] = bbox[2]
self.state[3] = bbox[3]

# Basic constant velocity model
self._initialize_kalman_filter()

# Initialize features list
self.features: list[np.ndarray] = []
if feature is not None:
self.features.append(feature)

def _initialize_kalman_filter(self) -> None:
"""
Sets up the matrices for the Kalman filter.
"""
# State transition matrix (F): 8x8
# We assume a constant velocity model. Positions are incremented by
# velocity each step.
self.F = np.eye(8, dtype=np.float32)
for i in range(4):
self.F[i, i + 4] = 1.0

# Measurement matrix (H): we directly measure x1, y1, x2, y2
self.H = np.eye(4, 8, dtype=np.float32) # 4x8

# Process covariance matrix (Q)
self.Q = np.eye(8, dtype=np.float32) * 0.01

# Measurement covariance (R): noise in detection
self.R = np.eye(4, dtype=np.float32) * 0.1

# Error covariance matrix (P)
self.P = np.eye(8, dtype=np.float32)

def predict(self) -> None:
"""
Predict the next state of the bounding box (applies the state transition).
"""
# Predict state
self.state = self.F @ self.state
# Predict error covariance
self.P = self.F @ self.P @ self.F.T + self.Q

# Increase time since update
self.time_since_update += 1

def update(self, bbox: np.ndarray) -> None:
"""
Updates the state with a new detected bounding box.

Args:
bbox (np.ndarray): Detected bounding box in the form [x1, y1, x2, y2].
"""
self.time_since_update = 0
self.number_of_successful_updates += 1

# Kalman Gain
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ np.linalg.inv(S)

# Residual
measurement = bbox.reshape((4, 1))
y = measurement - self.H @ self.state

# Update state
self.state = self.state + K @ y

# Update covariance
identity_matrix = np.eye(8, dtype=np.float32)
self.P = (identity_matrix - K @ self.H) @ self.P

def get_state_bbox(self) -> np.ndarray:
"""
Returns the current bounding box estimate from the state vector.

Returns:
np.ndarray: The bounding box [x1, y1, x2, y2].
"""
return np.array(
[
self.state[0], # x1
self.state[1], # y1
self.state[2], # x2
self.state[3], # y2
],
dtype=float,
).reshape(-1)

def update_feature(self, feature: np.ndarray):
self.features.append(feature)

Expand Down
16 changes: 11 additions & 5 deletions trackers/core/deepsort/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ class DeepSORTTracker(BaseTrackerWithFeatures):
from rfdetr import RFDETRBase
from rfdetr.util.coco_classes import COCO_CLASSES

from trackers.core.deepsort.tracker import DeepSORTTracker
from trackers import DeepSORTFeatureExtractor, DeepSORTTracker

model = RFDETRBase(device="mps")
tracker = DeepSORTTracker()
feature_extractor = DeepSORTFeatureExtractor.from_timm(
model_name="mobilenetv4_conv_small.e1200_r224_in1k"
)
tracker = DeepSORTTracker(feature_extractor=feature_extractor)
box_annotator = sv.BoxAnnotator()
label_annotator = sv.LabelAnnotator()

Expand Down Expand Up @@ -292,7 +295,8 @@ def _get_associated_indices(
if combined_dist.size > 0:
row_indices, col_indices = np.where(combined_dist < 1.0)
sorted_pairs = sorted(
zip(row_indices, col_indices), key=lambda x: combined_dist[x[0], x[1]]
zip(map(int, row_indices), map(int, col_indices)),
key=lambda x: combined_dist[x[0], x[1]],
)

used_rows = set()
Expand All @@ -303,8 +307,10 @@ def _get_associated_indices(
used_cols.add(col)
matched_indices.append((row, col))

unmatched_trackers = unmatched_trackers - used_rows
unmatched_detections = unmatched_detections - used_cols
unmatched_trackers = unmatched_trackers - {int(row) for row in used_rows}
unmatched_detections = unmatched_detections - {
int(col) for col in used_cols
}

return matched_indices, unmatched_trackers, unmatched_detections

Expand Down
11 changes: 7 additions & 4 deletions trackers/utils/sort_utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from copy import deepcopy
from typing import List, Sequence, TypeVar
from typing import List, Sequence, TypeVar, Union

import numpy as np
import supervision as sv
from supervision.detection.utils import box_iou_batch

from trackers.core.deepsort.kalman_box_tracker import DeepSORTKalmanBoxTracker
from trackers.core.sort.kalman_box_tracker import SORTKalmanBoxTracker

KalmanBoxTrackerType = TypeVar("KalmanBoxTrackerType", bound=SORTKalmanBoxTracker)
KalmanBoxTrackerType = TypeVar(
"KalmanBoxTrackerType", bound=Union[SORTKalmanBoxTracker, DeepSORTKalmanBoxTracker]
)


def get_alive_trackers(
Expand Down Expand Up @@ -42,7 +45,7 @@ def get_alive_trackers(


def get_iou_matrix(
trackers: Sequence[SORTKalmanBoxTracker], detection_boxes: np.ndarray
trackers: Sequence[KalmanBoxTrackerType], detection_boxes: np.ndarray
) -> np.ndarray:
"""
Build IOU cost matrix between detections and predicted bounding boxes
Expand All @@ -68,7 +71,7 @@ def get_iou_matrix(


def update_detections_with_track_ids(
trackers: Sequence[SORTKalmanBoxTracker],
trackers: Sequence[KalmanBoxTrackerType],
detections: sv.Detections,
detection_boxes: np.ndarray,
minimum_iou_threshold: float,
Expand Down