Skip to content

Commit a407270

Browse files
committed
add basic adaptive rate limit
1 parent 1493689 commit a407270

File tree

1 file changed

+69
-0
lines changed

1 file changed

+69
-0
lines changed

src/stream/src/executor/backfill/arrangement_backfill.rs

+69
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ pub struct ArrangementBackfillExecutor<S: StateStore, SD: ValueRowSerde> {
6767
rate_limit: Option<usize>,
6868
}
6969

70+
const INITIAL_ADAPTIVE_RATE_LIMIT: usize = 1;
71+
7072
impl<S, SD> ArrangementBackfillExecutor<S, SD>
7173
where
7274
S: StateStore,
@@ -133,6 +135,22 @@ where
133135

134136
let mut upstream = self.upstream.execute();
135137

138+
// Query the current barrier latency from meta.
139+
// Permit a +10% fluctuation in barrier latency. Set baseline to 5s.
140+
let baseline_barrier_latency = {
141+
let avg_latency = Self::get_barrier_latency(&self.metrics);
142+
let fluctuation = avg_latency * 0.1;
143+
if avg_latency < 5.0 {
144+
5.0
145+
} else {
146+
avg_latency + fluctuation
147+
}
148+
};
149+
let adaptive_rate_limit = true;
150+
let mut rate_limit = Some(INITIAL_ADAPTIVE_RATE_LIMIT);
151+
let mut threshold_rate_limit = 50_000;
152+
let mut rate_limit_linear_offset = 1000;
153+
136154
// Poll the upstream to get the first barrier.
137155
let first_barrier = expect_first_barrier(&mut upstream).await?;
138156
let mut paused = first_barrier.is_pause_on_startup();
@@ -530,6 +548,45 @@ where
530548
}
531549
}
532550

551+
// Adapt Rate Limit
552+
if adaptive_rate_limit {
553+
// First, check if we exceed the barrier latency
554+
let barrier_latency = Self::get_barrier_latency(&self.metrics);
555+
if barrier_latency > baseline_barrier_latency {
556+
// Exceed
557+
threshold_rate_limit /= 2;
558+
tracing::debug!(
559+
target: "adaptive_rate_limit",
560+
actor = self.actor_id,
561+
barrier_latency,
562+
threshold_rate_limit,
563+
"barrier latency exceeds threshold"
564+
);
565+
rate_limit = Some(INITIAL_ADAPTIVE_RATE_LIMIT);
566+
// We don't want to immediately zero the threshold, so we should let the system stabilize back to baseline first,
567+
// Before going back to exponential increase.
568+
} else {
569+
// Does not exceed
570+
if let Some(rate_limit_set) = rate_limit {
571+
let new_rate_limit = if rate_limit_set < threshold_rate_limit {
572+
rate_limit_set * 2
573+
} else {
574+
// rate_limit_set >= threshold_rate_limit
575+
rate_limit_set + rate_limit_linear_offset
576+
};
577+
rate_limit = Some(new_rate_limit);
578+
}
579+
}
580+
tracing::trace!(
581+
target: "adaptive_rate_limit",
582+
actor = self.actor_id,
583+
barrier_latency,
584+
rate_limit = ?rate_limit,
585+
"adjusted rate limit"
586+
);
587+
rate_limiter = rate_limit.and_then(create_limiter);
588+
}
589+
533590
yield Message::Barrier(barrier);
534591

535592
// We will switch snapshot at the start of the next iteration of the backfill loop.
@@ -614,6 +671,18 @@ where
614671
}
615672
}
616673

674+
// FIXME(kwannoel): Is there some way for an actor to directly query meta?
675+
// FIXME(kwannoel): IIUC These metrics are per parallelism.
676+
// It is a good-enough approximate,
677+
// but it can be further improved, e.g. another parallelism might be the bottleneck,
678+
// so we should still consider the global inflight barrier latency,
679+
// since chunks could be exchanged from this parallelism
680+
// to the hot parallelism.
681+
fn get_barrier_latency(metrics: &StreamingMetrics) -> f64 {
682+
let histogram = &metrics.barrier_inflight_latency;
683+
histogram.get_sample_sum() / histogram.get_sample_count() as f64
684+
}
685+
617686
#[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
618687
async fn make_snapshot_stream<'a>(
619688
upstream_table: &'a ReplicatedStateTable<S, SD>,

0 commit comments

Comments
 (0)