Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions education-ai-suite/smart-classroom/content_search/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,21 @@ Once the environment is configured, activate the virtual environment and launch
# Start all microservices
python .\start_services.py
```
> **Note**: For the first-time execution, the service may take several minutes to fully start. This is because the system needs to download pre-trained AI models.
> **Note**: For the first-time execution, the service may take several minutes to fully start. This is because the system needs to download pre-trained AI models (such as CLIP, BGE, and Qwen VLM).

> **Note**: Upon a successful launch, the console output should not contain any "ERROR" logs.
The launcher automatically performs health checks on all services. When all services are ready, you will see:
```
[launcher] All 6 services are ready. (startup took XXs)
[launcher] You can use Ctrl+C to stop all services.
```

If any service fails to start, the launcher will report which service(s) failed:
```
[launcher] WARNING: 1 service(s) failed: vlm (not ready after 600s)
[launcher] Check logs in: <path>/content_search/logs/
```

To verify that the Content Search service is running correctly, execute the following command:
You can also manually verify the service status:
```PowerShell
Invoke-RestMethod -Uri "http://127.0.0.1:9011/api/v1/system/health"
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def format(self, record):
logger.info(f"Loaded environment variables from {env_path}")
else:
logger.info(
f".env file not found at {env_path}. Using environment variables from docker-compose."
f".env file not found at {env_path}. Using environment variables."
)

logger.debug(f"Settings: {settings.model_dump()}")
155 changes: 114 additions & 41 deletions education-ai-suite/smart-classroom/content_search/start_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import argparse
import os
import signal
import socket
import subprocess
import sys
import threading
Expand Down Expand Up @@ -91,20 +92,13 @@ def _set(k, v):
except Exception as e:
print(f"[launcher] Error loading config: {e}")

def _split_services(values: List[str]) -> List[str]:
flat = []
for v in values:
flat.extend(p.strip().lower() for p in v.split(",") if p.strip())
return list(dict.fromkeys(flat))

def _build_env(extra: Optional[Dict[str, str]] = None,
extra_pythonpath: Optional[List[str]] = None) -> Dict[str, str]:
def _build_env(extra: Optional[Dict[str, str]] = None) -> Dict[str, str]:
env = os.environ.copy()
env["PYTHONUNBUFFERED"] = "1"
env["PYTHONIOENCODING"] = "utf-8"
env["PYTHONUTF8"] = "1"

