Skip to content

Commit 6150e3a

Browse files
committed
make adaptive rate limit configurable
1 parent e842dad commit 6150e3a

File tree

2 files changed

+6
-5
lines changed

2 files changed

+6
-5
lines changed

e2e_test/backfill/adaptive-rate-limit/amplification-100.slt

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ statement ok
1010
insert into fact select 1 from generate_series(1, 250000);
1111

1212
statement ok
13-
insert into dim select 1 from generate_series(1, 100);
13+
insert into dim select 1 from generate_series(1, 1000);
1414

1515
statement ok
1616
flush;

src/stream/src/executor/backfill/arrangement_backfill.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,7 @@ where
111111
let upstream_table_id = self.upstream_table.table_id();
112112
let mut upstream_table = self.upstream_table;
113113
let vnodes = upstream_table.vnodes().clone();
114-
let rate_limit = self.rate_limit;
115-
self.chunk_size = 1;
114+
let mut rate_limit = self.rate_limit;
116115

117116
// These builders will build data chunks.
118117
// We must supply them with the full datatypes which correspond to
@@ -149,7 +148,9 @@ where
149148
};
150149
tracing::debug!(target: "adaptive_rate_limit", highest_barrier_latency, threshold_barrier_latency, "initial configs");
151150
let adaptive_rate_limit = true;
152-
let mut rate_limit = Some(INITIAL_ADAPTIVE_RATE_LIMIT);
151+
if adaptive_rate_limit {
152+
rate_limit = Some(INITIAL_ADAPTIVE_RATE_LIMIT);
153+
}
153154

154155
// Poll the upstream to get the first barrier.
155156
let first_barrier = expect_first_barrier(&mut upstream).await?;
@@ -557,7 +558,7 @@ where
557558

558559
// Adapt Rate Limit
559560
if adaptive_rate_limit {
560-
Self::adapt_rate_limit_3(
561+
Self::adapt_rate_limit_2(
561562
&self.actor_id,
562563
&self.metrics,
563564
threshold_barrier_latency,

0 commit comments

Comments
 (0)