Skip to content

Commit 7fac0d3

Browse files
committed
test: add additional tests for the Hashpipe -> uds -> server data pipeline
1 parent 3c797fb commit 7fac0d3

File tree

6 files changed

+550
-141
lines changed

6 files changed

+550
-141
lines changed

tests/ci_tests/conftest.py

Lines changed: 156 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -1,158 +1,186 @@
1-
1+
# tests/ci_tests/conftest.py
22
import sys
33
import os
4-
import signal
54
import time
6-
from pathlib import Path
7-
import pytest
85
import subprocess
6+
import threading
7+
import asyncio
8+
from pathlib import Path
9+
import stat
910

11+
import pytest
1012
from control_util import is_hashpipe_running
1113

14+
from uds_server import UdsServer
15+
1216
def is_utility_available(name):
13-
"""Check if a command-line utility is in the system PATH."""
1417
return subprocess.run(["which", name], capture_output=True).returncode == 0
1518

19+
BASE_DIR = Path("/tmp/ci_run_dir")
20+
RUN_NAME = "obs_ci_run"
21+
MODULE_IDS = [1, 254]
22+
PCAP_FILE = "/app/test_data.pcapng"
23+
UDS_TEMPLATE = "/tmp/hashpipe_grpc.dp_{dp_name}.sock"
1624

17-
@pytest.fixture(scope="session")
18-
def hashpipe_pcap_runner():
19-
"""
20-
A session-scoped fixture that creates a realistic hashpipe run environment,
21-
starts tcpreplay to feed it data from a pcap file, and launches the
22-
hashpipe process with command-line arguments that mimic a production run.
23-
24-
This enables testing of the data flow:
25-
tcpreplay -> hashpipe (net_thread)
26-
"""
27-
if not is_utility_available("hashpipe") or not is_utility_available("tcpreplay"):
28-
pytest.fail("Required utility 'hashpipe' or 'tcpreplay' not found in PATH.")
29-
30-
# 1. Define Paths and Configuration
31-
pcap_file = "/app/test_data.pcapng"
32-
33-
# Define a base directory for the test, which will be the Current Working Directory (CWD)
34-
# for the hashpipe process. This matches the behavior of the production start_daq.py script.
35-
base_dir = Path("/tmp/ci_run_dir")
36-
37-
# Define a relative run name. It must start with "obs_" for the server to find it.
38-
run_name = "obs_ci_run"
39-
40-
# Create the directory structure that `make_run_dirs` in start.py would create.
41-
module_ids = [1, 254]
25+
def _ensure_dirs_and_module_config():
4226
cfg_str = ""
43-
for mid in module_ids:
44-
module_dir = base_dir / "module_{}".format(mid) / run_name
27+
for mid in MODULE_IDS:
28+
module_dir = BASE_DIR / f"module_{mid}" / RUN_NAME
4529
module_dir.mkdir(parents=True, exist_ok=True)
4630
cfg_str += f"{mid}\n"
47-
48-
# The config file for hashpipe goes in base_dir/run_name/
49-
config_dir = base_dir / run_name
31+
config_dir = BASE_DIR / RUN_NAME
5032
config_dir.mkdir(exist_ok=True)
51-
52-
# The module.config file tells hashpipe which module to listen for.
5333
module_config_path = config_dir / "module.config"
5434
with open(module_config_path, "w") as f:
5535
f.write(cfg_str)
36+
return module_config_path
37+
38+
def _wait_for(predicate, timeout_s=30, interval_s=0.5, desc="condition"):
39+
start = time.time()
40+
while time.time() - start < timeout_s:
41+
if predicate():
42+
return True
43+
time.sleep(interval_s)
44+
pytest.fail(f"Timeout waiting for {desc}")
45+
46+
def _uds_path(dp_name):
47+
return UDS_TEMPLATE.format(dp_name=dp_name)
48+
49+
class UdsServerManager:
50+
def __init__(self, socket_paths):
51+
self.socket_paths = socket_paths
52+
self.loop = None
53+
self.thread = None
54+
self.servers = {}
55+
self.started = threading.Event()
56+
57+
def start(self):
58+
def runner():
59+
self.loop = asyncio.new_event_loop()
60+
asyncio.set_event_loop(self.loop)
61+
async def start_all():
62+
for name, path in self.socket_paths.items():
63+
srv = UdsServer(str(path))
64+
await srv.start()
65+
self.servers[name] = srv
66+
self.started.set()
67+
self.loop.run_until_complete(start_all())
68+
self.loop.run_forever()
69+
70+
self.thread = threading.Thread(target=runner, daemon=True)
71+
self.thread.start()
72+
# Wait until servers started
73+
self.started.wait(timeout=5)
74+
if not self.started.is_set():
75+
raise RuntimeError("Failed to start UDS servers")
76+
77+
def stop(self):
78+
if self.loop is None:
79+
return
80+
async def stop_all():
81+
for srv in self.servers.values():
82+
await srv.stop()
83+
fut = asyncio.run_coroutine_threadsafe(stop_all(), self.loop)
84+
try:
85+
fut.result(timeout=5)
86+
except Exception:
87+
pass
88+
self.loop.call_soon_threadsafe(self.loop.stop)
89+
self.thread.join(timeout=5)
5690