paths = [str(CONTENT_SEARCH_DIR), str(REPO_ROOT)] + [str(p) for p in (extra_pythonpath or [])]
paths = [str(CONTENT_SEARCH_DIR), str(REPO_ROOT)]
if env.get("PYTHONPATH"):
paths.append(env["PYTHONPATH"])
env["PYTHONPATH"] = os.pathsep.join(paths)
Expand All @@ -120,7 +114,7 @@ def _build_env(extra: Optional[Dict[str, str]] = None,

def _spawn(
name: str, cmd: List[str], cwd: Path, logs_dir: Path, procs: Dict, log_files: Dict,
extra_env: Optional[Dict[str, str]] = None, extra_pythonpath: Optional[List[str]] = None,
extra_env: Optional[Dict[str, str]] = None,
) -> None:
log_path = logs_dir / name / f"{name}_{time.strftime('%Y%m%d_%H%M%S')}.log"
log_path.parent.mkdir(parents=True, exist_ok=True)
Expand All @@ -132,7 +126,7 @@ def _spawn(
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
text=True, bufsize=1,
encoding="utf-8", errors="replace",
env=_build_env(extra_env, extra_pythonpath),
env=_build_env(extra_env),
start_new_session=True,
)
procs[name] = p
Expand All @@ -150,82 +144,126 @@ def _tee(pipe, lf) -> None:
threading.Thread(target=_tee, args=(p.stdout, log_file), daemon=True).start()
print(f"[launcher] Started {name}: pid={p.pid} logs: {log_path}")

def _check_health(host: str, port: int, path: str = "") -> bool:
"""Check service health. If path is given, do HTTP GET; otherwise just TCP connect."""
try:
s = socket.create_connection((host, port), timeout=5)
if not path:
s.close()
return True
s.sendall(f"GET {path} HTTP/1.1\r\nHost: {host}:{port}\r\nConnection: close\r\n\r\n".encode())
data = b""
while True:
chunk = s.recv(4096)
if not chunk:
break
data += chunk
if b"\r\n" in data:
break
s.close()
text = data.decode("utf-8", errors="replace")
return text.startswith("HTTP/") and int(text.split()[1]) < 400
except Exception:
return False

def _env(key: str, default: str) -> str:
return os.environ.get(key, default)

def main() -> None:
_load_config_to_env()

parser = argparse.ArgumentParser(description="Start services via Environment Variables.")
parser.add_argument("--services", nargs="+", default=["chromadb", "minio", "vlm", "preprocess", "ingest", "main_app"])
args = parser.parse_args()

requested = _split_services(args.services)
requested = []
for v in args.services:
requested.extend(p.strip().lower() for p in v.split(",") if p.strip())
requested = list(dict.fromkeys(requested))

logs_dir = CONTENT_SEARCH_DIR / "logs"
logs_dir.mkdir(parents=True, exist_ok=True)

chroma_exe = os.environ.get("CHROMA_EXE")
chroma_exe = _env("CHROMA_EXE", "")
if not chroma_exe:
venv_exe = CONTENT_SEARCH_DIR / "venv_content_search" / "Scripts" / "chroma.exe"
chroma_exe = str(venv_exe) if venv_exe.exists() else "chroma"
provider_minio = CONTENT_SEARCH_DIR / "providers" / "minio_wrapper" / "minio.exe"
minio_exe = str(provider_minio) if provider_minio.exists() else "minio"

# Each service: cmd, cwd, extra_env, health check (host, port, path), timeout
# health path="" means TCP-only check
services_meta = {
"chromadb": {
"cmd": [chroma_exe, "run",
"--host", os.environ.get("CHROMA_HOST", "127.0.0.1"),
"--port", os.environ.get("CHROMA_PORT", "9090"),
"--path", os.environ.get("CHROMA_DATA_DIR", "./chroma_data")],
"cmd": [chroma_exe, "run",
"--host", _env("CHROMA_HOST", "127.0.0.1"),
"--port", _env("CHROMA_PORT", "9090"),
"--path", _env("CHROMA_DATA_DIR", "./chroma_data")],
"cwd": CONTENT_SEARCH_DIR,
"health": (_env("CHROMA_HOST", "127.0.0.1"), int(_env("CHROMA_PORT", "9090")), ""),
"health_timeout": 60,
},
"minio": {
"cmd": [minio_exe, "server", os.environ.get("MINIO_DATA_DIR", "./minio_data"),
"--address", os.environ.get("MINIO_ADDRESS", ":9000"),
"--console-address", os.environ.get("MINIO_CONSOLE_ADDRESS", ":9001")],
"cmd": [minio_exe, "server", _env("MINIO_DATA_DIR", "./minio_data"),
"--address", _env("MINIO_ADDRESS", ":9000"),
"--console-address", _env("MINIO_CONSOLE_ADDRESS", ":9001")],
"cwd": CONTENT_SEARCH_DIR,
"extra_env": {
"MINIO_ROOT_USER": os.environ.get("MINIO_ROOT_USER", "minioadmin"),
"MINIO_ROOT_PASSWORD": os.environ.get("MINIO_ROOT_PASSWORD", "minioadmin")
"MINIO_ROOT_USER": _env("MINIO_ROOT_USER", "minioadmin"),
"MINIO_ROOT_PASSWORD": _env("MINIO_ROOT_PASSWORD", "minioadmin"),
},
"health": ("127.0.0.1", int(_env("MINIO_ADDRESS", ":9000").lstrip(":")), "/minio/health/live"),
"health_timeout": 60,
},
"vlm": {
"cmd": [sys.executable, "-m", "uvicorn", "providers.vlm_openvino_serving.app:app",
"--host", os.environ.get("VLM_HOST", "127.0.0.1"),
"--port", os.environ.get("VLM_PORT", "9900")],
"cmd": [sys.executable, "-m", "uvicorn", "providers.vlm_openvino_serving.app:app",
"--host", _env("VLM_HOST", "127.0.0.1"),
"--port", _env("VLM_PORT", "9900")],
"cwd": CONTENT_SEARCH_DIR,
"extra_env": {
"VLM_MODEL_NAME": os.environ.get("VLM_MODEL_NAME", "Qwen/Qwen2.5-VL-3B-Instruct"),
"VLM_DEVICE": os.environ.get("VLM_DEVICE", "CPU"),
"VLM_MODEL_NAME": _env("VLM_MODEL_NAME", "Qwen/Qwen2.5-VL-3B-Instruct"),
"VLM_DEVICE": _env("VLM_DEVICE", "CPU"),
},
"health": (_env("VLM_HOST", "127.0.0.1"), int(_env("VLM_PORT", "9900")), "/health"),
"health_timeout": 600,
},
"preprocess": {
"cmd": [sys.executable, "-m", "uvicorn", "providers.video_preprocess.server:app",
"--host", os.environ.get("PREPROCESS_HOST", "127.0.0.1"),
"--port", os.environ.get("PREPROCESS_PORT", "8001")],
"cmd": [sys.executable, "-m", "uvicorn", "providers.video_preprocess.server:app",
"--host", _env("PREPROCESS_HOST", "127.0.0.1"),
"--port", _env("PREPROCESS_PORT", "8001")],
"cwd": CONTENT_SEARCH_DIR,
"health": (_env("PREPROCESS_HOST", "127.0.0.1"), int(_env("PREPROCESS_PORT", "8001")), "/health"),
"health_timeout": 120,
},
"ingest": {
"cmd": [sys.executable, "-m", "uvicorn", "providers.file_ingest_and_retrieve.server:app",
"--host", os.environ.get("INGEST_HOST", "127.0.0.1"),
"--port", os.environ.get("INGEST_PORT", "9990")],
"cmd": [sys.executable, "-m", "uvicorn", "providers.file_ingest_and_retrieve.server:app",
"--host", _env("INGEST_HOST", "127.0.0.1"),
"--port", _env("INGEST_PORT", "9990")],
"cwd": CONTENT_SEARCH_DIR,
"health": (_env("INGEST_HOST", "127.0.0.1"), int(_env("INGEST_PORT", "9990")), "/v1/dataprep/health"),
"health_timeout": 300,
},
"main_app": {
"cmd": [sys.executable, "-m", "uvicorn", "main:app",
"--host", os.environ.get("CS_HOST", "127.0.0.1"),
"--port", os.environ.get("CS_PORT", "9011")],
"cwd": CONTENT_SEARCH_DIR,
"cmd": [sys.executable, "-m", "uvicorn", "main:app",
"--host", _env("CS_HOST", "127.0.0.1"),
"--port", _env("CS_PORT", "9011")],
"cwd": CONTENT_SEARCH_DIR,
"health": (_env("CS_HOST", "127.0.0.1"), int(_env("CS_PORT", "9011")), "/api/v1/system/health"),
"health_timeout": 120,
},
}

start_time = time.monotonic()
print(f"[launcher] Starting services from: {CONTENT_SEARCH_DIR}")
procs: Dict = {}
log_files: Dict = {}

for sname in requested:
if sname in services_meta:
meta = services_meta[sname]
_spawn(sname, meta["cmd"], meta["cwd"], logs_dir, procs, log_files,
meta.get("extra_env"), meta.get("extra_pythonpath"))
_spawn(sname, meta["cmd"], meta["cwd"], logs_dir, procs, log_files, meta.get("extra_env"))
time.sleep(0.5)

def _terminate_all() -> None:
for name, p in procs.items():
if p.poll() is None:
Expand All @@ -241,7 +279,42 @@ def _handle_sig(signum, frame) -> None:
signal.signal(signal.SIGINT, _handle_sig)
signal.signal(signal.SIGTERM, _handle_sig)

print("[launcher] All services started. Press Ctrl+C to stop.")
# --- Health check: poll each service in parallel ---
print("[launcher] Waiting for services to become ready...")
results: Dict[str, bool] = {}

def _wait_healthy(name: str) -> None:
meta = services_meta[name]
host, port, path = meta["health"]
timeout = meta.get("health_timeout", 60)
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if procs[name].poll() is not None:
break
if _check_health(host, port, path):
results[name] = True
return
time.sleep(3)
rc = procs[name].poll()
results[name] = f"exited (code {rc})" if rc is not None else f"not ready after {timeout}s"

threads = [threading.Thread(target=_wait_healthy, args=(s,), daemon=True) for s in procs]
for t in threads:
t.start()
for t in threads:
t.join()

elapsed = time.monotonic() - start_time
failed = {s: reason for s, reason in results.items() if reason is not True}
print()
if failed:
details = ", ".join(f"{s} ({reason})" for s, reason in failed.items())
print(f"[launcher] WARNING: {len(failed)} service(s) failed: {details}")
print(f"[launcher] Check logs in: {logs_dir}/")
else:
print(f"[launcher] All {len(results)} services are ready. (startup took {elapsed:.1f}s)")
print(f"[launcher] You can use Ctrl+C to stop all services.\n")

try:
while True:
time.sleep(1.0)
Expand All @@ -258,4 +331,4 @@ def _handle_sig(signum, frame) -> None:
except: pass

if __name__ == "__main__":
main()
main()
Loading