Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 75 additions & 67 deletions controller/src/controller/vdms_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ def sendQuery(self, query, blob=None):
for (item, response) in zip(query, query_response[0]):
query_type = next(iter(item))
response_data = response.get(query_type, {})
if not isinstance(response_data, dict):
log.debug(f"sendQuery: Non-dict payload for {query_type}: {response_data!r}")
response_data = {}
responses.append(response_data)
else:
log.warning(f"Failed to send query to VDMS container: {query}")
Expand All @@ -102,34 +105,11 @@ def connect(self, hostname=DEFAULT_HOSTNAME):
try:
self.db.connect(hostname)
if self.dimensions is not None:
expected_dimensions = int(self.dimensions)
expected_metric = str(self.similarity_metric).strip().upper()
with self._schema_lock:
schema_exists, schema_dimensions, schema_metric = self.findSchemaMetadata(self.set_name)
if schema_exists:
if schema_dimensions is None:
raise RuntimeError(
f"connect: VDMS descriptor set '{self.set_name}' exists but returned no dimensions. "
"Refusing to proceed; recreate the descriptor set to continue.")
if schema_metric is None:
raise RuntimeError(
f"connect: VDMS descriptor set '{self.set_name}' exists but returned no metric. "
"Refusing to proceed; recreate the descriptor set to continue.")
if str(schema_metric).strip().upper() != expected_metric:
raise RuntimeError(
f"connect: VDMS descriptor set '{self.set_name}' uses metric {schema_metric}, "
f"but controller is configured for {expected_metric}. "
"Refusing to proceed; recreate the descriptor set with matching metric.")
if schema_dimensions != expected_dimensions:
raise RuntimeError(
f"connect: VDMS descriptor set '{self.set_name}' uses {schema_dimensions} dimensions, "
f"but controller is configured for {expected_dimensions}. "
"Refusing to proceed; recreate the descriptor set with matching dimensions.")
else:
if not self.addSchema(self.set_name, self.similarity_metric, expected_dimensions):
log.warning("connect: Schema creation failed; _schema_ready left False")
return
self.dimensions = expected_dimensions
self.ensureSchemaInner(
int(self.dimensions),
str(self.similarity_metric).strip().upper(),
"connect")
self._schema_ready = True
except RuntimeError as e:
log.error(f"Failed to initialize VDMS schema: {e}")
Expand All @@ -155,55 +135,83 @@ def addSchema(self, set_name, similarity_metric, dimensions):
return False
return True

def ensureSchema(self, dimensions):
def ensureSchemaInner(self, requested_dimensions, expected_metric, caller):
"""
Initialize the VDMS descriptor set schema with the given dimensions.
Called lazily when the first ReID embedding is observed, removing the need
to pre-configure vector_dimensions before runtime.
Thread-safe; idempotent when called with consistent dimensions.

@param dimensions Number of float32 elements in each embedding vector
@raises ValueError If called with dimensions inconsistent with a previously
initialized schema
@raises RuntimeError If schema creation fails; schema remains unavailable until
schema is flushed and the controller is restarted
Core attempt-first schema setup shared by connect() and ensureSchema().
Avoids FindDescriptorSet on a missing set (triggers a VDMS v2.12 bug):
attempt AddDescriptorSet first; only probe with FindDescriptorSet when
AddDescriptorSet reports the set already exists.

@param requested_dimensions Number of dimensions for the descriptor set
@param expected_metric Similarity metric (e.g. 'L2', 'IP')
@param caller Name of the calling method for log messages
@raises RuntimeError On schema mismatch or unrecoverable VDMS error
@return True on success, False if VDMS gave no response
"""
response, _ = self.sendQuery([{
"AddDescriptorSet": {
"name": self.set_name,
Comment thread
saratpoluri marked this conversation as resolved.
"metric": expected_metric,
"dimensions": requested_dimensions
}
}])

if not response:
raise RuntimeError(
f"{caller}: No response from VDMS for descriptor set '{self.set_name}'.")

if response[0].get('status') == 0:
log.info(f"{caller}: Created descriptor set '{self.set_name}' "
f"({requested_dimensions}D, {expected_metric})")
self.dimensions = requested_dimensions
return

# Non-zero: set likely already exists — now safe to probe with FindDescriptorSet
log.debug(f"{caller}: AddDescriptorSet status={response[0].get('status')}; "
f"set may already exist, probing metadata.")
schema_exists, schema_dimensions, schema_metric = self.findSchemaMetadata(self.set_name)

