-
Notifications
You must be signed in to change notification settings - Fork 39
Expand file tree
/
Copy pathtc_mqtt_sensor_roi.py
More file actions
322 lines (277 loc) · 10.6 KB
/
tc_mqtt_sensor_roi.py
File metadata and controls
322 lines (277 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
#!/usr/bin/env python3
# SPDX-FileCopyrightText: (C) 2022 - 2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
import json
import time
import os
from http import HTTPStatus
from scene_common.mqtt import PubSub
from scene_common.timestamp import get_iso_time, get_epoch_time
from tests.functional.common_scene_obj import SceneObjectMqtt
TEST_NAME = "NEX-T10460"
SENSOR_DELAY = 0.5
SENSOR_PROC_DELAY = 0.001
SENSOR_NAME = "TestSensor1"
PERSON = "person"
REGION = "region"
FRAME_RATE = 10
MAX_DELAYS = 100
class SensorMqttRoi(SceneObjectMqtt):
def __init__(self, testName, request, sensor_delay, recordXMLAttribute):
super().__init__(testName, request, recordXMLAttribute)
self.sensorHistory = []
self.sensorDelay = sensor_delay
self.foundValid = 0
self.sensorValue = 100
self.missedValues = 0
self.errorInSensor = False
self.checkedValues = 0
self.checkedEntered = 0
self.checkedExited = 0
self.enteredDetected = True
self.exitedDetected = True
self.exitedTimestamp = None
self.enteredTimestamp = None
return
def createSensor(self, sensorData):
res = self.rest.createSensor(sensorData)
assert res.statusCode == HTTPStatus.CREATED, (res.statusCode, res.errors)
return
def runSceneObjMqttPrepareExtra(self):
topic = PubSub.formatTopic(PubSub.DATA_SENSOR, sensor_id=self.roiName)
self.pubsub.addCallback(topic, self.sensorDataReceived)
sensor = {
'scene': self.sceneUID,
'name': self.roiName,
'area': "poly",
'points': self.roiPoints
}
self.createSensor(sensor)
time.sleep(1)
assert self.pushSensorValue(self.roiName, self.sensorValue)
time.sleep(3)
return
def runSceneObjMqttVerifyPassedExtra(self):
print("Verifying test parameters")
assert not self.errorInSensor
assert self.enteredDetected
assert self.exitedDetected
assert self.foundValid > 0
assert self.checkedEntered > 0
assert self.checkedExited > 0
assert self.checkedValues > 0
# Verify dwell window calculation is sensible
self.verifyDwellWindowExists()
return True
def verifyDwellWindowExists(self):
"""Verify dwell window metadata exists for integration validation.
This validates integration aspects only - that entry/exit times were properly
captured. The actual dwell calculation formulas are already tested by unit tests.
"""
if self.enteredTimestamp is None:
return # No entry detected, nothing to verify
if self.exitedTimestamp is not None:
dwell_window = self.exitedTimestamp - self.enteredTimestamp
assert dwell_window >= 0, f"Invalid dwell window: exit before entry"
print(f"Dwell window validated: {dwell_window:.2f}s from entry to exit")
else:
print("Object still in region - no exit timestamp to validate")
return
def sendDetections(self, objLocation, frame_rate):
jdata = self.objData()
start_time = get_epoch_time()
for location in objLocation:
now = time.time()
camera_id = jdata['id']
jdata['timestamp'] = get_iso_time(now)
jdata['objects'][PERSON][0]['bounding_box']['y'] = location
detection = json.dumps(jdata)
self.pubsub.publish(
PubSub.formatTopic(PubSub.DATA_CAMERA, camera_id=camera_id),
detection
)
time.sleep(1 / frame_rate)
if now - start_time > self.sensorDelay:
start_time = now
self.sensorValue += 1
assert self.pushSensorValue(self.roiName, self.sensorValue)
time.sleep(SENSOR_PROC_DELAY)
return
def eventReceived(self, pahoClient, userdata, message):
region_data = json.loads(message.payload.decode("utf-8"))
if len(region_data['objects']):
self.handleRegionData(region_data)
return
def regulatedReceived(self, pahoClient, userdata, message):
if self.entered:
scene_data = json.loads(message.payload.decode("utf-8"))
for obj in scene_data['objects']:
current_point = obj['translation']
scene_message_ts = get_epoch_time(scene_data['timestamp'])
if not self.isWithinRectangle(
self.roiPoints[1], self.roiPoints[3],
(current_point[0], current_point[1])
):
self.exited = True
self.entered = False
self.exitedDetected = True
self.exitedTimestamp = scene_message_ts
print('object exited region')
self.handleSceneSensorData(obj, scene_message_ts, self.exitedTimestamp)
if self.exited:
self.exitedTimestamp = None
self.enteredTimestamp = None
if self.errorInSensor:
break
return
def sensorDataReceived(self, pahoClient, userdata, message):
sensor_data = json.loads(message.payload.decode("utf-8"))
self.sensorHistory.append(sensor_data)
return
def pushSensorValue(self, sensor_name, value):
message_dict = {
'timestamp': get_iso_time(),
'id': sensor_name,
'value': value
}
# Publish the message to the sensor topic
result = self.pubsub.publish(
PubSub.formatTopic(PubSub.DATA_SENSOR, sensor_id=sensor_name),
json.dumps(message_dict)
)
error_code = result[0]
if error_code != 0:
print(f"Failed to send sensor {sensor_name} value!")
print(result.is_published())
return error_code == 0
def runROIMqtt(self):
self.exitCode = 1
self.runSceneObjMqttInitialize()
try:
self.runSceneObjMqttPrepare()
self.runSceneObjMqttPrepareExtra()
self.runROIMqttExecute()
passed = self.runROIMqttVerifyPassed()
passed_extra = self.runSceneObjMqttVerifyPassedExtra()
if (passed and passed_extra):
self.exitCode = 0
finally:
self.runSceneObjMqttFinally()
return
def handleEnteredExitedObjects(self, object_list, sensor_history_list):
found_error = False
for obj in object_list:
if not self.findAllSensorsInRange(obj, sensor_history_list):
found_error = True
break
return found_error is False
def handleRegionData(self, region_data):
if not 'objects' in region_data:
print("No objects in region!")
current_point = region_data['objects'][0]['translation']
region_message_ts = get_epoch_time(region_data['timestamp'])
if self.isWithinRectangle(self.roiPoints[1], self.roiPoints[3], (current_point[0], current_point[1])):
self.entered = True
self.enteredDetected = True
self.object_in_region = True # Critical: update flag for regionDataReceived callback
print('object entered region')
if self.enteredTimestamp is None:
self.enteredTimestamp = region_message_ts
# Check for exit events to clear object_in_region flag
if 'exited' in region_data and len(region_data['exited']) > 0:
self.object_in_region = False # Critical: update flag when object exits
print('object exited region')
if self.entered and len(self.sensorHistory) > 0:
start_idx, end_idx = self.findSensorIndexes(
self.enteredTimestamp, region_message_ts, self.exitedTimestamp)
if not self.handleEnteredExitedObjects(region_data['entered'],
self.sensorHistory[start_idx:end_idx]):
print("Found error in 'entered' objects!")
self.errorInSensor = True
else:
self.checkedEntered += 1
if not self.handleEnteredExitedObjects(region_data['exited'],
self.sensorHistory[start_idx:end_idx]):
print("Found error in 'exited' objects!")
self.errorInSensor = True
else:
self.checkedExited += 1
return
def findAllSensorsInRange(self, obj, sensor_list):
found_all = True
for cur_sensor in sensor_list:
found = self.findSensorInObj(obj, cur_sensor, self.roiName)
if not found:
print("Warning: failed to find expected sensor value {} (TS {})".format(
cur_sensor['value'], cur_sensor['timestamp']))
found_all = False
else:
self.foundValid += 1
return found_all
def findSensorInObj(self, obj, sensor_entry, sensor_name):
found = False
expected_sensor_ts = get_epoch_time(sensor_entry['timestamp'])
expected_sensor_value = sensor_entry['value']
if not 'sensors' in obj:
print("Object missing sensor data {}".format(obj))
return False
sensor_payload = obj['sensors'].get(sensor_name)
if sensor_payload is None:
print("Object missing expected sensor '{}' data {}".format(sensor_name, obj))
return False
if isinstance(sensor_payload, dict):
sensor_values = sensor_payload.get('values', [])
else:
sensor_values = sensor_payload
for sensor_info in sensor_values:
if get_epoch_time(sensor_info[0]) == expected_sensor_ts \
and sensor_info[1] == expected_sensor_value:
found = True
break
return found
def handleSceneSensorData(self, obj, scene_message_ts, exited_timestamp):
if self.enteredTimestamp is None:
return
if self.entered and len(self.sensorHistory) > 0:
start_idx, end_idx = self.findSensorIndexes(
self.enteredTimestamp, scene_message_ts, exited_timestamp)
found_all = self.findAllSensorsInRange(obj, self.sensorHistory[start_idx:end_idx])
if found_all:
self.missedValues = 0
else:
# Sometimes the scene controller hasn't updated the last sensor value,
# but it will on subsequent messages, so allow to check it later
if self.missedValues:
self.errorInSensor = True
print("Had previously Failed to find some expected sensor values!")
else:
self.missedValues += 1
self.checkedValues += end_idx - start_idx
return
def findSensorIndexes(self, entered_ts, cur_scene_ts, exited_ts):
global SENSOR_PROC_DELAY
start_idx = 0
end_idx = len(self.sensorHistory) - 1
for cur_idx, sensor in enumerate(self.sensorHistory):
cur_sensor_ts = get_epoch_time(sensor['timestamp'])
if (cur_sensor_ts - SENSOR_PROC_DELAY) <= entered_ts:
start_idx = cur_idx
if exited_ts is not None:
# Give the scene controller a grace period to process the sensor sensor data.
if (cur_sensor_ts - SENSOR_PROC_DELAY) < exited_ts:
end_idx = max(0, cur_idx - 1)
else:
if (cur_sensor_ts - SENSOR_PROC_DELAY) < cur_scene_ts:
end_idx = cur_idx
if end_idx == start_idx:
end_idx += 1
return start_idx, end_idx
def test_sensor_roi_mqtt(request, record_xml_attribute):
test = SensorMqttRoi(TEST_NAME, request, SENSOR_DELAY, record_xml_attribute)
test.runROIMqtt()
assert test.exitCode == 0
return test.exitCode
def main():
return test_sensor_roi_mqtt(None, None)
if __name__ == '__main__':
os._exit(main() or 0)