Skip to content

Commit 06856cd

Browse files
committed
tests: debug uds header consistency test race conditions by adding a frame cache
1 parent 3aba9fe commit 06856cd

File tree

4 files changed

+169
-125
lines changed

4 files changed

+169
-125
lines changed

tests/ci_tests/helper_uds_client.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,58 @@
55
import asyncio
66
import time
77

8+
class UdsFrameReader:
9+
def __init__(self, socket_path, timeout_s=5.0):
10+
self.socket_path = socket_path
11+
self.timeout_s = timeout_s
12+
self.sock = None
13+
14+
def connect(self):
15+
if self.sock:
16+
return
17+
18+
# Sanity checks
19+
if not os.path.exists(self.socket_path):
20+
raise FileNotFoundError(f"{self.socket_path} does not exist")
21+
st = os.stat(self.socket_path)
22+
if not stat.S_ISSOCK(st.st_mode):
23+
raise RuntimeError(f"{self.socket_path} exists but is not a socket")
24+
25+
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
26+
self.sock.settimeout(self.timeout_s)
27+
self.sock.connect(self.socket_path)
28+
29+
def read_frame(self):
30+
self.connect()
31+
32+
# Read one frame using existing socket
33+
mbytes = _recv_exact(self.sock, 2)
34+
module_id = int.from_bytes(mbytes, "big")
35+
36+
header_bytes = _recv_until(self.sock, b"\n\n")
37+
header_json = header_bytes[:-2].decode("utf-8")
38+
header = json.loads(header_json)
39+
40+
star = _recv_exact(self.sock, 1)
41+
if star != b"*":
42+
raise RuntimeError(f"Expected '*' before image payload, got {star!r}")
43+
44+
img = _recv_at_most(self.sock, 4096)
45+
return module_id, header, img
46+
47+
def close(self):
48+
if self.sock:
49+
self.sock.close()
50+
self.sock = None
51+
52+
def __enter__(self):
53+
return self
54+
55+
def __exit__(self, exc_type, exc_val, exc_tb):
56+
self.close()
57+
58+
59+
860
def read_one_frame_from_uds(socket_path, header_size_hint=None, timeout_s=5.0):
961
"""
1062
Reads exactly one frame from the snapshot.c client-facing UDS format:

tests/ci_tests/test_pff_header_consistency.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import sys
1212
sys.path.append('/app/tests/')
1313
sys.path.append('/app/util/')
14-
import pff
14+
import pff # type: ignore
1515

1616
@pytest.mark.usefixtures("daq_env")
1717
class TestPffHeaderConsistency:

tests/ci_tests/test_uds_data_path.py

Lines changed: 68 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from control_util import is_hashpipe_running
1616
from uds_server import UdsServer
17-
from helper_uds_client import read_one_frame_from_uds
17+
from helper_uds_client import read_one_frame_from_uds, UdsFrameReader
1818

1919
# Focus on ph256 since that's what we have test data for
2020
UDS_TEMPLATE = "/tmp/hashpipe_grpc.dp_{dp}.sock"
@@ -71,26 +71,6 @@ class TestUdsDataPath:
7171
and protocol violations don't crash Hashpipe.
7272
"""
7373

74-
def test_data_integrity_ph256(self, daq_env):
75-
"""
76-
Test that ph256 UDS data path maintains data integrity and doesn't crash
77-
when receiving real ph256 frames.
78-
"""
79-
assert _ensure_uds_available(daq_env), "Could not establish UDS connection"
80-
81-
uds_mgr = daq_env["uds_manager"]
82-
srv = uds_mgr.servers["ph256"]
83-
84-
# Wait for multiple frames to verify sustained data flow
85-
start = time.time()
86-
while time.time() - start < 15:
87-
if srv.frames_received >= 10:
88-
break
89-
time.sleep(0.5)
90-
91-
assert srv.frames_received >= 10, f"Expected >=10 frames, got {srv.frames_received}"
92-
assert is_hashpipe_running(), "Hashpipe should remain running during data flow"
93-
9474
def test_rapid_connect_disconnect_ph256(self, daq_env):
9575
"""
9676
Test rapid connect/disconnect cycles don't crash Hashpipe.
@@ -324,6 +304,24 @@ async def _stop():
324304

325305
mgr.stop()
326306

307+
def test_data_integrity_ph256(self, daq_env):
308+
"""
309+
Test that ph256 UDS data path maintains data integrity.
310+
"""
311+
assert _ensure_uds_available(daq_env), "Could not establish UDS connection"
312+
uds_mgr = daq_env["uds_manager"]
313+
srv = uds_mgr.servers["ph256"]
314+
315+
# Wait for multiple frames to verify sustained data flow
316+
start = time.time()
317+
while time.time() - start < 15:
318+
if srv.frames_received >= 10:
319+
break
320+
time.sleep(0.5)
321+
322+
assert srv.frames_received >= 10, f"Expected >=10 frames, got {srv.frames_received}"
323+
assert is_hashpipe_running(), "Hashpipe should remain running during data flow"
324+
327325
def test_concurrent_multiple_clients_ph256(self, daq_env):
328326
"""
329327
Test behavior when multiple clients try to connect to the same UDS socket.
@@ -362,10 +360,9 @@ def test_concurrent_multiple_clients_ph256(self, daq_env):
362360

