Skip to content

Commit f4b560f

Browse files
committed
add new strategies
1 parent 84f6419 commit f4b560f

File tree

5 files changed

+207
-53
lines changed

5 files changed

+207
-53
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
statement ok
2+
create table fact(v1 int);
3+
4+
statement ok
5+
create table dim(v1 int);
6+
7+
statement ok
8+
create materialized view m1 as select fact.v1 from fact join dim on fact.v1 = dim.v1;
9+
10+
# Total update = 250_000 * 100 = 25M records
11+
12+
statement ok
13+
insert into dim select 1 from generate_series(1, 100);
14+
15+
statement ok
16+
insert into fact select 1 from generate_series(1, 250000);
17+
18+
statement ok
19+
flush;
20+
21+
# Let at least 16 barriers pass through
22+
# Then we have 1 * 2^16 = 65536
23+
24+
skipif in-memory
25+
sleep 10s
26+
27+
statement ok
28+
flush;
29+
30+
statement ok
31+
flush;
32+
33+
statement ok
34+
flush;
35+
36+
statement ok
37+
flush;
38+
39+
statement ok
40+
flush;
41+
42+
statement ok
43+
flush;
44+
45+
statement ok
46+
flush;
47+
48+
statement ok
49+
flush;
50+
51+
statement ok
52+
flush;
53+
54+
statement ok
55+
flush;
56+
57+
statement ok
58+
flush;
59+
60+
statement ok
61+
flush;
62+
63+
statement ok
64+
flush;
65+
66+
statement ok
67+
flush;
68+
69+
statement ok
70+
flush;
71+
72+
statement ok
73+
flush;
74+
75+
# statement ok
76+
# drop sink s1;
77+
78+
# statement ok
79+
# drop table fact;
80+
81+
# statement ok
82+
# drop table dim;
83+
84+
# statement ok
85+
# create materialized view m2 as select fact.v1 from fact join dim on fact.v1 = dim.v1;

e2e_test/backfill/adaptive-rate-limit/amplification-100.slt

+14-3
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ create table fact(v1 int);
44
statement ok
55
create table dim(v1 int);
66

7+
# Total backfill = 50_000 * 100 = 5M records
8+
79
statement ok
8-
insert into fact select 1 from generate_series(1, 1000000);
10+
insert into fact select 1 from generate_series(1, 250000);
911

1012
statement ok
1113
insert into dim select 1 from generate_series(1, 100);
@@ -17,7 +19,7 @@ statement ok
1719
set background_ddl = true;
1820

1921
statement ok
20-
create sink s1 as select fact.v1 from fact join dim on fact.v1 = dim.v1 with (connector = 'blackhole');
22+
create materialized view m1 as select fact.v1 from fact join dim on fact.v1 = dim.v1;
2123

2224
statement ok
2325
set background_ddl = false;
@@ -83,4 +85,13 @@ flush;
8385
# drop table fact;
8486

8587
# statement ok
86-
# drop table dim;
88+
# drop table dim;
89+
90+
statement ok
91+
set background_ddl = true;
92+
93+
statement ok
94+
create materialized view m2 as select fact.v1 from fact join dim on fact.v1 = dim.v1;
95+
96+
statement ok
97+
set background_ddl = false;

e2e_test/backfill/adaptive-rate-limit/amplification-500.slt

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ statement ok
2020
set background_ddl = true;
2121

2222
statement ok
23-
create sink s1 as select fact.v1 from fact join dim on fact.v1 = dim.v1 with (connector = 'blackhole');
23+
create materialized view m1 as select fact.v1 from fact join dim on fact.v1 = dim.v1;
2424

2525
statement ok
2626
set background_ddl = false;

src/stream/src/executor/backfill/arrangement_backfill.rs

