-
-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy pathworker.rs
162 lines (140 loc) · 5.29 KB
/
worker.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
// Copyright (c) 2024-present, fjall-rs
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)
use super::manager::Task;
use crate::{
compaction::manager::CompactionManager, flush_tracker::FlushTracker,
snapshot_tracker::SnapshotTracker, PartitionHandle,
};
use lsm_tree::{AbstractTree, Segment, SeqNo};
use std::sync::Arc;
/// Flushes a single segment.
fn run_flush_worker(task: &Arc<Task>, eviction_threshold: SeqNo) -> crate::Result<Option<Segment>> {
#[rustfmt::skip]
let segment = task.partition.tree.flush_memtable(
// IMPORTANT: Segment has to get the task ID
// otherwise segment ID and memtable ID will not line up
task.id,
&task.sealed_memtable,
eviction_threshold,
);
// TODO: test this after a failed flush
if segment.is_err() {
// IMPORTANT: Need to decrement pending segments counter
if let crate::AnyTree::Blob(tree) = &task.partition.tree {
tree.pending_segments
.fetch_sub(1, std::sync::atomic::Ordering::Release);
}
}
Ok(segment?)
}
struct MultiFlushResultItem {
partition: PartitionHandle,
created_segments: Vec<Segment>,
/// Size sum of sealed memtables that have been flushed
size: u64,
}
type MultiFlushResults = Vec<crate::Result<MultiFlushResultItem>>;
/// Distributes tasks of multiple partitions over multiple worker threads.
///
/// Each thread is responsible for the tasks of one partition.
fn run_multi_flush(
partitioned_tasks: Vec<Vec<Arc<Task>>>,
eviction_threshold: SeqNo,
) -> MultiFlushResults {
log::debug!("spawning {} worker threads", partitioned_tasks.len());
// NOTE: Don't trust clippy
#[allow(clippy::needless_collect)]
let threads = partitioned_tasks
.into_iter()
.map(|tasks| {
std::thread::spawn(move || {
let partition = tasks
.first()
.expect("should always have at least one task")
.partition
.clone();
log::trace!(
"flushing {} memtables for partition {:?}",
tasks.len(),
partition.name
);
let memtables_size: u64 = tasks
.iter()
.map(|t| u64::from(t.sealed_memtable.size()))
.sum();
// NOTE: Don't trust clippy
#[allow(clippy::needless_collect)]
let flush_workers = tasks
.into_iter()
.map(|task| {
std::thread::spawn(move || run_flush_worker(&task, eviction_threshold))
})
.collect::<Vec<_>>();
let mut created_segments = Vec::with_capacity(flush_workers.len());
for t in flush_workers {
if let Some(segment) = t.join().expect("should join")? {
created_segments.push(segment);
}
}
Ok(MultiFlushResultItem {
partition,
created_segments,
size: memtables_size,
})
})
})
.collect::<Vec<_>>();
threads
.into_iter()
.map(|t| t.join().expect("should join"))
.collect::<Vec<_>>()
}
/// Runs flush logic.
#[allow(clippy::too_many_lines)]
pub fn run(
flush_tracker: &FlushTracker,
compaction_manager: &CompactionManager,
snapshot_tracker: &SnapshotTracker,
parallelism: usize,
) {
log::debug!("read locking flush manager");
let partitioned_tasks = flush_tracker.collect_tasks(parallelism);
if partitioned_tasks.is_empty() {
log::debug!("No tasks collected");
return;
}
for result in run_multi_flush(partitioned_tasks, snapshot_tracker.get_seqno_safe_to_gc()) {
match result {
Ok(MultiFlushResultItem {
partition,
created_segments,
size: memtables_size,
}) => {
// IMPORTANT: Flushed segments need to be applied *atomically* into the tree
// otherwise we could cover up an unwritten journal, which will result in data loss
if let Err(e) = partition.tree.register_segments(&created_segments) {
log::error!("Failed to register segments: {e:?}");
} else {
log::debug!("write locking flush manager to submit results");
log::debug!(
"Dequeuing flush tasks: {} => {}",
partition.name,
created_segments.len()
);
flush_tracker.dequeue_tasks(&partition.name, created_segments.len());
flush_tracker.shrink_buffer(memtables_size);
compaction_manager.notify(partition);
}
}
Err(e) => {
log::error!("Flush error: {e:?}");
}
}
}
log::debug!("write locking journal manager to maybe do maintenance");
if let Err(e) = flush_tracker.maintenance() {
log::error!("journal GC failed: {e:?}");
}
log::debug!("fully done");
}