Skip to content

Commit e099e7a

Browse files
committed
[Iris] Store the task stats in sqlite
Store the tasks stats in sqlite. This mirrors the task resource history table, but for items and bytes processed.
1 parent 4ca687c commit e099e7a

5 files changed

Lines changed: 383 additions & 0 deletions

File tree

lib/iris/src/iris/cluster/controller/controller.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1383,6 +1383,10 @@ def _run_prune_loop(self, stop_event: threading.Event) -> None:
13831383
self._transitions.prune_task_resource_history()
13841384
except Exception:
13851385
logger.exception("Task resource history cleanup failed")
1386+
try:
1387+
self._transitions.prune_task_stats_history()
1388+
except Exception:
1389+
logger.exception("Task stats history cleanup failed")
13861390

13871391
if wal_checkpoint_limiter.should_run():
13881392
try:
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Copyright The Marin Authors
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
import sqlite3
5+
6+
7+
def migrate(conn: sqlite3.Connection) -> None:
8+
conn.execute(
9+
"""
10+
CREATE TABLE IF NOT EXISTS task_stats_history (
11+
id INTEGER PRIMARY KEY AUTOINCREMENT,
12+
task_id TEXT NOT NULL REFERENCES tasks(task_id) ON DELETE CASCADE,
13+
items_processed INTEGER NOT NULL DEFAULT 0,
14+
bytes_processed INTEGER NOT NULL DEFAULT 0,
15+
timestamp_ms INTEGER NOT NULL
16+
)
17+
"""
18+
)
19+
conn.execute("CREATE INDEX IF NOT EXISTS idx_task_stats_history_task" " ON task_stats_history(task_id, id DESC)")
20+
21+
existing = {row[1] for row in conn.execute("PRAGMA table_info(tasks)").fetchall()}
22+
if "status_message" not in existing:
23+
conn.execute("ALTER TABLE tasks ADD COLUMN status_message TEXT NOT NULL DEFAULT ''")

lib/iris/src/iris/cluster/controller/schema.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -751,6 +751,8 @@ def generate_full_ddl(tables: Sequence[Table]) -> str:
751751
default=None,
752752
),
753753
Column("current_worker_address", "TEXT", "", python_type=str | None, decoder=_nullable(str), default=None),
754+
# Migration 0039
755+
Column("status_message", "TEXT", "NOT NULL DEFAULT ''", python_type=str, decoder=str, default=""),
754756
),
755757
table_constraints=("UNIQUE(job_id, task_index)",),
756758
indexes=(
@@ -1045,6 +1047,25 @@ def generate_full_ddl(tables: Sequence[Table]) -> str:
10451047
),
10461048
)
10471049

1050+
TASK_STATS_HISTORY = Table(
1051+
"task_stats_history",
1052+
"tsh",
1053+
columns=(
1054+
Column("id", "INTEGER", "PRIMARY KEY AUTOINCREMENT"),
1055+
Column(
1056+
"task_id",
1057+
"TEXT",
1058+
"NOT NULL REFERENCES tasks(task_id) ON DELETE CASCADE",
1059+
python_type=JobName,
1060+
decoder=JobName.from_wire,
1061+
),
1062+
Column("items_processed", "INTEGER", "NOT NULL DEFAULT 0"),
1063+
Column("bytes_processed", "INTEGER", "NOT NULL DEFAULT 0"),
1064+
Column("timestamp_ms", "INTEGER", "NOT NULL", python_type=Timestamp, decoder=decode_timestamp_ms),
1065+
),
1066+
indexes=("CREATE INDEX IF NOT EXISTS idx_task_stats_history_task" " ON task_stats_history(task_id, id DESC)",),
1067+
)
1068+
10481069
ENDPOINTS = Table(
10491070
"endpoints",
10501071
"e",
@@ -1312,6 +1333,7 @@ def generate_full_ddl(tables: Sequence[Table]) -> str:
13121333
WORKER_TASK_HISTORY,
13131334
WORKER_RESOURCE_HISTORY,
13141335
TASK_RESOURCE_HISTORY,
1336+
TASK_STATS_HISTORY,
13151337
ENDPOINTS,
13161338
DISPATCH_QUEUE,
13171339
SCALING_GROUPS,
@@ -1474,6 +1496,7 @@ class TaskDetailRow:
14741496
current_worker_id: WorkerId | None
14751497
current_worker_address: str | None
14761498
container_id: str | None = None
1499+
status_message: str = ""
14771500
attempts: tuple = dataclasses.field(default_factory=tuple)
14781501

14791502

@@ -1806,6 +1829,7 @@ def _job_columns(*names: str) -> tuple[tuple[Column, ...], tuple[str, ...]]:
18061829
"current_worker_id",
18071830
"current_worker_address",
18081831
"container_id",
1832+
"status_message",
18091833
extra_fields=(ExtraField("attempts", tuple, default_factory=tuple),),
18101834
row_cls=TaskDetailRow,
18111835
)

