-
Notifications
You must be signed in to change notification settings - Fork 38
ITEP-89520: Sensor tagging for high performan tracker #1308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 11 commits
a911fe6
01dc283
2a3a581
6ae6831
977fe7b
fa746cd
94de8df
6a20088
2d5bdbf
a118505
2b07ae1
2e178ba
c92671f
3ea470b
bf67241
ca5ec0a
f32cffe
baf8166
54a89f1
7d5d701
c27bcc0
8127c84
e1edda7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -63,6 +63,7 @@ def __init__(self, name, map_file, scale=None, | |
| self.trackerType = None | ||
| self.persist_attributes = {} | ||
| self.time_chunking_rate_fps = time_chunking_rate_fps | ||
| self._analytics_objects = {} | ||
|
|
||
| if not ControllerMode.isAnalyticsOnly(): | ||
| self._setTracker("time_chunked_intel_labs" if time_chunking_enabled else self.DEFAULT_TRACKER) | ||
|
|
@@ -312,22 +313,24 @@ def processSensorData(self, jdata, when): | |
| timestamp_str = get_iso_time(when) | ||
| timestamp_epoch = when | ||
|
|
||
| # Skip processing if no tracker (analytics-only mode) | ||
| if self.tracker is None: | ||
| return True | ||
|
|
||
| # Find all objects currently in the sensor region across ALL detection types | ||
| # Optimization: check if scene-wide to avoid redundant isPointWithin calls | ||
| # TODO: Further optimize for scenes with many objects: spatial indexing (R-tree), | ||
| # bounding box pre-filtering, or tracking only recently-moved objects | ||
| is_scene_wide = sensor.area == Region.REGION_SCENE | ||
| objects_in_sensor = [] | ||
| for detectionType in self.tracker.trackers.keys(): | ||
| for obj in self.tracker.currentObjects(detectionType): | ||
| # When tracking is disabled, do not rely on obj.frameCount being initialized | ||
| if (not self.use_tracker or obj.frameCount > 3) and (is_scene_wide or sensor.isPointWithin(obj.sceneLoc)): | ||
|
|
||
| if self.use_tracker: | ||
| for detectionType in self.tracker.trackers.keys(): | ||
| for obj in self.tracker.currentObjects(detectionType): | ||
| # When tracking is disabled, do not rely on obj.frameCount being initialized | ||
| if (not self.use_tracker or obj.frameCount > 3) and (is_scene_wide or sensor.isPointWithin(obj.sceneLoc)): | ||
| objects_in_sensor.append(obj) | ||
| obj.chain_data.active_sensors.add(sensor_id) | ||
| else: | ||
| for obj in self._analytics_objects.values(): | ||
| if is_scene_wide or sensor.isPointWithin(obj.sceneLoc): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not for this PR: I think it is odd that all of region, sensor, tripwires are just handled and operated on openly in our scene class. Think about creating a base class called "Analytic" and Region, Tripwire and Sensor derive from it and the process method is overridden in each.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree |
||
| objects_in_sensor.append(obj) | ||
| # Ensure active_sensors is updated (handles scene-wide sensors or objects existing before sensor creation) | ||
| obj.chain_data.active_sensors.add(sensor_id) | ||
|
||
|
|
||
| log.debug("SENSOR OBJECTS FOUND", sensor_id, len(objects_in_sensor), "type:", sensor.singleton_type) | ||
|
|
@@ -398,7 +401,7 @@ def _updateAttributeSensorEvents(self, objects_in_sensor, sensor_id, cur_value, | |
|
|
||
| return | ||
|
|
||
| def updateTrackedObjects(self, detection_type, objects): | ||
| def updateTrackedObjects(self, detection_type, tracked_objects): | ||
| """ | ||
| Update the cache of tracked objects from MQTT. | ||
| This is used by Analytics to consume tracked objects published by the Tracker service. | ||
|
|
@@ -407,7 +410,24 @@ def updateTrackedObjects(self, detection_type, objects): | |
| detection_type: The type of detection (e.g., 'person', 'vehicle') | ||
| objects: List of tracked objects for this detection type | ||
| """ | ||
| self.tracked_objects_cache[detection_type] = objects | ||
| self.tracked_objects_cache[detection_type] = tracked_objects | ||
|
|
||
| if ControllerMode.isAnalyticsOnly(): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please verify if use_tracer and isAnalyticsOnly are redundant.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks to me like |
||
| current_ids = {obj['id'] for obj in tracked_objects if 'id' in obj} | ||
|
|
||
| # Only remove stale objects belonging to THIS detection type | ||
| stale = [ | ||
| oid for oid, wrapper in self._analytics_objects.items() | ||
| if wrapper.category == detection_type and oid not in current_ids | ||
| ] | ||
| for oid in stale: | ||
| del self._analytics_objects[oid] | ||
|
|
||
| # Create or update wrappers β deserialize as a batch for efficiency, | ||
| # then index results by id so _analytics_objects stays consistent. | ||
| deserialized = self._deserializeTrackedObjects(tracked_objects) | ||
| for wrapper in deserialized: | ||
| self._analytics_objects[wrapper.gid] = wrapper | ||
| return | ||
|
|
||
| def getTrackedObjects(self, detection_type): | ||
|
|
@@ -437,7 +457,8 @@ def getTrackedObjects(self, detection_type): | |
| def _deserializeTrackedObjects(self, serialized_objects): | ||
| """ | ||
| Convert serialized tracked objects to a format usable by Analytics. | ||
| This creates lightweight wrappers that mimic MovingObject interface. | ||
| Reuses existing objects from _analytics_objects cache to preserve chain_data | ||
| state (sensor readings, region history) across frames. | ||
| If objects are already deserialized, returns them as-is. | ||
|
|
||
| Args: | ||
|
|
@@ -446,7 +467,6 @@ def _deserializeTrackedObjects(self, serialized_objects): | |
| Returns: | ||
| List of object-like structures with necessary attributes | ||
| """ | ||
|
|
||
| if not serialized_objects or not isinstance(serialized_objects, list): | ||
| return serialized_objects if serialized_objects else [] | ||
|
|
||
|
|
@@ -457,62 +477,64 @@ def _deserializeTrackedObjects(self, serialized_objects): | |
| for obj_data in serialized_objects: | ||
| if not isinstance(obj_data, dict): | ||
| continue | ||
| obj = SimpleNamespace() | ||
| obj.gid = obj_data.get('id') | ||
|
|
||
| obj_id = obj_data.get('id') | ||
|
|
||
| # Reuse existing object to preserve chain_data, or create a new one | ||
| if obj_id in self._analytics_objects: | ||
| obj = self._analytics_objects[obj_id] | ||
| else: | ||
| obj = SimpleNamespace() | ||
| obj.chain_data = ChainData(regions={}, publishedLocations=[], persist={}) | ||
| self._analytics_objects[obj_id] = obj | ||
|
|
||
| # Update position and all fields every frame | ||
| obj.gid = obj_id | ||
| obj.category = obj_data.get('type', obj_data.get('category')) | ||
| obj.sceneLoc = Point(obj_data.get('translation', [0, 0, 0])) | ||
| obj.velocity = Point(obj_data.get('velocity', [0, 0, 0])) if obj_data.get('velocity') else None | ||
| obj.size = obj_data.get('size') | ||
| obj.confidence = obj_data.get('confidence') | ||
| obj.frameCount = obj_data.get('frame_count', 0) | ||
| obj.frameCount = obj_data.get('frame_count', 4) # > 3 so sensor/region checks pass | ||
|
daddo-intel marked this conversation as resolved.
Outdated
|
||
| obj.rotation = obj_data.get('rotation') | ||
| obj.reid = {} | ||
| obj.metadata = {} | ||
| obj.vectors = [] | ||
| obj.boundingBox = None | ||
| obj.boundingBoxPixels = None | ||
| obj.intersected = False | ||
| obj.visibility = obj_data.get('visibility', []) | ||
| obj.info = {'category': obj.category, 'confidence': obj.confidence} | ||
|
|
||
| # Extract reid from metadata if present | ||
| metadata = obj_data.get('metadata', {}) | ||
| obj.reid = metadata.get('reid') if metadata else None | ||
| obj.reid = metadata.get('reid') if metadata else {} | ||
| obj.similarity = obj_data.get('similarity') | ||
| obj.vectors = [] # Empty list - tracked objects from MQTT don't have detection vectors | ||
| obj.boundingBoxPixels = None # Will use camera_bounds from obj_data if available | ||
|
|
||
| obj_id = obj.gid | ||
| if 'first_seen' in obj_data: | ||
| obj.when = get_epoch_time(obj_data.get('first_seen')) | ||
| obj.first_seen = obj.when | ||
| # Cache the first_seen from MQTT data | ||
| if obj_id not in self.object_history_cache: | ||
| self.object_history_cache[obj_id] = {} | ||
| self.object_history_cache[obj_id]['first_seen'] = obj.when | ||
| else: | ||
| # Check if we have a cached first_seen timestamp | ||
| if obj_id in self.object_history_cache and 'first_seen' in self.object_history_cache[obj_id]: | ||
| obj.when = self.object_history_cache[obj_id]['first_seen'] | ||
| obj.first_seen = obj.when | ||
| else: | ||
| # First time seeing this object, record current time | ||
| current_time = get_epoch_time() | ||
| obj.when = current_time | ||
| obj.first_seen = current_time | ||
| if obj_id not in self.object_history_cache: | ||
| self.object_history_cache[obj_id] = {} | ||
| self.object_history_cache[obj_id]['first_seen'] = current_time | ||
| log.debug(f"First time seeing object id {obj_data.get('id')} from MQTT; setting first_seen to current time: {current_time}") | ||
| obj.visibility = obj_data.get('visibility', []) | ||
|
|
||
| obj.info = { | ||
| 'category': obj.category, | ||
| 'confidence': obj.confidence, | ||
| } | ||
|
|
||
| if 'camera_bounds' in obj_data and obj_data['camera_bounds']: | ||
| obj._camera_bounds = obj_data['camera_bounds'] | ||
| else: | ||
| obj._camera_bounds = None | ||
|
|
||
| # Deserialize chain_data: convert sensors into env_sensor_state and attr_sensor_events | ||
| obj.chain_data = ChainData( | ||
| regions=obj_data.get('regions', {}), | ||
| publishedLocations=[], | ||
| persist=obj_data.get('persistent_data', {}), | ||
| ) | ||
| # Timestamps | ||
| if 'first_seen' in obj_data: | ||
| obj.first_seen = get_epoch_time(obj_data['first_seen']) | ||
| obj.when = obj.first_seen | ||
| # Cache the first_seen from MQTT data | ||
| self.object_history_cache.setdefault(obj_id, {})['first_seen'] = obj.when | ||
| elif obj_id in self.object_history_cache and 'first_seen' in self.object_history_cache[obj_id]: | ||
| obj.first_seen = self.object_history_cache[obj_id]['first_seen'] | ||
| obj.when = obj.first_seen | ||
| else: | ||
| current_time = get_epoch_time() | ||
| obj.first_seen = current_time | ||
| obj.when = current_time | ||
| self.object_history_cache.setdefault(obj_id, {})['first_seen'] = current_time | ||
| log.debug(f"First time seeing object id {obj_id} from MQTT; setting first_seen to current time: {current_time}") | ||
|
|
||
| # Update chain_data regions and persist from latest frame data | ||
| obj.chain_data.regions = obj_data.get('regions', obj.chain_data.regions) | ||
| obj.chain_data.persist = obj_data.get('persistent_data', obj.chain_data.persist) | ||
|
|
||
| # Convert serialized sensors into env_sensor_state and attr_sensor_events | ||
| sensors_data = obj_data.get('sensors', {}) | ||
|
|
@@ -528,16 +550,11 @@ def _deserializeTrackedObjects(self, serialized_objects): | |
| else: | ||
| obj.chain_data.attr_sensor_events[sensor_id] = values | ||
|
|
||
| obj_id = obj.gid | ||
| # Restore published locations from history cache | ||
| if obj_id in self.object_history_cache: | ||
| obj.chain_data.publishedLocations = self.object_history_cache[obj_id].get('publishedLocations', []) | ||
| else: | ||
| obj.chain_data.publishedLocations = [] | ||
| self.object_history_cache[obj_id] = {} | ||
|
|
||
| # Store current object data for next frame | ||
| self.object_history_cache[obj_id]['publishedLocations'] = obj.chain_data.publishedLocations | ||
| self.object_history_cache[obj_id]['last_seen'] = obj.sceneLoc | ||
| self.object_history_cache.setdefault(obj_id, {})['publishedLocations'] = obj.chain_data.publishedLocations | ||
| self.object_history_cache.setdefault(obj_id, {})['last_seen'] = obj.sceneLoc | ||
|
|
||
| objects.append(obj) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -309,8 +309,13 @@ sensors-send-events: # NEX-T10456 | |
| $(eval override BASE_IMAGE := $(IMAGE)-controller-test) | ||
| $(eval OLDSECRETSDIR := $(SECRETSDIR)) | ||
| $(eval SECRETSDIR := $(PWD)/manager/secrets) | ||
| $(eval COMPOSE_FILES := $(COMPOSE)/dlstreamer/broker.yml:$(COMPOSE)/ntp.yml:$(COMPOSE)/pgserver.yml:$(COMPOSE)/scene.yml:$(COMPOSE)/web.yml) | ||
| $(call common-recipe, $(COMPOSE_FILES), tests/functional/tc_sensors_send_mqtt_messages.py, 'pgserver web scene', true, /run/secrets/controller.auth) | ||
| $(if $(ANALYTICS),\ | ||
| $(eval COMPOSE_FILES := $(COMPOSE)/dlstreamer/broker.yml:$(COMPOSE)/ntp.yml:$(COMPOSE)/pgserver.yml:$(COMPOSE)/tracker.yml:$(COMPOSE)/controller_analytics.yml:$(COMPOSE)/web.yml) \ | ||
| $(eval SERVICES := 'pgserver web tracker controller-analytics'),\ | ||
| $(eval COMPOSE_FILES := $(COMPOSE)/dlstreamer/broker.yml:$(COMPOSE)/ntp.yml:$(COMPOSE)/pgserver.yml:$(COMPOSE)/scene.yml:$(COMPOSE)/web.yml) \ | ||
| $(eval SERVICES := 'pgserver web scene')\ | ||
| ) | ||
|
Comment on lines
+315
to
+320
|
||
| $(call common-recipe, $(COMPOSE_FILES), tests/functional/tc_sensors_send_mqtt_messages.py, $(SERVICES), true, /run/secrets/controller.auth) | ||
| $(eval SECRETSDIR := $(OLDSECRETSDIR)) | ||
| $(eval override BASE_IMAGE := $(IMAGE_OLD)) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| # SPDX-FileCopyrightText: (C) 2025 - 2026 Intel Corporation | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| networks: | ||
| scenescape-test: | ||
|
|
||
| secrets: | ||
| root-cert: | ||
| file: ${SECRETSDIR}/certs/scenescape-ca.pem | ||
| django: | ||
| file: ${SECRETSDIR}/django | ||
| controller.auth: | ||
| environment: CONTROLLER_AUTH | ||
|
|
||
| services: | ||
| controller-analytics: | ||
| image: scenescape-controller:${VERSION:-latest} | ||
| init: true | ||
| networks: | ||
| scenescape-test: | ||
| depends_on: | ||
| web: | ||
| condition: service_healthy | ||
| broker: | ||
| condition: service_started | ||
| ntpserv: | ||
| condition: service_started | ||
| environment: | ||
| VISIBILITY_TOPIC: ${VISIBILITY:-regulated} | ||
| CONTROLLER_ENABLE_ANALYTICS_ONLY: "true" | ||
| command: --restauth /run/secrets/controller.auth | ||
| --brokerauth /run/secrets/controller.auth | ||
| --broker broker.scenescape.intel.com | ||
| --ntp ntpserv | ||
| --visibility_topic "$VISIBILITY_TOPIC" | ||
| volumes: | ||
| - vol-media:/home/scenescape/SceneScape/media | ||
| - ./controller/config/tracker-config.json:/home/scenescape/SceneScape/tracker-config.json | ||
| secrets: | ||
| - source: root-cert | ||
| target: certs/scenescape-ca.pem | ||
| - django | ||
| - controller.auth | ||
| restart: on-failure | ||
|
|
||
| volumes: | ||
| vol-media: |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,62 @@ | ||||||
| # SPDX-FileCopyrightText: (C) 2025 - 2026 Intel Corporation | ||||||
| # SPDX-License-Identifier: Apache-2.0 | ||||||
|
|
||||||
| networks: | ||||||
| scenescape-test: | ||||||
|
|
||||||
| secrets: | ||||||
| root-cert: | ||||||
| file: ${SECRETSDIR}/certs/scenescape-ca.pem | ||||||
| django: | ||||||
| file: ${SECRETSDIR}/django | ||||||
| controller-auth-file: | ||||||
| file: manager/secrets/controller.auth | ||||||
|
||||||
| file: manager/secrets/controller.auth | |
| file: ${SECRETSDIR}/controller.auth |
Uh oh!
There was an error while loading. Please reload this page.