Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 47 additions & 29 deletions controller/src/controller/scene.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from scene_common import log
from scene_common.camera import Camera
from scene_common.earth_lla import convertLLAToECEF, calculateTRSLocal2LLAFromSurfacePoints
from scene_common.geometry import Line, Point, Region, Tripwire
from scene_common.geometry import Line, Point, Region, Tripwire, get_region_events, get_tripwire_events
from scene_common.scene_model import SceneModel
from scene_common.timestamp import get_epoch_time, get_iso_time
from scene_common.transform import CameraPose
Expand Down Expand Up @@ -570,26 +570,33 @@ def _updateEvents(self, detectionType, now, curObjects=None):
return

def _updateTripwireEvents(self, detectionType, now, curObjects):
for key in self.tripwires:
tripwire = self.tripwires[key]
tripwireObjects = tripwire.objects.get(detectionType, [])
objects = []
for obj in curObjects:
age = now - obj.when
# When tracker is disabled, skip the frameCount check and consider all objects;
# otherwise, only consider objects with frameCount > 3 as reliable.
if (obj.frameCount > 3 or ControllerMode.isAnalyticsOnly()) \
and len(obj.chain_data.publishedLocations) > 1:
d = tripwire.lineCrosses(Line(obj.chain_data.publishedLocations[0].as2Dxy,
obj.chain_data.publishedLocations[1].as2Dxy))
if d != 0:
event = TripwireEvent(obj, -d)
objects.append(event)

if len(tripwireObjects) != len(objects) \
# Filter to reliable objects with enough location history for crossing detection.
# When tracker is disabled, skip the frameCount check and consider all objects;
# otherwise, only consider objects with frameCount > 3 as reliable.
reliable_objects = [
obj for obj in curObjects
if (obj.frameCount > 3 or ControllerMode.isAnalyticsOnly())
and len(obj.chain_data.publishedLocations) > 1
]

object_locations = [
obj.chain_data.publishedLocations[:2] for obj in reliable_objects
]

crossing_events = get_tripwire_events(self.tripwires, object_locations)

for key, tripwire in self.tripwires.items():
event_matches = crossing_events.get(key, [])
previous_objects = tripwire.objects.get(detectionType, [])
crossed_objects = [
TripwireEvent(reliable_objects[obj_idx], direction)
for obj_idx, direction in event_matches
]

if len(previous_objects) != len(crossed_objects) \
and now - tripwire.when > DEBOUNCE_DELAY:
log.debug("TRIPWIRE EVENT", tripwireObjects, len(objects))
tripwire.objects[detectionType] = objects
log.debug("TRIPWIRE EVENT", previous_objects, len(crossed_objects))
tripwire.objects[detectionType] = crossed_objects
tripwire.when = now
if 'objects' not in self.events:
self.events['objects'] = []
Expand All @@ -598,16 +605,27 @@ def _updateTripwireEvents(self, detectionType, now, curObjects):

def _updateRegionEvents(self, detectionType, regions, now, now_str, curObjects):
updated = set()
for key in regions:
region = regions[key]

# Filter to reliable objects.
# When tracker is disabled, skip the frameCount check and consider all objects;
# otherwise, only consider objects with frameCount > 3 as reliable.
reliable_objects = [
obj for obj in curObjects
if obj.frameCount > 3 or ControllerMode.isAnalyticsOnly()
]

object_locations = [obj.sceneLoc for obj in reliable_objects]
objects_within_region = get_region_events(regions, object_locations)

for key, region in regions.items():
matched_indices = set(objects_within_region.get(key, []))
# Also include objects matched by mesh intersection (requires self)
for obj_idx, obj in enumerate(reliable_objects):
if obj_idx not in matched_indices and self.isIntersecting(obj, region):
matched_indices.add(obj_idx)

objects = [reliable_objects[i] for i in sorted(matched_indices)]
Comment thread
tdorauintc marked this conversation as resolved.
regionObjects = region.objects.get(detectionType, [])
objects = []
for obj in curObjects:
# When tracker is disabled, skip the frameCount check and consider all objects;
# otherwise, only consider objects with frameCount > 3 as reliable.
if (obj.frameCount > 3 or ControllerMode.isAnalyticsOnly()) \
and (region.isPointWithin(obj.sceneLoc) or self.isIntersecting(obj, region)):
objects.append(obj)

cur = set(x.gid for x in objects)
prev = set(x.gid for x in regionObjects)
Expand Down
34 changes: 34 additions & 0 deletions scene_common/src/scene_common/geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,37 @@ def serialize(self):
'uuid': self.uuid,
}
return data

