Skip to content

Commit ea230a7

Browse files
authored
ITEP-89829 - Automate tests for scene retrack functionality (#1278)
1 parent 86eec27 commit ea230a7

5 files changed

Lines changed: 946 additions & 1 deletion

File tree

manager/src/manager/serializers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ def validate(self, attrs):
560560
raise serializers.ValidationError({'body': ['Request body is required.']})
561561

562562
name = attrs.get('name', None)
563-
if not name or not name.strip():
563+
if (name is None and not self.partial) or (name is not None and not name.strip()):
564564
raise serializers.ValidationError({'name': ['This field is required.']})
565565

566566
allowed = set(self.fields.keys()) | {

tests/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ _functional-tests: \
168168
reid-semantic-unique-count \
169169
reid-unique-count \
170170
rest-test \
171+
retrack-object \
171172
scene-details \
172173
scene-import \
173174
scene-import-json \

tests/Makefile.functional

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,3 +410,13 @@ child-event-propagation: # NEX-T10530
410410
$(call common-recipe, $(COMPOSE_FILES), tests/functional/tc_child_scene_event_propagation.py, 'pgserver web scene', true, /run/secrets/controller.auth)
411411
$(eval SECRETSDIR := $(OLDSECRETSDIR))
412412
$(eval override BASE_IMAGE := $(IMAGE_OLD))
413+
414+
retrack-object:
415+
$(eval IMAGE_OLD := $(BASE_IMAGE))
416+
$(eval override BASE_IMAGE := $(IMAGE)-controller-test)
417+
$(eval COMPOSE_FILES := $(COMPOSE)/dlstreamer/broker.yml:$(COMPOSE)/ntp.yml:$(COMPOSE)/pgserver.yml:$(COMPOSE)/scene.yml:$(COMPOSE)/web.yml)
418+
$(eval OLDSECRETSDIR := $(SECRETSDIR))
419+
$(eval SECRETSDIR := $(PWD)/manager/secrets)
420+
$(call common-recipe, $(COMPOSE_FILES), tests/functional/tc_scene_retrack.py, 'pgserver web scene', true, /run/secrets/controller.auth)
421+
$(eval override BASE_IMAGE := $(IMAGE_OLD))
422+
$(eval SECRETSDIR := $(OLDSECRETSDIR))

tests/functional/common_retrack.py

Lines changed: 348 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,348 @@
1+
#!/usr/bin/env python3
2+
3+
# SPDX-FileCopyrightText: (C) 2026 Intel Corporation
4+
# SPDX-License-Identifier: Apache-2.0
5+
6+
import copy
7+
import json
8+
import math
9+
import threading
10+
import time
11+
12+
from scene_common.mqtt import PubSub
13+
from scene_common import log
14+
from scene_common.timestamp import get_iso_time
15+
16+
17+
class RetrackTest:
18+
19+
FRAME_RATE = 10
20+
MAX_WAIT = 5
21+
NUM_PUBLISH_ITERATIONS = 5
22+
23+
def __init__(self, params):
24+
"""! Initialise an empty helper bound to the given connection parameters.
25+
26+
@param params Dict of functional-test connection parameters from
27+
the conftest fixture.
28+
"""
29+
self.params = params
30+
self.parent_id = None
31+
self.child_id = None
32+
self._lock = threading.Lock()
33+
self.parent_received = []
34+
self.child_received = []
35+
36+
def on_message(self, mqttc, obj, msg):
37+
"""! Default onMessage callback, routes regulated messages into
38+
parent_received or child_received based on scene_id.
39+
40+
@param mqttc MQTT client object.
41+
@param obj Private user data (unused).
42+
@param msg MQTTMessage instance.
43+
"""
44+
topic = PubSub.parseTopic(msg.topic)
45+
if topic is None:
46+
return
47+
try:
48+
data = json.loads(msg.payload.decode("utf-8"))
49+
except (json.JSONDecodeError, UnicodeDecodeError) as exc:
50+
log.warning(f"Failed to decode MQTT payload on {msg.topic}: {exc}")
51+
return
52+
obj_count = len(data.get('objects', []))
53+
if obj_count == 0:
54+
return
55+
with self._lock:
56+
if topic.get('scene_id') == self.parent_id:
57+
log.info(f"Parent regulated: {obj_count} objects")
58+
self.parent_received.append(data)
59+
elif topic.get('scene_id') == self.child_id:
60+
log.info(f"Child regulated: {obj_count} objects")
61+
self.child_received.append(data)
62+
63+
def setup_scenes(self, rest_client):
64+
"""! Create a fresh parent scene and link the existing Demo scene as
65+
child with retrack=True (default).
66+
67+
@param rest_client An authenticated RESTClient instance.
68+
"""
69+
parent_scene = rest_client.createScene({'name': "retrack_parent"})
70+
assert parent_scene.statusCode == 201, \
71+
f"Failed to create parent scene: {parent_scene.statusCode}"
72+
self.parent_id = parent_scene['uid']
73+
log.info(f"Created parent scene: {self.parent_id}")
74+
75+
scenes = rest_client.getScenes({'name': 'Demo'})
76+
assert scenes['count'] > 0, "Demo scene not found – required for retrack tests"
77+
child_scene = scenes['results'][0]
78+
self.child_id = child_scene['uid']
79+
log.info(f"Using Demo as child scene: {self.child_id}")
80+
81+
res = rest_client.updateScene(self.child_id, {'parent': self.parent_id})
82+
assert res.statusCode == 200, \
83+
f"Failed to link child to parent: {res.statusCode}"
84+
85+
child_links = rest_client.getChildScene({'parent': self.parent_id})
86+
assert child_links.statusCode == 200 and child_links['count'] == 1, \
87+
"Child-parent link not found after linking"
88+
89+
def teardown_scenes(self, rest_client):
90+
"""! Unlink the child scene and delete the parent scene created for
91+
the test. The Demo child scene is a fixture and is never deleted.
92+
93+
@param rest_client An authenticated RESTClient instance.
94+
"""
95+
if self.child_id and self.parent_id:
96+
try:
97+
res = rest_client.deleteChildSceneLink(self.child_id)
98+
errors = getattr(res, 'errors', None)
99+
if res.statusCode in (200, 204):
100+
log.info(f"[TEARDOWN] Unlinked child uid={self.child_id}: {res.statusCode}")
101+
else:
102+
log.error(
103+
f"[TEARDOWN] Failed to unlink child uid={self.child_id}: "
104+
f"status={res.statusCode}, errors={errors}"
105+
)
106+
except Exception as exc:
107+
log.error(f"[TEARDOWN] Exception unlinking child uid={self.child_id}: {exc}")
108+
if self.parent_id:
109+
try:
110+
res = rest_client.deleteScene(self.parent_id)
111+
errors = getattr(res, 'errors', None)
112+
if res.statusCode in (200, 204):
113+
log.info(f"[TEARDOWN] Deleted parent scene uid={self.parent_id}: {res.statusCode}")
114+
else:
115+
log.error(
116+
f"[TEARDOWN] Failed to delete parent scene uid={self.parent_id}: "
117+
f"status={res.statusCode}, errors={errors}"
118+
)
119+
except Exception as exc:
120+
log.error(f"[TEARDOWN] Exception deleting parent scene uid={self.parent_id}: {exc}")
121+
122+
def _await_db_notification(self, rest_fn):
123+
"""! Subscribe to CMD_DATABASE, call rest_fn(), then assert the
124+
notification arrives confirming the controller loaded the change.
125+
126+
@param rest_fn Zero-argument callable that performs the REST update.
127+
"""
128+
db_received = threading.Event()
129+
subscribed = threading.Event()
130+
db_topic = PubSub.formatTopic(PubSub.CMD_DATABASE)
131+
132+
def _on_db(mqttc, obj, msg):
133+
db_received.set()
134+
135+
def _on_connected(mqttc, obj, flags, rc):
136+
if rc == 0:
137+
mqttc.addCallback(db_topic, _on_db)
138+
139+
def _on_subscribed(mqttc, obj, mid, granted_qos):
140+
subscribed.set()
141+
142+
tmp = PubSub(self.params["auth"], None, self.params["rootcert"],
143+
self.params["broker_url"], self.params["broker_port"])
144+
tmp.onConnect = _on_connected
145+
tmp.onSubscribe = _on_subscribed
146+
tmp.connect()
147+
tmp.loopStart()
148+
assert subscribed.wait(self.MAX_WAIT), \
149+
"Temporary MQTT client failed to subscribe to CMD_DATABASE within timeout"
150+
try:
151+
rest_fn()
152+
assert db_received.wait(self.MAX_WAIT), \
153+
"Timed out waiting for CMD_DATABASE notification"
154+
finally:
155+
tmp.loopStop()
156+
157+
def set_retrack(self, rest_client, value):
158+
"""! Update the retrack flag on the child scene link and wait for the
159+
CMD_DATABASE notification confirming the controller has loaded the change.
160+
161+
@param rest_client An authenticated RESTClient instance.
162+
@param value Boolean value for the retrack field.
163+
"""
164+
def _update():
165+
res = rest_client.updateChildScene(self.child_id, {'retrack': value})
166+
assert res.statusCode == 200, \
167+
f"Failed to set retrack={value}: {res.statusCode}"
168+
log.info(f"Set retrack={value} on child scene {self.child_id}")
169+
verify = rest_client.getChildScene({'parent': self.parent_id})
170+
assert verify.statusCode == 200, \
171+
f"Failed to read back child scene link after setting retrack={value}"
172+
assert verify['count'] > 0, \
173+
f"No child scene link found when verifying retrack={value} (parent={self.parent_id})"
174+
actual = verify['results'][0]['retrack']
175+
log.info(f"Verify child link retrack value: {actual}")
176+
assert actual == value, \
177+
f"retrack mismatch: expected {value}, got {actual}"
178+
self._await_db_notification(_update)
179+
180+
def set_external_rate(self, rest_client, rate):
181+
"""! Update external_update_rate on the child scene and wait for the
182+
CMD_DATABASE notification confirming the controller has loaded the change.
183+
184+
@param rest_client An authenticated RESTClient instance.
185+
@param rate Float Hz value for external_update_rate.
186+
"""
187+
def _update():
188+
res = rest_client.updateScene(self.child_id, {'external_update_rate': rate})
189+
assert res.statusCode == 200, \
190+
f"Failed to set external_update_rate={rate}: {res.statusCode}"
191+
log.info(f"Set external_update_rate={rate} on scene {self.child_id}")
192+
self._await_db_notification(_update)
193+
194+
def make_client(self, topics=None, on_msg=None):
195+
"""! Create and start an MQTT PubSub client, subscribe to *topics* on
196+
connect, and block until the broker confirms connection.
197+
198+
Defaults to subscribing to DATA_REGULATED for both parent and child
199+
scenes with self.on_message as the callback when omitted.
200+
201+
@param topics List of MQTT topic strings. Defaults to
202+
DATA_REGULATED for parent_id and child_id.
203+
@param on_msg onMessage callback. Defaults to self.on_message.
204+
@return Connected PubSub instance.
205+
"""
206+
if topics is None:
207+
topics = [
208+
PubSub.formatTopic(PubSub.DATA_REGULATED, scene_id=self.parent_id),
209+
PubSub.formatTopic(PubSub.DATA_REGULATED, scene_id=self.child_id),
210+
]
211+
if on_msg is None:
212+
on_msg = self.on_message
213+
connected_event = threading.Event()
214+
215+
def _on_connect(mqttc, obj, flags, rc):
216+
if rc == 0:
217+
for t in topics:
218+
mqttc.subscribe(t)
219+
log.info(f"Subscribed: {t}")
220+
connected_event.set()
221+
222+
client = PubSub(self.params["auth"], None, self.params["rootcert"],
223+
self.params["broker_url"], self.params["broker_port"])
224+
client.onConnect = _on_connect
225+
client.onMessage = on_msg
226+
client.connect()
227+
client.loopStart()
228+
assert connected_event.wait(self.MAX_WAIT), \
229+
"MQTT client failed to connect within timeout"
230+
return client
231+
232+
def wait_for_messages(self, timeout=None, require_parent=True, require_child=True):
233+
"""! Block until at least one message with objects has arrived on the
234+
expected topics, or timeout expires.
235+
236+
@param timeout Maximum seconds to wait. Defaults to MAX_WAIT.
237+
@param require_parent Assert that parent received objects if True.
238+
@param require_child Assert that child received objects if True.
239+
"""
240+
if timeout is None:
241+
timeout = self.MAX_WAIT
242+
start = time.time()
243+
while time.time() - start < timeout:
244+
with self._lock:
245+
parent_ok = (not require_parent) or len(self.parent_received) > 0
246+
child_ok = (not require_child) or len(self.child_received) > 0
247+
if parent_ok and child_ok:
248+
return
249+
time.sleep(0.5)
250+
with self._lock:
251+
parent_count = len(self.parent_received)
252+
child_count = len(self.child_received)
253+
if require_parent:
254+
assert parent_count > 0, \
255+
f"Timed out after {timeout}s: no objects on parent regulated topic"
256+
if require_child:
257+
assert child_count > 0, \
258+
f"Timed out after {timeout}s: no objects on child regulated topic"
259+
260+
def reset(self):
261+
"""! Clear both accumulators atomically under the lock."""
262+
with self._lock:
263+
self.parent_received.clear()
264+
self.child_received.clear()
265+
266+
def snapshot_received(self):
267+
"""! Return a frozen (parent_list, child_list) copy under the lock.
268+
269+
@return Tuple of (list, list) – point-in-time snapshots of
270+
parent_received and child_received.
271+
"""
272+
with self._lock:
273+
return list(self.parent_received), list(self.child_received)
274+
275+
@staticmethod
276+
def collect_object_ids(messages):
277+
"""! Return the set of object id values from a list of regulated messages.
278+
279+
@param messages List of decoded regulated-data message dicts.
280+
@return Set of id strings found in 'objects' lists.
281+
"""
282+
ids = set()
283+
for msg in messages:
284+
for obj in msg.get('objects', []):
285+
if 'id' in obj:
286+
ids.add(obj['id'])
287+
return ids
288+
289+
@staticmethod
290+
def publish_data(obj_data, client, obj_category="person"):
291+
"""! Publish simulated object detection data to a camera's MQTT topic.
292+
293+
@param obj_data The object data fixture containing camera id and objects.
294+
@param client The MQTT PubSub client.
295+
@param obj_category The object category to publish (default: "person").
296+
"""
297+
obj_data = copy.deepcopy(obj_data)
298+
cam_id = obj_data["id"]
299+
topic = PubSub.formatTopic(PubSub.DATA_CAMERA, camera_id=cam_id)
300+
for iteration in range(RetrackTest.NUM_PUBLISH_ITERATIONS):
301+
for i in range(5):
302+
obj_data["timestamp"] = get_iso_time()
303+
obj_data["objects"][obj_category][0]["bounding_box"]["y"] = 100 + (i * 20)
304+
obj_data["objects"][obj_category][0]["category"] = obj_category
305+
client.publish(topic, json.dumps(obj_data))
306+
log.info(
307+
f"Published object via camera {cam_id}: y={100 + (i * 20)} "
308+
f"(iter {iteration})")
309+
time.sleep(1.0 / RetrackTest.FRAME_RATE)
310+
311+
@staticmethod
312+
def publish_timed(obj_data, client, rate, duration):
313+
"""! Publish camera detections at *rate* Hz for *duration* seconds.
314+
315+
@param obj_data The object data fixture containing camera id and objects.
316+
@param client The MQTT PubSub client.
317+
@param rate Publish rate in Hz.
318+
@param duration Duration in seconds.
319+
"""
320+
obj_data = copy.deepcopy(obj_data)
321+
cam_id = obj_data["id"]
322+
topic = PubSub.formatTopic(PubSub.DATA_CAMERA, camera_id=cam_id)
323+
end = time.time() + duration
324+
i = 0
325+
while time.time() < end:
326+
obj_data["timestamp"] = get_iso_time()
327+
obj_data["objects"]["person"][0]["bounding_box"]["y"] = 100 + (i % 5) * 20
328+
obj_data["objects"]["person"][0]["category"] = "person"
329+
client.publish(topic, json.dumps(obj_data))
330+
time.sleep(1.0 / rate)
331+
i += 1
332+
333+
@staticmethod
334+
def assert_valid_translation(tr, label):
335+
"""! Assert that *tr* is a list of exactly three finite numeric values.
336+
337+
@param tr The translation value to validate.
338+
@param label Human-readable label used in assertion messages.
339+
"""
340+
assert isinstance(tr, (list, tuple)), \
341+
f"{label} 'translation' must be a list or tuple, got {type(tr).__name__}: {tr!r}"
342+
assert len(tr) == 3, \
343+
f"{label} 'translation' must have 3 elements, got {len(tr)}"
344+
for v in tr:
345+
assert isinstance(v, (int, float)), \
346+
f"{label} translation element not numeric: {v}"
347+
assert math.isfinite(v), \
348+
f"{label} translation element is not finite (NaN/Inf): {v}"

0 commit comments

Comments
 (0)