Skip to content

Commit 3a2b56c

Browse files
core: Use new service-manager
Signed-off-by: Patrick José Pereira <patrickelectric@gmail.com>
1 parent 077ffae commit 3a2b56c

File tree

18 files changed

+2778
-86
lines changed

18 files changed

+2778
-86
lines changed

core/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ wifi = { workspace = true }
6565

6666
[tool.uv.workspace]
6767
members = ["services/*", "libs/*"]
68-
exclude = ["services/versionchooser", "services/kraken", "services/helper"]
68+
exclude = ["services/helper", "services/kraken", "services/service_manager", "services/versionchooser"]
6969

7070
# ------------------ Tooling Configurations ------------------
7171

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
"""Service Manager - Linux service supervisor with cgroups v2 resource control."""
2+
3+
__version__ = "0.1.0"
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
"""Entry point for service-manager daemon."""
2+
3+
import argparse
4+
import asyncio
5+
import sys
6+
from pathlib import Path
7+
8+
from service_manager import __version__
9+
from service_manager.config import AgentConfig
10+
from service_manager.daemon import AgentDaemon, PidLock, daemonize
11+
12+
13+
def main() -> int:
14+
"""Main entry point."""
15+
parser = argparse.ArgumentParser(
16+
prog="service-manager",
17+
description="Linux service supervisor with cgroups v2 resource control",
18+
)
19+
parser.add_argument(
20+
"-c",
21+
"--config",
22+
type=Path,
23+
help="Path to config file",
24+
)
25+
parser.add_argument(
26+
"-f",
27+
"--foreground",
28+
action="store_true",
29+
help="Run in foreground (don't daemonize)",
30+
)
31+
parser.add_argument(
32+
"-V",
33+
"--version",
34+
action="version",
35+
version=f"%(prog)s {__version__}",
36+
)
37+
38+
args = parser.parse_args()
39+
40+
# Daemonize BEFORE starting asyncio (only on Linux, and only if not foreground)
41+
if not args.foreground and sys.platform == "linux":
42+
config = AgentConfig.load_or_default(args.config)
43+
pid_lock = PidLock(config.pid_file)
44+
45+
# Check if already running before forking
46+
if PidLock.is_running(config.pid_file):
47+
print("Another instance is already running", file=sys.stderr)
48+
return 1
49+
50+
daemonize()
51+
52+
# Acquire lock after daemonizing (we have a new PID now)
53+
if not pid_lock.acquire():
54+
return 1
55+
56+
daemon = AgentDaemon(
57+
config_path=args.config,
58+
foreground=args.foreground or sys.platform != "linux",
59+
)
60+
61+
try:
62+
asyncio.run(daemon.run())
63+
except KeyboardInterrupt:
64+
pass
65+
66+
return 0
67+
68+
69+
if __name__ == "__main__":
70+
sys.exit(main())
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
"""cgroup v2 controller for resource limits and metrics."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
import os
7+
from dataclasses import dataclass
8+
from pathlib import Path
9+
10+
from service_manager.config import ResourceLimits
11+
12+
log = logging.getLogger(__name__)
13+
14+
15+
@dataclass
16+
class CgroupMetrics:
17+
"""Metrics read from cgroup."""
18+
19+
cpu_usage_usec: int = 0
20+
memory_current: int = 0
21+
memory_peak: int = 0
22+
io_read_bytes: int = 0
23+
io_write_bytes: int = 0
24+
pids_current: int = 0
25+
26+
27+
class CgroupController:
28+
"""Manages cgroup v2 hierarchy for services."""
29+
30+
def __init__(self, root: Path):
31+
self.root = root
32+
33+
def _cgroup_path(self, name: str) -> Path:
34+
"""Get cgroup directory path for a service."""
35+
return self.root / f"svc-{name}"
36+
37+
async def ensure_root(self) -> None:
38+
"""Ensure the root cgroup directory exists."""
39+
if not self.root.exists():
40+
log.info(f"Creating cgroup root: {self.root}")
41+
try:
42+
self.root.mkdir(parents=True, exist_ok=True)
43+
except PermissionError:
44+
log.warning(
45+
f"Cannot create cgroup root {self.root}. "
46+
"Running without cgroup support. "
47+
"Run as root or configure systemd delegation."
48+
)
49+
50+
async def create(self, name: str, limits: ResourceLimits) -> Path | None:
51+
"""Create cgroup for a service and apply limits."""
52+
path = self._cgroup_path(name)
53+
54+
if not self.root.exists():
55+
log.debug(f"cgroup root not available, skipping cgroup for {name}")
56+
return None
57+
58+
try:
59+
path.mkdir(exist_ok=True)
60+
await self._apply_limits(path, limits)
61+
log.debug(f"Created cgroup: {path}")
62+
return path
63+
except PermissionError as e:
64+
log.warning(f"Cannot create cgroup for {name}: {e}")
65+
return None
66+
except OSError as e:
67+
log.error(f"Failed to create cgroup for {name}: {e}")
68+
return None
69+
70+
async def _apply_limits(self, path: Path, limits: ResourceLimits) -> None:
71+
"""Apply resource limits to a cgroup."""
72+
# CPU limit: cpu.max format is "quota period" in microseconds
73+
# e.g., "150000 100000" means 1.5 cores (150ms per 100ms period)
74+
if limits.cpu_cores is not None:
75+
quota = int(limits.cpu_cores * 100000)
76+
period = 100000
77+
self._write_file(path / "cpu.max", f"{quota} {period}")
78+
79+
# Memory limit
80+
if limits.memory_mb is not None:
81+
memory_bytes = limits.memory_mb * 1024 * 1024
82+
self._write_file(path / "memory.max", str(memory_bytes))
83+
84+
# PID limit
85+
if limits.max_pids is not None:
86+
self._write_file(path / "pids.max", str(limits.max_pids))
87+
88+
# IO limits (requires knowing the device major:minor)
89+
if limits.io_read_mbps is not None or limits.io_write_mbps is not None:
90+
log.debug("IO bandwidth limits require device specification, skipping")
91+
92+
def _write_file(self, path: Path, value: str) -> None:
93+
"""Write a value to a cgroup control file."""
94+
try:
95+
path.write_text(value)
96+
except PermissionError:
97+
log.warning(f"Cannot write to {path}: permission denied")
98+
except OSError as e:
99+
log.warning(f"Cannot write to {path}: {e}")
100+
101+
def _read_file(self, path: Path) -> str:
102+
"""Read a cgroup control file."""
103+
try:
104+
return path.read_text().strip()
105+
except (OSError, FileNotFoundError):
106+
return ""
107+
108+
async def add_pid(self, name: str, pid: int) -> bool:
109+
"""Add a process to a service's cgroup."""
110+
path = self._cgroup_path(name)
111+
procs_file = path / "cgroup.procs"
112+
113+
if not procs_file.exists():
114+
return False
115+
116+
try:
117+
procs_file.write_text(str(pid))
118+
log.debug(f"Added PID {pid} to cgroup {name}")
119+
return True
120+
except OSError as e:
121+
log.warning(f"Cannot add PID {pid} to cgroup {name}: {e}")
122+
return False
123+
124+
async def destroy(self, name: str) -> None:
125+
"""Remove a service's cgroup."""
126+
path = self._cgroup_path(name)
127+
128+
if not path.exists():
129+
return
130+
131+
# Kill any remaining processes
132+
procs_file = path / "cgroup.procs"
133+
if procs_file.exists():
134+
pids = self._read_file(procs_file).split()
135+
for pid_str in pids:
136+
try:
137+
os.kill(int(pid_str), 9)
138+
except (OSError, ValueError):
139+
pass
140+
141+
# Remove the cgroup directory
142+
try:
143+
path.rmdir()
144+
log.debug(f"Destroyed cgroup: {path}")
145+
except OSError as e:
146+
log.warning(f"Cannot remove cgroup {path}: {e}")
147+
148+
# pylint: disable=too-many-branches
149+
async def read_metrics(self, name: str) -> CgroupMetrics | None:
150+
"""Read current metrics from a service's cgroup."""
151+
path = self._cgroup_path(name)
152+
153+
if not path.exists():
154+
return None
155+
156+
metrics = CgroupMetrics()
157+
158+
# CPU usage from cpu.stat
159+
cpu_stat = self._read_file(path / "cpu.stat")
160+
for line in cpu_stat.splitlines():
161+
if line.startswith("usage_usec"):
162+
try:
163+
metrics.cpu_usage_usec = int(line.split()[1])
164+
except (IndexError, ValueError):
165+
pass
166+
167+
# Memory
168+
memory_current = self._read_file(path / "memory.current")
169+
if memory_current:
170+
try:
171+
metrics.memory_current = int(memory_current)
172+
except ValueError:
173+
pass
174+
175+
memory_peak = self._read_file(path / "memory.peak")
176+
if memory_peak:
177+
try:
178+
metrics.memory_peak = int(memory_peak)
179+
except ValueError:
180+
pass
181+
182+
# IO stats from io.stat
183+
io_stat = self._read_file(path / "io.stat")
184+
for line in io_stat.splitlines():
185+
parts = line.split()
186+
for part in parts:
187+
if part.startswith("rbytes="):
188+
try:
189+
metrics.io_read_bytes += int(part.split("=")[1])
190+
except (IndexError, ValueError):
191+
pass
192+
elif part.startswith("wbytes="):
193+
try:
194+
metrics.io_write_bytes += int(part.split("=")[1])
195+
except (IndexError, ValueError):
196+
pass
197+
198+
# PIDs
199+
pids_current = self._read_file(path / "pids.current")
200+
if pids_current:
201+
try:
202+
metrics.pids_current = int(pids_current)
203+
except ValueError:
204+
pass
205+
206+
return metrics
207+
208+
async def list_existing(self) -> list[str]:
209+
"""List existing service cgroups (for orphan detection)."""
210+
if not self.root.exists():
211+
return []
212+
213+
result = []
214+
try:
215+
for entry in self.root.iterdir():
216+
if entry.is_dir() and entry.name.startswith("svc-"):
217+
result.append(entry.name[4:]) # Strip "svc-" prefix
218+
except OSError:
219+
pass
220+
return result
221+
222+
async def get_pids(self, name: str) -> list[int]:
223+
"""Get PIDs in a service's cgroup."""
224+
path = self._cgroup_path(name)
225+
procs_file = path / "cgroup.procs"
226+
227+
if not procs_file.exists():
228+
return []
229+
230+
pids = []
231+
content = self._read_file(procs_file)
232+
for pid_str in content.split():
233+
try:
234+
pids.append(int(pid_str))
235+
except ValueError:
236+
pass
237+
return pids
238+
239+
def cgroup_exists(self, name: str) -> bool:
240+
"""Check if a cgroup exists for a service."""
241+
return self._cgroup_path(name).exists()
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""CLI client for service-manager."""

0 commit comments

Comments
 (0)