Skip to content

Commit 01cc84a

Browse files
committed
refactor(iceberg): move scan from batch to connector
Add function , so the code can be shared by stream source.
1 parent 0b77efb commit 01cc84a

File tree

9 files changed

+180
-105
lines changed

9 files changed

+180
-105
lines changed

Cargo.lock

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/batch/executors/Cargo.toml

+1-2
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ risingwave_pb = { workspace = true }
3535
risingwave_rpc_client = { workspace = true }
3636
risingwave_storage = { workspace = true }
3737
rw_futures_util = { workspace = true }
38-
scopeguard = "1"
3938
thiserror-ext = { workspace = true }
4039
tokio = { version = "0.2", package = "madsim-tokio", features = [
4140
"rt",
@@ -47,7 +46,6 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
4746
"fs",
4847
] }
4948
tokio-postgres = "0.7"
50-
tokio-stream = { workspace = true }
5149
tracing = "0.1"
5250
uuid = { version = "1", features = ["v4"] }
5351

@@ -61,6 +59,7 @@ risingwave_expr_impl = { workspace = true }
6159
risingwave_hummock_sdk = { workspace = true }
6260
tempfile = "3"
6361
tikv-jemallocator = { workspace = true }
62+
tokio-stream = { workspace = true }
6463

6564
[[bench]]
6665
name = "filter"

src/batch/executors/src/executor/iceberg_scan.rs

+16-57
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,17 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::sync::Arc;
16-
1715
use futures_async_stream::try_stream;
1816
use futures_util::stream::StreamExt;
1917
use itertools::Itertools;
20-
use risingwave_common::array::arrow::IcebergArrowConvert;
21-
use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
18+
use risingwave_common::array::DataChunk;
2219
use risingwave_common::catalog::{
2320
Field, Schema, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
2421
};
2522
use risingwave_common::types::{DataType, ScalarImpl};
26-
use risingwave_common_estimate_size::EstimateSize;
27-
use risingwave_connector::source::iceberg::{IcebergFileScanTask, IcebergProperties, IcebergSplit};
23+
use risingwave_connector::source::iceberg::{
24+
scan_task_to_chunk, IcebergFileScanTask, IcebergProperties, IcebergScanOpts, IcebergSplit,
25+
};
2826
use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
2927
use risingwave_connector::WithOptionsSecResolved;
3028
use risingwave_expr::expr::LiteralExpression;
@@ -92,7 +90,6 @@ impl IcebergScanExecutor {
9290
async fn do_execute(mut self: Box<Self>) {
9391
let table = self.iceberg_config.load_table().await?;
9492
let data_types = self.schema.data_types();
95-
let table_name = table.identifier().name().to_owned();
9693

9794
let data_file_scan_tasks = match Option::take(&mut self.file_scan_tasks) {
9895
Some(IcebergFileScanTask::Data(data_file_scan_tasks)) => data_file_scan_tasks,
@@ -110,58 +107,20 @@ impl IcebergScanExecutor {
110107
}
111108
};
112109

113-
let mut read_bytes = 0;
114-
let _metrics_report_guard = scopeguard::guard(
115-
(read_bytes, table_name, self.metrics.clone()),
116-
|(read_bytes, table_name, metrics)| {
117-
if let Some(metrics) = metrics {
118-
metrics
119-
.iceberg_scan_metrics()
120-
.iceberg_read_bytes
121-
.with_guarded_label_values(&[&table_name])
122-
.inc_by(read_bytes as _);
123-
}
124-
},
125-
);
126110
for data_file_scan_task in data_file_scan_tasks {
127-
let data_file_path = data_file_scan_task.data_file_path.clone();
128-
let data_sequence_number = data_file_scan_task.sequence_number;
129-
130-
let reader = table
131-
.reader_builder()
132-
.with_batch_size(self.batch_size)
133-
.build();
134-
let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));
135-
136-
let mut record_batch_stream =
137-
reader.read(Box::pin(file_scan_stream)).await?.enumerate();
138-
139-
while let Some((index, record_batch)) = record_batch_stream.next().await {
140-
let record_batch = record_batch?;
141-
142-
// iceberg_t1_source
143-
let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
144-
if self.need_seq_num {
145-
let (mut columns, visibility) = chunk.into_parts();
146-
columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
147-
vec![data_sequence_number; visibility.len()],
148-
))));
149-
chunk = DataChunk::from_parts(columns.into(), visibility)
150-
};
151-
if self.need_file_path_and_pos {
152-
let (mut columns, visibility) = chunk.into_parts();
153-
columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
154-
vec![data_file_path.as_str(); visibility.len()],
155-
))));
156-
let index_start = (index * self.batch_size) as i64;
157-
columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
158-
(index_start..(index_start + visibility.len() as i64))
159-
.collect::<Vec<i64>>(),
160-
))));
161-
chunk = DataChunk::from_parts(columns.into(), visibility)
162-
}
111+
#[for_await]
112+
for chunk in scan_task_to_chunk(
113+
table.clone(),
114+
data_file_scan_task,
115+
IcebergScanOpts {
116+
batch_size: self.batch_size,
117+
need_seq_num: self.need_seq_num,
118+
need_file_path_and_pos: self.need_file_path_and_pos,
119+
},
120+
self.metrics.as_ref().map(|m| m.iceberg_scan_metrics()),
121+
) {
122+
let chunk = chunk?;
163123
assert_eq!(chunk.data_types(), data_types);
164-
read_bytes += chunk.estimated_heap_size() as u64;
165124
yield chunk;
166125
}
167126
}

