-
Notifications
You must be signed in to change notification settings - Fork 36
Expand file tree
/
Copy pathtime_chunking.py
More file actions
209 lines (166 loc) · 9.3 KB
/
time_chunking.py
File metadata and controls
209 lines (166 loc) · 9.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# SPDX-FileCopyrightText: (C) 2025 - 2026 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""
Time-chunked tracker implementation for performance optimization.
OVERVIEW:
Performance enhancement that reduces tracking load by processing only the most recent
detection frame from each camera+category combination within time windows. Instead of
processing every incoming message immediately, buffers them and dispatches only the
latest data every 50ms (default interval, configurable).
IMPLEMENTATION:
- TimeChunkedIntelLabsTracking: Inherits from IntelLabsTracking, overrides trackObjects()
- TimeChunkProcessor: Timer thread that manages buffering and periodic dispatch
- TimeChunkBuffer: Thread-safe storage that keeps only latest frame per camera+category
FEATURES:
- Object Batching: Currently disabled (ENABLE_OBJECT_BATCHING=False). When enabled,
batches objects from all cameras per category into a single tracker call for improved performance
USAGE:
TimeChunkedIntelLabsTracking is configurable via tracker-config.json:
- Set "time_chunking_enabled": true to enable time-chunked tracking
- Set "time_chunking_interval_milliseconds": 50 to set processing interval (optional, defaults to 50ms if not present)
The Scene class will automatically select TimeChunkedIntelLabsTracking when enabled, otherwise uses standard IntelLabsTracking.
Example tracker-config.json:
{
"max_unreliable_frames": 10,
"non_measurement_frames_dynamic": 8,
"non_measurement_frames_static": 16,
"baseline_frame_rate": 30,
"time_chunking_enabled": true,
"time_chunking_interval_milliseconds": 50
}
"""
import threading
import time
from typing import Any, List
from scene_common import log
from controller.ilabs_tracking import IntelLabsTracking
from controller.tracking import BATCHED_MODE, STREAMING_MODE, DEFAULT_SUSPENDED_TRACK_TIMEOUT_SECS
from controller.observability import metrics
DEFAULT_CHUNKING_INTERVAL_MS = 50 # Default interval in milliseconds
# TODO: object batching is not working yet, needs fixing tracker matching logic first
ENABLE_OBJECT_BATCHING = True # Hardcoded to False - batch objects from all cameras per category for single tracker call
class TimeChunkBuffer:
"""Buffer organized by category, then by camera for efficient grouping"""
def __init__(self):
self._data = {} # Structure: {category: {camera_id: (objects, when, already_tracked)}}
self._lock = threading.Lock()
def add(self, camera_id: str, category: str, objects: Any, when: float, already_tracked: List[Any]):
"""Store latest message per category->camera - overwrites previous for performance optimization"""
with self._lock:
# Initialize category if not exists
if category not in self._data:
self._data[category] = {}
# Store latest frame for this camera in this category
self._data[category][camera_id] = (objects, when, already_tracked)
def pop_all(self):
"""Get all data organized by category->camera and clear buffer"""
with self._lock:
result = self._data.copy() # {category: {camera_id: (objects, when, already_tracked)}}
self._data.clear()
return result
class TimeChunkProcessor(threading.Thread):
"""Timer thread that processes buffered messages at configurable intervals"""
def __init__(self, tracker_manager, interval_ms=DEFAULT_CHUNKING_INTERVAL_MS):
super().__init__(daemon=True)
self.buffer = TimeChunkBuffer()
self.tracker_manager = tracker_manager
self.interval = interval_ms / 1000.0 # Convert to seconds
self._stop_event = threading.Event() # Use Event instead of boolean flag
def add_message(self, camera_id: str, category: str, objects: Any, when: float, already_tracked: List[Any]):
"""Buffer latest frame only - overwrites previous frames per camera+category for performance"""
self.buffer.add(camera_id, category, objects, when, already_tracked)
def shutdown(self):
"""Gracefully shutdown the processor thread"""
self._stop_event.set()
def run(self):
"""Process buffer at configured interval - organized by category with camera data"""
while not self._stop_event.is_set():
if self._stop_event.wait(timeout=self.interval):
break # Stop event was set, exit loop
# {category: {camera_id: (objects, when, already_tracked)}}
category_data = self.buffer.pop_all()
# Iterate per category and process each camera separately
for category, camera_dict in category_data.items():
if category in self.tracker_manager.trackers:
tracker = self.tracker_manager.trackers[category]
# Skip the category if tracker is still processing previous batch
if not tracker.queue.empty():
log.warning(
f"Tracker work queue is not empty ({tracker.queue.qsize()}). Dropping {len(camera_dict)} messages for category: {category}")
metrics_attributes = {
"category": category,
"reason": "tracker_busy"
}
metrics.inc_dropped(metrics_attributes)
continue
if ENABLE_OBJECT_BATCHING:
# Create aggregated lists: list of lists where each inner list contains objects from one camera
objects_per_camera = []
latest_when = 0
all_already_tracked = []
# Sort camera data by timestamp (when) to ensure earliest detections come first
sorted_camera_items = sorted(camera_dict.items(), key=lambda x: x[1][1]) # Sort by 'when' (index 1 in tuple)
for camera_id, (objects, when, already_tracked) in sorted_camera_items:
objects_per_camera.append(objects) # Keep objects from each camera in separate list
latest_when = max(latest_when, when)
all_already_tracked.extend(already_tracked)
# Single enqueue for aggregated camera data in this category
if objects_per_camera:
tracker.queue.put((objects_per_camera, latest_when, all_already_tracked, BATCHED_MODE))
else:
# Process each camera's data for this category separately (default behavior)
for camera_id, (objects, when, already_tracked) in camera_dict.items():
tracker.queue.put((objects, when, already_tracked, STREAMING_MODE))
log.info("TimeChunkProcessor thread exiting")
class TimeChunkedIntelLabsTracking(IntelLabsTracking):
"""Time-chunked version of IntelLabsTracking."""
def __init__(self, max_unreliable_time, non_measurement_time_dynamic, non_measurement_time_static, time_chunking_interval_milliseconds, suspended_track_timeout_secs=DEFAULT_SUSPENDED_TRACK_TIMEOUT_SECS):
# Call parent constructor to initialize IntelLabsTracking
super().__init__(max_unreliable_time, non_measurement_time_dynamic, non_measurement_time_static, suspended_track_timeout_secs)
self.time_chunking_interval_milliseconds = time_chunking_interval_milliseconds
self.suspended_track_timeout_secs = suspended_track_timeout_secs
log.info(f"Initialized TimeChunkedIntelLabsTracking {self.__str__()} with chunking interval: {self.time_chunking_interval_milliseconds} ms")
def trackObjects(self, objects, already_tracked_objects, when, categories,
ref_camera_frame_rate, max_unreliable_time,
non_measurement_time_dynamic, non_measurement_time_static,
use_tracker=True):
"""Override trackObjects to use time chunking"""
if not use_tracker:
raise NotImplementedError(
"Non-tracker mode is not supported in TimeChunkedIntelLabsTracking")
# Create IntelLabs trackers if not already created
self._createIlabsTrackers(categories, max_unreliable_time, non_measurement_time_dynamic, non_measurement_time_static)
if not categories:
categories = self.trackers.keys()
# Extract camera_id from objects - required for time chunking
try:
camera_id = objects[0].camera.cameraID
except (AttributeError, IndexError):
log.warning("No camera ID found in objects, skipping time chunking processing")
return
for category in categories:
self._updateRefCameraFrameRate(ref_camera_frame_rate, category)
# Use time chunking
self.time_chunk_processor.add_message(
camera_id, category, objects, when, already_tracked_objects)
def _createIlabsTrackers(self, categories, max_unreliable_time, non_measurement_time_dynamic, non_measurement_time_static):
"""Create IntelLabs tracker object for each category"""
# create time chunk processor for frames buffering
if not hasattr(self, 'time_chunk_processor'):
self.time_chunk_processor = TimeChunkProcessor(self, self.time_chunking_interval_milliseconds)
self.time_chunk_processor.start()
# delegate tracking to IntelLabsTracking
for category in categories:
if category not in self.trackers:
tracker = IntelLabsTracking(max_unreliable_time, non_measurement_time_dynamic, non_measurement_time_static, self.suspended_track_timeout_secs)
self.trackers[category] = tracker
tracker.start()
log.info(f"Started IntelLabs tracker {tracker.__str__()} thread for category {category}")
return
def join(self):
# First, stop the time chunk processor and wait for it to process all pending messages
if hasattr(self, 'time_chunk_processor'):
self.time_chunk_processor.shutdown()
self.time_chunk_processor.join()
super().join()
return