if not schema_exists:
raise RuntimeError(
Comment thread
saratpoluri marked this conversation as resolved.
f"{caller}: AddDescriptorSet failed and set not found. "
f"Response: {response[0]}")
if schema_dimensions is None:
raise RuntimeError(
f"{caller}: '{self.set_name}' exists but returned no dimensions. "
"Recreate the descriptor set to continue.")
if schema_metric is None:
raise RuntimeError(
f"{caller}: '{self.set_name}' exists but returned no metric. "
"Recreate the descriptor set to continue.")
if str(schema_metric).strip().upper() != expected_metric:
raise RuntimeError(
f"{caller}: '{self.set_name}' uses metric {schema_metric}, "
f"expected {expected_metric}. "
"Recreate the descriptor set with matching metric.")
if schema_dimensions != requested_dimensions:
raise RuntimeError(
f"{caller}: '{self.set_name}' has {schema_dimensions} dimensions, "
f"expected {requested_dimensions}. "
"Recreate the descriptor set with matching dimensions.")

log.info(f"{caller}: Verified existing descriptor set '{self.set_name}' "
f"({schema_dimensions}D, {schema_metric})")
self.dimensions = requested_dimensions

def ensureSchema(self, dimensions):
with self._schema_lock:
requested_dimensions = int(dimensions)
expected_metric = str(self.similarity_metric).strip().upper()
if self._schema_ready:
if int(self.dimensions) != requested_dimensions:
raise ValueError(
f"ReID schema already initialized with {self.dimensions} dimensions; "
f"incoming vector has {requested_dimensions} dimensions. "
f"Restart the controller and flush the VDMS descriptor set to change dimensions.")
"Restart the controller and flush the VDMS descriptor set to change dimensions.")
return
schema_exists, schema_dimensions, schema_metric = self.findSchemaMetadata(self.set_name)
if schema_exists:
if schema_dimensions is None:
raise RuntimeError(
f"ensureSchema: VDMS descriptor set '{self.set_name}' exists but dimensions were not returned. "
"Refusing to proceed; recreate the descriptor set to continue.")
if schema_metric is None:
raise RuntimeError(
f"ensureSchema: VDMS descriptor set '{self.set_name}' exists but metric was not returned. "
"Refusing to proceed; recreate the descriptor set to continue.")
if str(schema_metric).strip().upper() != expected_metric:
raise RuntimeError(
f"ensureSchema: VDMS descriptor set '{self.set_name}' uses metric {schema_metric}, "
f"but controller is configured for {expected_metric}. "
"Refusing to proceed; recreate the descriptor set with matching metric.")
if schema_dimensions != requested_dimensions:
raise RuntimeError(
f"ensureSchema: VDMS descriptor set '{self.set_name}' uses {schema_dimensions} dimensions, "
f"but incoming vectors use {requested_dimensions}. "
"Refusing to proceed; recreate the descriptor set with matching dimensions.")
else:
if not self.addSchema(self.set_name, self.similarity_metric, requested_dimensions):
raise RuntimeError(
f"ensureSchema: Failed to create VDMS descriptor set '{self.set_name}'; "
"schema not confirmed. ReID writes will be skipped until schema is available.")
self.dimensions = requested_dimensions
self.ensureSchemaInner(
requested_dimensions,
str(self.similarity_metric).strip().upper(),
"ensureSchema")
self._schema_ready = True

def addEntry(self, uuid, rvid, object_type, reid_vectors, set_name=SCHEMA_NAME, **metadata):
Expand Down Expand Up @@ -304,7 +312,7 @@ def findSchemaMetadata(self, set_name):
if not response:
return False, None, None
first_response = response[0]
if first_response.get('status') != 0 or first_response.get('returned', 0) <= 0:
if not first_response or first_response.get('status') != 0 or first_response.get('returned', 0) <= 0:
return False, None, None

schema_dimensions = self._extractSchemaDimensions(first_response)
Expand Down
48 changes: 45 additions & 3 deletions tests/functional/common_scene_obj.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,24 @@ def eventReceived(self, pahoClient, userdata, message):
self.verifyRegionEvent(regionData)
return

def regionDataReceived(self, pahoClient, userdata, message):
region_data = json.loads(message.payload.decode("utf-8"))

if getattr(self, "roi_deleted", False):
self.message_received_after_delete = True
print("Region data received after ROI deletion (unexpected)")
return

