Skip to content

Commit edcc891

Browse files
github-actions[bot]ravwojdylaclaude
committed
Move counters from Iris to Zephyr per Russell's feedback
Counters are now a Zephyr-only concept. Instead of file-based I/O through the Iris heartbeat/DB path, counters accumulate in-memory on each Zephyr worker and flow to the coordinator via the existing heartbeat RPC. - Remove all counter code from Iris: proto fields, DB column, migration, transitions, service aggregation, worker monitor, task_attempt - Add zephyr/counters.py with increment() / get_counters() API backed by WorkerContext (pure in-memory, zero I/O per increment) - Extend WorkerContext protocol with increment_counter/get_counter_snapshot - Wire counters through ZephyrWorker heartbeat → ZephyrCoordinator state - Add counters to JobStatus dataclass and get_status() aggregation - Log counters in coordinator periodic status lines for agent visibility - Only send counters when values change (avoid steady-state DB/RPC churn) - Reset counters per-task in _execute_shard to prevent cross-task leakage - Update babysit-zephyr and babysit-job skills with counter monitoring docs Co-authored-by: Rafal Wojdyla <ravwojdyla@users.noreply.github.com> Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent d21d09f commit edcc891

16 files changed

Lines changed: 263 additions & 422 deletions

File tree

.agents/skills/babysit-job/SKILL.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,13 @@ data loading issues, unclear multi-file stack traces.
103103
or unsatisfied resources -> mark `degraded` and notify user.
104104
- If same error repeats after one fix attempt, do not retry blindly; report to user.
105105

106+
### Zephyr Counters
107+
108+
Zephyr pipelines support user-defined counters (e.g. `documents_processed`, `bytes_written`).
109+
Counters appear in coordinator progress logs and in `JobStatus.counters`. When babysitting
110+
a Zephyr job, monitor counter advancement as an additional throughput signal. See
111+
**babysit-zephyr** for details.
112+
106113
### When to Escalate
107114

108115
- Debug Zephyr pipeline issues -> **debug-zephyr-job**

.agents/skills/babysit-zephyr/SKILL.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,23 @@ A healthy zephyr job has:
6565

6666
The coordinator logs a progress line every 5s:
6767
```
68-
[stage0-Map → Scatter] 347/1964 complete, 1617 in-flight, 0 queued, 1828/1891 workers alive, 63 dead
68+
[stage0-Map → Scatter] 347/1964 complete, 1617 in-flight, 0 queued, 1828/1891 workers alive, 63 dead, counters: bytes_written=4831838208 documents_processed=1200000
6969
```
7070

