Skip to content

feat(streaming): adapt backfill rate limit according to barrier latency #16678

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 21 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
statement ok
create table fact(v1 int);

statement ok
create table dim(v1 int);

statement ok
create materialized view m1 as select fact.v1 from fact join dim on fact.v1 = dim.v1;

# Total update = 250_000 * 100 = 25M records

statement ok
insert into dim select 1 from generate_series(1, 100);

statement ok
insert into fact select 1 from generate_series(1, 250000);

statement ok
flush;

# Let at least 16 barriers pass through
# Then we have 1 * 2^16 = 65536

skipif in-memory
sleep 10s

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

# statement ok
# drop sink s1;

# statement ok
# drop table fact;

# statement ok
# drop table dim;

# statement ok
# create materialized view m2 as select fact.v1 from fact join dim on fact.v1 = dim.v1;
100 changes: 100 additions & 0 deletions e2e_test/backfill/adaptive-rate-limit/amplification-100.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
statement ok
set streaming_parallelism = 2;

statement ok
create table fact(v1 int);

statement ok
create table dim(v1 int);

# Total backfill = 50_000 * 100 = 5M records

statement ok
insert into fact select 1 from generate_series(1, 250000);

statement ok
insert into dim select 1 from generate_series(1, 2000);

statement ok
flush;

statement ok
set background_ddl = true;

statement ok
create materialized view m1 as select fact.v1 from fact join dim on fact.v1 = dim.v1;

statement ok
set background_ddl = false;

# Let at least 16 barriers pass through
# Then we have 1 * 2^16 = 65536

skipif in-memory
sleep 10s

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

# statement ok
# drop sink s1;

# statement ok
# drop table fact;

# statement ok
# drop table dim;

statement ok
set background_ddl = true;

statement ok
create materialized view m2 as select fact.v1 from fact join dim on fact.v1 = dim.v1;

statement ok
set background_ddl = false;
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
statement ok
create table fact(v1 int);

statement ok
create table dim1(v1 int);

statement ok
create table dim2(v1 int);

statement ok
create table dim3(v1 int);

statement ok
create table dim4(v1 int);

statement ok
create table dim5(v1 int);

statement ok
insert into fact select 1 from generate_series(1, 1000000);

statement ok
insert into dim1 select 1 from generate_series(1, 2);

statement ok
insert into dim2 select 1 from generate_series(1, 5);

statement ok
insert into dim3 select 1 from generate_series(1, 10);

statement ok
insert into dim4 select 1 from generate_series(1, 10);

statement ok
insert into dim5 select 1 from generate_series(1, 10);

statement ok
flush;

statement ok
set background_ddl = true;

statement ok
create sink s1 as select fact.v1 from
fact
join dim1 on fact.v1 = dim1.v1
join dim2 on dim1.v1 = dim2.v1
join dim3 on dim2.v1 = dim3.v1
join dim4 on dim3.v1 = dim4.v1
join dim5 on dim4.v1 = dim5.v1
with (connector = 'blackhole');

statement ok
create sink s2 as select fact.v1 from
fact
join dim1 on fact.v1 = dim1.v1
join dim2 on dim1.v1 = dim2.v1
join dim3 on dim2.v1 = dim3.v1
join dim4 on dim3.v1 = dim4.v1
join dim5 on dim4.v1 = dim5.v1
with (connector = 'blackhole');

statement ok
set background_ddl = false;

# Let at least 16 barriers pass through
# Then we have 1 * 2^16 = 65536

skipif in-memory
sleep 10s

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

# statement ok
# drop sink s1;

# statement ok
# drop table fact;

# statement ok
# drop table dim;
Loading
Loading