11# tests/ci_tests/conftest.py
2+
23import sys
34import os
45import time
78import asyncio
89from pathlib import Path
910import stat
10-
1111import pytest
12- from control_util import is_hashpipe_running
1312
13+ from control_util import is_hashpipe_running
1414from uds_server import UdsServer
1515
1616def is_utility_available (name ):
@@ -28,6 +28,7 @@ def _ensure_dirs_and_module_config():
2828 module_dir = BASE_DIR / f"module_{ mid } " / RUN_NAME
2929 module_dir .mkdir (parents = True , exist_ok = True )
3030 cfg_str += f"{ mid } \n "
31+
3132 config_dir = BASE_DIR / RUN_NAME
3233 config_dir .mkdir (exist_ok = True )
3334 module_config_path = config_dir / "module.config"
@@ -53,45 +54,69 @@ def __init__(self, socket_paths):
5354 self .thread = None
5455 self .servers = {}
5556 self .started = threading .Event ()
57+ self ._stop_future = None
58+ self ._is_stopped = False
5659
5760 def start (self ):
61+ if self ._is_stopped :
62+ return False # Don't restart if explicitly stopped
63+
5864 def runner ():
5965 self .loop = asyncio .new_event_loop ()
6066 asyncio .set_event_loop (self .loop )
67+
6168 async def start_all ():
6269 for name , path in self .socket_paths .items ():
6370 srv = UdsServer (str (path ))
6471 await srv .start ()
6572 self .servers [name ] = srv
6673 self .started .set ()
74+
6775 self .loop .run_until_complete (start_all ())
6876 self .loop .run_forever ()
6977
7078 self .thread = threading .Thread (target = runner , daemon = True )
7179 self .thread .start ()
80+
7281 # Wait until servers started
73- self .started .wait (timeout = 5 )
82+ self .started .wait (timeout = 10 )
7483 if not self .started .is_set ():
7584 raise RuntimeError ("Failed to start UDS servers" )
85+
86+ return True
7687
7788 def stop (self ):
78- if self .loop is None :
89+ self ._is_stopped = True
90+
91+ if self .loop is None or not self .loop .is_running ():
7992 return
93+
94+ # Schedule the stop coroutine in the event loop
8095 async def stop_all ():
8196 for srv in self .servers .values ():
8297 await srv .stop ()
83- fut = asyncio .run_coroutine_threadsafe (stop_all (), self .loop )
98+
99+ # Run the stop coroutine in the event loop
100+ self ._stop_future = asyncio .run_coroutine_threadsafe (stop_all (), self .loop )
101+
102+ # Wait for completion
84103 try :
85- fut . result (timeout = 5 )
104+ self . _stop_future . result (timeout = 10 )
86105 except Exception :
87106 pass
107+
108+ # Stop the event loop
88109 self .loop .call_soon_threadsafe (self .loop .stop )
89- self .thread .join (timeout = 5 )
110+
111+ # Wait for thread to finish
112+ if self .thread :
113+ self .thread .join (timeout = 10 )
90114
91115@pytest .fixture (scope = "session" )
92116def daq_env ():
93117 if not is_utility_available ("hashpipe" ):
94118 pytest .fail ("hashpipe not found in PATH" )
119+
95120 if not is_utility_available ("tcpreplay" ):
96121 pytest .fail ("tcpreplay not found in PATH" )
97122
@@ -107,7 +132,7 @@ def daq_env():
107132 tcpreplay_cmd = [
108133 "tcpreplay" ,
109134 "--mbps=5" ,
110- "--loop=0" ,
135+ "--loop=0" ,
111136 "--intf1=lo" ,
112137 PCAP_FILE ,
113138 ]
@@ -121,8 +146,10 @@ def daq_env():
121146 "--max_file_size_mb" , "5" ,
122147 "--bindhost" , "lo" ,
123148 ]
149+
124150 for mid in MODULE_IDS :
125151 start_daq .extend (["--module_id" , str (mid )])
152+
126153 hashpipe_launcher = subprocess .Popen (start_daq , cwd = BASE_DIR )
127154
128155 # 3) Wait for hashpipe to be running
@@ -139,9 +166,10 @@ def daq_env():
139166 # 4) Optional: verify at least one server saw a connection within 10s
140167 start = time .time ()
141168 while time .time () - start < 10 :
142- # If any server’ s connected event is set, we know hashpipe connected to at least one DP
169+ # If any server' s connected event is set, we know hashpipe connected to at least one DP
143170 # Skip strict requirement; filesystem checks will also verify pipeline
144171 break
172+
145173 env = {
146174 "base_dir" : BASE_DIR ,
147175 "run_name" : RUN_NAME ,
@@ -154,8 +182,10 @@ def daq_env():
154182
155183 try :
156184 yield env
185+
157186 finally :
158187 print ("\n -- Tearing down DAQ environment --" )
188+
159189 # Stop tcpreplay first
160190 try :
161191 tcpreplay_proc .terminate ()
@@ -171,6 +201,7 @@ def daq_env():
171201 sys .executable ,
172202 "/app/tests/ci_tests/stop_daq.py" ,
173203 ]
204+
174205 try :
175206 cp = subprocess .run (stop_daq , cwd = BASE_DIR , capture_output = True , text = True , timeout = 15 )
176207 print ("stop_daq.py stdout:\n " , cp .stdout )
0 commit comments