Skip to content

Commit 2bf9e40

Browse files
HevagogdJaniga
andauthored
Real-Time worker logs (#60)
Co-authored-by: Damian Janiga <[email protected]>
1 parent f53ae73 commit 2bf9e40

File tree

9 files changed

+371
-101
lines changed

9 files changed

+371
-101
lines changed

src/logger/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
from logger.numeric_logger import get_csv_logger
22
from logger.orchestration_logger import log_docker_logs
3+
from logger.stream_reader import stream_reader
34
from logger.u_logger import configure_logger, get_logger
45

56
__all__ = [
67
"get_csv_logger",
78
"configure_logger",
89
"get_logger",
910
"log_docker_logs",
11+
"stream_reader",
1012
]

src/logger/orchestration_logger.py

Lines changed: 62 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,12 @@
11
import subprocess
22
from logging import Logger
33
from pathlib import Path
4-
from typing import List
54

6-
from logger.utils import get_external_console_logging
5+
from logger.utils import get_external_console_logging, get_services, get_services_id
76

87
TAIL_LINES = 100
9-
WORKER_PREFIX = "worker"
10-
SIM_SERVICE_NAME = "simulation_server"
11-
12-
13-
def get_services() -> List[str]:
14-
"""Return a list of all docker compose service names."""
15-
try:
16-
container_ids = subprocess.check_output(
17-
["docker", "ps", "-q"], text=True
18-
).splitlines()
19-
20-
services = set()
21-
for cid in container_ids:
22-
inspect_output = subprocess.check_output(
23-
[
24-
"docker",
25-
"inspect",
26-
"--format",
27-
'{{ index .Config.Labels "com.docker.compose.service" }}',
28-
cid,
29-
],
30-
text=True,
31-
).strip()
32-
if inspect_output and inspect_output != "<no value>":
33-
services.add(inspect_output)
34-
return list(services)
35-
except Exception as e:
36-
raise RuntimeError(
37-
"Failed to get Docker services. Ensure Docker is running and you have access to it."
38-
) from e
8+
WORKER_PREFIX = "simulation-worker"
9+
SIM_SERVICE_NAME = "simulation-server"
3910

4011

4112
def _start_external_xterm_log_terminal(title: str, command: str) -> None:
@@ -57,6 +28,8 @@ def _start_external_log_terminal(title: str, command: str) -> None:
5728
title,
5829
command,
5930
)
31+
except Exception as e:
32+
raise RuntimeError(f"Failed to open external terminal for logging: {e}") from e
6033

6134

6235
def _start_file_logging(command: str) -> None:
@@ -69,7 +42,7 @@ def _start_file_logging(command: str) -> None:
6942
)
7043

7144

72-
def log_docker_logs(logger: Logger) -> None:
45+
def log_docker_logs(logger: Logger, disperse_worker_logs: bool = True) -> None:
7346
"""Fetch and log docker logs for workers and simulation service."""
7447
assert logger is not None, "Logger must be initialized before logging Docker logs."
7548

@@ -78,7 +51,7 @@ def log_docker_logs(logger: Logger) -> None:
7851
log_dir.mkdir(exist_ok=True)
7952

8053
try:
81-
services = get_services()
54+
services = get_services(logger)
8255
except RuntimeError as e:
8356
logger.error(f"Error fetching Docker services: {e}")
8457
return
@@ -89,35 +62,69 @@ def log_docker_logs(logger: Logger) -> None:
8962
return
9063

9164
worker_services = sorted([s for s in services if s.startswith(WORKER_PREFIX)])
92-
93-
worker_log_path = (log_dir / "docker_workers.log").resolve()
9465
sim_log_path = (log_dir / "docker_simulation_server.log").resolve()
9566

