diff --git a/backend/src/zimfarm_backend/common/schemas/orms.py b/backend/src/zimfarm_backend/common/schemas/orms.py index 039ccd2c5..a1b641cf1 100644 --- a/backend/src/zimfarm_backend/common/schemas/orms.py +++ b/backend/src/zimfarm_backend/common/schemas/orms.py @@ -113,14 +113,18 @@ class TaskContainerProgressSchema(BaseModel): total: int | None = None -class TaskContainerMemoryStatsSchema(BaseModel): - max_usage: int | None = None +class TaskResourceUsageSchema(BaseModel): + max_usage: int | float | None = Field(default=None, alias="max") -class TaskContainerStatsSchema(BaseModel): - memory: TaskContainerMemoryStatsSchema = Field( - default_factory=TaskContainerMemoryStatsSchema - ) +class TaskCPUUsageSchema(TaskResourceUsageSchema): + avg_usage: float | None = Field(default=None, alias="avg") + + +class TaskStatsSchema(BaseModel): + memory: TaskResourceUsageSchema = Field(default_factory=TaskResourceUsageSchema) + cpu: TaskCPUUsageSchema = Field(default_factory=TaskCPUUsageSchema) + disk: TaskResourceUsageSchema = Field(default_factory=TaskResourceUsageSchema) class TaskContainerSchema(BaseModel): @@ -130,7 +134,7 @@ class TaskContainerSchema(BaseModel): log: str | None = None image: str | None = None - stats: dict[str, Any] | None = None + stats: TaskStatsSchema | None = None artifacts: str | None = None stderr: str | None = None stdout: str | None = None diff --git a/frontend-ui/src/types/tasks.ts b/frontend-ui/src/types/tasks.ts index e4b6af05f..670144fb5 100644 --- a/frontend-ui/src/types/tasks.ts +++ b/frontend-ui/src/types/tasks.ts @@ -32,6 +32,15 @@ export interface TaskFile { info?: TaskFileInfo } +export interface TaskResourceUsage { + max?: number | null +} + +export interface TaskCPUUsage { + max?: number + avg?: number +} + export interface TaskContainer { command: string[] exit_code?: number @@ -46,9 +55,9 @@ export interface TaskContainer { total: number } | null stats?: { - memory?: { - max_usage?: number | null - } + memory?: TaskResourceUsage + disk?: TaskResourceUsage + cpu?: TaskCPUUsage } } diff --git a/frontend-ui/src/views/TaskDetailView.vue b/frontend-ui/src/views/TaskDetailView.vue index e509725b3..9724540b1 100644 --- a/frontend-ui/src/views/TaskDetailView.vue +++ b/frontend-ui/src/views/TaskDetailView.vue @@ -324,13 +324,55 @@ {{ taskContainer.exit_code }} - + Stats - - mdi-memory - {{ maxMemory }} (max) - +
+ + + + + + + + + + + + +
@@ -578,12 +620,37 @@ const canCancel = computed(() => { const maxMemory = computed(() => { try { - return formattedBytesSize(taskContainer.value?.stats?.memory?.max_usage || 0) + return formattedBytesSize(taskContainer.value?.stats?.memory?.max || 0) + } catch { + return null + } +}) + +const maxDisk = computed(() => { + try { + return formattedBytesSize(taskContainer.value?.stats?.disk?.max || 0) } catch { return null } }) +const cpuStats = computed(() => { + const stats = taskContainer.value?.stats?.cpu + if (!stats) return null + return { + max: stats.max ?? null, + avg: stats.avg ?? null, + } +}) + +const hasCpuStats = computed(() => { + return cpuStats.value && (cpuStats.value.max !== null || cpuStats.value.avg !== null) +}) + +const hasStats = computed(() => { + return maxMemory.value || maxDisk.value || hasCpuStats.value +}) + const monitoringUrl = computed(() => { return `http://monitoring.openzim.org/host/${scheduleName.value}_${shortId.value}.${ task.value?.worker_name diff --git a/worker/src/zimfarm_worker/task/worker.py b/worker/src/zimfarm_worker/task/worker.py index 360fdafbf..a6ec8518d 100644 --- a/worker/src/zimfarm_worker/task/worker.py +++ b/worker/src/zimfarm_worker/task/worker.py @@ -40,6 +40,9 @@ from zimfarm_worker.task.zim import get_zim_info SLEEP_INTERVAL = 60 # nb of seconds to sleep before watching +CPU_EWMA_ALPHA = 0.01 # EWMA smoothing factor for CPU percentage samples (0..1) + + PENDING = "pending" DOING = "doing" DONE = "done" @@ -132,6 +135,9 @@ def __init__( self.scraper_succeeded: bool | None = None # whether scraper succeeded self.max_memory_usage: int = 0 # maximum memory used by scraper + self.max_disk_usage: int = 0 # maximum disk used by scraper + self.avg_cpu_usage: float = 0.0 # cpu exponential moving weighted average + self.max_cpu_usage: float = 0.0 # maximum cpu percentage used by scraper # register stop/^C self.register_signals() @@ -192,8 +198,62 @@ def mark_scraper_completed(self, exit_code: int, stdout: str, stderr: str): } ) + def _get_scraper_disk_usage(self) -> int: + """ + Get disk usage of scraper container's task workdir in bytes. + + Calculates the actual disk space used by files in the scraper's + task workdir (where ZIM files and other outputs are written). + """ + if not self.task_workdir: + return 0 + + try: + if self.task_workdir.exists() and self.task_workdir.is_dir(): + return sum( + f.stat().st_size + for f in self.task_workdir.rglob("*") + if f.is_file() + ) + return 0 + except Exception: + logger.exception("Failed to get scraper disk usage") + return 0 + + def _compute_scraper_cpu_stats(self, scraper_stats: dict[str, Any]) -> float: + """ + Compute CPU usage statistics from scraper container stats. + + Calculates CPU percentage with EWMA smoothing to reduce effect of + short spikes. + """ + cpu_sample = 0.0 + cpu_stats = scraper_stats.get("cpu_stats", {}) + precpu_stats = scraper_stats.get("precpu_stats", {}) + prev_total = precpu_stats.get("cpu_usage", {}).get("total_usage", 0) + curr_total = cpu_stats.get("cpu_usage", {}).get("total_usage", 0) + prev_system = precpu_stats.get("system_cpu_usage", 0) + curr_system = cpu_stats.get("system_cpu_usage", 0) + + delta_cpu = curr_total - prev_total + delta_system = curr_system - prev_system + online_cpus = cpu_stats.get("online_cpus", 0) + + if delta_system > 0 and delta_cpu >= 0 and online_cpus > 0: + cpu_sample = (delta_cpu / float(delta_system)) * float(online_cpus) * 100.0 + + # apply EWMA smoothing to reduce effect of short spikes + if self.avg_cpu_usage == 0.0: + self.avg_cpu_usage = cpu_sample + else: + self.avg_cpu_usage = ( + CPU_EWMA_ALPHA * cpu_sample + + (1.0 - CPU_EWMA_ALPHA) * self.avg_cpu_usage + ) + return cpu_sample + def submit_scraper_progress(self): - """report last lines of scraper to the API""" + """report scraper statistics and logs to the API""" if not self.scraper: logger.error("No scraper to update") return @@ -204,6 +264,7 @@ def submit_scraper_progress(self): stream=False ) scraper_stats = cast(dict[str, Any], scraper_stats) + # update statistics self.max_memory_usage = max( [ @@ -211,10 +272,24 @@ def submit_scraper_progress(self): self.max_memory_usage, ] ) + + cpu_sample = self._compute_scraper_cpu_stats(scraper_stats) + self.max_cpu_usage = max([cpu_sample, self.max_cpu_usage]) + + disk_usage = self._get_scraper_disk_usage() + self.max_disk_usage = max([disk_usage, self.max_disk_usage]) + stats: dict[str, Any] = { "memory": { "max_usage": self.max_memory_usage, - } + }, + "cpu": { + "max_usage": self.max_cpu_usage, + "avg_usage": round(self.avg_cpu_usage, 2), + }, + "disk": { + "max_usage": self.max_disk_usage, + }, } # fetch and compute progression from progress file