Skip to content

Commit 262e3cc

Browse files
committed
.
Signed-off-by: xxchan <[email protected]>
1 parent 61813d8 commit 262e3cc

File tree

18 files changed

+1004
-118
lines changed

18 files changed

+1004
-118
lines changed

Cargo.lock

+1-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+3-3
Original file line numberDiff line numberDiff line change
@@ -155,12 +155,12 @@ icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "0ec44f
155155
"prometheus",
156156
] }
157157
# branch dev_rebase_main_20241230
158-
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "683fb89edeaf8d1baae69e1f376d68b92be1d496", features = [
158+
iceberg = { path = "../iceberg-rust/crates/iceberg", features = [
159159
"storage-s3",
160160
"storage-gcs",
161161
] }
162-
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "683fb89edeaf8d1baae69e1f376d68b92be1d496" }
163-
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "683fb89edeaf8d1baae69e1f376d68b92be1d496" }
162+
iceberg-catalog-rest = { path = "../iceberg-rust/crates/catalog/rest" }
163+
iceberg-catalog-glue = { path = "../iceberg-rust/crates/catalog/glue" }
164164
opendal = "0.49"
165165
# used only by arrow-udf-flight
166166
arrow-flight = "53"

src/common/src/catalog/column.rs

+12
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,18 @@ impl ColumnCatalog {
424424
)),
425425
]
426426
}
427+
428+
pub fn is_row_id_column(&self) -> bool {
429+
self.column_desc.column_id == ROW_ID_COLUMN_ID
430+
}
431+
432+
// Partition
433+
// pub fn is_source_partition_or_offset_column(&self) -> bool {
434+
// self.column_desc
435+
// .additional_column
436+
// .column_type
437+
// .is_some_and(|col| matches!(col, ColumnType::Offset(_) | ColumnType::Partition(_)))
438+
// }
427439
}
428440

429441
impl From<PbColumnCatalog> for ColumnCatalog {

src/connector/src/source/iceberg/mod.rs

+32-6
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,18 @@ impl IcebergFileScanTask {
165165
IcebergFileScanTask::CountStar(_) => false,
166166
}
167167
}
168+
169+
pub fn files(&self) -> Vec<String> {
170+
match self {
171+
IcebergFileScanTask::Data(file_scan_tasks)
172+
| IcebergFileScanTask::EqualityDelete(file_scan_tasks)
173+
| IcebergFileScanTask::PositionDelete(file_scan_tasks) => file_scan_tasks
174+
.iter()
175+
.map(|task| task.data_file_path.clone())
176+
.collect(),
177+
IcebergFileScanTask::CountStar(_) => vec![],
178+
}
179+
}
168180
}
169181

170182
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
@@ -234,7 +246,9 @@ impl SplitEnumerator for IcebergSplitEnumerator {
234246
}
235247

236248
async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
237-
// Iceberg source does not support streaming queries
249+
// Like file source, iceberg streaming source has a List Executor and a Fetch Executor,
250+
// instead of relying on SplitEnumerator on meta.
251+
// TODO: add some validation logic here.
238252
Ok(vec![])
239253
}
240254
}
@@ -349,18 +363,30 @@ impl IcebergSplitEnumerator {
349363
let table_schema = table.metadata().current_schema();
350364
tracing::debug!("iceberg_table_schema: {:?}", table_schema);
351365

352-
let mut position_delete_files = vec![];
353-
let mut data_files = vec![];
354-
let mut equality_delete_files = vec![];
355366
let scan = table
356367
.scan()
357368
.with_filter(predicate)
358369
.snapshot_id(snapshot_id)
359370
.select(require_names)
360371
.build()
361-
.map_err(|e| anyhow!(e))?;
372+
.context("failed to build iceberg scan")?;
373+
Self::scan_to_splits(snapshot_id, scan, iceberg_scan_type, batch_parallelism).await
374+
}
362375