67+
logger.info(f"Logging Docker worker services: {worker_services}")
68+
9669
if worker_services:
97-
worker_cmd = f"docker compose logs --tail {TAIL_LINES} --follow {' '.join(worker_services)}"
70+
if disperse_worker_logs:
71+
for worker_service in worker_services:
72+
worker_log_path = (
73+
log_dir / f'docker_{worker_service.replace("-", "_")}.log'
74+
).resolve()
75+
worker_cmd = (
76+
f"docker logs --tail {TAIL_LINES} --follow {worker_service}"
77+
)
78+
79+
if get_external_console_logging():
80+
_start_external_log_terminal(
81+
f"Docker Logs: {worker_service}",
82+
f"{worker_cmd} | stdbuf -oL -eL tee '{worker_log_path}'",
83+
)
84+
logger.info(
85+
f"Opened external terminal for {worker_service} logs (also logging to {worker_log_path})."
86+
)
87+
88+
_start_file_logging(f"{worker_cmd} > '{worker_log_path}'")
89+
logger.info(f"Logging {worker_service} logs to file: {worker_log_path}")
90+
else:
91+
worker_log_path = (log_dir / "docker_workers.log").resolve()
92+
worker_services = sorted(
93+
[s for s in get_services_id() if s.startswith(WORKER_PREFIX)]
94+
)
95+
96+
logger.info(f"Aggregating logs for worker services: {worker_services}")
97+
worker_cmd = f"docker compose logs --tail {TAIL_LINES} --follow {' '.join(worker_services)}"
98+
99+
if get_external_console_logging():
100+
_start_external_log_terminal(
101+
"Docker Worker Logs",
102+
f"{worker_cmd} | stdbuf -oL -eL tee '{worker_log_path}'",
103+
)
104+
logger.info(
105+
f"Opened external terminal for aggregated worker logs (also logging to {worker_log_path})."
106+
)
107+
108+
_start_file_logging(f"{worker_cmd} > '{worker_log_path}'")
109+
logger.info(f"Logging aggregated worker logs to file: {worker_log_path}")
110+
111+
if SIM_SERVICE_NAME in services:
112+
sim_cmd = f"docker logs --tail {TAIL_LINES} --follow {SIM_SERVICE_NAME}"
98113

99114
if get_external_console_logging():
100115
_start_external_log_terminal(
101-
"Docker Worker Logs",
102-
f"{worker_cmd} | stdbuf -oL -eL tee '{worker_log_path}'",
116+
"Docker Simulation Server Logs",
117+
f"{sim_cmd} | stdbuf -oL -eL tee '{sim_log_path}'",
103118
)
104119
logger.info(
105-
f"Opened external terminal for worker logs (also logging to {worker_log_path})."
120+
f"Opened external terminal for simulation logs (also logging to {sim_log_path})."
106121
)
107122

