Add stage-level throughput stats#5063
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 20ff8ade78
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| if self._count % self.COUNTER_SET_INTERVAL == 0: | ||
| self._set_counters() |
There was a problem hiding this comment.
Flush counters after generator exhaustion
StatisticsGenerator.wrap only updates the stage counters every 10 yielded items, so when a shard emits a non-multiple of 10 records (including any shard with fewer than 10), the final partial batch is never written to _counters. The subprocess then reports stale totals in report_result, causing persistent undercounting of zephyr/item_count/* and zephyr/bytes_processed/* in status logs and final aggregated counters.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Hmm, this makes me think I should just do away with the % 10 and also the local variables and keep the state in the counters.
There was a problem hiding this comment.
Would it be worth thinking of splitting counter acquisition from updates? I'm imagining something like this:
stage_count = ctx.get_counter(f"...")
stage_size = ctx.get_counter(f"...")
counter.{value, set, increment}
then incrementing the counter wouldn't be any more expensive that maintaining the local state, and you wouldn't need a lock on it (assuming Python semantics around int tearing)
There was a problem hiding this comment.
Working with the counters shouldn't take a lock as far as I can tell, although set could certainly expose races. I decided to remove it (see most recent change) and just keep the state in the counters, which I think should be fast enough (since it's just a dictionary lookup and then the same increment).
| items = totals.get(f"zephyr/item_count/{self._stage_idx}", 0) | ||
| bytes_processed = totals.get(f"zephyr/bytes_processed/{self._stage_idx}", 0) |
There was a problem hiding this comment.
if these counters are scatter specific and feel a bit more "internal" should we use better names here and/or namespace?
There was a problem hiding this comment.
I wasn't imagining these would be scatter-specific stats, ideally every stage would implement them, I just wanted to start small and get feedback from you all on the approach (also scatter is the simplest case).
The comment above from codex made me rethink the naming though and I switched it to stage_name, which should provide better name-spacing.
Let me know what you think though, I'm happy to change them to something else if you think it's better.
There was a problem hiding this comment.
I think the per stage namespace is a lot better - thanks!
20ff8ad to
e05b067
Compare
|
|
||
| stage_dir = f"{chunk_prefix}/{execution_id}/{task.stage_name}" | ||
| # Sanitize for use as a path component: replace non-alphanumeric runs with '-' | ||
| output_stage_name = re.sub(r"[^a-zA-Z0-9_.-]+", "-", task.stage_name).strip("-") |
There was a problem hiding this comment.
Moved this into the subprocess worker so I had access to the original stage name here. As far as I can tell, the stage name was previously only used here so I can just do this transformation here.
| def __init__(self, stage_name: str) -> None: | ||
| self._stage_name = stage_name | ||
|
|
||
| def wrap(self, gen: Generator[T, None, None]) -> Generator[T, None, None]: |
There was a problem hiding this comment.
nit: it's more-idiomatic/less-verbose to use Iterator instead of Generator
|
|
||
| logger.info( | ||
| "[%s] [%s] %d/%d complete, %d in-flight, %d queued, %d/%d workers alive, %d dead", | ||
| "[%s] [%s] %d/%d complete, %d in-flight, %d queued, %d/%d workers alive, %d dead; " |
There was a problem hiding this comment.
I wonder if we should log these also in/per worker subprocess?
There was a problem hiding this comment.
I actually had this during testing, but removed it once I had it logged in the controller. Added it back in a separate commit so it's easy to pull back out if we want to.
e87ab0b to
ed0699e
Compare
Tracks items and bytes that are being saved to disk as part of a scatter operation. This won't properly capture non-scatter operations, but since scatter captures the bulk of the work, I'm starting here. Adding these stats to the existing logs, next step would be to plumb them through to the Iris coorindator and display them in the UI.
ed0699e to
4584c5d
Compare
Tracks items and bytes that are being saved to disk as part of a scatter operation. This won't properly capture non-scatter operations, but since scatter captures the bulk of the work, I'm starting here.
Adding these stats to the existing logs, next step would be to plumb them through to the Iris coordinator and display them in the UI.