88import argparse
99import os
1010import signal
11+ import socket
1112import subprocess
1213import sys
1314import threading
@@ -91,20 +92,13 @@ def _set(k, v):
9192 except Exception as e :
9293 print (f"[launcher] Error loading config: { e } " )
9394
94- def _split_services (values : List [str ]) -> List [str ]:
95- flat = []
96- for v in values :
97- flat .extend (p .strip ().lower () for p in v .split ("," ) if p .strip ())
98- return list (dict .fromkeys (flat ))
99-
100- def _build_env (extra : Optional [Dict [str , str ]] = None ,
101- extra_pythonpath : Optional [List [str ]] = None ) -> Dict [str , str ]:
95+ def _build_env (extra : Optional [Dict [str , str ]] = None ) -> Dict [str , str ]:
10296 env = os .environ .copy ()
10397 env ["PYTHONUNBUFFERED" ] = "1"
10498 env ["PYTHONIOENCODING" ] = "utf-8"
10599 env ["PYTHONUTF8" ] = "1"
106100
107- paths = [str (CONTENT_SEARCH_DIR ), str (REPO_ROOT )] + [ str ( p ) for p in ( extra_pythonpath or [])]
101+ paths = [str (CONTENT_SEARCH_DIR ), str (REPO_ROOT )]
108102 if env .get ("PYTHONPATH" ):
109103 paths .append (env ["PYTHONPATH" ])
110104 env ["PYTHONPATH" ] = os .pathsep .join (paths )
@@ -120,7 +114,7 @@ def _build_env(extra: Optional[Dict[str, str]] = None,
120114
121115def _spawn (
122116 name : str , cmd : List [str ], cwd : Path , logs_dir : Path , procs : Dict , log_files : Dict ,
123- extra_env : Optional [Dict [str , str ]] = None , extra_pythonpath : Optional [ List [ str ]] = None ,
117+ extra_env : Optional [Dict [str , str ]] = None ,
124118) -> None :
125119 log_path = logs_dir / name / f"{ name } _{ time .strftime ('%Y%m%d_%H%M%S' )} .log"
126120 log_path .parent .mkdir (parents = True , exist_ok = True )
@@ -132,7 +126,7 @@ def _spawn(
132126 stdout = subprocess .PIPE , stderr = subprocess .STDOUT ,
133127 text = True , bufsize = 1 ,
134128 encoding = "utf-8" , errors = "replace" ,
135- env = _build_env (extra_env , extra_pythonpath ),
129+ env = _build_env (extra_env ),
136130 start_new_session = True ,
137131 )
138132 procs [name ] = p
@@ -150,82 +144,126 @@ def _tee(pipe, lf) -> None:
150144 threading .Thread (target = _tee , args = (p .stdout , log_file ), daemon = True ).start ()
151145 print (f"[launcher] Started { name } : pid={ p .pid } logs: { log_path } " )
152146
147+ def _check_health (host : str , port : int , path : str = "" ) -> bool :
148+ """Check service health. If path is given, do HTTP GET; otherwise just TCP connect."""
149+ try :
150+ s = socket .create_connection ((host , port ), timeout = 5 )
151+ if not path :
152+ s .close ()
153+ return True
154+ s .sendall (f"GET { path } HTTP/1.1\r \n Host: { host } :{ port } \r \n Connection: close\r \n \r \n " .encode ())
155+ data = b""
156+ while True :
157+ chunk = s .recv (4096 )
158+ if not chunk :
159+ break
160+ data += chunk
161+ if b"\r \n " in data :
162+ break
163+ s .close ()
164+ text = data .decode ("utf-8" , errors = "replace" )
165+ return text .startswith ("HTTP/" ) and int (text .split ()[1 ]) < 400
166+ except Exception :
167+ return False
168+
169+ def _env (key : str , default : str ) -> str :
170+ return os .environ .get (key , default )
171+
153172def main () -> None :
154173 _load_config_to_env ()
155174
156175 parser = argparse .ArgumentParser (description = "Start services via Environment Variables." )
157176 parser .add_argument ("--services" , nargs = "+" , default = ["chromadb" , "minio" , "vlm" , "preprocess" , "ingest" , "main_app" ])
158177 args = parser .parse_args ()
159178
160- requested = _split_services (args .services )
179+ requested = []
180+ for v in args .services :
181+ requested .extend (p .strip ().lower () for p in v .split ("," ) if p .strip ())
182+ requested = list (dict .fromkeys (requested ))
183+
161184 logs_dir = CONTENT_SEARCH_DIR / "logs"
162185 logs_dir .mkdir (parents = True , exist_ok = True )
163186
164- chroma_exe = os . environ . get ("CHROMA_EXE" )
187+ chroma_exe = _env ("CHROMA_EXE" , " " )
165188 if not chroma_exe :
166189 venv_exe = CONTENT_SEARCH_DIR / "venv_content_search" / "Scripts" / "chroma.exe"
167190 chroma_exe = str (venv_exe ) if venv_exe .exists () else "chroma"
168191 provider_minio = CONTENT_SEARCH_DIR / "providers" / "minio_wrapper" / "minio.exe"
169192 minio_exe = str (provider_minio ) if provider_minio .exists () else "minio"
170193
194+ # Each service: cmd, cwd, extra_env, health check (host, port, path), timeout
195+ # health path="" means TCP-only check
171196 services_meta = {
172197 "chromadb" : {
173- "cmd" : [chroma_exe , "run" ,
174- "--host" , os . environ . get ("CHROMA_HOST" , "127.0.0.1" ),
175- "--port" , os . environ . get ("CHROMA_PORT" , "9090" ),
176- "--path" , os . environ . get ("CHROMA_DATA_DIR" , "./chroma_data" )],
198+ "cmd" : [chroma_exe , "run" ,
199+ "--host" , _env ("CHROMA_HOST" , "127.0.0.1" ),
200+ "--port" , _env ("CHROMA_PORT" , "9090" ),
201+ "--path" , _env ("CHROMA_DATA_DIR" , "./chroma_data" )],
177202 "cwd" : CONTENT_SEARCH_DIR ,
203+ "health" : (_env ("CHROMA_HOST" , "127.0.0.1" ), int (_env ("CHROMA_PORT" , "9090" )), "" ),
204+ "health_timeout" : 60 ,
178205 },
179206 "minio" : {
180- "cmd" : [minio_exe , "server" , os . environ . get ("MINIO_DATA_DIR" , "./minio_data" ),
181- "--address" , os . environ . get ("MINIO_ADDRESS" , ":9000" ),
182- "--console-address" , os . environ . get ("MINIO_CONSOLE_ADDRESS" , ":9001" )],
207+ "cmd" : [minio_exe , "server" , _env ("MINIO_DATA_DIR" , "./minio_data" ),
208+ "--address" , _env ("MINIO_ADDRESS" , ":9000" ),
209+ "--console-address" , _env ("MINIO_CONSOLE_ADDRESS" , ":9001" )],
183210 "cwd" : CONTENT_SEARCH_DIR ,
184211 "extra_env" : {
185- "MINIO_ROOT_USER" : os . environ . get ("MINIO_ROOT_USER" , "minioadmin" ),
186- "MINIO_ROOT_PASSWORD" : os . environ . get ("MINIO_ROOT_PASSWORD" , "minioadmin" )
212+ "MINIO_ROOT_USER" : _env ("MINIO_ROOT_USER" , "minioadmin" ),
213+ "MINIO_ROOT_PASSWORD" : _env ("MINIO_ROOT_PASSWORD" , "minioadmin" ),
187214 },
215+ "health" : ("127.0.0.1" , int (_env ("MINIO_ADDRESS" , ":9000" ).lstrip (":" )), "/minio/health/live" ),
216+ "health_timeout" : 60 ,
188217 },
189218 "vlm" : {
190- "cmd" : [sys .executable , "-m" , "uvicorn" , "providers.vlm_openvino_serving.app:app" ,
191- "--host" , os . environ . get ("VLM_HOST" , "127.0.0.1" ),
192- "--port" , os . environ . get ("VLM_PORT" , "9900" )],
219+ "cmd" : [sys .executable , "-m" , "uvicorn" , "providers.vlm_openvino_serving.app:app" ,
220+ "--host" , _env ("VLM_HOST" , "127.0.0.1" ),
221+ "--port" , _env ("VLM_PORT" , "9900" )],
193222 "cwd" : CONTENT_SEARCH_DIR ,
194223 "extra_env" : {
195- "VLM_MODEL_NAME" : os . environ . get ("VLM_MODEL_NAME" , "Qwen/Qwen2.5-VL-3B-Instruct" ),
196- "VLM_DEVICE" : os . environ . get ("VLM_DEVICE" , "CPU" ),
224+ "VLM_MODEL_NAME" : _env ("VLM_MODEL_NAME" , "Qwen/Qwen2.5-VL-3B-Instruct" ),
225+ "VLM_DEVICE" : _env ("VLM_DEVICE" , "CPU" ),
197226 },
227+ "health" : (_env ("VLM_HOST" , "127.0.0.1" ), int (_env ("VLM_PORT" , "9900" )), "/health" ),
228+ "health_timeout" : 600 ,
198229 },
199230 "preprocess" : {
200- "cmd" : [sys .executable , "-m" , "uvicorn" , "providers.video_preprocess.server:app" ,
201- "--host" , os . environ . get ("PREPROCESS_HOST" , "127.0.0.1" ),
202- "--port" , os . environ . get ("PREPROCESS_PORT" , "8001" )],
231+ "cmd" : [sys .executable , "-m" , "uvicorn" , "providers.video_preprocess.server:app" ,
232+ "--host" , _env ("PREPROCESS_HOST" , "127.0.0.1" ),
233+ "--port" , _env ("PREPROCESS_PORT" , "8001" )],
203234 "cwd" : CONTENT_SEARCH_DIR ,
235+ "health" : (_env ("PREPROCESS_HOST" , "127.0.0.1" ), int (_env ("PREPROCESS_PORT" , "8001" )), "/health" ),
236+ "health_timeout" : 120 ,
204237 },
205238 "ingest" : {
206- "cmd" : [sys .executable , "-m" , "uvicorn" , "providers.file_ingest_and_retrieve.server:app" ,
207- "--host" , os . environ . get ("INGEST_HOST" , "127.0.0.1" ),
208- "--port" , os . environ . get ("INGEST_PORT" , "9990" )],
239+ "cmd" : [sys .executable , "-m" , "uvicorn" , "providers.file_ingest_and_retrieve.server:app" ,
240+ "--host" , _env ("INGEST_HOST" , "127.0.0.1" ),
241+ "--port" , _env ("INGEST_PORT" , "9990" )],
209242 "cwd" : CONTENT_SEARCH_DIR ,
243+ "health" : (_env ("INGEST_HOST" , "127.0.0.1" ), int (_env ("INGEST_PORT" , "9990" )), "/v1/dataprep/health" ),
244+ "health_timeout" : 300 ,
210245 },
211246 "main_app" : {
212- "cmd" : [sys .executable , "-m" , "uvicorn" , "main:app" ,
213- "--host" , os .environ .get ("CS_HOST" , "127.0.0.1" ),
214- "--port" , os .environ .get ("CS_PORT" , "9011" )],
215- "cwd" : CONTENT_SEARCH_DIR ,
247+ "cmd" : [sys .executable , "-m" , "uvicorn" , "main:app" ,
248+ "--host" , _env ("CS_HOST" , "127.0.0.1" ),
249+ "--port" , _env ("CS_PORT" , "9011" )],
250+ "cwd" : CONTENT_SEARCH_DIR ,
251+ "health" : (_env ("CS_HOST" , "127.0.0.1" ), int (_env ("CS_PORT" , "9011" )), "/api/v1/system/health" ),
252+ "health_timeout" : 120 ,
216253 },
217254 }
218255
256+ start_time = time .monotonic ()
219257 print (f"[launcher] Starting services from: { CONTENT_SEARCH_DIR } " )
220258 procs : Dict = {}
221259 log_files : Dict = {}
222260
223261 for sname in requested :
224262 if sname in services_meta :
225263 meta = services_meta [sname ]
226- _spawn (sname , meta ["cmd" ], meta ["cwd" ], logs_dir , procs , log_files ,
227- meta .get ("extra_env" ), meta .get ("extra_pythonpath" ))
264+ _spawn (sname , meta ["cmd" ], meta ["cwd" ], logs_dir , procs , log_files , meta .get ("extra_env" ))
228265 time .sleep (0.5 )
266+
229267 def _terminate_all () -> None :
230268 for name , p in procs .items ():
231269 if p .poll () is None :
@@ -241,7 +279,42 @@ def _handle_sig(signum, frame) -> None:
241279 signal .signal (signal .SIGINT , _handle_sig )
242280 signal .signal (signal .SIGTERM , _handle_sig )
243281
244- print ("[launcher] All services started. Press Ctrl+C to stop." )
282+ # --- Health check: poll each service in parallel ---
283+ print ("[launcher] Waiting for services to become ready..." )
284+ results : Dict [str , bool ] = {}
285+
286+ def _wait_healthy (name : str ) -> None :
287+ meta = services_meta [name ]
288+ host , port , path = meta ["health" ]
289+ timeout = meta .get ("health_timeout" , 60 )
290+ deadline = time .monotonic () + timeout
291+ while time .monotonic () < deadline :
292+ if procs [name ].poll () is not None :
293+ break
294+ if _check_health (host , port , path ):
295+ results [name ] = True
296+ return
297+ time .sleep (3 )
298+ rc = procs [name ].poll ()
299+ results [name ] = f"exited (code { rc } )" if rc is not None else f"not ready after { timeout } s"
300+
301+ threads = [threading .Thread (target = _wait_healthy , args = (s ,), daemon = True ) for s in procs ]
302+ for t in threads :
303+ t .start ()
304+ for t in threads :
305+ t .join ()
306+
307+ elapsed = time .monotonic () - start_time
308+ failed = {s : reason for s , reason in results .items () if reason is not True }
309+ print ()
310+ if failed :
311+ details = ", " .join (f"{ s } ({ reason } )" for s , reason in failed .items ())
312+ print (f"[launcher] WARNING: { len (failed )} service(s) failed: { details } " )
313+ print (f"[launcher] Check logs in: { logs_dir } /" )
314+ else :
315+ print (f"[launcher] All { len (results )} services are ready. (startup took { elapsed :.1f} s)" )
316+ print (f"[launcher] You can use Ctrl+C to stop all services.\n " )
317+
245318 try :
246319 while True :
247320 time .sleep (1.0 )
@@ -258,4 +331,4 @@ def _handle_sig(signum, frame) -> None:
258331 except : pass
259332
260333if __name__ == "__main__" :
261- main ()
334+ main ()
0 commit comments