108-
_start_file_logging(f"{worker_cmd} > '{worker_log_path}'")
109-
logger.info(f"Logging worker logs to file: {worker_log_path}")
110-
111-
sim_cmd = f"docker logs --tail {TAIL_LINES} --follow {SIM_SERVICE_NAME}"
112-
113-
if get_external_console_logging():
114-
_start_external_log_terminal(
115-
"Docker Simulation Server Logs",
116-
f"{sim_cmd} | stdbuf -oL -eL tee '{sim_log_path}'",
123+
_start_file_logging(f"{sim_cmd} > '{sim_log_path}'")
124+
logger.info(f"Logging simulation logs to file: {sim_log_path}")
125+
elif SIM_SERVICE_NAME not in services and any(
126+
s.startswith(WORKER_PREFIX) for s in services
127+
):
128+
logger.warning(
129+
f"Simulation service '{SIM_SERVICE_NAME}' not found. Skipping simulation server logs."
117130
)
118-
logger.info(
119-
f"Opened external terminal for simulation logs (also logging to {sim_log_path})."
120-
)
121-
122-
_start_file_logging(f"{sim_cmd} > '{sim_log_path}'")
123-
logger.info(f"Logging simulation logs to file: {sim_log_path}")

src/logger/stream_reader.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import io
2+
from typing import Callable
3+
4+
5+
def stream_reader(
6+
stream: io.TextIOWrapper | None,
7+
output_list: list[str],
8+
log_with_level: Callable,
9+
) -> None:
10+
if stream:
11+
try:
12+
for line in iter(stream.readline, ""):
13+
line_content = line.strip()
14+
log_with_level(line_content)
15+
output_list.append(line_content)
16+
finally:
17+
stream.close()

src/logger/utils.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import os
2-
from typing import Any, Dict
2+
import subprocess
3+
from logging import Logger
4+
from typing import Any, Dict, List
35

46

57
def get_logger_profile() -> str:
@@ -87,3 +89,43 @@ def get_external_console_logging() -> bool:
8789
The value of external_docker_log_console.
8890
"""
8991
return bool(get_logging_output()["external_docker_log_console"])
92+
93+
94+
def get_services(logger: Logger) -> List[str]:
95+
"""Return a list of all docker compose service names, including numbered workers."""
96+
try:
97+
compose_services = subprocess.check_output(
98+
["docker", "ps", "--format", "'{{.Names}}'"], text=True
99+
).splitlines()
100+
return [service.strip("'") for service in compose_services if service.strip()]
101+
except Exception as e:
102+
raise RuntimeError(
103+
"Failed to get Docker services. Ensure Docker is running and you have access to it."
104+
) from e
105+
106+
107+
def get_services_id() -> List[str]:
108+
try:
109+
container_ids = subprocess.check_output(
110+
["docker", "ps", "-q"], text=True
111+
).splitlines()
112+
113+
services = set()
114+
for cid in container_ids:
115+
inspect_output = subprocess.check_output(
116+
[
117+
"docker",
118+
"inspect",
119+
"--format",
120+
'{{ index .Config.Labels "com.docker.compose.service" }}',
121+
cid,
122+
],
123+
text=True,
124+
).strip()
125+
if inspect_output and inspect_output != "<no value>":
126+
services.add(inspect_output)
127+
return list(services)
128+
except Exception as e:
129+
raise RuntimeError(
130+
"Failed to get Docker services. Ensure Docker is running and you have access to it."
131+
) from e

src/services/simulation_service/core/connectors/conn_utils/__init__.py

Whitespace-only changes.
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import subprocess
2+
import threading
3+
from typing import Callable
4+
5+
6+
class ManagedSubprocess:
7+
def __init__(
8+
self,
9+
command_args: list[str],
10+
stream_reader_func: Callable,
11+
logger_info_func: Callable,
12+
logger_error_func: Callable,
13+
text: bool = True,
14+
):
15+
self.command_args = command_args
16+
self.text = text
17+
self.stream_reader_func = stream_reader_func
18+
self.logger_info_func = logger_info_func
19+
self.logger_error_func = logger_error_func
20+
21+
self.process: subprocess.Popen | None = None
22+
self.stdout_thread: threading.Thread | None = None
23+
self.stderr_thread: threading.Thread | None = None
24+
self.stdout_lines: list[str] = []
25+
self.stderr_lines: list[str] = []
26+
27+
def __enter__(self) -> subprocess.Popen:
28+
self.logger_info_func(f"Starting subprocess: {' '.join(self.command_args[:3])}")
29+
try:
30+
self.process = subprocess.Popen(
31+
self.command_args,
32+
stdout=subprocess.PIPE,
33+
stderr=subprocess.PIPE,
34+
text=self.text,
35+
)
36+
except Exception:
37+
self.logger_error_func(
38+
f"Failed to start subprocess with command: {' '.join(self.command_args[:3])}"
39+
)
40+
self.process = None
41+
raise # Re-raise the exception from Popen
42+
43+
if self.process.stdout is None or self.process.stderr is None:
44+
if self.process:
45+
self.process.kill()
46+
self.process.wait()
47+
raise RuntimeError("Subprocess stdout/stderr streams are not available.")
48+
49+
self.stdout_thread = threading.Thread(
50+
target=self.stream_reader_func,
51+
args=(self.process.stdout, self.stdout_lines, self.logger_info_func),
52+
daemon=True,
53+
)
54+
self.stderr_thread = threading.Thread(
55+
target=self.stream_reader_func,
56+
args=(self.process.stderr, self.stderr_lines, self.logger_error_func),
57+
daemon=True,
58+
)
59+
60+
self.stdout_thread.start()
61+
self.stderr_thread.start()
62+
63+
return self.process
64+
65+
def __exit__(self, exc_type, exc_val, exc_tb):
66+
if self.process and self.process.poll() is None:
67+
self.logger_warning_func = getattr(
68+
self, "logger_warning_func", self.logger_error_func
69+
)
70+
self.logger_warning_func(
71+
f"Subprocess (PID: {self.process.pid}) still running upon exiting context. Exception: {exc_type}. Terminating."
72+
)
73+
self.process.terminate()
74+
try:
75+
self.process.wait(timeout=5)
76+
except subprocess.TimeoutExpired:
77+
self.logger_warning_func(
78+
f"Subprocess (PID: {self.process.pid}) did not terminate gracefully. Killing."
79+
)
80+
self.process.kill()
81+
self.process.wait()
82+
83+
# Join threads to ensure all output is processed and resources are released
84+
if self.stdout_thread and self.stdout_thread.is_alive():
85+
self.stdout_thread.join(timeout=2)
86+
if self.stderr_thread and self.stderr_thread.is_alive():
87+
self.stderr_thread.join(timeout=2)
88+
89+
return False

0 commit comments

Comments
 (0)