363361
def test_data_validation_ph256(self, daq_env):
364362
"""
365-
Test that we can actually read and validate ph256 frame data.
363+
Test that we can validate the content of received ph256 frames.
366364
"""
367365
assert _ensure_uds_available(daq_env), "Could not establish UDS connection"
368-
369366
uds_mgr = daq_env["uds_manager"]
370367
srv = uds_mgr.servers["ph256"]
371368

@@ -375,31 +372,26 @@ def test_data_validation_ph256(self, daq_env):
375372
if srv.connected.is_set() and srv.frames_received >= 5:
376373
break
377374
time.sleep(0.5)
378-
375+
379376
assert srv.connected.is_set(), "Should have ph256 connection"
380377
assert srv.frames_received >= 5, "Should have received frames"
381378

382-
# Try to read a frame using the helper client
383-
ph_path = uds_path("ph256")
384-
try:
385-
# Read one frame
386-
module_id, header, image_data = read_one_frame_from_uds(ph_path, timeout_s=5.0)
387-
388-
# Validate frame structure
389-
assert isinstance(module_id, int), "Module ID should be integer"
390-
assert module_id in [1, 254], f"Module ID {module_id} should be in test module set"
391-
assert isinstance(header, dict), "Header should be dict"
392-
assert 'quabo_num' in header, "Header should contain quabo_num"
393-
assert 'pkt_num' in header, "Header should contain pkt_num"
394-
assert 'pkt_tai' in header, "Header should contain pkt_tai"
395-
assert 'pkt_nsec' in header, "Header should contain pkt_nsec"
396-
assert isinstance(image_data, bytes), "Image data should be bytes"
397-
assert len(image_data) > 0, "Image data should not be empty"
398-
399-
except Exception as e:
400-
# If we can't read, it might be due to timing, but Hashpipe should still be running
401-
print(f"Frame read failed (may be expected): {e}")
402-
379+
# Use stored frames from the server for validation
380+
stored_frames = srv.recent_frames
381+
assert len(stored_frames) > 0, "UDS server should have stored some frames"
382+
383+
# Validate the structure of the first stored frame
384+
frame_data = stored_frames[0]
385+
module_id = frame_data['module_id']
386+
header = frame_data['header']
387+
image_data = frame_data['image_data']
388+
389+
assert isinstance(module_id, int), "Module ID should be an integer"
390+
assert module_id in daq_env["module_ids"], f"Module ID {module_id} not in test set"
391+
assert isinstance(header, dict), "Header should be a dict"
392+
assert 'quabo_num' in header, "Header should contain 'quabo_num'"
393+
assert 'pkt_num' in header, "Header should contain 'pkt_num'"
394+
assert isinstance(image_data, bytes) and len(image_data) > 0, "Image data should be non-empty bytes"
403395
assert is_hashpipe_running(), "Hashpipe should remain running during data validation"
404396

405397
def test_sustained_high_rate_ph256(self, daq_env):
@@ -413,8 +405,8 @@ def test_sustained_high_rate_ph256(self, daq_env):
413405

414406
initial_frames = srv.frames_received
415407

416-
# Let it run for sustained period (reduced to 20 seconds for faster tests)
417-
test_duration = 20
408+
# Let it run for sustained period
409+
test_duration = 60
418410
start = time.time()
419411
last_check = start
420412
last_frames = initial_frames
@@ -452,78 +444,47 @@ def test_sustained_high_rate_ph256(self, daq_env):
452444

453445
def test_uds_header_consistency_ph256(self, daq_env):
454446
"""
455-
Test that UDS-delivered ph256 frames have consistent JSON header formatting.
456-
This ensures UDS path maintains PFF format requirements.
447+
Test that UDS-delivered ph256 frames have consistent JSON headers
448+
by checking the raw byte representations captured by the server.
457449
"""
458450
assert _ensure_uds_available(daq_env), "Could not establish UDS connection"
459-
460451
uds_mgr = daq_env["uds_manager"]
461452
srv = uds_mgr.servers["ph256"]
453+
nframes = 100
462454

