Skip to content

Commit 15fc148

Browse files
authored
refactor: clarify region flush reasons (#8146)
* refactor: clarify region flush reasons Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions from CR Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com>
1 parent ba679dd commit 15fc148

5 files changed

Lines changed: 34 additions & 40 deletions

File tree

src/mito2/src/flush.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,6 @@ impl WriteBufferManager for WriteBufferManagerImpl {
206206
/// Reason of a flush task.
207207
#[derive(Debug, IntoStaticStr, Clone, Copy, PartialEq, Eq)]
208208
pub enum FlushReason {
209-
/// Other reasons.
210-
Others,
211209
/// Engine reaches flush threshold.
212210
EngineFull,
213211
/// Manual flush.
@@ -243,6 +241,8 @@ impl From<RegionFlushReason> for FlushReason {
243241
RegionFlushReason::RegionMigration => FlushReason::RegionMigration,
244242
RegionFlushReason::Repartition => FlushReason::Repartition,
245243
RegionFlushReason::RemoteWalPrune => FlushReason::RemoteWalPrune,
244+
RegionFlushReason::Closing => FlushReason::Closing,
245+
RegionFlushReason::Downgrading => FlushReason::Downgrading,
246246
}
247247
}
248248
}
@@ -1355,7 +1355,7 @@ mod tests {
13551355
let (output_tx, output_rx) = oneshot::channel();
13561356
let mut task = RegionFlushTask {
13571357
region_id: builder.region_id(),
1358-
reason: FlushReason::Others,
1358+
reason: FlushReason::Manual,
13591359
senders: Vec::new(),
13601360
request_sender: tx,
13611361
access_layer: env.access_layer.clone(),
@@ -1401,7 +1401,7 @@ mod tests {
14011401
let mut tasks: Vec<_> = (0..3)
14021402
.map(|_| RegionFlushTask {
14031403
region_id: builder.region_id(),
1404-
reason: FlushReason::Others,
1404+
reason: FlushReason::Manual,
14051405
senders: Vec::new(),
14061406
request_sender: tx.clone(),
14071407
access_layer: env.access_layer.clone(),
@@ -1587,7 +1587,7 @@ mod tests {
15871587
let mut tasks: Vec<_> = (0..2)
15881588
.map(|_| RegionFlushTask {
15891589
region_id: builder.region_id(),
1590-
reason: FlushReason::Others,
1590+
reason: FlushReason::Manual,
15911591
senders: Vec::new(),
15921592
request_sender: tx.clone(),
15931593
access_layer: env.access_layer.clone(),

src/mito2/src/worker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1153,7 +1153,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
11531153
continue;
11541154
}
11551155
DdlRequest::Flush(req) => {
1156-
self.handle_flush_request(ddl.region_id, req, None, ddl.sender);
1156+
self.handle_flush_request(ddl.region_id, req, ddl.sender);
11571157
continue;
11581158
}
11591159
DdlRequest::Compact(req) => {

src/mito2/src/worker/handle_close.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717
use common_telemetry::info;
1818
use store_api::logstore::LogStore;
1919
use store_api::logstore::provider::Provider;
20-
use store_api::region_request::RegionFlushRequest;
20+
use store_api::region_request::{RegionFlushReason, RegionFlushRequest};
2121
use store_api::storage::RegionId;
2222

23-
use crate::flush::FlushReason;
2423
use crate::request::OptionOutputTx;
2524
use crate::worker::RegionWorkerLoop;
2625

@@ -51,8 +50,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
5150
info!("Region {} has pending data, waiting for flush", region_id);
5251
self.handle_flush_request(
5352
region_id,
54-
RegionFlushRequest::default(),
55-
Some(FlushReason::Closing),
53+
RegionFlushRequest {
54+
reason: Some(RegionFlushReason::Closing),
55+
..Default::default()
56+
},
5657
sender,
5758
);
5859
return;

src/mito2/src/worker/handle_flush.rs

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,14 @@ use crate::sst::index::IndexBuildType;
3131
use crate::worker::RegionWorkerLoop;
3232

3333
fn resolve_flush_reason(
34-
explicit_reason: Option<FlushReason>,
3534
request_reason: Option<RegionFlushReason>,
3635
is_downgrading: bool,
3736
) -> FlushReason {
38-
explicit_reason
39-
.or_else(|| request_reason.map(FlushReason::from))
40-
.unwrap_or({
41-
if is_downgrading {
42-
FlushReason::Downgrading
43-
} else {
44-
FlushReason::Manual
45-
}
46-
})
37+
match request_reason {
38+
Some(reason) => FlushReason::from(reason),
39+
None if is_downgrading => FlushReason::Downgrading,
40+
None => FlushReason::Manual,
41+
}
4742
}
4843

4944
impl<S: LogStore> RegionWorkerLoop<S> {
@@ -181,7 +176,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
181176
&mut self,
182177
region_id: RegionId,
183178
request: RegionFlushRequest,
184-
reason: Option<FlushReason>,
185179
sender: OptionOutputTx,
186180
) {
187181
let region = match self.regions.flushable_region(region_id) {
@@ -197,7 +191,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
197191
// when handling flush request instead of in `schedule_flush` or `flush_finished`.
198192
self.update_topic_latest_entry_id(&region);
199193

200-
let reason = resolve_flush_reason(reason, request.reason, region.is_downgrading());
194+
let reason = resolve_flush_reason(request.reason, region.is_downgrading());
201195
let mut task =
202196
self.new_flush_task(&region, reason, request.row_group_size, self.config.clone());
203197
task.push_sender(sender);
@@ -369,38 +363,33 @@ impl<S: LogStore> RegionWorkerLoop<S> {
369363
mod tests {
370364
use super::*;
371365

372-
#[test]
373-
fn test_resolve_flush_reason_prefers_explicit_reason() {
374-
let reason = resolve_flush_reason(
375-
Some(FlushReason::Closing),
376-
Some(RegionFlushReason::RemoteWalPrune),
377-
true,
378-
);
379-
assert_eq!(reason, FlushReason::Closing);
380-
}
381-
382366
#[test]
383367
fn test_resolve_flush_reason_uses_request_reason() {
384368
assert_eq!(
385-
resolve_flush_reason(None, Some(RegionFlushReason::RegionMigration), true),
369+
resolve_flush_reason(Some(RegionFlushReason::RegionMigration), true),
386370
FlushReason::RegionMigration
387371
);
388372
assert_eq!(
389-
resolve_flush_reason(None, Some(RegionFlushReason::Repartition), false),
373+
resolve_flush_reason(Some(RegionFlushReason::Repartition), false),
390374
FlushReason::Repartition
391375
);
392376
assert_eq!(
393-
resolve_flush_reason(None, Some(RegionFlushReason::RemoteWalPrune), false),
377+
resolve_flush_reason(Some(RegionFlushReason::RemoteWalPrune), false),
394378
FlushReason::RemoteWalPrune
395379
);
380+
assert_eq!(
381+
resolve_flush_reason(Some(RegionFlushReason::Closing), false),
382+
FlushReason::Closing
383+
);
384+
assert_eq!(
385+
resolve_flush_reason(Some(RegionFlushReason::Downgrading), false),
386+
FlushReason::Downgrading
387+
);
396388
}
397389

398390
#[test]
399391
fn test_resolve_flush_reason_fallback_unchanged() {
400-
assert_eq!(
401-
resolve_flush_reason(None, None, true),
402-
FlushReason::Downgrading
403-
);
404-
assert_eq!(resolve_flush_reason(None, None, false), FlushReason::Manual);
392+
assert_eq!(resolve_flush_reason(None, true), FlushReason::Downgrading);
393+
assert_eq!(resolve_flush_reason(None, false), FlushReason::Manual);
405394
}
406395
}

src/store-api/src/region_request.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1414,6 +1414,10 @@ pub enum RegionFlushReason {
14141414
Repartition,
14151415
/// Flush triggered by remote WAL pruning.
14161416
RemoteWalPrune,
1417+
/// Flush region before closing region.
1418+
Closing,
1419+
/// Flush region before downgrading region.
1420+
Downgrading,
14171421
}
14181422

14191423
#[derive(Debug, Clone, Default)]

0 commit comments

Comments
 (0)