+107-37
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,9 @@ use crate::common::table::state_table::ReplicatedStateTable;
2929
#[cfg(debug_assertions)]
3030
use crate::executor::backfill::utils::METADATA_STATE_LEN;
3131
use crate::executor::backfill::utils::{
32-
compute_bounds, create_builder, create_limiter, create_limiter_with_state,
33-
get_progress_per_vnode, mapping_chunk, mapping_message, mark_chunk_ref_by_vnode,
34-
owned_row_iter, persist_state_per_vnode, update_pos_by_vnode, BackfillProgressPerVnode,
35-
BackfillRateLimiter, BackfillState,
32+
compute_bounds, create_builder, create_limiter, get_progress_per_vnode, mapping_chunk,
33+
mapping_message, mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode,
34+
update_pos_by_vnode, BackfillProgressPerVnode, BackfillRateLimiter, BackfillState,
3635
};
3736
use crate::executor::prelude::*;
3837
use crate::task::CreateMviewProgress;
@@ -113,6 +112,7 @@ where
113112
let mut upstream_table = self.upstream_table;
114113
let vnodes = upstream_table.vnodes().clone();
115114
let rate_limit = self.rate_limit;
115+
self.chunk_size = 1;
116116

117117
// These builders will build data chunks.
118118
// We must supply them with the full datatypes which correspond to
@@ -139,14 +139,15 @@ where
139139
// Query the current barrier latency from meta.
140140
// Permit a 2x fluctuation in barrier latency. Set threshold to 15s.
141141
let mut total_barrier_latency = Self::get_total_barrier_latency(&self.metrics);
142-
let current_barrier_latency = Self::get_barrier_latency(&self.metrics);
142+
let mut highest_barrier_latency = Self::get_barrier_latency(&self.metrics);
143143
let threshold_barrier_latency = {
144-
if current_barrier_latency < 5.0 {
145-
15.0
144+
if highest_barrier_latency <= 10.0 {
145+
20.0
146146
} else {
147-
current_barrier_latency * 2.0
147+
highest_barrier_latency * 2.0
148148
}
149149
};
150+
tracing::debug!(target: "adaptive_rate_limit", highest_barrier_latency, threshold_barrier_latency, "initial configs");
150151
let adaptive_rate_limit = true;
151152
let mut rate_limit = Some(INITIAL_ADAPTIVE_RATE_LIMIT);
152153

@@ -556,15 +557,14 @@ where
556557

557558
// Adapt Rate Limit
558559
if adaptive_rate_limit {
559-
let rate_limiter_curr = rate_limiter.take();
560-
rate_limiter = Self::adapt_rate_limit(
560+
Self::adapt_rate_limit_3(
561561
&self.actor_id,
562562
&self.metrics,
563563
threshold_barrier_latency,
564-
current_barrier_latency,
564+
&mut highest_barrier_latency,
565565
&mut total_barrier_latency,
566566
&mut rate_limit,
567-
rate_limiter_curr,
567+
&mut rate_limiter,
568568
)
569569
}
570570

@@ -638,60 +638,137 @@ where
638638
}
639639
}
640640

