-
-
Notifications
You must be signed in to change notification settings - Fork 37
/
Copy pathmod.rs
164 lines (133 loc) · 5.18 KB
/
mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// Copyright (c) 2024-present, fjall-rs
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)
pub mod item;
use crate::{Keyspace, PartitionHandle, PersistMode};
use item::Item;
use lsm_tree::{AbstractTree, ValueType};
use std::{collections::HashSet, sync::Arc};
/// Partition key (a.k.a. column family, locality group)
pub type PartitionKey = Arc<str>;
/// An atomic write batch
///
/// Allows atomically writing across partitions inside the [`Keyspace`].
#[doc(alias = "WriteBatch")]
pub struct Batch {
pub(crate) data: Vec<Item>,
keyspace: Keyspace,
durability: Option<PersistMode>,
}
impl Batch {
/// Initializes a new write batch.
///
/// This function is called by [`Keyspace::batch`].
pub(crate) fn new(keyspace: Keyspace) -> Self {
Self {
data: Vec::new(),
keyspace,
durability: None,
}
}
/// Initializes a new write batch with preallocated capacity.
#[must_use]
pub fn with_capacity(keyspace: Keyspace, capacity: usize) -> Self {
Self {
data: Vec::with_capacity(capacity),
keyspace,
durability: None,
}
}
/// Sets the durability level.
#[must_use]
pub fn durability(mut self, mode: Option<PersistMode>) -> Self {
self.durability = mode;
self
}
/// Inserts a key-value pair into the batch
pub fn insert<K: AsRef<[u8]>, V: AsRef<[u8]>>(
&mut self,
p: &PartitionHandle,
key: K,
value: V,
) {
self.data.push(Item::new(
p.name.clone(),
key.as_ref(),
value.as_ref(),
ValueType::Value,
));
}
/// Adds a tombstone marker for a key
pub fn remove<K: AsRef<[u8]>>(&mut self, p: &PartitionHandle, key: K) {
self.data.push(Item::new(
p.name.clone(),
key.as_ref(),
vec![],
ValueType::Tombstone,
));
}
/// Commits the batch to the [`Keyspace`] atomically
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn commit(mut self) -> crate::Result<()> {
use std::sync::atomic::Ordering;
log::trace!("batch: Acquiring journal writer");
let mut journal_writer = self.keyspace.journal.get_writer();
// IMPORTANT: Check the poisoned flag after getting journal mutex, otherwise TOCTOU
if self.keyspace.is_poisoned.load(Ordering::Relaxed) {
return Err(crate::Error::Poisoned);
}
let batch_seqno = self.keyspace.seqno.next();
let _ = journal_writer.write_batch(self.data.iter(), self.data.len(), batch_seqno);
if let Some(mode) = self.durability {
if let Err(e) = journal_writer.persist(mode) {
self.keyspace.is_poisoned.store(true, Ordering::Release);
log::error!(
"persist failed, which is a FATAL, and possibly hardware-related, failure: {e:?}"
);
return Err(crate::Error::Poisoned);
}
}
#[allow(clippy::mutable_key_type)]
let mut partitions_with_possible_stall = HashSet::new();
let partitions = self.keyspace.partitions.read().expect("lock is poisoned");
let mut batch_size = 0u64;
log::trace!("Applying {} batched items to memtable(s)", self.data.len());
for item in std::mem::take(&mut self.data) {
let Some(partition) = partitions.get(&item.partition) else {
continue;
};
// TODO: need a better, generic write op
let (item_size, _) = match item.value_type {
ValueType::Value => partition.tree.insert(item.key, item.value, batch_seqno),
ValueType::Tombstone => partition.tree.remove(item.key, batch_seqno),
ValueType::WeakTombstone => partition.tree.remove_weak(item.key, batch_seqno),
};
batch_size += u64::from(item_size);
// IMPORTANT: Clone the handle, because we don't want to keep the partitions lock open
partitions_with_possible_stall.insert(partition.clone());
}
self.keyspace
.visible_seqno
.fetch_max(batch_seqno + 1, Ordering::AcqRel);
drop(journal_writer);
drop(partitions);
// IMPORTANT: Add batch size to current write buffer size
// Otherwise write buffer growth is unbounded when using batches
self.keyspace.flush_tracker.grow_buffer(batch_size);
// Check each affected partition for write stall/halt
for partition in partitions_with_possible_stall {
let memtable_size = partition.tree.active_memtable_size();
if let Err(e) = partition.check_memtable_overflow(memtable_size) {
log::error!("Failed memtable rotate check: {e:?}");
};
// IMPORTANT: Check write buffer as well
// Otherwise batch writes are never stalled/halted
let write_buffer_size = self.keyspace.flush_tracker.buffer_size();
partition.check_write_buffer_size(write_buffer_size);
}
Ok(())
}
}