463-
# Wait for connection and multiple frames
455+
# Wait for the server to receive a good number of frames
464456
start = time.time()
465-
while time.time() - start < 15:
466-
if srv.connected.is_set() and srv.frames_received >= 10:
457+
while time.time() - start < 20:
458+
if srv.frames_received >= nframes:
467459
break
468460
time.sleep(0.5)
469461

470-
assert srv.connected.is_set(), "ph256 UDS connection required"
471-
assert srv.frames_received >= 10, f"Need >=10 frames for consistency test, got {srv.frames_received}"
472-
473-
# Collect multiple frames via UDS client
474-
ph_path = uds_path("ph256")
475-
header_sizes = []
476-
json_structures = []
477-
478-
for frame_num in range(5): # Test 5 frames
479-
try:
480-
# Verify socket exists before attempting read
481-
if not _uds_exists(ph_path):
482-
print(f"Frame {frame_num}: UDS socket does not exist, skipping")
483-
continue
484-
485-
module_id, header, image_data = read_one_frame_from_uds(ph_path, timeout_s=3.0)
486-
487-
# Convert header back to JSON string to measure size
488-
import json
489-
header_json = json.dumps(header, separators=(',', ':'))
490-
header_sizes.append(len(header_json))
462+
assert srv.frames_received >= nframes, f"Need >= {nframes} frames for consistency test, got {srv.frames_received}"
491463

492-
# Track JSON structure (field names and types)
493-
structure = {k: type(v).__name__ for k, v in header.items()}
494-
json_structures.append(structure)
464+
# Retrieve the raw JSON byte strings from the server's stored frames
465+
stored_frames = srv.recent_frames[-nframes:] # Analyze the last nframes frames
466+
json_headers = [frame['json_bytes'] for frame in stored_frames]
495467

496-
except Exception as e:
497-
print(f"Frame {frame_num} read failed: {e}")
498-
# Continue trying other frames
468+
# 1. Primary Test: All JSON headers must have the exact same byte length.
469+
# This is the most critical requirement for predictable frame seeking.
470+
header_lengths = {len(h) for h in json_headers}
471+
assert len(header_lengths) == 1, \
472+
f"UDS headers have inconsistent byte lengths: {header_lengths}. " \
473+
"All headers from hashpipe must be identical in size."
499474

500-
if len(header_sizes) < 3:
501-
pytest.skip(f"Could only read {len(header_sizes)} frames from UDS, need at least 3")
475+
print(f"✓ All UDS headers have a consistent length: {header_lengths.pop()} bytes")
502476

503-
# All headers should be similar size (within small variance for number formatting)
504-
min_size = min(header_sizes)
505-
max_size = max(header_sizes)
506-
size_variance = max_size - min_size
507-
508-
print(f"UDS header sizes: {header_sizes}, variance: {size_variance}")
509-
510-
# Allow small variance for different number lengths, but ensure consistency
511-
assert size_variance <= 20, f"UDS header size variance too large: {size_variance} bytes"
512-
513-
# All JSON structures should be identical
514-
first_structure = json_structures[0]
515-
for i, structure in enumerate(json_structures[1:], 1):
516-
assert structure == first_structure, (
517-
f"Frame {i} has different JSON structure: {structure} vs {first_structure}"
518-
)
519-
520-
# Verify required fields are present
521-
required_fields = {'quabo_num', 'pkt_num', 'pkt_tai', 'pkt_nsec', 'tv_sec', 'tv_usec'}
522-
actual_fields = set(first_structure.keys())
477+
# 2. Sanity Check: Ensure the header is valid JSON and contains required fields.
478+
first_header_bytes = json_headers[0]
479+
try:
480+
header_content = json.loads(first_header_bytes)
481+
required_fields = {'quabo_num', 'pkt_num', 'pkt_tai', 'pkt_nsec', 'tv_sec', 'tv_usec'}
482+
actual_fields = set(header_content.keys())
483+
assert required_fields.issubset(actual_fields), \
484+
f"Missing required fields. Expected: {required_fields}, Got: {actual_fields}"
485+
except json.JSONDecodeError:
486+
pytest.fail("The received JSON header is not valid JSON.")
487+
488+
print(f"UDS header consistency test passed for {len(json_headers)} frames.")
523489

524-
assert required_fields.issubset(actual_fields), (
525-
f"Missing required fields. Expected: {required_fields}, Got: {actual_fields}"
526-
)
527490

528-
print(f"UDS header consistency test passed: {len(header_sizes)} frames, "
529-
f"size range: {min_size}-{max_size} bytes")

0 commit comments

Comments
 (0)