641-
fn adapt_rate_limit(
641+
// 2x rate limit indefinitely. Backpressure will kick in and slowdown the ingestion rate.
642+
fn adapt_rate_limit_3(
642643
actor_id: &ActorId,
643644
metrics: &StreamingMetrics,
644645
threshold_barrier_latency: f64,
645-
current_barrier_latency: f64,
646+
highest_barrier_latency: &mut f64,
646647
total_barrier_latency: &mut f64,
647648
rate_limit: &mut Option<usize>,
648-
rate_limiter: Option<BackfillRateLimiter>,
649-
) -> Option<BackfillRateLimiter> {
649+
rate_limiter: &mut Option<BackfillRateLimiter>,
650+
) {
651+
if let Some(rate_limit_setting) = rate_limit {
652+
*rate_limit_setting *= 2;
653+
*rate_limiter = create_limiter(*rate_limit_setting)
654+
}
655+
}
656+
657+
fn adapt_rate_limit_2(
658+
actor_id: &ActorId,
659+
metrics: &StreamingMetrics,
660+
threshold_barrier_latency: f64,
661+
highest_barrier_latency: &mut f64,
662+
total_barrier_latency: &mut f64,
663+
rate_limit: &mut Option<usize>,
664+
rate_limiter: &mut Option<BackfillRateLimiter>,
665+
) {
650666
let new_total_barrier_latency = Self::get_total_barrier_latency(metrics);
651667
let new_barrier_latency = new_total_barrier_latency - *total_barrier_latency;
652-
*total_barrier_latency = new_total_barrier_latency;
653-
let new_rate_limit = if new_barrier_latency == 0.0 {
668+
*highest_barrier_latency = f64::max(new_barrier_latency, *highest_barrier_latency);
669+
tracing::debug!(
670+
target: "adaptive_rate_limit",
671+
new_barrier_latency,
672+
);
673+
let new_rate_limit = if *highest_barrier_latency > 2_f64 * threshold_barrier_latency {
674+
Some(INITIAL_ADAPTIVE_RATE_LIMIT)
675+
} else if *highest_barrier_latency > threshold_barrier_latency
676+
&& let Some(rate_limit_set) = rate_limit
677+
{
654678
tracing::debug!(
655679
target: "adaptive_rate_limit",
656-
?rate_limit,
657-
"waiting for barrier latency"
680+
"barrier latency keep constant"
658681
);
659682
*rate_limit
660-
// do nothing
661-
} else if new_barrier_latency > threshold_barrier_latency {
683+
} else if new_total_barrier_latency > *total_barrier_latency
684+
&& let Some(rate_limit_set) = rate_limit
685+
{
686+
let scaling_factor = 1.1_f64;
687+
let scaled_rate_limit = (*rate_limit_set as f64) * scaling_factor;
688+
let new_rate_limit = scaled_rate_limit.ceil() as usize;
689+
Some(new_rate_limit)
690+
} else {
691+
*rate_limit
692+
};
693+
*total_barrier_latency = new_total_barrier_latency;
694+
*highest_barrier_latency = new_barrier_latency;
695+
if *rate_limit != new_rate_limit
696+
&& let Some(rate_limit_setting) = new_rate_limit
697+
{
698+
*rate_limit = new_rate_limit;
699+
tracing::trace!(
700+
target: "adaptive_rate_limit",
701+
actor_id,
702+
?rate_limit,
703+
"adjusted rate limit"
704+
);
705+
*rate_limiter = create_limiter(rate_limit_setting)
706+
}
707+
}
708+
709+
fn adapt_rate_limit(
710+
actor_id: &ActorId,
711+
metrics: &StreamingMetrics,
712+
threshold_barrier_latency: f64,
713+
highest_barrier_latency: &mut f64,
714+
total_barrier_latency: &mut f64,
715+
rate_limit: &mut Option<usize>,
716+
rate_limiter: &mut Option<BackfillRateLimiter>,
717+
) {
718+
let new_total_barrier_latency = Self::get_total_barrier_latency(metrics);
719+
// let new_barrier_latency = new_total_barrier_latency - *total_barrier_latency;
720+
let new_barrier_latency = Self::get_barrier_latency(metrics);
721+
let new_rate_limit = if new_barrier_latency > 2_f64 * threshold_barrier_latency {
662722
tracing::debug!(
663723
target: "adaptive_rate_limit",
664724
new_barrier_latency,
665-
"barrier latency exceeds threshold, reset to initial rate limit"
725+
"barrier latency exceeds threshold * 2, reset to initial rate limit"
666726
);
667727
Some(INITIAL_ADAPTIVE_RATE_LIMIT)
668-
} else if let Some(rate_limit_set) = rate_limit {
728+
} else if new_barrier_latency > threshold_barrier_latency
729+
&& let Some(rate_limit_set) = rate_limit
730+
{
731+
tracing::debug!(
732+
target: "adaptive_rate_limit",
733+
new_barrier_latency,
734+
"barrier latency exceeds threshold, exponential decrease"
735+
);
736+
Some(usize::max(1, *rate_limit_set / 2))
737+
} else if new_total_barrier_latency > *total_barrier_latency
738+
&& let Some(rate_limit_set) = rate_limit
739+
{
669740
// We use the following inputs to determine the scaling factor:
670741
// 1. The barrier latency "left" before we reach the threshold.
671742
// If we have a lot left, we can scale more aggressively.
672743
// 2. The change in barrier latency.
673744
// If the barrier latency increases significantly, we should scale less.
674745
// That being said, we should not let it be 0 as well, if we still have threshold to scale.
675746
// So we just let it be a lower number, like 0.1.
676-
let barrier_latency_surplus_ratio =
677-
(threshold_barrier_latency - new_barrier_latency) / threshold_barrier_latency;
747+
let barrier_latency_surplus_ratio = (threshold_barrier_latency - new_barrier_latency)
748+
/ (threshold_barrier_latency * 1.3);
678749
let barrier_latency_diff_ratio = (1_f64
679-
- (new_barrier_latency - current_barrier_latency) / current_barrier_latency)
750+
- (new_barrier_latency - *highest_barrier_latency) / *highest_barrier_latency)
680751
.clamp(0.1_f64, 1_f64);
681752
let scaling_factor = 1_f64 + barrier_latency_surplus_ratio * barrier_latency_diff_ratio;
682753
let scaled_rate_limit = (*rate_limit_set as f64) * scaling_factor;
683-
let new_rate_limit = f64::min(1_f64, scaled_rate_limit).round() as usize;
754+
let new_rate_limit = f64::max(1_f64, scaled_rate_limit).round() as usize;
684755
tracing::debug!(
685756
target: "adaptive_rate_limit",
686757
new_rate_limit,
758+
barrier_latency_surplus_ratio,
759+
barrier_latency_diff_ratio,
687760
scaling_factor,
688761
"scaling rate limit"
689762
);
690763
Some(new_rate_limit)
691764
} else {
692765
*rate_limit
693766
};
694-
if *rate_limit != new_rate_limit {
767+
*total_barrier_latency = new_total_barrier_latency;
768+
*highest_barrier_latency = new_barrier_latency;
769+
if *rate_limit != new_rate_limit
770+
&& let Some(rate_limit_setting) = new_rate_limit
771+
{
695772
*rate_limit = new_rate_limit;
696773
tracing::trace!(
697774
target: "adaptive_rate_limit",
@@ -700,14 +777,7 @@ where
700777
?rate_limit,
701778
"adjusted rate limit"
702779
);
703-
}
704-
if let Some(rate_limit) = rate_limit
705-
&& let Some(rate_limiter) = rate_limiter
706-
{
707-
let store = rate_limiter.into_state_store();
708-
create_limiter_with_state(*rate_limit, store)
709-
} else {
710-
None
780+
*rate_limiter = create_limiter(rate_limit_setting)
711781
}
712782
}
713783

src/stream/src/executor/backfill/utils.rs

-12
Original file line numberDiff line numberDiff line change
@@ -826,18 +826,6 @@ pub fn create_builder(
826826
}
827827
}
828828

829-
pub fn create_limiter_with_state(
830-
rate_limit: usize,
831-
rate_limit_state: InMemoryState,
832-
) -> Option<BackfillRateLimiter> {
833-
if rate_limit == 0 {
834-
return None;
835-
}
836-
let quota = Quota::per_second(NonZeroU32::new(rate_limit as u32).unwrap());
837-
let clock = MonotonicClock;
838-
Some(RateLimiter::new(quota, rate_limit_state, &clock))
839-
}
840-
841829
pub fn create_limiter(rate_limit: usize) -> Option<BackfillRateLimiter> {
842830
if rate_limit == 0 {
843831
return None;

0 commit comments

Comments
 (0)