diff --git a/controller/src/controller/scene.py b/controller/src/controller/scene.py index e68c7fcd6..bbae2dc4f 100644 --- a/controller/src/controller/scene.py +++ b/controller/src/controller/scene.py @@ -63,6 +63,7 @@ def __init__(self, name, map_file, scale=None, self.trackerType = None self.persist_attributes = {} self.time_chunking_rate_fps = time_chunking_rate_fps + self._analytics_objects = {} if not ControllerMode.isAnalyticsOnly(): self._setTracker("time_chunked_intel_labs" if time_chunking_enabled else self.DEFAULT_TRACKER) @@ -312,22 +313,24 @@ def processSensorData(self, jdata, when): timestamp_str = get_iso_time(when) timestamp_epoch = when - # Skip processing if no tracker (analytics-only mode) - if self.tracker is None: - return True - # Find all objects currently in the sensor region across ALL detection types # Optimization: check if scene-wide to avoid redundant isPointWithin calls # TODO: Further optimize for scenes with many objects: spatial indexing (R-tree), # bounding box pre-filtering, or tracking only recently-moved objects is_scene_wide = sensor.area == Region.REGION_SCENE objects_in_sensor = [] - for detectionType in self.tracker.trackers.keys(): - for obj in self.tracker.currentObjects(detectionType): - # When tracking is disabled, do not rely on obj.frameCount being initialized - if (not self.use_tracker or obj.frameCount > 3) and (is_scene_wide or sensor.isPointWithin(obj.sceneLoc)): + + if self.use_tracker and self.tracker is not None: + for detectionType in self.tracker.trackers.keys(): + for obj in self.tracker.currentObjects(detectionType): + # When tracking is disabled, do not rely on obj.frameCount being initialized + if (not self.use_tracker or obj.frameCount > 3) and (is_scene_wide or sensor.isPointWithin(obj.sceneLoc)): + objects_in_sensor.append(obj) + obj.chain_data.active_sensors.add(sensor_id) + else: + for obj in self._analytics_objects.values(): + if is_scene_wide or sensor.isPointWithin(obj.sceneLoc): objects_in_sensor.append(obj) - # Ensure active_sensors is updated (handles scene-wide sensors or objects existing before sensor creation) obj.chain_data.active_sensors.add(sensor_id) log.debug("SENSOR OBJECTS FOUND", sensor_id, len(objects_in_sensor), "type:", sensor.singleton_type) @@ -398,7 +401,7 @@ def _updateAttributeSensorEvents(self, objects_in_sensor, sensor_id, cur_value, return - def updateTrackedObjects(self, detection_type, objects): + def updateTrackedObjects(self, detection_type, tracked_objects): """ Update the cache of tracked objects from MQTT. This is used by Analytics to consume tracked objects published by the Tracker service. @@ -407,7 +410,24 @@ def updateTrackedObjects(self, detection_type, objects): detection_type: The type of detection (e.g., 'person', 'vehicle') objects: List of tracked objects for this detection type """ - self.tracked_objects_cache[detection_type] = objects + self.tracked_objects_cache[detection_type] = tracked_objects + + if ControllerMode.isAnalyticsOnly(): + current_ids = {obj['id'] for obj in tracked_objects if 'id' in obj} + + # Only remove stale objects belonging to THIS detection type + stale = [ + oid for oid, wrapper in self._analytics_objects.items() + if wrapper.category == detection_type and oid not in current_ids + ] + for oid in stale: + del self._analytics_objects[oid] + + # Create or update wrappers — deserialize as a batch for efficiency, + # then index results by id so _analytics_objects stays consistent. + deserialized = self._deserializeTrackedObjects(tracked_objects) + for wrapper in deserialized: + self._analytics_objects[wrapper.gid] = wrapper return def getTrackedObjects(self, detection_type): @@ -422,22 +442,17 @@ def getTrackedObjects(self, detection_type): """ # If analytics-only mode is enabled, only use MQTT cache (from separate Tracker service) if ControllerMode.isAnalyticsOnly(): - if detection_type in self.tracked_objects_cache: - cached_objects = self.tracked_objects_cache[detection_type] - return self._deserializeTrackedObjects(cached_objects) - return [] - - # If tracker is enabled, use direct tracker call (traditional mode) + return [obj for obj in self._analytics_objects.values() + if obj.category == detection_type] if self.tracker is not None: - log.debug(f"Using direct tracker call for detection type: {detection_type}") return self.tracker.currentObjects(detection_type) - return [] def _deserializeTrackedObjects(self, serialized_objects): """ Convert serialized tracked objects to a format usable by Analytics. - This creates lightweight wrappers that mimic MovingObject interface. + Reuses existing objects from _analytics_objects cache to preserve chain_data + state (sensor readings, region history) across frames. If objects are already deserialized, returns them as-is. Args: @@ -446,7 +461,6 @@ def _deserializeTrackedObjects(self, serialized_objects): Returns: List of object-like structures with necessary attributes """ - if not serialized_objects or not isinstance(serialized_objects, list): return serialized_objects if serialized_objects else [] @@ -457,8 +471,19 @@ def _deserializeTrackedObjects(self, serialized_objects): for obj_data in serialized_objects: if not isinstance(obj_data, dict): continue - obj = SimpleNamespace() - obj.gid = obj_data.get('id') + + obj_id = obj_data.get('id') + + # Reuse existing object to preserve chain_data, or create a new one + if obj_id in self._analytics_objects: + obj = self._analytics_objects[obj_id] + else: + obj = SimpleNamespace() + obj.chain_data = ChainData(regions={}, publishedLocations=[], persist={}) + self._analytics_objects[obj_id] = obj + + # Update position and all fields every frame + obj.gid = obj_id obj.category = obj_data.get('type', obj_data.get('category')) obj.sceneLoc = Point(obj_data.get('translation', [0, 0, 0])) obj.velocity = Point(obj_data.get('velocity', [0, 0, 0])) if obj_data.get('velocity') else None @@ -466,53 +491,44 @@ def _deserializeTrackedObjects(self, serialized_objects): obj.confidence = obj_data.get('confidence') obj.frameCount = obj_data.get('frame_count', 0) obj.rotation = obj_data.get('rotation') + obj.reid = {} + obj.metadata = {} + obj.vectors = [] + obj.boundingBox = None + obj.boundingBoxPixels = None + obj.intersected = False + obj.visibility = obj_data.get('visibility', []) + obj.info = {'category': obj.category, 'confidence': obj.confidence} + # Extract reid from metadata if present metadata = obj_data.get('metadata', {}) - obj.reid = metadata.get('reid') if metadata else None + obj.reid = metadata.get('reid') if metadata else {} obj.similarity = obj_data.get('similarity') - obj.vectors = [] # Empty list - tracked objects from MQTT don't have detection vectors - obj.boundingBoxPixels = None # Will use camera_bounds from obj_data if available - - obj_id = obj.gid - if 'first_seen' in obj_data: - obj.when = get_epoch_time(obj_data.get('first_seen')) - obj.first_seen = obj.when - # Cache the first_seen from MQTT data - if obj_id not in self.object_history_cache: - self.object_history_cache[obj_id] = {} - self.object_history_cache[obj_id]['first_seen'] = obj.when - else: - # Check if we have a cached first_seen timestamp - if obj_id in self.object_history_cache and 'first_seen' in self.object_history_cache[obj_id]: - obj.when = self.object_history_cache[obj_id]['first_seen'] - obj.first_seen = obj.when - else: - # First time seeing this object, record current time - current_time = get_epoch_time() - obj.when = current_time - obj.first_seen = current_time - if obj_id not in self.object_history_cache: - self.object_history_cache[obj_id] = {} - self.object_history_cache[obj_id]['first_seen'] = current_time - log.debug(f"First time seeing object id {obj_data.get('id')} from MQTT; setting first_seen to current time: {current_time}") - obj.visibility = obj_data.get('visibility', []) - - obj.info = { - 'category': obj.category, - 'confidence': obj.confidence, - } if 'camera_bounds' in obj_data and obj_data['camera_bounds']: obj._camera_bounds = obj_data['camera_bounds'] else: obj._camera_bounds = None - # Deserialize chain_data: convert sensors into env_sensor_state and attr_sensor_events - obj.chain_data = ChainData( - regions=obj_data.get('regions', {}), - publishedLocations=[], - persist=obj_data.get('persistent_data', {}), - ) + # Timestamps + if 'first_seen' in obj_data: + obj.first_seen = get_epoch_time(obj_data['first_seen']) + obj.when = obj.first_seen + # Cache the first_seen from MQTT data + self.object_history_cache.setdefault(obj_id, {})['first_seen'] = obj.when + elif obj_id in self.object_history_cache and 'first_seen' in self.object_history_cache[obj_id]: + obj.first_seen = self.object_history_cache[obj_id]['first_seen'] + obj.when = obj.first_seen + else: + current_time = get_epoch_time() + obj.first_seen = current_time + obj.when = current_time + self.object_history_cache.setdefault(obj_id, {})['first_seen'] = current_time + log.debug(f"First time seeing object id {obj_id} from MQTT; setting first_seen to current time: {current_time}") + + # Update chain_data regions and persist from latest frame data + obj.chain_data.regions = obj_data.get('regions', obj.chain_data.regions) + obj.chain_data.persist = obj_data.get('persistent_data', obj.chain_data.persist) # Convert serialized sensors into env_sensor_state and attr_sensor_events sensors_data = obj_data.get('sensors', {}) @@ -528,16 +544,11 @@ def _deserializeTrackedObjects(self, serialized_objects): else: obj.chain_data.attr_sensor_events[sensor_id] = values - obj_id = obj.gid + # Restore published locations from history cache if obj_id in self.object_history_cache: obj.chain_data.publishedLocations = self.object_history_cache[obj_id].get('publishedLocations', []) - else: - obj.chain_data.publishedLocations = [] - self.object_history_cache[obj_id] = {} - - # Store current object data for next frame - self.object_history_cache[obj_id]['publishedLocations'] = obj.chain_data.publishedLocations - self.object_history_cache[obj_id]['last_seen'] = obj.sceneLoc + self.object_history_cache.setdefault(obj_id, {})['publishedLocations'] = obj.chain_data.publishedLocations + self.object_history_cache.setdefault(obj_id, {})['last_seen'] = obj.sceneLoc objects.append(obj) diff --git a/tests/Makefile b/tests/Makefile index 6668539b9..9eeb97ef7 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -69,6 +69,7 @@ define common-recipe = WAITFORCONTAINERS=$(CONTAINERS) \ NO_PROXY=$(NO_PROXY),.scenescape.intel.com,.scenescape \ no_proxy=$(no_proxy),.scenescape.intel.com,.scenescape \ + ANALYTICS=$(ANALYTICS) \ $(RUNTEST) $(YML) pytest -s $(GENERATE_JUNITXML) $(TEST_SCRIPT) \ $${EXTRAS} 2>&1 | tee -ia $(LOGFILE) \ ; echo "MAKE_TARGET: $@" | tee -ia $(LOGFILE) \ diff --git a/tests/Makefile.functional b/tests/Makefile.functional index e0f0b3153..a7ad12644 100644 --- a/tests/Makefile.functional +++ b/tests/Makefile.functional @@ -312,8 +312,13 @@ sensors-send-events: # NEX-T10456 $(eval override BASE_IMAGE := $(IMAGE)-controller-test) $(eval OLDSECRETSDIR := $(SECRETSDIR)) $(eval SECRETSDIR := $(PWD)/manager/secrets) - $(eval COMPOSE_FILES := $(COMPOSE)/dlstreamer/broker.yml:$(COMPOSE)/ntp.yml:$(COMPOSE)/pgserver.yml:$(COMPOSE)/scene.yml:$(COMPOSE)/web.yml) - $(call common-recipe, $(COMPOSE_FILES), tests/functional/tc_sensors_send_mqtt_messages.py, 'pgserver web scene', true, /run/secrets/controller.auth) + $(if $(ANALYTICS),\ + $(eval COMPOSE_FILES := $(COMPOSE)/dlstreamer/broker.yml:$(COMPOSE)/ntp.yml:$(COMPOSE)/pgserver.yml:$(COMPOSE)/tracker.yml:$(COMPOSE)/controller_analytics.yml:$(COMPOSE)/web.yml) \ + $(eval SERVICES := 'pgserver web tracker controller-analytics'),\ + $(eval COMPOSE_FILES := $(COMPOSE)/dlstreamer/broker.yml:$(COMPOSE)/ntp.yml:$(COMPOSE)/pgserver.yml:$(COMPOSE)/scene.yml:$(COMPOSE)/web.yml) \ + $(eval SERVICES := 'pgserver web scene')\ + ) + $(call common-recipe, $(COMPOSE_FILES), tests/functional/tc_sensors_send_mqtt_messages.py, $(SERVICES), true, /run/secrets/controller.auth) $(eval SECRETSDIR := $(OLDSECRETSDIR)) $(eval override BASE_IMAGE := $(IMAGE_OLD)) diff --git a/tests/compose/controller_analytics.yml b/tests/compose/controller_analytics.yml new file mode 100644 index 000000000..d93654077 --- /dev/null +++ b/tests/compose/controller_analytics.yml @@ -0,0 +1,47 @@ +# SPDX-FileCopyrightText: (C) 2025 - 2026 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +networks: + scenescape-test: + +secrets: + root-cert: + file: ${SECRETSDIR}/certs/scenescape-ca.pem + django: + file: ${SECRETSDIR}/django + controller.auth: + environment: CONTROLLER_AUTH + +services: + controller-analytics: + image: scenescape-controller:${VERSION:-latest} + init: true + networks: + scenescape-test: + depends_on: + web: + condition: service_healthy + broker: + condition: service_started + ntpserv: + condition: service_started + environment: + VISIBILITY_TOPIC: ${VISIBILITY:-regulated} + CONTROLLER_ENABLE_ANALYTICS_ONLY: "true" + command: --restauth /run/secrets/controller.auth + --brokerauth /run/secrets/controller.auth + --broker broker.scenescape.intel.com + --ntp ntpserv + --visibility_topic "$VISIBILITY_TOPIC" + volumes: + - vol-media:/home/scenescape/SceneScape/media + - ./controller/config/tracker-config.json:/home/scenescape/SceneScape/tracker-config.json + secrets: + - source: root-cert + target: certs/scenescape-ca.pem + - django + - controller.auth + restart: on-failure + +volumes: + vol-media: diff --git a/tests/compose/tracker.yml b/tests/compose/tracker.yml new file mode 100644 index 000000000..a831b9e8b --- /dev/null +++ b/tests/compose/tracker.yml @@ -0,0 +1,62 @@ +# SPDX-FileCopyrightText: (C) 2025 - 2026 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +networks: + scenescape-test: + +secrets: + root-cert: + file: ${SECRETSDIR}/certs/scenescape-ca.pem + django: + file: ${SECRETSDIR}/django + controller-auth-file: + file: manager/secrets/controller.auth + +services: + tracker: + image: scenescape-tracker:${VERSION:-latest} + networks: + scenescape-test: + aliases: + - tracker.scenescape.intel.com + user: "10001:10001" + depends_on: + broker: + condition: service_started + web: + condition: service_healthy + environment: + - TRACKER_LOG_LEVEL=info + - TRACKER_MQTT_HOST=broker.scenescape.intel.com + - TRACKER_MQTT_PORT=1883 + - TRACKER_MQTT_INSECURE=false + - TRACKER_MQTT_TLS_CA_CERT=/run/secrets/certs/scenescape-ca.pem + - TRACKER_MQTT_TLS_VERIFY_SERVER=true + - TRACKER_MANAGER_URL=https://web.scenescape.intel.com + - TRACKER_MANAGER_AUTH_PATH=/run/secrets/controller.auth + - TRACKER_MANAGER_CA_CERT_PATH=/run/secrets/certs/scenescape-ca.pem + - TRACKER_SCENES_SOURCE=api + # Override host proxy settings - Paho MQTT dont respect no_proxy var, so as a WA + # tracker code detects empty vars and unsets them (see proxy_utils.cpp clearEmptyProxyEnvVars) + - http_proxy= + - https_proxy= + - HTTP_PROXY= + - HTTPS_PROXY= + secrets: + - source: root-cert + target: certs/scenescape-ca.pem + - source: controller-auth-file + target: /run/secrets/controller.auth + read_only: true + cap_drop: + - ALL + security_opt: + - no-new-privileges:true + # Exit 0: graceful stop or non-retryable error (bad auth) — stay stopped + # Exit 1: retryable error (broker unavailable) — restart + # Exit 99: scene update received — restart to reload config + restart: on-failure + mem_limit: ${TRACKER_MEM_LIMIT:-512m} + # Scale: ~1 CPU per 100 tracked objects. Increase TRACKER_CPUS for larger deployments. + cpus: ${TRACKER_CPUS:-2.0} + pids_limit: 1000 diff --git a/tests/functional/tc_sensors_send_mqtt_messages.py b/tests/functional/tc_sensors_send_mqtt_messages.py index 9df4c0089..055a91c8b 100755 --- a/tests/functional/tc_sensors_send_mqtt_messages.py +++ b/tests/functional/tc_sensors_send_mqtt_messages.py @@ -26,6 +26,9 @@ FRAMES_PER_SECOND = 10 THING_TYPES = ["person", "chair", "table", "couch"] MAX_CONTROLLER_WAIT = 30 # seconds +IMG_W = 640 +IMG_H = 480 +ANALYTICS = os.environ.get('ANALYTICS', 'false').lower() == 'true' class SensorMqttMessageFlowTest(FunctionalTest): def __init__(self, testName, request, recordXMLAttribute): @@ -159,62 +162,61 @@ def plotCourse(self): course = np.dstack(course) return course[0] + def to_px(self, box): + x = box['x'] + y = box['y'] + w = box['width'] + h = box['height'] + + cx = (x + 1) / 2 * IMG_W + cy = (y + 1) / 2 * IMG_H + + w_px = w * IMG_W + h_px = h * IMG_H + + return { + "x": cx - w_px / 2, + "y": cy - h_px / 2, + "width": w_px, + "height": h_px + } + def createDetection(self, positionNow): + def make_obj(obj_id, category, x, y, w, h): + bbox = { + 'x': x, + 'y': y, + 'width': w, + 'height': h, + } + + return { + 'id': obj_id, + 'category': category, + 'bounding_box': bbox, + 'bounding_box_px': self.to_px(bbox), + } + detection = { - 'id': self.cameraId, - 'timestamp': get_iso_time(get_epoch_time()), - 'objects': { - 'person': [ - { - 'id': 1, - 'category': 'person', - 'bounding_box': { - 'x': 0.56, - 'y': positionNow.y, - 'width': 0.24, - 'height': 0.49, - }, - }, - ], - 'chair': [ - { - 'id': 2, - 'category': 'chair', - 'bounding_box': { - 'x': 0.68, - 'y': positionNow.y, - 'width': 0.24, - 'height': 0.49, - }, - }, - ], - 'table': [ - { - 'id': 3, - 'category': 'table', - 'bounding_box': { - 'x': 0.44, - 'y': positionNow.y, - 'width': 0.30, - 'height': 0.20, - }, - }, - ], - 'couch': [ - { - 'id': 4, - 'category': 'couch', - 'bounding_box': { - 'x': 0.80, - 'y': positionNow.y, - 'width': 0.36, - 'height': 0.28, - }, - }, - ], - }, - 'rate': 9.8, + 'id': self.cameraId, + 'timestamp': get_iso_time(get_epoch_time()), + 'objects': { + 'person': [ + make_obj(1, 'person', 0.56, positionNow.y, 0.24, 0.49) + ], + 'chair': [ + make_obj(2, 'chair', 0.68, positionNow.y, 0.24, 0.49) + ], + 'table': [ + make_obj(3, 'table', 0.44, positionNow.y, 0.30, 0.20) + ], + 'couch': [ + make_obj(4, 'couch', 0.80, positionNow.y, 0.36, 0.28) + ], + }, + 'rate': 9.8, } + return detection def _publish_scheduled_sensor_value(self, idx, now, schedule, sensor_name): @@ -407,7 +409,9 @@ def _find_sensor_values_in_messages(self, messages, sensor_name): def _verify_sensor_payloads(self): assert self.regulatedMessages, "No regulated messages received" - assert self.externalMessages, "No external messages received" + + if not ANALYTICS: + assert self.externalMessages, "No external messages received" seen_types = set() scene_env_types = set() diff --git a/tests/sscape_tests/scene_pytest/test_scene.py b/tests/sscape_tests/scene_pytest/test_scene.py index c8f8748e9..8637f304f 100644 --- a/tests/sscape_tests/scene_pytest/test_scene.py +++ b/tests/sscape_tests/scene_pytest/test_scene.py @@ -278,7 +278,7 @@ def test_deserialize_tracked_objects_uses_configured_attribute_singleton_type(): 'weight-sensor': SimpleNamespace(singleton_type='attribute') } scene.object_history_cache = {} - + scene._analytics_objects = {} objects = scene._deserializeTrackedObjects([ { 'id': 'object-1', @@ -303,7 +303,7 @@ def test_deserialize_tracked_objects_defaults_unknown_sensor_to_environmental(): scene = scene_module.Scene.__new__(scene_module.Scene) scene.sensors = {} scene.object_history_cache = {} - + scene._analytics_objects = {} objects = scene._deserializeTrackedObjects([ { 'id': 'object-1', @@ -330,7 +330,7 @@ def test_deserialize_tracked_objects_defaults_missing_singleton_type_to_environm 'sensor-without-type': SimpleNamespace(singleton_type=None) } scene.object_history_cache = {} - + scene._analytics_objects = {} objects = scene._deserializeTrackedObjects([ { 'id': 'object-1', diff --git a/tools/scenescape-start b/tools/scenescape-start index 8dd9a96d9..08d206201 100755 --- a/tools/scenescape-start +++ b/tools/scenescape-start @@ -134,7 +134,7 @@ if [[ "${IMAGE}" == *"scenescape-manager-test"* ]] ; then DCKARGS+=(-u $(id -u) --userns host -w ${APPDIR} -v ${HOSTDIR}/manager/secrets:/home/scenescape/SceneScape/manager/secrets -v ${HOSTDIR}/manager/secrets/django/secrets.py:/home/scenescape/SceneScape/manager/secrets.py ) fi docker run ${DCKARGS[@]} -v "${HOSTDIR}":${APPDIR} \ - -e http_proxy -e https_proxy -e no_proxy -e PROJECT="${DCKPROJECT}"\ + -e http_proxy -e https_proxy -e no_proxy -e PROJECT="${DCKPROJECT}" -e ANALYTICS \ -v ${DCKPROJECT}_vol-models:${MODELDIR} -v ${DCKPROJECT}_vol-sample-data:/home/scenescape/SceneScape/sample_data -e PYTHONPATH="${APPDIR}"\ -v ${HOSTDIR}/manager/secrets:/run/secrets \ ${IMAGE} ${ARGS[@]} $@