99from pathlib import Path
1010import stat
1111import pytest
12-
1312from control_util import is_hashpipe_running
1413from uds_server import UdsServer
1514
@@ -59,64 +58,89 @@ def __init__(self, socket_paths):
5958
6059 def start (self ):
6160 if self ._is_stopped :
62- return False # Don't restart if explicitly stopped
63-
61+ # Reset state for restart
62+ self ._is_stopped = False
63+ self .started .clear ()
64+ self .servers .clear ()
65+
6466 def runner ():
6567 self .loop = asyncio .new_event_loop ()
6668 asyncio .set_event_loop (self .loop )
67-
69+
6870 async def start_all ():
69- for name , path in self .socket_paths .items ():
70- srv = UdsServer (str (path ))
71- await srv .start ()
72- self .servers [name ] = srv
73- self .started .set ()
74-
75- self .loop .run_until_complete (start_all ())
76- self .loop .run_forever ()
71+ try :
72+ for name , path in self .socket_paths .items ():
73+ srv = UdsServer (str (path ))
74+ await srv .start ()
75+ self .servers [name ] = srv
76+ print (f"Started UDS server for { name } at { path } " )
77+ self .started .set ()
78+ except Exception as e :
79+ print (f"Failed to start UDS servers: { e } " )
80+ raise
81+
82+ try :
83+ self .loop .run_until_complete (start_all ())
84+ self .loop .run_forever ()
85+ except Exception as e :
86+ print (f"UDS server loop failed: { e } " )
7787
7888 self .thread = threading .Thread (target = runner , daemon = True )
7989 self .thread .start ()
8090
81- # Wait until servers started
82- self .started .wait (timeout = 10 )
83- if not self .started .is_set ():
84- raise RuntimeError ("Failed to start UDS servers" )
85-
91+ # Wait for startup with extended timeout
92+ if not self .started .wait (timeout = 15 ):
93+ raise RuntimeError ("Failed to start UDS servers within timeout" )
94+
95+ # Give hashpipe time to detect and connect to new sockets
96+ time .sleep (2 )
8697 return True
8798
8899 def stop (self ):
89100 self ._is_stopped = True
90-
91101 if self .loop is None or not self .loop .is_running ():
92102 return
93103
94- # Schedule the stop coroutine in the event loop
95104 async def stop_all ():
105+ tasks = []
96106 for srv in self .servers .values ():
97- await srv .stop ()
107+ tasks .append (srv .stop ())
108+
109+ if tasks :
110+ await asyncio .gather (* tasks , return_exceptions = True )
98111
99- # Run the stop coroutine in the event loop
112+ # Schedule the stop coroutine
100113 self ._stop_future = asyncio .run_coroutine_threadsafe (stop_all (), self .loop )
101114
102- # Wait for completion
115+ # Wait for completion with extended timeout
103116 try :
104- self ._stop_future .result (timeout = 10 )
105- except Exception :
106- pass
117+ self ._stop_future .result (timeout = 20 )
118+ except Exception as e :
119+ print ( f"Error during UDS server shutdown: { e } " )
107120
108121 # Stop the event loop
109122 self .loop .call_soon_threadsafe (self .loop .stop )
110123
111124 # Wait for thread to finish
112- if self .thread :
113- self .thread .join (timeout = 10 )
125+ if self .thread and self .thread .is_alive ():
126+ self .thread .join (timeout = 20 )
127+
128+ # Ensure all socket files are cleaned up
129+ for path in self .socket_paths .values ():
130+ try :
131+ if os .path .exists (str (path )):
132+ os .unlink (str (path ))
133+ except Exception :
134+ pass
135+
136+ # Clear state
137+ self .servers .clear ()
138+ self .started .clear ()
114139
115140@pytest .fixture (scope = "session" )
116141def daq_env ():
117142 if not is_utility_available ("hashpipe" ):
118143 pytest .fail ("hashpipe not found in PATH" )
119-
120144 if not is_utility_available ("tcpreplay" ):
121145 pytest .fail ("tcpreplay not found in PATH" )
122146
@@ -132,21 +156,20 @@ def daq_env():
132156 tcpreplay_cmd = [
133157 "tcpreplay" ,
134158 "--mbps=5" ,
135- "--loop=0" ,
159+ "--loop=0" ,
136160 "--intf1=lo" ,
137161 PCAP_FILE ,
138162 ]
139163 tcpreplay_proc = subprocess .Popen (tcpreplay_cmd )
140164
141- # 2) Start hashpipe via start_daq.py (ensure your start_daq.py uses psutil-based PID find)
165+ # 2) Start hashpipe via start_daq.py
142166 start_daq = [
143167 sys .executable ,
144168 "/app/tests/ci_tests/start_daq.py" ,
145169 "--run_dir" , str (BASE_DIR ),
146170 "--max_file_size_mb" , "5" ,
147171 "--bindhost" , "lo" ,
148172 ]
149-
150173 for mid in MODULE_IDS :
151174 start_daq .extend (["--module_id" , str (mid )])
152175
@@ -163,13 +186,6 @@ def daq_env():
163186 print ("No PID file was created by start_daq.py" )
164187 raise
165188
166- # 4) Optional: verify at least one server saw a connection within 10s
167- start = time .time ()
168- while time .time () - start < 10 :
169- # If any server's connected event is set, we know hashpipe connected to at least one DP
170- # Skip strict requirement; filesystem checks will also verify pipeline
171- break
172-
173189 env = {
174190 "base_dir" : BASE_DIR ,
175191 "run_name" : RUN_NAME ,
@@ -182,7 +198,6 @@ def daq_env():
182198
183199 try :
184200 yield env
185-
186201 finally :
187202 print ("\n -- Tearing down DAQ environment --" )
188203
@@ -201,7 +216,6 @@ def daq_env():
201216 sys .executable ,
202217 "/app/tests/ci_tests/stop_daq.py" ,
203218 ]
204-
205219 try :
206220 cp = subprocess .run (stop_daq , cwd = BASE_DIR , capture_output = True , text = True , timeout = 15 )
207221 print ("stop_daq.py stdout:\n " , cp .stdout )
0 commit comments