Skip to content

Commit 0b77efb

Browse files
committed
refactor: remove unnessary check_iceberg_source
- Originally added in #14971 (When we can specify columns) - After #15415, derive columns from iceberg source automatically and the check becomes useless. Signed-off-by: xxchan <[email protected]>
1 parent 1eab263 commit 0b77efb

File tree

5 files changed

+8
-63
lines changed

5 files changed

+8
-63
lines changed

.git-blame-ignore-revs

+3
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,6 @@ c583e2c6c054764249acf484438c7bf7197765f4
4848

4949
# refactor: split catalog to smaller files (#19870)
5050
d6341b74be3f1913cc93993a95c147999df1ff74
51+
52+
# refactor: add 2 pedantic clippy lints for strings (#19614)
53+
119e17eee482b6add5d5cb6ceb3b84326bf5ce68

src/common/src/array/data_chunk.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use crate::util::value_encoding::{
4343
};
4444

4545
/// [`DataChunk`] is a collection of Columns,
46-
/// a with visibility mask for each row.
46+
/// with a visibility mask for each row.
4747
/// For instance, we could have a [`DataChunk`] of this format.
4848
///
4949
/// | v1 | v2 | v3 |

src/common/src/catalog/column.rs

-6
Original file line numberDiff line numberDiff line change
@@ -401,12 +401,6 @@ impl ColumnCatalog {
401401
]
402402
}
403403

404-
pub fn is_iceberg_hidden_col(&self) -> bool {
405-
self.column_desc.name == ICEBERG_SEQUENCE_NUM_COLUMN_NAME
406-
|| self.column_desc.name == ICEBERG_FILE_PATH_COLUMN_NAME
407-
|| self.column_desc.name == ICEBERG_FILE_POS_COLUMN_NAME
408-
}
409-
410404
/// Note: these columns are added in `SourceStreamChunkRowWriter::do_action`.
411405
/// May also look for the usage of `SourceColumnType`.
412406
pub fn debezium_cdc_source_cols() -> [Self; 3] {

src/frontend/src/handler/create_source.rs

+3-8
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use std::sync::LazyLock;
1919
use anyhow::{anyhow, Context};
2020
use either::Either;
2121
use external_schema::debezium::extract_debezium_avro_table_pk_columns;
22-
use external_schema::iceberg::check_iceberg_source;
2322
use external_schema::nexmark::check_nexmark_schema;
2423
use itertools::Itertools;
2524
use maplit::{convert_args, hashmap, hashset};
@@ -28,7 +27,7 @@ use rand::Rng;
2827
use risingwave_common::array::arrow::{arrow_schema_iceberg, IcebergArrowConvert};
2928
use risingwave_common::bail_not_implemented;
3029
use risingwave_common::catalog::{
31-
debug_assert_column_ids_distinct, ColumnCatalog, ColumnDesc, ColumnId, Schema, TableId,
30+
debug_assert_column_ids_distinct, ColumnCatalog, ColumnDesc, ColumnId, TableId,
3231
INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME, ROW_ID_COLUMN_NAME,
3332
};
3433
use risingwave_common::license::Feature;
@@ -620,7 +619,7 @@ pub(super) fn bind_source_watermark(
620619
///
621620
/// One should only call this function after all properties of all columns are resolved, like
622621
/// generated column descriptors.
623-
pub(super) async fn check_format_encode(
622+
pub(super) fn check_format_encode(
624623
props: &WithOptionsSecResolved,
625624
row_id_index: Option<usize>,
626625
columns: &[ColumnCatalog],
@@ -631,10 +630,6 @@ pub(super) async fn check_format_encode(
631630

632631
if connector == NEXMARK_CONNECTOR {
633632
check_nexmark_schema(props, row_id_index, columns)
634-
} else if connector == ICEBERG_CONNECTOR {
635-
Ok(check_iceberg_source(props, columns)
636-
.await
637-
.map_err(|err| ProtocolError(err.to_report_string()))?)
638633
} else {
639634
Ok(())
640635
}
@@ -892,7 +887,7 @@ pub async fn bind_create_source_or_table_with_connector(
892887
sql_columns_defs.to_vec(),
893888
&pk_col_ids,
894889
)?;
895-
check_format_encode(&with_properties, row_id_index, &columns).await?;
890+
check_format_encode(&with_properties, row_id_index, &columns)?;
896891

897892
let definition = handler_args.normalized_sql.clone();
898893

src/frontend/src/handler/create_source/external_schema/iceberg.rs

+1-48
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use super::*;
1616

17+
/// TODO: make hidden columns additional columns, instead of normal columns?
1718
pub async fn extract_iceberg_columns(
1819
with_properties: &WithOptionsSecResolved,
1920
) -> anyhow::Result<Vec<ColumnCatalog>> {
@@ -51,51 +52,3 @@ pub async fn extract_iceberg_columns(
5152
)))
5253
}
5354
}
54-
55-
pub async fn check_iceberg_source(
56-
props: &WithOptionsSecResolved,
57-
columns: &[ColumnCatalog],
58-
) -> anyhow::Result<()> {
59-
let props = ConnectorProperties::extract(props.clone(), true)?;
60-
let ConnectorProperties::Iceberg(properties) = props else {
61-
return Err(anyhow!(format!(
62-
"Invalid properties for iceberg source: {:?}",
63-
props
64-
)));
65-
};
66-
67-
let schema = Schema {
68-
fields: columns
69-
.iter()
70-
.filter(|&c| !c.is_iceberg_hidden_col())
71-
.cloned()
72-
.map(|c| c.column_desc.into())
73-
.collect(),
74-
};
75-
76-
let table = properties.load_table().await?;
77-
78-
let iceberg_schema =
79-
::iceberg::arrow::schema_to_arrow_schema(table.metadata().current_schema())?;
80-
81-
for f1 in schema.fields() {
82-
if !iceberg_schema.fields.iter().any(|f2| f2.name() == &f1.name) {
83-
return Err(anyhow::anyhow!(format!(
84-
"Column {} not found in iceberg table",
85-
f1.name
86-
)));
87-
}
88-
}
89-
90-
let new_iceberg_field = iceberg_schema
91-
.fields
92-
.iter()
93-
.filter(|f1| schema.fields.iter().any(|f2| f1.name() == &f2.name))
94-
.cloned()
95-
.collect::<Vec<_>>();
96-
let new_iceberg_schema = arrow_schema_iceberg::Schema::new(new_iceberg_field);
97-
98-
risingwave_connector::sink::iceberg::try_matches_arrow_schema(&schema, &new_iceberg_schema)?;
99-
100-
Ok(())
101-
}

0 commit comments

Comments
 (0)