lib/iris/src/iris/cluster/controller/transitions.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,17 @@ class ReservationClaim:
156156
runs every 10 min so total wall-time is irrelevant — bounding worst-case
157157
writer hold is what matters for concurrent RPCs."""
158158

159+
TASK_STATS_HISTORY_RETENTION = 50
160+
"""Maximum task_stats_history rows retained per task_id.
161+
Logarithmic downsampling triggers at 2x this value."""
162+
163+
TASK_STATS_HISTORY_TERMINAL_TTL = Duration.from_hours(1)
164+
"""After a task reaches a terminal state, its stats history is fully evicted
165+
this long after the finish timestamp."""
166+
167+
TASK_STATS_HISTORY_DELETE_CHUNK = 1000
168+
"""Maximum task_ids per DELETE in prune_task_stats_history."""
169+
159170
DIRECT_PROVIDER_PROMOTION_RATE = 128
160171
"""Token bucket capacity for task promotion (pods per minute).
161172
@@ -2670,6 +2681,69 @@ def prune_task_resource_history(self) -> int:
26702681
logger.info("Pruned %d task_resource_history rows (log downsampling)", total_deleted)
26712682
return evicted_terminal + total_deleted
26722683

2684+
def prune_task_stats_history(self) -> int:
2685+
"""Two-pass prune for task_stats_history, mirroring prune_task_resource_history.
2686+
2687+
1. Evict all history for tasks that have been in a terminal state
2688+
longer than TASK_STATS_HISTORY_TERMINAL_TTL.
2689+
2. Logarithmic downsampling for anything that remains: when a task_id
2690+
exceeds 2*N rows, thin the older half by deleting every other row.
2691+
"""
2692+
now_ms = Timestamp.now().epoch_ms()
2693+
ttl_cutoff_ms = now_ms - TASK_STATS_HISTORY_TERMINAL_TTL.to_ms()
2694+
terminal_placeholders = ",".join("?" for _ in TERMINAL_TASK_STATES)
2695+
2696+
with self._db.read_snapshot() as snap:
2697+
terminal_ids = [
2698+
str(r["task_id"])
2699+
for r in snap.fetchall(
2700+
f"SELECT task_id FROM tasks "
2701+
f"WHERE state IN ({terminal_placeholders}) "
2702+
f"AND finished_at_ms IS NOT NULL AND finished_at_ms < ?",
2703+
(*TERMINAL_TASK_STATES, ttl_cutoff_ms),
2704+
)
2705+
]
2706+
2707+
evicted_terminal = 0
2708+
for chunk_start in range(0, len(terminal_ids), TASK_STATS_HISTORY_DELETE_CHUNK):
2709+
chunk = terminal_ids[chunk_start : chunk_start + TASK_STATS_HISTORY_DELETE_CHUNK]
2710+
ph = ",".join("?" * len(chunk))
2711+
with self._db.transaction() as cur:
2712+
cur.execute(f"DELETE FROM task_stats_history WHERE task_id IN ({ph})", tuple(chunk))
2713+
evicted_terminal += cur.rowcount
2714+
2715+
threshold = TASK_STATS_HISTORY_RETENTION * 2
2716+
with self._db.transaction() as cur:
2717+
overflows = cur.execute(
2718+
"SELECT task_id, COUNT(*) as cnt FROM task_stats_history GROUP BY task_id HAVING cnt > ?",
2719+
(threshold,),
2720+
).fetchall()
2721+
ids_to_delete: list[int] = []
2722+
for row in overflows:
2723+
tid = row["task_id"]
2724+
all_ids = [
2725+
r["id"]
2726+
for r in cur.execute(
2727+
"SELECT id FROM task_stats_history WHERE task_id = ? ORDER BY id ASC",
2728+
(tid,),
2729+
).fetchall()
2730+
]
2731+
older = all_ids[: len(all_ids) - TASK_STATS_HISTORY_RETENTION]
2732+
ids_to_delete.extend(older[1::2])
2733+
2734+
total_deleted = 0
2735+
for chunk_start in range(0, len(ids_to_delete), 900):
2736+
chunk = ids_to_delete[chunk_start : chunk_start + 900]
2737+
ph = ",".join("?" * len(chunk))
2738+
cur.execute(f"DELETE FROM task_stats_history WHERE id IN ({ph})", tuple(chunk))
2739+
total_deleted += cur.rowcount
2740+
2741+
if evicted_terminal > 0:
2742+
logger.info("Evicted %d task_stats_history rows (terminal TTL)", evicted_terminal)
2743+
if total_deleted > 0:
2744+
logger.info("Pruned %d task_stats_history rows (log downsampling)", total_deleted)
2745+
return evicted_terminal + total_deleted
2746+
26732747
def _batch_delete(
26742748
self,
26752749
sql: str,
@@ -2925,6 +2999,18 @@ def record_task_stats(self, task_id: JobName, items_processed: int, bytes_proces
29252999
bytes_processed,
29263000
status,
29273001
)
3002+
now_ms = int(time.time() * 1000)
3003+
with self._db.transaction() as cur:
3004+
cur.execute(
3005+
"INSERT INTO task_stats_history"
3006+
" (task_id, items_processed, bytes_processed, timestamp_ms)"
3007+
" VALUES (?, ?, ?, ?)",
3008+
(task_id.to_wire(), items_processed, bytes_processed, now_ms),
3009+
)
3010+
cur.execute(
3011+
"UPDATE tasks SET status_message = ? WHERE task_id = ?",
3012+
(status, task_id.to_wire()),
3013+
)
29283014

29293015
# --- Endpoint Management ---
29303016

0 commit comments

Comments
 (0)