Skip to content

Commit b936b99

Browse files
xxchanwcy-fdu
andauthored
feat: support streaming iceberg source (#20568)
Signed-off-by: xxchan <[email protected]> Co-authored-by: congyi <[email protected]>
1 parent 895412b commit b936b99

File tree

30 files changed

+1446
-122
lines changed

30 files changed

+1446
-122
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
statement ok
2+
SET RW_IMPLICIT_FLUSH = true;
3+
4+
statement ok
5+
CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar, PRIMARY KEY(i1, i2));
6+
7+
statement ok
8+
CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1;
9+
10+
statement ok
11+
CREATE SINK sink1 AS select * from mv1 WITH (
12+
connector = 'iceberg',
13+
type = 'upsert',
14+
database.name = 'demo_db',
15+
table.name = 'test_streaming',
16+
catalog.name = 'demo',
17+
catalog.type = 'storage',
18+
warehouse.path = 's3a://icebergdata/demo',
19+
s3.endpoint = 'http://127.0.0.1:9301',
20+
s3.region = 'us-east-1',
21+
s3.access.key = 'hummockadmin',
22+
s3.secret.key = 'hummockadmin',
23+
create_table_if_not_exists = 'true',
24+
commit_checkpoint_interval = 1,
25+
primary_key = 'i1,i2',
26+
partition_by = 'i1'
27+
);
28+
29+
statement ok
30+
INSERT INTO s1 (i1, i2, i3) values(1,'1','1'),(2,'2','2'),(3,'3','3'),(4,'4','4'),(5,'5','5');
31+
32+
statement ok
33+
CREATE SOURCE iceberg_t1_source
34+
WITH (
35+
connector = 'iceberg',
36+
s3.endpoint = 'http://127.0.0.1:9301',
37+
s3.region = 'us-east-1',
38+
s3.access.key = 'hummockadmin',
39+
s3.secret.key = 'hummockadmin',
40+
catalog.type = 'storage',
41+
warehouse.path = 's3a://icebergdata/demo',
42+
database.name = 'demo_db',
43+
table.name = 'test_streaming',
44+
);
45+
46+
statement ok
47+
create materialized view mv2 as select * from iceberg_t1_source;
48+
49+
# select only some columns
50+
statement ok
51+
create materialized view mv3 as select i2, i1 * 3 as abc from iceberg_t1_source;
52+
53+
sleep 5s
54+
55+
query I retry 3 backoff 5s
56+
select * from mv2 order by i1;
57+
----
58+
1 1 1
59+
2 2 2
60+
3 3 3
61+
4 4 4
62+
5 5 5
63+
64+
65+
query I retry 3 backoff 5s
66+
select * from mv3 order by i2;
67+
----
68+
1 3
69+
2 6
70+
3 9
71+
4 12
72+
5 15
73+
74+
75+
# delete cannot be reflected
76+
statement ok
77+
DELETE FROM s1 WHERE i1 < 3;
78+
79+
sleep 5s
80+
81+
query I
82+
select count(*) from iceberg_t1_source;
83+
----
84+
3
85+
86+
query I retry 3 backoff 5s
87+
select count(*) from mv2;
88+
----
89+
5
90+
91+
query I retry 3 backoff 5s
92+
select count(*) from mv3;
93+
----
94+
5
95+
96+
97+
# insert more data
98+
statement ok
99+
INSERT INTO s1 (i1, i2, i3) values(6,'6','6'),(7,'7','7'),(8,'8','8'),(9,'9','9'),(10,'10','10');
100+
101+
sleep 5s
102+
103+
query I
104+
select count(*) from iceberg_t1_source;
105+
----
106+
8
107+
108+
query I retry 3 backoff 5s
109+
select count(*) from mv2;
110+
----
111+
10
112+
113+
query I retry 3 backoff 5s
114+
select count(*) from mv3;
115+
----
116+
10
117+
118+
119+
# insert more data -- with upsert
120+
statement ok
121+
INSERT INTO s1 (i1, i2, i3) values(9,'9','99'),(10,'10','1010'), (11,'11','1111');
122+
123+
sleep 5s
124+
125+
query I
126+
select * from iceberg_t1_source order by i1;
127+
----
128+
3 3 3
129+
4 4 4
130+
5 5 5
131+
6 6 6
132+
7 7 7
133+
8 8 8
134+
9 9 99
135+
10 10 1010
136+
11 11 1111
137+
138+
query I retry 3 backoff 5s
139+
select * from mv2 order by i1, i3;
140+
----
141+
1 1 1
142+
2 2 2
143+
3 3 3
144+
4 4 4
145+
5 5 5
146+
6 6 6
147+
7 7 7
148+
8 8 8
149+
9 9 9
150+
9 9 99
151+
10 10 10
152+
10 10 1010
153+
11 11 1111
154+
155+
query I retry 3 backoff 5s
156+
select count(*) from mv3;
157+
----
158+
13
159+
160+
# test recovery
161+
statement ok
162+
RECOVER;
163+
164+
# insert more data
165+
statement ok
166+
INSERT INTO s1 (i1, i2, i3) values(12,'12','1212'),(13,'13','1313'),(14,'14','1414'),(15,'15','1515'),(16,'16','1616');
167+
168+
sleep 5s
169+
170+
query I
171+
select count(*) from iceberg_t1_source;
172+
----
173+
14
174+
175+
query I retry 3 backoff 5s
176+
select count(*) from mv2;
177+
----
178+
18
179+
180+
query I retry 3 backoff 5s
181+
select count(*) from mv3;
182+
----
183+
18
184+
185+
# insert large batch of data
186+
statement ok
187+
INSERT INTO s1 (i1, i2, i3) select 1, i::varchar, i::varchar from generate_series(10001, 20000) as i;
188+
189+
sleep 5s
190+
191+
query I
192+
select count(*) from iceberg_t1_source;
193+
----
194+
10014
195+
196+
query I retry 3 backoff 5s
197+
select count(*) from mv2;
198+
----
199+
10018
200+
201+
query I retry 3 backoff 5s
202+
select count(*) from mv3;
203+
----
204+
10018
205+
206+
# TODO: also test compaction
207+
208+
statement ok
209+
DROP SINK sink1;
210+
211+
statement ok
212+
DROP SOURCE iceberg_t1_source CASCADE;
213+
214+
statement ok
215+
DROP TABLE s1 cascade;
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
init_sqls = [
2+
'CREATE SCHEMA IF NOT EXISTS demo_db',
3+
'DROP TABLE IF EXISTS demo_db.test_streaming PURGE',
4+
]
5+
6+
slt = 'test_case/iceberg_source_streaming.slt'
7+
8+
drop_sqls = [
9+
'DROP TABLE IF EXISTS demo_db.test_streaming PURGE',
10+
'DROP SCHEMA IF EXISTS demo_db',
11+
]