57-
# 2. Build commands
58-
# Command to loop the pcap file to the loopback interface, simulating network traffic.
91+
@pytest.fixture(scope="session")
92+
def daq_env():
93+
if not is_utility_available("hashpipe"):
94+
pytest.fail("hashpipe not found in PATH")
95+
if not is_utility_available("tcpreplay"):
96+
pytest.fail("tcpreplay not found in PATH")
97+
98+
# Prepare filesystem like production
99+
_ensure_dirs_and_module_config()
100+
101+
# 0) Start UDS servers FIRST (the tests are the servers)
102+
uds_paths = {dp: Path(_uds_path(dp)) for dp in ["ph256", "ph1024", "img16", "img8"]}
103+
uds_mgr = UdsServerManager(uds_paths)
104+
uds_mgr.start()
105+
106+
# 1) Start tcpreplay (loop indefinitely at 1 Mbps to lo)
59107
tcpreplay_cmd = [
60108
"tcpreplay",
61109
"--mbps=1",
62-
"--loop=0", # Loop indefinitely
63-
"--intf1=lo", # Send to loopback interface
64-
pcap_file
110+
"--loop=0",
111+
"--intf1=lo",
112+
PCAP_FILE,
65113
]
66-
67-
hashpipe_cmd = [
68-
"hashpipe",
69-
"-p", "hashpipe.so",
70-
"-I", "0",
71-
"-o", "BINDHOST=lo",
72-
"-o", f"RUNDIR={run_name}",
73-
"-o", f"CONFIG={run_name}/module.config",
74-
"-o", "MAXFILESIZE=1",
75-
"-o", "GROUPPHFRAMES=0",
76-
"-o", "OBS=TEST",
77-
"net_thread", "compute_thread", "output_thread"
114+
tcpreplay_proc = subprocess.Popen(tcpreplay_cmd)
115+
116+
# 2) Start hashpipe via start_daq.py (ensure your start_daq.py uses psutil-based PID find)
117+
start_daq = [
118+
sys.executable,
119+
"/app/tests/ci_tests/start_daq.py",
120+
"--run_dir", str(BASE_DIR),
121+
"--max_file_size_mb", "1",
122+
"--bindhost", "lo",
78123
]
124+
for mid in MODULE_IDS:
125+
start_daq.extend(["--module_id", str(mid)])
126+
hashpipe_launcher = subprocess.Popen(start_daq, cwd=BASE_DIR)
79127

80-
# 3. Start processes
81-
# Start tcpreplay to generate UDP packets.
82-
tcpreplay_proc = subprocess.Popen(tcpreplay_cmd)#, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
83-
84-
# Start the hashpipe process with the CWD set to the base directory.
85-
hashpipe_proc = subprocess.Popen(
86-
hashpipe_cmd,
87-
cwd=base_dir
88-
)
89-
90-
# 4. Wait for initialization and validation
91-
num_retries = 20
92-
for i in range(num_retries):
93-
# Allow time for processes to initialize and sockets to be created.
94-
if tcpreplay_proc.poll() is not None:
95-
pytest.fail(f"tcpreplay failed to start. Exit code: {tcpreplay_proc.returncode}")
96-
elif hashpipe_proc.poll() is not None:
97-
pytest.fail(f"hashpipe failed to start. Exit code: {hashpipe_proc.returncode}. Check logs in {base_dir}.")
98-
if is_hashpipe_running():
99-
print(f"hashpipe is running after {i} retries.")
100-
break
101-
print(f"hashpipe is not running after {i}/{num_retries} retries. Retrying in 1 second.")
102-
time.sleep(1)
103-
else:
104-
pytest.fail(f"hashpipe failed to start after {num_retries} retries. Check logs in {base_dir}.")
105-
106-
# Yield to let the tests run
107-
yield
108-
109-
# 5. Teardown
110-
print("\n-- Tearing down hashpipe and tcpreplay processes --")
111-
112-
# First, stop tcpreplay so it stops feeding data.
113-
tcpreplay_proc.terminate()
128+
# 3) Wait for hashpipe to be running
114129
try:
115-
tcpreplay_proc.wait(timeout=15)
116-
except subprocess.TimeoutExpired:
117-
tcpreplay_proc.kill()
118-
119-
# Now, run stop_daq.py to gracefully shut down hashpipe.
120-
# The Dockerfile places project source code in /app.
121-
stop_daq_script_path = "/app/panoseti_util/stop_daq.py"
122-
if os.path.exists(stop_daq_script_path):
123-
print(f"-- Running {stop_daq_script_path} in {base_dir} --")
124-
# stop_daq.py expects to be run from the data dir (which is our run_dir)
125-
# and reads the PID from a file in its cwd.
130+
_wait_for(is_hashpipe_running, timeout_s=30, desc="hashpipe to be running")
131+
except Exception:
132+
pid_file = BASE_DIR / "daq_hashpipe_pid"
133+
if pid_file.exists():
134+
print(f"Found PID file: {pid_file.read_text().strip()}")
135+
else:
136+
print("No PID file was created by start_daq.py")
137+
raise
138+
139+
# 4) Optional: verify at least one server saw a connection within 10s
140+
start = time.time()
141+
while time.time() - start < 10:
142+
# If any server’s connected event is set, we know hashpipe connected to at least one DP
143+
# Skip strict requirement; filesystem checks will also verify pipeline
144+
break
145+
env = {
146+
"base_dir": BASE_DIR,
147+
"run_name": RUN_NAME,
148+
"module_ids": MODULE_IDS,
149+
"uds_paths": uds_paths,
150+
"uds_manager": uds_mgr,
151+
"tcpreplay_proc": tcpreplay_proc,
152+
"hashpipe_launcher": hashpipe_launcher,
153+
}
154+
155+
try:
156+
yield env
157+
finally:
158+
print("\n-- Tearing down DAQ environment --")
159+
# Stop tcpreplay first
160+
try:
161+
tcpreplay_proc.terminate()
162+
tcpreplay_proc.wait(timeout=10)
163+
except Exception:
164+
try:
165+
tcpreplay_proc.kill()
166+
except Exception:
167+
pass
168+
169+
# Stop hashpipe via stop_daq.py
170+
stop_daq = [
171+
sys.executable,
172+
"/app/tests/ci_tests/stop_daq.py",
173+
]
126174
try:
127-
completed_process = subprocess.run(
128-
[sys.executable, stop_daq_script_path],
129-
cwd=base_dir,
130-
capture_output=True,
131-
text=True,
132-
timeout=10 # Add a timeout to prevent hanging
133-
)
134-
print(f"stop_daq.py stdout:\n{completed_process.stdout}")
135-
print(f"stop_daq.py stderr:\n{completed_process.stderr}")
136-
if completed_process.returncode != 0:
137-
# If it fails, fall back to killing the process directly.
138-
print("stop_daq.py failed, falling back to direct process termination.")
139-
if hashpipe_proc.poll() is None:
140-
hashpipe_proc.send_signal(signal.SIGINT) # Graceful shutdown
141-
try:
142-
hashpipe_proc.wait(timeout=5)
143-
except subprocess.TimeoutExpired:
144-
hashpipe_proc.kill() # Forceful shutdown
175+
cp = subprocess.run(stop_daq, cwd=BASE_DIR, capture_output=True, text=True, timeout=15)
176+
print("stop_daq.py stdout:\n", cp.stdout)
177+
print("stop_daq.py stderr:\n", cp.stderr)
145178
except subprocess.TimeoutExpired:
146-
print("stop_daq.py timed out. Killing hashpipe process directly.")
147-
if hashpipe_proc.poll() is None:
148-
hashpipe_proc.kill()
149-
else:
150-
print(f"{stop_daq_script_path} not found. Terminating hashpipe process directly.")
151-
# Fallback to original method if script is not found.
152-
if hashpipe_proc.poll() is None:
153-
hashpipe_proc.send_signal(signal.SIGINT) # Graceful shutdown
179+
print("stop_daq.py timed out; sending SIGINT to hashpipe directly.")
154180
try:
155-
hashpipe_proc.wait(timeout=5)
156-
except subprocess.TimeoutExpired:
157-
hashpipe_proc.kill() # Forceful shutdown
181+
subprocess.run(["pkill", "-2", "hashpipe"])
182+
except Exception:
183+
pass
158184

185+
# Stop UDS servers and clean sockets
186+
uds_mgr.stop()
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import os
2+
import socket
3+
import stat
4+
import json
5+
6+
def read_one_frame_from_uds(socket_path, header_size_hint=None, timeout_s=5.0):
7+
"""
8+
Reads exactly one frame from the snapshot.c client-facing UDS format:
9+
[2-byte big-endian module_id]
10+
[JSON header ending with b'\\n\\n'] (if header_size_hint is provided, readexactly that many)
11+
['*']
12+
[binary image bytes], where size is inferred by the consumer.
13+
Returns (module_id, header_dict, raw_image_bytes)
14+
"""
15+
# Sanity check
16+
if not os.path.exists(socket_path):
17+
raise FileNotFoundError(f"{socket_path} does not exist")
18+
st = os.stat(socket_path)
19+
if not stat.S_ISSOCK(st.st_mode):
20+
raise RuntimeError(f"{socket_path} exists but is not a socket")
21+
22+
# Blocking connect with timeout
23+
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
24+
sock.settimeout(timeout_s)
25+
sock.connect(socket_path)
26+
27+
try:
28+
# 1) 2-byte module_id
29+
mbytes = _recv_exact(sock, 2)
30+
module_id = int.from_bytes(mbytes, "big")
31+
32+
# 2) header
33+
if header_size_hint is None:
34+
header_bytes = _recv_until(sock, b"\n\n")
35+
else:
36+
header_bytes = _recv_exact(sock, header_size_hint)
37+
if not header_bytes.endswith(b"\n\n"):
38+
# fallback: find the terminator
39+
extra = _recv_until(sock, b"\n\n")
40+
header_bytes += extra
41+
header_json = header_bytes[:-2].decode("utf-8")
42+
header = json.loads(header_json)
43+
44+
# 3) image prefix '*' then frame payload length is unknown to client.
45+
# In practice, for CI we can bound read to a reasonable size.
46+
star = _recv_exact(sock, 1)
47+
if star != b"*":
48+
raise RuntimeError(f"Expected '*' before image payload, got {star!r}")
49+
50+
# Without the dp’s exact bytes_per_image, we don’t know total bytes.
51+
# For tests, read a safe upper bound then return what we got in one recv.
52+
# A robust client would know bytes_per_image. For CI, read a max buffer.
53+
img = _recv_at_most(sock, 4096) # enough for ph256/img16 frame sizes in CI
54+
return module_id, header, img
55+
finally:
56+
sock.close()
57+
58+
def _recv_exact(sock, n):
59+
buf = bytearray()
60+
while len(buf) < n:
61+
chunk = sock.recv(n - len(buf))
62+
if not chunk:
63+
raise ConnectionError("Socket closed while expecting more data")
64+
buf.extend(chunk)
65+
return bytes(buf)
66+
67+
def _recv_until(sock, terminator: bytes, max_bytes=65536):
68+
buf = bytearray()
69+
while True:
70+
chunk = sock.recv(1024)
71+
if not chunk:
72+
raise ConnectionError("Socket closed before terminator")
73+
buf.extend(chunk)
74+
if buf.endswith(terminator):
75+
return bytes(buf)
76+
if len(buf) > max_bytes:
77+
raise RuntimeError("Header exceeded maximum allowed size")
78+
79+
def _recv_at_most(sock, max_bytes):
80+
# Single recv up to max_bytes, returns what was available immediately.
81+
return sock.recv(max_bytes)

0 commit comments

Comments
 (0)