src/batch/src/monitor/stats.rs

+4-29
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ use prometheus::{
1919
histogram_opts, register_histogram_with_registry, register_int_counter_with_registry,
2020
Histogram, IntGauge, Registry,
2121
};
22-
use risingwave_common::metrics::{LabelGuardedIntCounterVec, TrAdderGauge};
22+
use risingwave_common::metrics::TrAdderGauge;
2323
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
24+
use risingwave_connector::source::iceberg::IcebergScanMetrics;
2425

2526
/// Metrics for batch executor.
2627
/// Currently, it contains:
@@ -93,8 +94,8 @@ impl BatchMetricsInner {
9394
&self.batch_manager_metrics
9495
}
9596

96-
pub fn iceberg_scan_metrics(&self) -> &IcebergScanMetrics {
97-
&self.iceberg_scan_metrics
97+
pub fn iceberg_scan_metrics(&self) -> Arc<IcebergScanMetrics> {
98+
self.iceberg_scan_metrics.clone()
9899
}
99100

100101
pub fn for_test() -> BatchMetrics {
@@ -182,29 +183,3 @@ impl BatchSpillMetrics {
182183
Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone())
183184
}
184185
}
185-
186-
#[derive(Clone)]
187-
pub struct IcebergScanMetrics {
188-
pub iceberg_read_bytes: LabelGuardedIntCounterVec<1>,
189-
}
190-
191-
impl IcebergScanMetrics {
192-
fn new(registry: &Registry) -> Self {
193-
let iceberg_read_bytes = register_guarded_int_counter_vec_with_registry!(
194-
"iceberg_read_bytes",
195-
"Total size of iceberg read requests",
196-
&["table_name"],
197-
registry
198-
)
199-
.unwrap();
200-
201-
Self { iceberg_read_bytes }
202-
}
203-
204-
pub fn for_test() -> Arc<Self> {
205-
Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone())
206-
}
207-
}
208-
209-
pub static GLOBAL_ICEBERG_SCAN_METRICS: LazyLock<IcebergScanMetrics> =
210-
LazyLock::new(|| IcebergScanMetrics::new(&GLOBAL_METRICS_REGISTRY));

