Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,21 @@
stack=False,
)

ITERATION_PREFETCHED_BLOCKS_COUNT_PANEL = Panel(
id=90,
title="Iteration Prefetched Blocks Count",
description="Current number of prefetched blocks in the iterator",
unit="blocks",
targets=[
Target(
expr="sum(ray_data_iter_prefetched_blocks_count{{{global_filters}}}) by (dataset)",
legend="Prefetched Blocks: {{dataset}}",
)
],
fill=0,
stack=False,
)

# Ray Data Metrics (Miscellaneous)
SCHEDULING_LOOP_DURATION_PANEL = Panel(
id=47,
Expand Down Expand Up @@ -1390,6 +1405,7 @@
ITERATION_BLOCKS_LOCAL_PANEL,
ITERATION_BLOCKS_REMOTE_PANEL,
ITERATION_BLOCKS_UNKNOWN_LOCATION_PANEL,
ITERATION_PREFETCHED_BLOCKS_COUNT_PANEL,
],
collapsed=True,
),
Expand Down
6 changes: 6 additions & 0 deletions python/ray/data/_internal/block_batching/iter_batches.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ def get_next_ref_bundle() -> RefBundle:
current_window_size = 0

if num_batches_to_prefetch <= 0:
if stats:
stats.iter_prefetched_blocks_count = 0
for ref_bundle in ref_bundles:
for block_ref in ref_bundle.block_refs:
yield block_ref
Expand All @@ -398,6 +400,8 @@ def get_next_ref_bundle() -> RefBundle:
break

prefetcher.prefetch_blocks([block_ref for block_ref, _ in list(sliding_window)])
if stats:
stats.iter_prefetched_blocks_count = len(sliding_window)

while sliding_window:
block_ref, metadata = sliding_window.popleft()
Expand All @@ -413,6 +417,8 @@ def get_next_ref_bundle() -> RefBundle:
)
except StopIteration:
pass
if stats:
stats.iter_prefetched_blocks_count = len(sliding_window)
yield block_ref
trace_deallocation(block_ref, loc="iter_batches", free=eager_free)
prefetcher.stop()
Expand Down
15 changes: 15 additions & 0 deletions python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,11 @@ def __init__(self, max_stats=1000):
description="Number of blocks that have unknown locations",
tag_keys=iter_tag_keys,
)
self.iter_prefetched_blocks_count = Gauge(
"data_iter_prefetched_blocks_count",
description="Current number of prefetched blocks in the iterator",
tag_keys=iter_tag_keys,
)

# === Dataset and Operator Metadata Metrics ===
dataset_tags = ("dataset", "job_id", "start_time")
Expand Down Expand Up @@ -568,6 +573,7 @@ def update_iteration_metrics(
self.iter_blocks_local.set(stats.iter_blocks_local, tags)
self.iter_blocks_remote.set(stats.iter_blocks_remote, tags)
self.iter_unknown_location.set(stats.iter_unknown_location, tags)
self.iter_prefetched_blocks_count.set(stats.iter_prefetched_blocks_count, tags)

self.iter_block_fetching_s.set(stats.iter_get_s.get(), tags)
self.iter_batch_shaping_s.set(stats.iter_next_batch_s.get(), tags)
Expand Down Expand Up @@ -976,6 +982,7 @@ def __init__(
self.iter_blocks_local: int = 0
self.iter_blocks_remote: int = 0
self.iter_unknown_location: int = 0
self.iter_prefetched_blocks_count: int = 0

# Memory usage stats
self.global_bytes_spilled: int = 0
Expand Down Expand Up @@ -1018,6 +1025,7 @@ def to_summary(self) -> "DatasetStatsSummary":
self.iter_blocks_local,
self.iter_blocks_remote,
self.iter_unknown_location,
self.iter_prefetched_blocks_count,
)

stats_summary_parents = []
Expand Down Expand Up @@ -1730,6 +1738,8 @@ class IterStatsSummary:
iter_blocks_remote: int
# Num of blocks with unknown locations
iter_unknown_location: int
# Current number of prefetched blocks in the iterator
iter_prefetched_blocks_count: int

def __str__(self) -> str:
return self.to_string()
Expand Down Expand Up @@ -1830,6 +1840,10 @@ def to_string(self) -> str:
out += " * Num blocks unknown location: {}\n".format(
self.iter_unknown_location
)
if self.iter_prefetched_blocks_count:
out += " * Num prefetched blocks: {}\n".format(
self.iter_prefetched_blocks_count
)
if self.streaming_split_coord_time.get() != 0:
out += "Streaming split coordinator overhead time: "
out += f"{fmt(self.streaming_split_coord_time.get())}\n"
Expand All @@ -1846,6 +1860,7 @@ def __repr__(self, level=0) -> str:
f"{indent} iter_blocks_local={self.iter_blocks_local or None},\n"
f"{indent} iter_blocks_remote={self.iter_blocks_remote or None},\n"
f"{indent} iter_unknown_location={self.iter_unknown_location or None},\n"
f"{indent} iter_prefetched_blocks_count={self.iter_prefetched_blocks_count or None},\n"
f"{indent} next_time={fmt(self.next_time.get()) or None},\n"
f"{indent} format_time={fmt(self.format_time.get()) or None},\n"
f"{indent} user_time={fmt(self.user_time.get()) or None},\n"
Expand Down
5 changes: 5 additions & 0 deletions python/ray/data/tests/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,7 @@ def test_dataset__repr__(ray_start_regular_shared, restore_data_context):
" iter_blocks_local=None,\n"
" iter_blocks_remote=None,\n"
" iter_unknown_location=None,\n"
" iter_prefetched_blocks_count=None,\n"
" next_time=T,\n"
" format_time=T,\n"
" user_time=T,\n"
Expand All @@ -813,6 +814,7 @@ def test_dataset__repr__(ray_start_regular_shared, restore_data_context):
" iter_blocks_local=None,\n"
" iter_blocks_remote=None,\n"
" iter_unknown_location=None,\n"
" iter_prefetched_blocks_count=None,\n"
" next_time=T,\n"
" format_time=T,\n"
" user_time=T,\n"
Expand Down Expand Up @@ -933,6 +935,7 @@ def check_stats():
" iter_blocks_local=None,\n"
" iter_blocks_remote=None,\n"
" iter_unknown_location=N,\n"
" iter_prefetched_blocks_count=None,\n"
" next_time=T,\n"
" format_time=T,\n"
" user_time=T,\n"
Expand Down Expand Up @@ -1029,6 +1032,7 @@ def check_stats():
" iter_blocks_local=None,\n"
" iter_blocks_remote=None,\n"
" iter_unknown_location=None,\n"
" iter_prefetched_blocks_count=None,\n"
" next_time=T,\n"
" format_time=T,\n"
" user_time=T,\n"
Expand All @@ -1051,6 +1055,7 @@ def check_stats():
" iter_blocks_local=None,\n"
" iter_blocks_remote=None,\n"
" iter_unknown_location=None,\n"
" iter_prefetched_blocks_count=None,\n"
" next_time=T,\n"
" format_time=T,\n"
" user_time=T,\n"
Expand Down