Skip to content

Commit 0fd736d

Browse files
core: Use new service-manager
Signed-off-by: Patrick José Pereira <patrickelectric@gmail.com>
1 parent 077ffae commit 0fd736d

File tree

18 files changed

+2758
-86
lines changed

18 files changed

+2758
-86
lines changed

core/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ wifi = { workspace = true }
6565

6666
[tool.uv.workspace]
6767
members = ["services/*", "libs/*"]
68-
exclude = ["services/versionchooser", "services/kraken", "services/helper"]
68+
exclude = ["services/helper", "services/kraken", "services/service_manager", "services/versionchooser"]
6969

7070
# ------------------ Tooling Configurations ------------------
7171

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
"""Service Manager - Linux service supervisor with cgroups v2 resource control."""
2+
3+
__version__ = "0.1.0"
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
"""Entry point for service-manager daemon."""
2+
3+
import argparse
4+
import asyncio
5+
import sys
6+
from pathlib import Path
7+
8+
from service_manager import __version__
9+
from service_manager.config import AgentConfig
10+
from service_manager.daemon import AgentDaemon, PidLock, daemonize
11+
12+
13+
def main() -> int:
14+
"""Main entry point."""
15+
parser = argparse.ArgumentParser(
16+
prog="service-manager",
17+
description="Linux service supervisor with cgroups v2 resource control",
18+
)
19+
parser.add_argument(
20+
"-c",
21+
"--config",
22+
type=Path,
23+
help="Path to config file",
24+
)
25+
parser.add_argument(
26+
"-f",
27+
"--foreground",
28+
action="store_true",
29+
help="Run in foreground (don't daemonize)",
30+
)
31+
parser.add_argument(
32+
"-V",
33+
"--version",
34+
action="version",
35+
version=f"%(prog)s {__version__}",
36+
)
37+
38+
args = parser.parse_args()
39+
40+
# Daemonize BEFORE starting asyncio (only on Linux, and only if not foreground)
41+
if not args.foreground and sys.platform == "linux":
42+
config = AgentConfig.load_or_default(args.config)
43+
pid_lock = PidLock(config.pid_file)
44+
45+
# Check if already running before forking
46+
if PidLock.is_running(config.pid_file):
47+
print("Another instance is already running", file=sys.stderr)
48+
return 1
49+
50+
daemonize()
51+
52+
# Acquire lock after daemonizing (we have a new PID now)
53+
if not pid_lock.acquire():
54+
return 1
55+
56+
daemon = AgentDaemon(
57+
config_path=args.config,
58+
foreground=args.foreground or sys.platform != "linux",
59+
)
60+
61+
try:
62+
asyncio.run(daemon.run())
63+
except KeyboardInterrupt:
64+
pass
65+
66+
return 0
67+
68+
69+
if __name__ == "__main__":
70+
sys.exit(main())
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
"""cgroup v2 controller for resource limits and metrics."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
import os
7+
from dataclasses import dataclass
8+
from pathlib import Path
9+
10+
from service_manager.config import ResourceLimits
11+
12+
log = logging.getLogger(__name__)
13+
14+
15+
@dataclass
16+
class CgroupMetrics:
17+
"""Metrics read from cgroup."""
18+
19+
cpu_usage_usec: int = 0
20+
memory_current: int = 0
21+
memory_peak: int = 0
22+
io_read_bytes: int = 0
23+
io_write_bytes: int = 0
24+
pids_current: int = 0
25+
26+
27+
class CgroupController:
28+
"""Manages cgroup v2 hierarchy for services."""
29+
30+
def __init__(self, root: Path):
31+
self.root = root
32+
33+
def _cgroup_path(self, name: str) -> Path:
34+
"""Get cgroup directory path for a service."""
35+
return self.root / f"svc-{name}"
36+
37+
async def ensure_root(self) -> None:
38+
"""Ensure the root cgroup directory exists."""
39+
if not self.root.exists():
40+
log.info(f"Creating cgroup root: {self.root}")
41+
try:
42+
self.root.mkdir(parents=True, exist_ok=True)
43+
except PermissionError:
44+
log.warning(
45+
f"Cannot create cgroup root {self.root}. "
46+
"Running without cgroup support. "
47+
"Run as root or configure systemd delegation."
48+
)
49+
50+
async def create(self, name: str, limits: ResourceLimits) -> Path | None:
51+
"""Create cgroup for a service and apply limits."""
52+
path = self._cgroup_path(name)
53+
54+
if not self.root.exists():
55+
log.debug(f"cgroup root not available, skipping cgroup for {name}")
56+
return None
57+
58+
try:
59+
path.mkdir(exist_ok=True)
60+
await self._apply_limits(path, limits)
61+
log.debug(f"Created cgroup: {path}")
62+
return path
63+
except PermissionError as e:
64+
log.warning(f"Cannot create cgroup for {name}: {e}")
65+
return None
66+
except OSError as e:
67+
log.error(f"Failed to create cgroup for {name}: {e}")
68+
return None
69+
70+
async def _apply_limits(self, path: Path, limits: ResourceLimits) -> None:
71+
"""Apply resource limits to a cgroup."""
72+
# CPU limit: cpu.max format is "quota period" in microseconds
73+
# e.g., "150000 100000" means 1.5 cores (150ms per 100ms period)
74+
if limits.cpu_cores is not None:
75+
quota = int(limits.cpu_cores * 100000)
76+
period = 100000
77+
self._write_file(path / "cpu.max", f"{quota} {period}")
78+
79+
# Memory limit
80+
if limits.memory_mb is not None:
81+
memory_bytes = limits.memory_mb * 1024 * 1024
82+
self._write_file(path / "memory.max", str(memory_bytes))
83+
84+
# PID limit
85+
if limits.max_pids is not None:
86+
self._write_file(path / "pids.max", str(limits.max_pids))
87+
88+
# IO limits (requires knowing the device major:minor)
89+
if limits.io_read_mbps is not None or limits.io_write_mbps is not None:
90+
log.debug("IO bandwidth limits require device specification, skipping")
91+
92+
def _write_file(self, path: Path, value: str) -> None:
93+
"""Write a value to a cgroup control file."""
94+
try:
95+
path.write_text(value)
96+
except PermissionError:
97+
log.warning(f"Cannot write to {path}: permission denied")
98+
except OSError as e:
99+
log.warning(f"Cannot write to {path}: {e}")
100+
101+
def _read_file(self, path: Path) -> str:
102+
"""Read a cgroup control file."""
103+
try:
104+
return path.read_text().strip()
105+
except (OSError, FileNotFoundError):
106+
return ""
107+
108+
async def add_pid(self, name: str, pid: int) -> bool:
109+
"""Add a process to a service's cgroup."""
110+
path = self._cgroup_path(name)
111+
procs_file = path / "cgroup.procs"
112+
113+
if not procs_file.exists():
114+
return False
115+
116+
try:
117+
procs_file.write_text(str(pid))
118+
log.debug(f"Added PID {pid} to cgroup {name}")
119+
return True
120+
except OSError as e:
121+
log.warning(f"Cannot add PID {pid} to cgroup {name}: {e}")
122+
return False
123+
124+
async def destroy(self, name: str) -> None:
125+
"""Remove a service's cgroup."""
126+
path = self._cgroup_path(name)
127+
128+
if not path.exists():
129+
return
130+
131+
# Kill any remaining processes
132+
procs_file = path / "cgroup.procs"
133+
if procs_file.exists():
134+
pids = self._read_file(procs_file).split()
135+
for pid_str in pids:
136+
try:
137+
os.kill(int(pid_str), 9)
138+
except (OSError, ValueError):
139+
pass
140+
141+
# Remove the cgroup directory
142+
try:
143+
path.rmdir()
144+
log.debug(f"Destroyed cgroup: {path}")
145+
except OSError as e:
146+
log.warning(f"Cannot remove cgroup {path}: {e}")
147+
148+
async def read_metrics(self, name: str) -> CgroupMetrics | None:
149+
"""Read current metrics from a service's cgroup."""
150+
path = self._cgroup_path(name)
151+
152+
if not path.exists():
153+
return None
154+
155+
metrics = CgroupMetrics()
156+
157+
# CPU usage from cpu.stat
158+
cpu_stat = self._read_file(path / "cpu.stat")
159+
for line in cpu_stat.splitlines():
160+
if line.startswith("usage_usec"):
161+
try:
162+
metrics.cpu_usage_usec = int(line.split()[1])
163+
except (IndexError, ValueError):
164+
pass
165+
166+
# Memory
167+
memory_current = self._read_file(path / "memory.current")
168+
if memory_current:
169+
try:
170+
metrics.memory_current = int(memory_current)
171+
except ValueError:
172+
pass
173+
174+
memory_peak = self._read_file(path / "memory.peak")
175+
if memory_peak:
176+
try:
177+
metrics.memory_peak = int(memory_peak)
178+
except ValueError:
179+
pass
180+
181+
# IO stats from io.stat
182+
io_stat = self._read_file(path / "io.stat")
183+
for line in io_stat.splitlines():
184+
parts = line.split()
185+
for part in parts:
186+
if part.startswith("rbytes="):
187+
try:
188+
metrics.io_read_bytes += int(part.split("=")[1])
189+
except (IndexError, ValueError):
190+
pass
191+
elif part.startswith("wbytes="):
192+
try:
193+
metrics.io_write_bytes += int(part.split("=")[1])
194+
except (IndexError, ValueError):
195+
pass
196+
197+
# PIDs
198+
pids_current = self._read_file(path / "pids.current")
199+
if pids_current:
200+
try:
201+
metrics.pids_current = int(pids_current)
202+
except ValueError:
203+
pass
204+
205+
return metrics
206+
207+
async def list_existing(self) -> list[str]:
208+
"""List existing service cgroups (for orphan detection)."""
209+
if not self.root.exists():
210+
return []
211+
212+
result = []
213+
try:
214+
for entry in self.root.iterdir():
215+
if entry.is_dir() and entry.name.startswith("svc-"):
216+
result.append(entry.name[4:]) # Strip "svc-" prefix
217+
except OSError:
218+
pass
219+
return result
220+
221+
async def get_pids(self, name: str) -> list[int]:
222+
"""Get PIDs in a service's cgroup."""
223+
path = self._cgroup_path(name)
224+
procs_file = path / "cgroup.procs"
225+
226+
if not procs_file.exists():
227+
return []
228+
229+
pids = []
230+
content = self._read_file(procs_file)
231+
for pid_str in content.split():
232+
try:
233+
pids.append(int(pid_str))
234+
except ValueError:
235+
pass
236+
return pids
237+
238+
def cgroup_exists(self, name: str) -> bool:
239+
"""Check if a cgroup exists for a service."""
240+
return self._cgroup_path(name).exists()
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""CLI client for service-manager."""

0 commit comments

Comments
 (0)