objects = region_data.get('objects', [])
if not objects:
return

self.regionDataNonEmptyCount += 1
if self.object_in_region:
region_ts = get_epoch_time(region_data['timestamp'])
self.regionDataTimestampsWhileInside.append(region_ts)
return

def verifyRegionEvent(self, regionEvent):
self.entered = False
self.exited = False
Expand All @@ -70,6 +88,7 @@ def verifyRegionEvent(self, regionEvent):
self.expectedExit.append(event['id'])
self.expectedEnter.remove(event['id'])
self.entered = True
self.object_in_region = True
# Track entry time for dwell verification
if event['id'] not in self.objectEntryTimes:
self.objectEntryTimes[event['id']] = get_epoch_time()
Expand All @@ -81,9 +100,19 @@ def verifyRegionEvent(self, regionEvent):
if event['object']['id'] in self.expectedExit:
self.expectedExit.remove(event['object']['id'])
self.exited = True
self.object_in_region = False
print("object with id {} exited region\n".format(event['object']['id']))
return

def verifyRegionDataUpdates(self):
"""Verify that DATA_REGION updates continue while object remains in region."""
assert self.regionDataNonEmptyCount > 0, "Expected at least one non-empty DATA_REGION message"
assert len(self.regionDataTimestampsWhileInside) >= 2, \
"Expected repeated DATA_REGION updates while object is inside region"
assert self.regionDataTimestampsWhileInside[-1] > self.regionDataTimestampsWhileInside[0], \
"Expected DATA_REGION timestamps to advance across in-region updates"
return
Comment thread
saratpoluri marked this conversation as resolved.

def isWithinRectangle(self, bl, tr, curr_point):
if (curr_point[0] > bl[0] and curr_point[0] < tr[0] and \
curr_point[1] > bl[1] and curr_point[1] < tr[1]):
Expand All @@ -100,6 +129,10 @@ def setupROI(self, roiData):
region_id=self.roi_uid, region_type=REGION)
self.pubsub.addCallback(topic, self.eventReceived)

data_topic = PubSub.formatTopic(PubSub.DATA_REGION, scene_id=self.sceneUID,
region_id=self.roi_uid, thing_type=PERSON)
self.pubsub.addCallback(data_topic, self.regionDataReceived)


assert res['points']
return res['points']
Expand Down Expand Up @@ -132,6 +165,9 @@ def runSceneObjMqttInitialize(self):
self.roiPoints = ((0.9, 4.0), (0.9, 2.4),
(8.1, 2.4), (8.1, 4.0))
self.message_received_after_delete = False
self.object_in_region = False
self.regionDataNonEmptyCount = 0
self.regionDataTimestampsWhileInside = []
self.objectEntryTimes = {} # Track when each object entered region for dwell verification
self.previousDwellTimes = {} # Track previous dwell times to verify monotonic increase
if self.testName and self.recordXMLAttribute:
Expand Down Expand Up @@ -234,9 +270,15 @@ def runROIMqttDelete(self):
return

def runROIMqttVerifyPassed(self):
return self.exited and self.entered == False \
and len(self.expectedExit) == 0 \
and len(self.expectedEnter) == 0
if not (
self.exited and self.entered == False
and len(self.expectedExit) == 0
and len(self.expectedEnter) == 0
):
return False

self.verifyRegionDataUpdates()
return True

def runROIMqttVerifyNoEventsAfterDelete(self):

Expand Down
6 changes: 6 additions & 0 deletions tests/functional/tc_mqtt_sensor_roi.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,16 @@ def handleRegionData(self, region_data):
if self.isWithinRectangle(self.roiPoints[1], self.roiPoints[3], (current_point[0], current_point[1])):
self.entered = True
self.enteredDetected = True
self.object_in_region = True # Critical: update flag for regionDataReceived callback
print('object entered region')
if self.enteredTimestamp is None:
self.enteredTimestamp = region_message_ts

# Check for exit events to clear object_in_region flag
if 'exited' in region_data and len(region_data['exited']) > 0:
self.object_in_region = False # Critical: update flag when object exits
print('object exited region')

Comment thread
saratpoluri marked this conversation as resolved.
Outdated
if self.entered and len(self.sensorHistory) > 0:
start_idx, end_idx = self.findSensorIndexes(
self.enteredTimestamp, region_message_ts, self.exitedTimestamp)
Expand Down
Loading
Loading