def get_tripwire_events(tripwires, object_locations):
"""Detect line crossings between object movement segments and tripwires.

@param tripwires Dict of {key: Tripwire} to check against
@param object_locations List of location pairs (current, previous) per object
@return Dict mapping tripwire key to list of (object_index, direction) tuples
"""
tripwire_events = {}
for key, tripwire in tripwires.items():
event_matches = []
for obj_idx, obj_locations in enumerate(object_locations):
d = tripwire.lineCrosses(Line(obj_locations[0].as2Dxy,
obj_locations[1].as2Dxy))
if d != 0:
event_matches.append((obj_idx, -d))
tripwire_events[key] = event_matches
return tripwire_events

def get_region_events(regions, object_locations):
"""Determine which objects are within each region using point containment.

@param regions Dict of {key: Region} to check against
@param object_locations List of object positions (Point) to test
@return Dict mapping region key to list of object indices within that region
"""
region_events = {}
for key, region in regions.items():
region_objects = []
for obj_idx, obj_location in enumerate(object_locations):
if region.isPointWithin(obj_location):
region_objects.append(obj_idx)
region_events[key] = region_objects
return region_events
98 changes: 98 additions & 0 deletions tests/sscape_tests/geometry/test_region_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# SPDX-FileCopyrightText: (C) 2026 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import math

import pytest

from scene_common.geometry import get_region_events, Region, Point

UUID = "39bd9698-8603-43fb-9cb9-06d9a14e6a24"


def _regular_polygon(n, cx=10, cy=10, r=10):
"""! Generate vertices for a regular n-gon centered at (cx, cy) with radius r. """
return [
[cx + r * math.cos(2 * math.pi * i / n),
cy + r * math.sin(2 * math.pi * i / n)]
for i in range(n)
]


# --- Positive tests: object inside various region types ---

@pytest.mark.parametrize("n_vertices", [3, 4, 5, 10],
ids=["triangle", "quad", "pentagon", "decagon"])
def test_object_inside_polygon(n_vertices):
"""! Verify object at polygon center is detected inside for varying vertex counts. """
pts = _regular_polygon(n_vertices)
region = Region(UUID, "poly", {"points": pts})
result = get_region_events({"r": region}, [Point(10, 10)])
assert result["r"] == [0]


def test_object_inside_circle():
"""! Verify object at circle center is detected inside. """
region = Region(UUID, "circle", {"area": "circle", "center": [5, 5], "radius": 3})
result = get_region_events({"r": region}, [Point(5, 5)])
assert result["r"] == [0]


def test_object_inside_scene():
"""! Verify any object is inside a scene-wide region. """
region = Region(UUID, "scene", {"area": "scene"})
result = get_region_events({"r": region}, [Point(100, 200)])
assert result["r"] == [0]


# --- No-match tests: object outside all regions ---

def test_no_match_outside_polygon():
"""! Verify object outside polygon is not detected. """
region = Region(UUID, "poly", {"points": [[0, 0], [10, 0], [10, 10], [0, 10]]})
result = get_region_events({"r": region}, [Point(50, 50)])
assert result["r"] == []


def test_no_match_outside_circle():
"""! Verify object outside circle is not detected. """
region = Region(UUID, "circle", {"area": "circle", "center": [5, 5], "radius": 3})
result = get_region_events({"r": region}, [Point(50, 50)])
assert result["r"] == []


# --- Length variations ---

def test_single_region_single_object():
"""! Verify single region with one matching object. """
region = Region(UUID, "poly", {"points": [[0, 0], [10, 0], [10, 10], [0, 10]]})
result = get_region_events({"r": region}, [Point(5, 5)])
assert result["r"] == [0]


def test_multiple_regions_multiple_objects():
"""! Verify correct mapping with overlapping regions and multiple objects. """
region_a = Region(UUID, "a", {"points": [[0, 0], [10, 0], [10, 10], [0, 10]]})
region_b = Region(UUID, "b", {"points": [[5, 5], [15, 5], [15, 15], [5, 15]]})
region_c = Region(UUID, "c", {"area": "circle", "center": [20, 20], "radius": 2})
regions = {"a": region_a, "b": region_b, "c": region_c}
objects = [Point(7, 7), Point(2, 2), Point(20, 20), Point(50, 50)]
result = get_region_events(regions, objects)
assert sorted(result["a"]) == [0, 1]
assert result["b"] == [0]
assert result["c"] == [2]


# --- Empty inputs ---

def test_empty_object_list():
"""! Verify regions with no objects returns empty lists per key. """
region = Region(UUID, "poly", {"points": [[0, 0], [10, 0], [10, 10], [0, 10]]})
result = get_region_events({"r": region}, [])
assert result["r"] == []


def test_empty_region_dict():
"""! Verify empty region dict returns empty result. """
result = get_region_events({}, [Point(5, 5)])
assert result == {}
113 changes: 113 additions & 0 deletions tests/sscape_tests/geometry/test_tripwire_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# SPDX-FileCopyrightText: (C) 2026 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import pytest

