Conversation
Signed-off-by: xgui <xgui@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a SpillMetricsMonitor actor to compute and report peak and average object store spilling rates, which is a valuable addition for performance monitoring in benchmarks. The implementation uses a detached Ray actor with a background polling thread, which is a suitable design. The integration into RayDataLoaderFactory is clean. My review includes a couple of suggestions to enhance the robustness of the metric calculation and improve code consistency.
| ) | ||
| return memory_info.store_stats.spilled_bytes_total | ||
|
|
||
| def _poll_loop(self): |
There was a problem hiding this comment.
| if delta_time > 0: | ||
| rate_gb_s = (delta_bytes / (1024**3)) / delta_time | ||
| with self._lock: | ||
| self._spill_rates_gb_s.append(rate_gb_s) |
There was a problem hiding this comment.
The spilled_bytes_total counter could theoretically reset (e.g., on GCS restart), which would cause delta_bytes to be negative. This would result in a negative spill rate being recorded, skewing the average calculation. It's safer to only calculate the rate for non-negative delta_bytes.
| if delta_time > 0: | |
| rate_gb_s = (delta_bytes / (1024**3)) / delta_time | |
| with self._lock: | |
| self._spill_rates_gb_s.append(rate_gb_s) | |
| if delta_time > 0 and delta_bytes >= 0: | |
| rate_gb_s = (delta_bytes / (1024**3)) / delta_time | |
| with self._lock: | |
| self._spill_rates_gb_s.append(rate_gb_s) |
Description
Related issues
Additional information