363-
let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
376+
pub async fn scan_to_splits(
377+
snapshot_id: i64,
378+
scan: TableScan,
379+
iceberg_scan_type: IcebergScanType,
380+
batch_parallelism: usize,
381+
) -> ConnectorResult<Vec<IcebergSplit>> {
382+
let mut position_delete_files = vec![];
383+
let mut data_files = vec![];
384+
let mut equality_delete_files = vec![];
385+
386+
let file_scan_stream = scan
387+
.plan_files()
388+
.await
389+
.context("failed to plan iceberg FileScanTask")?;
364390

365391
#[for_await]
366392
for task in file_scan_stream {

src/connector/src/source/reader/desc.rs

+11-3
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,13 @@ impl SourceDescBuilder {
9999
.map(|c| SourceColumnDesc::from(&c.column_desc))
100100
.collect();
101101

102-
for (existed, c) in columns_exist.iter().zip_eq_fast(&additional_columns) {
103-
if !existed {
104-
columns.push(SourceColumnDesc::hidden_addition_col_from_column_desc(c));
102+
// currently iceberg uses other columns. See `extract_iceberg_columns`
103+
// TODO: unify logic.
104+
if connector_name != "iceberg" {
105+
for (existed, c) in columns_exist.iter().zip_eq_fast(&additional_columns) {
106+
if !existed {
107+
columns.push(SourceColumnDesc::hidden_addition_col_from_column_desc(c));
108+
}
105109
}
106110
}
107111

@@ -173,6 +177,10 @@ impl SourceDescBuilder {
173177
metrics: self.metrics.clone(),
174178
})
175179
}
180+
181+
pub fn with_properties(&self) -> WithOptionsSecResolved {
182+
self.with_properties.clone()
183+
}
176184
}
177185

178186
pub mod test_utils {

src/connector/src/with_options.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,12 @@ pub trait WithPropertiesExt: Get + Sized {
150150

151151
fn connector_need_pk(&self) -> bool {
152152
// Currently only iceberg connector doesn't need primary key
153-
!self.is_iceberg_connector()
153+
// introduced in https://github.com/risingwavelabs/risingwave/pull/14971
154+
// XXX: This seems not the correct way. Iceberg doesn't necessarily has a PK.
155+
// "batch source" doesn't need a PK?
156+
// For streaming, if it has a PK, do we want to use it? It seems not safe.
157+
// !self.is_iceberg_connector()
158+
true
154159
}
155160

156161
fn is_legacy_fs_connector(&self) -> bool {

src/frontend/src/optimizer/plan_node/generic/source.rs

+104-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use std::rc::Rc;
1616

1717
use educe::Educe;
18-
use risingwave_common::catalog::{ColumnCatalog, Field, Schema};
18+
use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, Schema};
1919
use risingwave_common::types::DataType;
2020
use risingwave_common::util::sort_util::OrderType;
2121
use risingwave_connector::WithPropertiesExt;
@@ -77,6 +77,8 @@ impl GenericPlanNode for Source {
7777
}
7878

7979
fn stream_key(&self) -> Option<Vec<usize>> {
80+
// FIXME: output col idx is not set. But iceberg source can prune cols.
81+
// XXX: there's a RISINGWAVE_ICEBERG_ROW_ID. Should we use it?
8082
self.row_id_index.map(|idx| vec![idx])
8183
}
8284

@@ -96,6 +98,79 @@ impl GenericPlanNode for Source {
9698
}
9799

98100
impl Source {
101+
/// The output is [`risingwave_connector::source::filesystem::FsPageItem`] / [`risingwave_connector::source::iceberg::IcebergSplit`]
102+
pub fn file_list_node(core: Self) -> Self {
103+
let column_catalog = if core.is_iceberg_connector() {
104+
vec![
105+
ColumnCatalog {
106+
column_desc: ColumnDesc::from_field_with_column_id(
107+
&Field {
108+
name: "partition_id".to_owned(),
109+
data_type: DataType::Varchar,
110+
},
111+
0,
112+
),
113+
is_hidden: false,
114+
},
115+
ColumnCatalog {
116+
column_desc: ColumnDesc::from_field_with_column_id(
117+
&Field {
118+
name: "split".to_owned(),
119+
data_type: DataType::Jsonb,
120+
},
121+
0,
122+
),
123+
is_hidden: false,
124+
},
125+
]
126+
} else if core.is_new_fs_connector() {
127+
vec![
128+
ColumnCatalog {
129+
column_desc: ColumnDesc::from_field_with_column_id(
130+
&Field {
131+
name: "filename".to_owned(),
132+
data_type: DataType::Varchar,
133+
},
134+
0,
135+
),
136+
is_hidden: false,
137+
},
138+
// This columns seems unused.
139+
ColumnCatalog {
140+
column_desc: ColumnDesc::from_field_with_column_id(
141+
&Field {
142+
name: "last_edit_time".to_owned(),
143+
data_type: DataType::Timestamptz,
144+
sub_fields: vec![],
145+
type_name: "".to_owned(),
146+
},
147+
1,
148+
),
149+
is_hidden: false,
150+
},
151+
ColumnCatalog {
152+
column_desc: ColumnDesc::from_field_with_column_id(
153+
&Field {
154+
name: "file_size".to_owned(),
155+
data_type: DataType::Int64,
156+
sub_fields: vec![],
157+
type_name: "".to_owned(),
158+
},
159+
0,
160+
),
161+
is_hidden: false,
162+
},
163+
]
164+
} else {
165+
unreachable!()
166+
};
167+
Self {
168+
column_catalog,
169+
row_id_index: None,
170+
..core
171+
}
172+
}
173+
99174
pub fn is_new_fs_connector(&self) -> bool {
100175
self.catalog
101176
.as_ref()
@@ -119,6 +194,34 @@ impl Source {
119194
self.is_iceberg_connector()
120195
}
121196

197+
pub fn exclude_iceberg_hidden_columns(mut self) -> Self {
198+
let Some(catalog) = &mut self.catalog else {
199+
return self;
200+
};
201+
if catalog.info.is_shared() {
202+
// for shared source, we should produce all columns
203+
return self;
204+
}
205+
if self.kind != SourceNodeKind::CreateMViewOrBatch {
206+
return self;
207+
}
208+
209+
let prune = |col: &ColumnCatalog| col.is_hidden() && !col.is_row_id_column();
210+
211+
// minus the number of hidden columns before row_id_index.
212+
self.row_id_index = self.row_id_index.map(|idx| {
213+
let mut cnt = 0;
214+
for col in self.column_catalog.iter().take(idx + 1) {
215+
if prune(col) {
216+
cnt += 1;
217+
}
218+
}
219+
idx - cnt
220+
});
221+
self.column_catalog.retain(|c| !prune(c));
222+
self
223+
}
224+
122225
/// The columns in stream/batch source node indicate the actual columns it will produce,
123226
/// instead of the columns defined in source catalog. The difference is generated columns.
124227
pub fn exclude_generated_columns(mut self) -> (Self, Option<usize>) {

src/frontend/src/optimizer/plan_node/logical_iceberg_scan.rs

+10-1
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,19 @@ impl LogicalIcebergScan {
6464
pub fn clone_with_required_cols(&self, required_cols: &[usize]) -> Self {
6565
assert!(!required_cols.is_empty());
6666
let mut core = self.core.clone();
67+
let mut has_row_id = false;
6768
core.column_catalog = required_cols
6869
.iter()
69-
.map(|idx| core.column_catalog[*idx].clone())
70+
.map(|idx| {
71+
if Some(*idx) == core.row_id_index {
72+
has_row_id = true;
73+
}
74+
core.column_catalog[*idx].clone()
75+
})
7076
.collect();
77+
if !has_row_id {
78+
core.row_id_index = None;
79+
}
7180
let base = PlanBase::new_logical_with_core(&core);
7281

7382
LogicalIcebergScan {

0 commit comments

Comments
 (0)