Skip to content

Commit 78e65cd

Browse files
chenzl25yuhao-suclaude
authored
feat(stream): support partitioned gapfill (#25287)
Co-authored-by: Yuhao Su <yuhaosu@outlook.com> Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 59bf027 commit 78e65cd

18 files changed

Lines changed: 1972 additions & 1052 deletions

File tree

Lines changed: 336 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,336 @@
1+
# Test for GAP_FILL with PARTITION BY in normal streaming mode.
2+
# Tests that gap filling operates independently per partition.
3+
4+
statement ok
5+
SET RW_IMPLICIT_FLUSH TO true;
6+
7+
# ============================================
8+
# Test 1: Basic PARTITION BY with single column
9+
# ============================================
10+
11+
statement ok
12+
CREATE TABLE sensor_data (
13+
device_id INT,
14+
ts TIMESTAMP,
15+
value DOUBLE,
16+
PRIMARY KEY (device_id, ts)
17+
);
18+
19+
statement ok
20+
INSERT INTO sensor_data VALUES
21+
(1, '2024-05-21 10:00:00', 10.0),
22+
(1, '2024-05-21 10:03:00', 40.0),
23+
(2, '2024-05-21 10:00:00', 100.0),
24+
(2, '2024-05-21 10:02:00', 120.0);
25+
26+
# Gap fill per device - each device should be filled independently
27+
statement ok
28+
CREATE MATERIALIZED VIEW gf_partitioned AS
29+
SELECT device_id, ts, value
30+
FROM GAP_FILL(sensor_data, ts, INTERVAL '1' MINUTE, LOCF(value), PARTITION_BY(device_id));
31+
32+
# Device 1: gaps at 10:01, 10:02 filled with LOCF from 10:00 (value=10)
33+
# Device 2: gap at 10:01 filled with LOCF from 10:00 (value=100)
34+
query ITR
35+
SELECT device_id, ts, value FROM gf_partitioned ORDER BY device_id, ts;
36+
----
37+
1 2024-05-21 10:00:00 10
38+
1 2024-05-21 10:01:00 10
39+
1 2024-05-21 10:02:00 10
40+
1 2024-05-21 10:03:00 40
41+
2 2024-05-21 10:00:00 100
42+
2 2024-05-21 10:01:00 100
43+
2 2024-05-21 10:02:00 120
44+
45+
# Insert into the gap for device 1 only
46+
statement ok
47+
INSERT INTO sensor_data VALUES
48+
(1, '2024-05-21 10:01:00', 20.0);
49+
50+
# Device 1: 10:01 now has real data (20), 10:02 still LOCF from 10:01 (20)
51+
# Device 2: unchanged
52+
query ITR
53+
SELECT device_id, ts, value FROM gf_partitioned ORDER BY device_id, ts;
54+
----
55+
1 2024-05-21 10:00:00 10
56+
1 2024-05-21 10:01:00 20
57+
1 2024-05-21 10:02:00 20
58+
1 2024-05-21 10:03:00 40
59+
2 2024-05-21 10:00:00 100
60+
2 2024-05-21 10:01:00 100
61+
2 2024-05-21 10:02:00 120
62+
63+
# Delete a data point from device 2
64+
statement ok
65+
DELETE FROM sensor_data WHERE device_id = 2 AND ts = '2024-05-21 10:02:00';
66+
67+
# Device 2 now only has 10:00, no gap to fill (no next anchor)
68+
query ITR
69+
SELECT device_id, ts, value FROM gf_partitioned ORDER BY device_id, ts;
70+
----
71+
1 2024-05-21 10:00:00 10
72+
1 2024-05-21 10:01:00 20
73+
1 2024-05-21 10:02:00 20
74+
1 2024-05-21 10:03:00 40
75+
2 2024-05-21 10:00:00 100
76+
77+
# Add data back to device 2 with larger gap
78+
statement ok
79+
INSERT INTO sensor_data VALUES
80+
(2, '2024-05-21 10:03:00', 130.0);
81+
82+
# Device 2: gaps at 10:01, 10:02 filled with LOCF from 10:00 (100)
83+
query ITR
84+
SELECT device_id, ts, value FROM gf_partitioned ORDER BY device_id, ts;
85+
----
86+
1 2024-05-21 10:00:00 10
87+
1 2024-05-21 10:01:00 20
88+
1 2024-05-21 10:02:00 20
89+
1 2024-05-21 10:03:00 40
90+
2 2024-05-21 10:00:00 100
91+
2 2024-05-21 10:01:00 100
92+
2 2024-05-21 10:02:00 100
93+
2 2024-05-21 10:03:00 130
94+
95+
statement ok
96+
DROP MATERIALIZED VIEW gf_partitioned;
97+
98+
statement ok
99+
DROP TABLE sensor_data;
100+
101+
# ============================================
102+
# Test 2: PARTITION BY with INTERPOLATE
103+
# ============================================
104+
105+
statement ok
106+
CREATE TABLE metrics (
107+
device_id INT,
108+
ts TIMESTAMP,
109+
temp DOUBLE,
110+
PRIMARY KEY (device_id, ts)
111+
);
112+
113+
statement ok
114+
INSERT INTO metrics VALUES
115+
(1, '2024-05-21 10:00:00', 20.0),
116+
(1, '2024-05-21 10:04:00', 24.0),
117+
(2, '2024-05-21 10:00:00', 100.0),
118+
(2, '2024-05-21 10:02:00', 200.0);
119+
120+
statement ok
121+
CREATE MATERIALIZED VIEW gf_interpolated AS
122+
SELECT device_id, ts, temp
123+
FROM GAP_FILL(metrics, ts, INTERVAL '1' MINUTE, INTERPOLATE(temp), PARTITION_BY(device_id));
124+
125+
# Device 1: interpolate 20→24 over 4 minutes: 21, 22, 23
126+
# Device 2: interpolate 100→200 over 2 minutes: 150
127+
query ITR
128+
SELECT device_id, ts, temp FROM gf_interpolated ORDER BY device_id, ts;
129+
----
130+
1 2024-05-21 10:00:00 20
131+
1 2024-05-21 10:01:00 21
132+
1 2024-05-21 10:02:00 22
133+
1 2024-05-21 10:03:00 23
134+
1 2024-05-21 10:04:00 24
135+
2 2024-05-21 10:00:00 100
136+
2 2024-05-21 10:01:00 150
137+
2 2024-05-21 10:02:00 200
138+
139+
statement ok
140+
DROP MATERIALIZED VIEW gf_interpolated;
141+
142+
statement ok
143+
DROP TABLE metrics;
144+
145+
# ============================================
146+
# Test 3: PARTITION BY with multiple columns
147+
# ============================================
148+
149+
statement ok
150+
CREATE TABLE multi_partition (
151+
region VARCHAR,
152+
device_id INT,
153+
ts TIMESTAMP,
154+
value DOUBLE,
155+
PRIMARY KEY (region, device_id, ts)
156+
);
157+
158+
statement ok
159+
INSERT INTO multi_partition VALUES
160+
('us', 1, '2024-05-21 10:00:00', 10.0),
161+
('us', 1, '2024-05-21 10:02:00', 30.0),
162+
('eu', 1, '2024-05-21 10:00:00', 100.0),
163+
('eu', 1, '2024-05-21 10:02:00', 300.0);
164+
165+
statement ok
166+
CREATE MATERIALIZED VIEW gf_multi_part AS
167+
SELECT region, device_id, ts, value
168+
FROM GAP_FILL(multi_partition, ts, INTERVAL '1' MINUTE, LOCF(value), PARTITION_BY(region, device_id));
169+
170+
# (us, 1): gap at 10:01 → LOCF from 10 = 10
171+
# (eu, 1): gap at 10:01 → LOCF from 100 = 100
172+
query TITR
173+
SELECT region, device_id, ts, value FROM gf_multi_part ORDER BY region, device_id, ts;
174+
----
175+
eu 1 2024-05-21 10:00:00 100
176+
eu 1 2024-05-21 10:01:00 100
177+
eu 1 2024-05-21 10:02:00 300
178+
us 1 2024-05-21 10:00:00 10
179+
us 1 2024-05-21 10:01:00 10
180+
us 1 2024-05-21 10:02:00 30
181+
182+
statement ok
183+
DROP MATERIALIZED VIEW gf_multi_part;
184+
185+
statement ok
186+
DROP TABLE multi_partition;
187+
188+
# ============================================
189+
# Test 4: Update within partitioned gap fill
190+
# ============================================
191+
192+
statement ok
193+
CREATE TABLE update_test (
194+
device_id INT,
195+
ts TIMESTAMP,
196+
value DOUBLE,
197+
PRIMARY KEY (device_id, ts)
198+
);
199+
200+
statement ok
201+
INSERT INTO update_test VALUES
202+
(1, '2024-05-21 10:00:00', 10.0),
203+
(1, '2024-05-21 10:03:00', 40.0);
204+
205+
statement ok
206+
CREATE MATERIALIZED VIEW gf_update AS
207+
SELECT device_id, ts, value
208+
FROM GAP_FILL(update_test, ts, INTERVAL '1' MINUTE, LOCF(value), PARTITION_BY(device_id));
209+
210+
# Initial state: gaps at 10:01, 10:02 filled with LOCF(10)
211+
query ITR
212+
SELECT device_id, ts, value FROM gf_update ORDER BY device_id, ts;
213+
----
214+
1 2024-05-21 10:00:00 10
215+
1 2024-05-21 10:01:00 10
216+
1 2024-05-21 10:02:00 10
217+
1 2024-05-21 10:03:00 40
218+
219+
# Update the endpoint - should retract old fills and generate new ones
220+
statement ok
221+
UPDATE update_test SET "value" = 50.0 WHERE device_id = 1 AND ts = '2024-05-21 10:03:00';
222+
223+
query ITR
224+
SELECT device_id, ts, value FROM gf_update ORDER BY device_id, ts;
225+
----
226+
1 2024-05-21 10:00:00 10
227+
1 2024-05-21 10:01:00 10
228+
1 2024-05-21 10:02:00 10
229+
1 2024-05-21 10:03:00 50
230+
231+
statement ok
232+
DROP MATERIALIZED VIEW gf_update;
233+
234+
statement ok
235+
DROP TABLE update_test;
236+
237+
# ============================================
238+
# Test 5: no-PK input and NULL time pass-through
239+
# ============================================
240+
241+
statement ok
242+
CREATE TABLE no_pk_null_time (
243+
device_id INT,
244+
ts TIMESTAMP,
245+
value INT
246+
);
247+
248+
statement ok
249+
INSERT INTO no_pk_null_time VALUES
250+
(1, '2024-05-21 10:00:00', 10),
251+
(1, NULL, 999),
252+
(1, '2024-05-21 10:02:00', 20),
253+
(2, '2024-05-21 10:00:00', 30),
254+
(2, '2024-05-21 10:02:00', 40);
255+
256+
statement ok
257+
CREATE MATERIALIZED VIEW gf_no_pk_null_time AS
258+
SELECT device_id, ts, value
259+
FROM GAP_FILL(no_pk_null_time, ts, INTERVAL '1' MINUTE, LOCF(value), PARTITION_BY(device_id));
260+
261+
# NULL time rows pass through unchanged and are not inserted into gap-fill state.
262+
# The no-PK input also verifies filled rows with NULL hidden stream-key columns are valid.
263+
query ITR
264+
SELECT device_id, ts, value FROM gf_no_pk_null_time ORDER BY device_id, ts NULLS FIRST, value;
265+
----
266+
1 NULL 999
267+
1 2024-05-21 10:00:00 10
268+
1 2024-05-21 10:01:00 10
269+
1 2024-05-21 10:02:00 20
270+
2 2024-05-21 10:00:00 30
271+
2 2024-05-21 10:01:00 30
272+
2 2024-05-21 10:02:00 40
273+
274+
statement ok
275+
DROP MATERIALIZED VIEW gf_no_pk_null_time;
276+
277+
statement ok
278+
DROP TABLE no_pk_null_time;
279+
280+
# ============================================
281+
# Test 6: update a MIDDLE anchor (regression for dangling UpdateDelete)
282+
# ============================================
283+
# Updating an anchor that has both a prev and a next neighbor previously emitted a
284+
# dangling UpdateDelete (its U- separated from the paired U+ by interleaved fill rows),
285+
# violating the U-/U+ adjacency invariant and panicking update_check / downstream.
286+
287+
statement ok
288+
CREATE TABLE mid_update (
289+
device_id INT,
290+
ts TIMESTAMP,
291+
value DOUBLE,
292+
PRIMARY KEY (device_id, ts)
293+
);
294+
295+
statement ok
296+
INSERT INTO mid_update VALUES
297+
(1, '2024-05-21 10:00:00', 10.0),
298+
(1, '2024-05-21 10:03:00', 40.0),
299+
(1, '2024-05-21 10:06:00', 70.0);
300+
301+
statement ok
302+
CREATE MATERIALIZED VIEW gf_mid AS
303+
SELECT device_id, ts, value
304+
FROM GAP_FILL(mid_update, ts, INTERVAL '1' MINUTE, LOCF(value), PARTITION_BY(device_id));
305+
306+
query ITR
307+
SELECT device_id, ts, value FROM gf_mid ORDER BY device_id, ts;
308+
----
309+
1 2024-05-21 10:00:00 10
310+
1 2024-05-21 10:01:00 10
311+
1 2024-05-21 10:02:00 10
312+
1 2024-05-21 10:03:00 40
313+
1 2024-05-21 10:04:00 40
314+
1 2024-05-21 10:05:00 40
315+
1 2024-05-21 10:06:00 70
316+
317+
# Update the middle anchor (10:03): must not panic; LOCF fills after it follow the new value.
318+
statement ok
319+
UPDATE mid_update SET "value" = 50.0 WHERE device_id = 1 AND ts = '2024-05-21 10:03:00';
320+
321+
query ITR
322+
SELECT device_id, ts, value FROM gf_mid ORDER BY device_id, ts;
323+
----
324+
1 2024-05-21 10:00:00 10
325+
1 2024-05-21 10:01:00 10
326+
1 2024-05-21 10:02:00 10
327+
1 2024-05-21 10:03:00 50
328+
1 2024-05-21 10:04:00 50
329+
1 2024-05-21 10:05:00 50
330+
1 2024-05-21 10:06:00 70
331+
332+
statement ok
333+
DROP MATERIALIZED VIEW gf_mid;
334+
335+
statement ok
336+
DROP TABLE mid_update;

proto/stream_plan.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1119,6 +1119,7 @@ message EowcGapFillNode {
11191119
repeated string fill_strategies = 4;
11201120
catalog.Table buffer_table = 5;
11211121
catalog.Table prev_row_table = 6;
1122+
repeated uint32 partition_by_indices = 7;
11221123
}
11231124

11241125
message GapFillNode {
@@ -1127,6 +1128,8 @@ message GapFillNode {
11271128
repeated uint32 fill_columns = 3;
11281129
repeated string fill_strategies = 4;
11291130
catalog.Table state_table = 5;
1131+
repeated uint32 partition_by_indices = 6;
1132+
repeated uint32 pointer_key_indices = 7;
11301133
}
11311134

11321135
message StreamNode {

src/common/src/config/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,10 @@ pub mod default {
368368
2048
369369
}
370370

371+
pub fn stream_high_gap_fill_amplification_threshold() -> usize {
372+
2048
373+
}
374+
371375
/// Default to 1 to be compatible with the behavior before this config is introduced.
372376
pub fn stream_exchange_connection_pool_size() -> Option<u16> {
373377
Some(1)

src/common/src/config/streaming.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,11 @@ pub struct StreamingDeveloperConfig {
166166
/// it will be logged.
167167
pub high_join_amplification_threshold: usize,
168168

169+
#[serde(default = "default::developer::stream_high_gap_fill_amplification_threshold")]
170+
/// If number of rows generated by gap fill between two anchor rows exceeds this threshold
171+
/// number, it will be logged.
172+
pub high_gap_fill_amplification_threshold: usize,
173+
169174
/// Actor tokio metrics is enabled if `enable_actor_tokio_metrics` is set or metrics level >= Debug.
170175
#[serde(default = "default::developer::enable_actor_tokio_metrics")]
171176
pub enable_actor_tokio_metrics: bool,

src/config/docs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,7 @@ This page is automatically generated by `./risedev generate-example-config`
378378
| exchange_initial_permits | The initial permits that a channel holds, i.e., the maximum row count can be buffered in the channel. | 2048 |
379379
| hash_agg_max_dirty_groups_heap_size | The max heap size of dirty groups of `HashAggExecutor`. | 67108864 |
380380
| hash_join_entry_state_max_rows | Configure the system-wide cache row cardinality of hash join. For example, if this is set to 1000, it means we can have at most 1000 rows in cache. | 30000 |
381+
| high_gap_fill_amplification_threshold | If number of rows generated by gap fill between two anchor rows exceeds this threshold number, it will be logged. | 2048 |
381382
| high_join_amplification_threshold | If number of hash join matches exceeds this threshold number, it will be logged. | 2048 |
382383
| iceberg_fetch_batch_size | `IcebergFetchExecutor`: The number of files the executor will fetch concurrently in a batch. | 1024 |
383384
| iceberg_list_interval_sec | `IcebergListExecutor`: The interval in seconds for Iceberg source to list new files. | 10 |

src/config/example.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ memory_controller_sequence_tls_lag = 32
233233
enable_arrangement_backfill = true
234234
enable_snapshot_backfill = true
235235
high_join_amplification_threshold = 2048
236+
high_gap_fill_amplification_threshold = 2048
236237
enable_actor_tokio_metrics = true
237238
exchange_connection_pool_size = 1
238239
enable_auto_schema_change = true

0 commit comments

Comments
 (0)