from scene_common.geometry import get_tripwire_events, Tripwire, Point

UUID = "39bd9698-8603-43fb-9cb9-06d9a14e6a24"


# --- Positive tests: crossing in both directions for four orientations ---

@pytest.mark.parametrize(
"tripwire_pts, current, previous, expected_dir",
[
# Horizontal tripwire [(0,0) -> (10,0)]
([[0, 0], [10, 0]], Point(5, 1), Point(5, -1), -1),
([[0, 0], [10, 0]], Point(5, -1), Point(5, 1), 1),
# Vertical tripwire [(5,0) -> (5,10)]
([[5, 0], [5, 10]], Point(6, 5), Point(4, 5), 1),
([[5, 0], [5, 10]], Point(4, 5), Point(6, 5), -1),
# Acute angle (~45 deg) tripwire [(0,0) -> (10,10)]
([[0, 0], [10, 10]], Point(10, 0), Point(0, 10), 1),
([[0, 0], [10, 10]], Point(0, 10), Point(10, 0), -1),
# Obtuse angle (~135 deg) tripwire [(10,0) -> (0,10)]
([[10, 0], [0, 10]], Point(0, 0), Point(10, 10), -1),
([[10, 0], [0, 10]], Point(10, 10), Point(0, 0), 1),
],
ids=[
"horizontal-south-to-north",
"horizontal-north-to-south",
"vertical-left-to-right",
"vertical-right-to-left",
"acute-cross-a",
"acute-cross-b",
"obtuse-cross-a",
"obtuse-cross-b",
])
def test_tripwire_crossing(tripwire_pts, current, previous, expected_dir):
"""! Verify tripwire crossing detection for various orientations and directions. """
tripwire = Tripwire(UUID, "tw", {"points": tripwire_pts})
result = get_tripwire_events({"tw": tripwire}, [(current, previous)])
assert len(result["tw"]) == 1
assert result["tw"][0] == (0, expected_dir)


# --- No-match tests ---

def test_no_crossing_parallel_offset():
"""! Verify no crossing when movement is parallel to tripwire but offset. """
tripwire = Tripwire(UUID, "tw", {"points": [[0, 0], [10, 0]]})
result = get_tripwire_events({"tw": tripwire}, [(Point(15, 2), Point(5, 2))])
assert result["tw"] == []


def test_no_crossing_collinear():
"""! Verify no crossing when movement is along the tripwire (collinear). """
tripwire = Tripwire(UUID, "tw", {"points": [[0, 0], [10, 0]]})
result = get_tripwire_events({"tw": tripwire}, [(Point(8, 0), Point(2, 0))])
assert result["tw"] == []




# --- Positive tests: endpoint on tripwire ---

def test_crossing_when_endpoint_on_tripwire():
"""! Verify that endpoint landing exactly on the tripwire counts as a crossing. """
tripwire = Tripwire(UUID, "tw", {"points": [[0, 0], [10, 0]]})
result = get_tripwire_events({"tw": tripwire}, [(Point(5, 0), Point(5, -5))])
Comment thread
tdorauintc marked this conversation as resolved.
assert len(result["tw"]) == 1
assert result["tw"][0][0] == 0


# --- Length variations ---

def test_single_tripwire_single_object():
"""! Verify single tripwire with one crossing object. """
tripwire = Tripwire(UUID, "tw", {"points": [[0, 0], [10, 0]]})
result = get_tripwire_events({"tw": tripwire}, [(Point(5, 1), Point(5, -1))])
assert len(result["tw"]) == 1
assert result["tw"][0][0] == 0


def test_multiple_tripwires_multiple_objects():
"""! Verify detection with multiple tripwires and objects, one crossing both. """
tw_h = Tripwire(UUID, "h", {"points": [[0, 0], [10, 0]]})
tw_v = Tripwire(UUID, "v", {"points": [[5, -5], [5, 5]]})
tripwires = {"h": tw_h, "v": tw_v}
objects = [
(Point(6, 1), Point(4, -1)), # crosses both
(Point(1, 5), Point(1, 4)), # crosses neither
]
result = get_tripwire_events(tripwires, objects)
assert len(result["h"]) == 1
assert result["h"][0][0] == 0
assert len(result["v"]) == 1
assert result["v"][0][0] == 0


# --- Empty inputs ---

def test_empty_object_list():
"""! Verify tripwires with no objects returns empty lists per key. """
tripwire = Tripwire(UUID, "tw", {"points": [[0, 0], [10, 0]]})
result = get_tripwire_events({"tw": tripwire}, [])
assert result["tw"] == []


def test_empty_tripwire_dict():
"""! Verify empty tripwire dict returns empty result. """
result = get_tripwire_events({}, [(Point(5, 1), Point(5, -1))])
assert result == {}
Loading