src/batch/src/task/env.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@ use std::sync::Arc;
1717
use risingwave_common::config::{BatchConfig, MetricLevel};
1818
use risingwave_common::util::addr::HostAddr;
1919
use risingwave_common::util::worker_util::WorkerNodeId;
20+
use risingwave_connector::source::iceberg::IcebergScanMetrics;
2021
use risingwave_connector::source::monitor::SourceMetrics;
2122
use risingwave_dml::dml_manager::DmlManagerRef;
2223
use risingwave_rpc_client::ComputeClientPoolRef;
2324
use risingwave_storage::StateStoreImpl;
2425

25-
use crate::monitor::{
26-
BatchExecutorMetrics, BatchManagerMetrics, BatchSpillMetrics, IcebergScanMetrics,
27-
};
26+
use crate::monitor::{BatchExecutorMetrics, BatchManagerMetrics, BatchSpillMetrics};
2827
use crate::task::BatchManager;
2928

3029
/// The global environment for task execution.

src/compute/src/server.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use std::time::Duration;
1818

1919
use risingwave_batch::monitor::{
2020
GLOBAL_BATCH_EXECUTOR_METRICS, GLOBAL_BATCH_MANAGER_METRICS, GLOBAL_BATCH_SPILL_METRICS,
21-
GLOBAL_ICEBERG_SCAN_METRICS,
2221
};
2322
use risingwave_batch::rpc::service::task_service::BatchServiceImpl;
2423
use risingwave_batch::spill::spill_op::SpillOp;
@@ -40,6 +39,7 @@ use risingwave_common::util::tokio_util::sync::CancellationToken;
4039
use risingwave_common::{GIT_SHA, RW_VERSION};
4140
use risingwave_common_heap_profiling::HeapProfiler;
4241
use risingwave_common_service::{MetricsManager, ObserverManager, TracingExtractLayer};
42+
use risingwave_connector::source::iceberg::GLOBAL_ICEBERG_SCAN_METRICS;
4343
use risingwave_connector::source::monitor::GLOBAL_SOURCE_METRICS;
4444
use risingwave_dml::dml_manager::DmlManager;
4545
use risingwave_pb::common::worker_node::Property;

src/connector/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ rustls-native-certs = "0.8"
121121
rustls-pemfile = "2"
122122
rustls-pki-types = "1"
123123
rw_futures_util = { workspace = true }
124+
scopeguard = "1"
124125
sea-schema = { version = "0.16", default-features = false, features = [
125126
"discovery",
126127
"sqlx-postgres",
@@ -137,7 +138,6 @@ strum = "0.26"
137138
strum_macros = "0.26"
138139
tempfile = "3"
139140
thiserror = "1"
140-
141141
thiserror-ext = { workspace = true }
142142
# To easiy get the type_name and impl IntoSql for rust_decimal, we fork the crate.
143143
# Another reason is that we are planning to refactor their IntoSql trait to allow specifying the type to convert.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright 2025 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::{Arc, LazyLock};
16+
17+
use prometheus::Registry;
18+
use risingwave_common::metrics::LabelGuardedIntCounterVec;
19+
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
20+
use risingwave_common::register_guarded_int_counter_vec_with_registry;
21+
22+
#[derive(Clone)]
23+
pub struct IcebergScanMetrics {
24+
pub iceberg_read_bytes: LabelGuardedIntCounterVec<1>,
25+
}
26+
27+
impl IcebergScanMetrics {
28+
fn new(registry: &Registry) -> Self {
29+
let iceberg_read_bytes = register_guarded_int_counter_vec_with_registry!(
30+
"iceberg_read_bytes",
31+
"Total size of iceberg read requests",
32+
&["table_name"],
33+
registry
34+
)
35+
.unwrap();
36+
37+
Self { iceberg_read_bytes }
38+
}
39+
40+
pub fn for_test() -> Arc<Self> {
41+
Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone())
42+
}
43+
}
44+
45+
pub static GLOBAL_ICEBERG_SCAN_METRICS: LazyLock<IcebergScanMetrics> =
46+
LazyLock::new(|| IcebergScanMetrics::new(&GLOBAL_METRICS_REGISTRY));

0 commit comments

Comments
 (0)