src/batch/executors/src/executor/iceberg_scan.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub struct IcebergScanExecutor {
3939
#[allow(dead_code)]
4040
snapshot_id: Option<i64>,
4141
file_scan_tasks: Option<IcebergFileScanTask>,
42-
batch_size: usize,
42+
chunk_size: usize,
4343
schema: Schema,
4444
identity: String,
4545
metrics: Option<BatchMetrics>,
@@ -66,7 +66,7 @@ impl IcebergScanExecutor {
6666
iceberg_config: IcebergProperties,
6767
snapshot_id: Option<i64>,
6868
file_scan_tasks: IcebergFileScanTask,
69-
batch_size: usize,
69+
chunk_size: usize,
7070
schema: Schema,
7171
identity: String,
7272
metrics: Option<BatchMetrics>,
@@ -76,7 +76,7 @@ impl IcebergScanExecutor {
7676
Self {
7777
iceberg_config,
7878
snapshot_id,
79-
batch_size,
79+
chunk_size,
8080
schema,
8181
file_scan_tasks: Some(file_scan_tasks),
8282
identity,
@@ -113,7 +113,7 @@ impl IcebergScanExecutor {
113113
table.clone(),
114114
data_file_scan_task,
115115
IcebergScanOpts {
116-
batch_size: self.batch_size,
116+
chunk_size: self.chunk_size,
117117
need_seq_num: self.need_seq_num,
118118
need_file_path_and_pos: self.need_file_path_and_pos,
119119
},

src/common/src/catalog/column.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,10 @@ impl ColumnCatalog {
418418
)),
419419
]
420420
}
421+
422+
pub fn is_row_id_column(&self) -> bool {
423+
self.column_desc.column_id == ROW_ID_COLUMN_ID
424+
}
421425
}
422426

423427
impl From<PbColumnCatalog> for ColumnCatalog {

src/common/src/config.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,6 +1232,18 @@ pub struct StreamingDeveloperConfig {
12321232

12331233
#[serde(default)]
12341234
pub compute_client_config: RpcClientConfig,
1235+
1236+
/// `IcebergListExecutor`: The interval in seconds for Iceberg source to list new files.
1237+
#[serde(default = "default::developer::iceberg_list_interval_sec")]
1238+
pub iceberg_list_interval_sec: u64,
1239+
1240+
/// `IcebergFetchExecutor`: The number of files the executor will fetch concurrently in a batch.
1241+
#[serde(default = "default::developer::iceberg_fetch_batch_size")]
1242+
pub iceberg_fetch_batch_size: u64,
1243+
1244+
/// `IcebergSink`: The size of the cache for positional delete in the sink.
1245+
#[serde(default = "default::developer::iceberg_sink_positional_delete_cache_size")]
1246+
pub iceberg_sink_positional_delete_cache_size: usize,
12351247
}
12361248

12371249
/// The subsections `[batch.developer]`.
@@ -2244,6 +2256,18 @@ pub mod default {
22442256
pub fn rpc_client_connect_timeout_secs() -> u64 {
22452257
5
22462258
}
2259+
2260+
pub fn iceberg_list_interval_sec() -> u64 {
2261+
1
2262+
}
2263+
2264+
pub fn iceberg_fetch_batch_size() -> u64 {
2265+
1024
2266+
}
2267+
2268+
pub fn iceberg_sink_positional_delete_cache_size() -> usize {
2269+
1024
2270+
}
22472271
}
22482272

22492273
pub use crate::system_param::default as system;

src/config/example.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,9 @@ stream_switch_jdbc_pg_to_native = false
182182
stream_max_barrier_batch_size = 1024
183183
stream_hash_join_entry_state_max_rows = 30000
184184
stream_enable_explain_analyze_stats = true
185+
stream_iceberg_list_interval_sec = 1
186+
stream_iceberg_fetch_batch_size = 1024
187+
stream_iceberg_sink_positional_delete_cache_size = 1024
185188

186189
[streaming.developer.stream_compute_client_config]
187190
connect_timeout_secs = 5

src/connector/src/sink/iceberg/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -840,7 +840,10 @@ impl IcebergSinkWriter {
840840
MonitoredPositionDeleteWriterBuilder::new(
841841
SortPositionDeleteWriterBuilder::new(
842842
parquet_writer_builder.clone(),
843-
1024,
843+
writer_param
844+
.streaming_config
845+
.developer
846+
.iceberg_sink_positional_delete_cache_size,
844847
None,
845848
None,
846849
),

src/connector/src/sink/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ use prometheus::Registry;
6767
use risingwave_common::array::ArrayError;
6868
use risingwave_common::bitmap::Bitmap;
6969
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
70+
use risingwave_common::config::StreamingConfig;
7071
use risingwave_common::hash::ActorId;
7172
use risingwave_common::metrics::{
7273
LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
@@ -491,6 +492,7 @@ pub struct SinkWriterParam {
491492
pub sink_id: SinkId,
492493
pub sink_name: String,
493494
pub connector: String,
495+
pub streaming_config: StreamingConfig,
494496
}
495497

496498
#[derive(Clone)]
@@ -586,6 +588,7 @@ impl SinkWriterParam {
586588
sink_id: SinkId::new(1),
587589
sink_name: "test_sink".to_owned(),
588590
connector: "test_connector".to_owned(),
591+
streaming_config: StreamingConfig::default(),
589592
}
590593
}
591594
}

0 commit comments

Comments
 (0)