71+
### User-Defined Counters
72+
73+
Zephyr pipelines can report user-defined counters via `zephyr.counters.increment()`. Counters are aggregated across all workers and appear in:
74+
- **Coordinator progress logs**: appended to the periodic status line (grep for `counters:`)
75+
- **`get_status()` RPC**: `JobStatus.counters` dict, accessible programmatically
76+
77+
To check counters from task code, use:
78+
```python
79+
from zephyr import counters
80+
counters.increment("documents_processed", batch_size)
81+
```
82+
83+
Counters are sent to the coordinator via the worker heartbeat (every 5s) and only transmitted when values change — no overhead for idle workers.
84+
7185
Fetch via the Iris CLI:
7286
```bash
7387
uv run iris --config lib/iris/examples/marin.yaml rpc controller get-task-logs \
@@ -115,10 +129,15 @@ After submitting, monitor in escalating stages:
115129
3. Get the run command (or reuse the previous one).
116130
4. Submit and resume monitoring.
117131

132+
## Monitoring Counters
133+
134+
When babysitting a Zephyr job, check coordinator logs for counter lines. Counters give you insight into pipeline throughput (e.g. `documents_processed`, `bytes_written`, `validation_errors`). If counters stop advancing while shards are still in-flight, this may indicate a straggler or stuck worker — escalate to debug-zephyr-job.
135+
118136
## When to Escalate
119137

120138
Escalate to **debug-zephyr-job** when:
121139
- A stage is stuck (no shard progress for an extended period)
122140
- Stragglers are holding up a stage (few in-flight, 0 queued, most workers idle)
123141
- Workers are failing repeatedly with the same error
142+
- Counters stop advancing while tasks remain in-flight
124143
- For controller issues (e.g., RPCs timing out), use the **debug-iris-controller** skill

lib/iris/src/iris/cluster/controller/db.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,6 @@ class Task:
624624
current_worker_id: WorkerId | None = db_field("current_worker_id", _nullable(_decode_worker_id), default=None)
625625
current_worker_address: str | None = db_field("current_worker_address", _nullable(_decode_str), default=None)
626626
container_id: str | None = db_field("container_id", _nullable(_decode_str), default=None)
627-
counters: dict[str, int] = db_field("counters_json", _decode_json_dict, default_factory=dict)
628627
attempts: tuple[Attempt, ...] = field(default_factory=tuple)
629628

630629
def is_finished(self) -> bool:

lib/iris/src/iris/cluster/controller/migrations/0013_task_counters.py

Lines changed: 0 additions & 10 deletions
This file was deleted.

lib/iris/src/iris/cluster/controller/service.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,6 @@ def task_to_proto(task: Task, worker_address: str = "") -> cluster_pb2.TaskStatu
173173
proto.resource_usage.CopyFrom(task.resource_usage)
174174
if task.container_id:
175175
proto.container_id = task.container_id
176-
if task.counters:
177-
proto.counters.update(task.counters)
178176
# For pending tasks with prior terminal attempts, surface retry context.
179177
if task.state == cluster_pb2.TASK_STATE_PENDING and task.attempts and task.attempts[-1].is_terminal:
180178
last = task.attempts[-1]
@@ -843,12 +841,9 @@ def get_job_status(
843841
task_statuses = []
844842
total_failure_count = 0
845843
total_preemption_count = 0
846-
total_counters: dict[str, int] = {}
847844
for task in tasks:
848845
total_failure_count += task.failure_count
849846
total_preemption_count += task.preemption_count
850-
for name, value in task.counters.items():
851-
total_counters[name] = total_counters.get(name, 0) + value
852847

853848
task_statuses.append(task_to_proto(task, worker_address=worker_addr_by_id.get(task.worker_id, "")))
854849

@@ -874,7 +869,6 @@ def get_job_status(
874869
tasks=task_statuses,
875870
name=job.request.name if job.request else "",
876871
pending_reason=pending_reason,
877-
counters=total_counters,
878872
)
879873
if job.request:
880874
proto_job_status.resources.CopyFrom(job.request.resources)

lib/iris/src/iris/cluster/controller/transitions.py

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,6 @@ class TaskUpdate:
141141
resource_usage: cluster_pb2.ResourceUsage | None = None
142142
log_entries: list[logging_pb2.LogEntry] = field(default_factory=list)
143143
container_id: str | None = None
144-
counters: dict[str, int] = field(default_factory=dict)
145144

146145

147146
@dataclass(frozen=True)
@@ -1024,7 +1023,6 @@ def _apply_single_heartbeat(
10241023
or update.exit_code is not None
10251024
or update.resource_usage is not None
10261025
or update.log_entries
1027-
or update.counters
10281026
)
10291027
if update.new_state == prior_state and not has_new_data:
10301028
continue
@@ -1049,12 +1047,6 @@ def _apply_single_heartbeat(
10491047
"UPDATE tasks SET resource_usage_proto = ? WHERE task_id = ?",
10501048
(usage_payload, update.task_id.to_wire()),
10511049
)
1052-
if update.counters:
1053-
cur.execute(
1054-
"UPDATE tasks SET counters_json = ? WHERE task_id = ?",
1055-
(json.dumps(update.counters), update.task_id.to_wire()),
1056-
)
1057-
10581050
terminal_ms: int | None = None
10591051
started_ms: int | None = None
10601052
task_state = prior_state
@@ -1104,15 +1096,6 @@ def _apply_single_heartbeat(
11041096
task_state = cluster_pb2.TASK_STATE_PENDING
11051097
terminal_ms = None
11061098

1107-
# Clear stale counters when the task is retried so that
1108-
# get_job_status() does not double-count values from the
1109-
# previous attempt.
1110-
if task_state == cluster_pb2.TASK_STATE_PENDING:
1111-
cur.execute(
1112-
"UPDATE tasks SET counters_json = NULL WHERE task_id = ?",
1113-
(update.task_id.to_wire(),),
1114-
)
1115-
11161099
cur.execute(
11171100
"UPDATE task_attempts SET state = ?, started_at_ms = COALESCE(started_at_ms, ?), "
11181101
"finished_at_ms = COALESCE(finished_at_ms, ?), exit_code = COALESCE(?, exit_code), "
@@ -2138,11 +2121,6 @@ def apply_direct_provider_updates(self, updates: list[TaskUpdate]) -> TxResult:
21382121
"UPDATE tasks SET resource_usage_proto = ? WHERE task_id = ?",
21392122
(usage_payload, update.task_id.to_wire()),
21402123
)
2141-
if update.counters:
2142-
cur.execute(
2143-
"UPDATE tasks SET counters_json = ? WHERE task_id = ?",
2144-
(json.dumps(update.counters), update.task_id.to_wire()),
2145-
)
21462124
if update.container_id is not None:
21472125
cur.execute(
21482126
"UPDATE tasks SET container_id = ? WHERE task_id = ?",
@@ -2202,15 +2180,6 @@ def apply_direct_provider_updates(self, updates: list[TaskUpdate]) -> TxResult:
22022180
task_state = cluster_pb2.TASK_STATE_PENDING
22032181
terminal_ms = None
22042182

2205-
# Clear stale counters when the task is retried so that
2206-
# get_job_status() does not double-count values from the
2207-
# previous attempt.
2208-
if task_state == cluster_pb2.TASK_STATE_PENDING:
2209-
cur.execute(
2210-
"UPDATE tasks SET counters_json = NULL WHERE task_id = ?",
2211-
(update.task_id.to_wire(),),
2212-
)
2213-
22142183
cur.execute(
22152184
"UPDATE task_attempts SET state = ?, started_at_ms = COALESCE(started_at_ms, ?), "
22162185
"finished_at_ms = COALESCE(finished_at_ms, ?), exit_code = COALESCE(?, exit_code), "

lib/iris/src/iris/cluster/controller/worker_provider.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ def _apply_request_from_response(
8787
resource_usage=entry.resource_usage if entry.resource_usage.ByteSize() > 0 else None,
8888
log_entries=list(entry.log_entries),
8989
container_id=entry.container_id or None,
90-
counters=dict(entry.counters) if entry.counters else {},
9190
)
9291
)
9392
return HeartbeatApplyRequest(

lib/iris/src/iris/cluster/worker/task_attempt.py

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
bundle download -> image build -> container run -> monitor -> cleanup.
88
"""
99

10-
import json
1110
import logging
1211
import shutil
1312
import socket
@@ -159,10 +158,6 @@ def build_iris_env(
159158
for name, port in task.ports.items():
160159
env[f"IRIS_PORT_{name.upper()}"] = str(port)
161160

162-
# Counter file is written by task code and read by the monitor loop.
163-
# /app is the container-side mount point for the workdir.
164-
env["IRIS_COUNTER_FILE"] = "/app/iris_counters.json"
165-
166161
return env
167162

168163

@@ -257,9 +252,6 @@ def __init__(
257252
self.process_count: int = 0
258253
self.disk_mb: int = 0
259254

260-
# User-defined counters (read from iris_counters.json in workdir)
261-
self.counters: dict[str, int] = {}
262-
263255
# Build tracking
264256
self.build_started: Timestamp | None = None
265257
self.build_finished: Timestamp | None = None
@@ -452,9 +444,6 @@ def to_proto(self) -> cluster_pb2.TaskStatus:
452444
proto.build_metrics.build_started.CopyFrom(self.build_started.to_proto())
453445
if self.build_finished is not None:
454446
proto.build_metrics.build_finished.CopyFrom(self.build_finished.to_proto())
455-
if self.counters:
456-
proto.counters.update(self.counters)
457-
458447
return proto
459448

460449
def _check_cancelled(self) -> None:
@@ -754,7 +743,6 @@ def _monitor_loop(
754743
)
755744
# Final log fetch before container stops
756745
self._stream_logs(log_reader)
757-
self._read_counters()
758746

759747
# Container has stopped
760748
if status.error:
@@ -802,8 +790,6 @@ def _monitor_loop(
802790
except Exception:
803791
logger.debug("Stats collection failed for task %s", self.task_id, exc_info=True)
804792

805-
self._read_counters()
806-
807793
# Sleep before next poll
808794
time.sleep(self._poll_interval_seconds)
809795

@@ -823,18 +809,6 @@ def _stream_logs(self, reader: RuntimeLogReader) -> None:
823809
except Exception:
824810
logger.debug("Log streaming failed for task %s", self.task_id, exc_info=True)
825811

826-
def _read_counters(self) -> None:
827-
"""Read user-defined counters from iris_counters.json in the workdir."""
828-
if self.workdir is None:
829-
return
830-
counter_file = self.workdir / "iris_counters.json"
831-
if not counter_file.exists():
832-
return
833-
try:
834-
self.counters = json.loads(counter_file.read_text())
835-
except (json.JSONDecodeError, OSError):
836-
logger.debug("Counter file read failed for task %s", self.task_id, exc_info=True)
837-
838812
def _cleanup(self) -> None:
839813
"""Clean up task resources: container, ports, image protection, workdir.
840814

lib/iris/src/iris/cluster/worker/worker.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -625,8 +625,6 @@ def handle_heartbeat(self, request: cluster_pb2.HeartbeatRequest) -> cluster_pb2
625625
entry.finished_at.CopyFrom(task_proto.finished_at)
626626
if task_proto.resource_usage.ByteSize() > 0:
627627
entry.resource_usage.CopyFrom(task_proto.resource_usage)
628-
if task_proto.counters:
629-
entry.counters.update(task_proto.counters)
630628
tasks.append(entry)
631629

632630
# Kill tasks not in expected_tasks - the controller has decided these

lib/iris/src/iris/counters.py

Lines changed: 0 additions & 108 deletions
This file was deleted.

0 commit comments

Comments
 (0)