Skip to content

Commit b498060

Browse files
authored
Merge branch 'main' into flush_tracker
2 parents ac96a73 + fa09be9 commit b498060

File tree

18 files changed

+133
-94
lines changed

18 files changed

+133
-94
lines changed

Diff for: Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
name = "fjall"
33
description = "LSM-based key-value storage engine"
44
license = "MIT OR Apache-2.0"
5-
version = "2.4.4"
5+
version = "2.5.0"
66
edition = "2021"
77
rust-version = "1.74.0"
88
readme = "README.md"
@@ -28,7 +28,7 @@ bytes = ["lsm-tree/bytes"]
2828

2929
[dependencies]
3030
byteorder = "1.5.0"
31-
lsm-tree = { version = "2.4.0", default-features = false }
31+
lsm-tree = { version = "2.5.0", default-features = false }
3232
log = "0.4.21"
3333
std-semaphore = "0.1.0"
3434
tempfile = "3.10.1"

Diff for: examples/permuterm/src/main.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ fn main() -> Result<()> {
8080

8181
for word in WORDS {
8282
for term in permuterm(word) {
83-
db.insert(format!("{term}#{word}"), word).unwrap();
83+
db.insert(format!("{term}#{word}"), *word).unwrap();
8484
}
8585
}
8686

Diff for: examples/tx-blob-cas/src/main.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ impl Cas {
125125
let rc = self.decrement_ref_count(&mut tx, &prev_content_hash)?;
126126
if rc == 0 {
127127
// No more references
128-
tx.remove(&self.blobs, &prev_content_hash);
128+
tx.remove(&self.blobs, prev_content_hash);
129129
}
130130
}
131131

@@ -156,7 +156,7 @@ impl Cas {
156156
let rc = self.decrement_ref_count(&mut tx, &content_hash)?;
157157
if rc == 0 {
158158
// No more references
159-
tx.remove(&self.blobs, &content_hash);
159+
tx.remove(&self.blobs, content_hash);
160160
}
161161

162162
tx.commit()?;

Diff for: examples/tx-mpmc-queue/src/main.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,12 @@ fn main() -> fjall::Result<()> {
6363
// Something like SingleDelete https://github.com/facebook/rocksdb/wiki/Single-Delete
6464
// would be good for this type of workload
6565
if let Some((key, _)) = tx.first_key_value(&tasks)? {
66-
tx.remove(&tasks, &key);
66+
let task_id = std::str::from_utf8(&key).unwrap().to_owned();
67+
68+
tx.remove(&tasks, key);
6769

6870
tx.commit()?;
6971

70-
let task_id = std::str::from_utf8(&key).unwrap();
7172
println!("consumer {idx} completed task {task_id}");
7273

7374
counter.fetch_add(1, Relaxed);

Diff for: examples/tx-partition-move/src/main.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,13 @@ fn main() -> fjall::Result<()> {
3333
// Something like SingleDelete https://github.com/facebook/rocksdb/wiki/Single-Delete
3434
// would be good for this type of workload
3535
if let Some((key, value)) = tx.first_key_value(&src)? {
36-
tx.remove(&src, &key);
37-
tx.insert(&dst, &key, &value);
36+
let task_id = std::str::from_utf8(&key).unwrap().to_owned();
37+
38+
tx.remove(&src, key.clone());
39+
tx.insert(&dst, key, value);
3840

3941
tx.commit()?;
4042

41-
let task_id = std::str::from_utf8(&key).unwrap();
4243
println!("consumer {idx} moved {task_id}");
4344

4445
let ms = rng.gen_range(10..100);

Diff for: examples/tx-ssi-mpmc-queue/src/main.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,14 @@ fn main() -> fjall::Result<()> {
6565
// Something like SingleDelete https://github.com/facebook/rocksdb/wiki/Single-Delete
6666
// would be good for this type of workload
6767
if let Some((key, _)) = tx.first_key_value(&tasks)? {
68-
tx.remove(&tasks, &key);
68+
let task_id = std::str::from_utf8(&key).unwrap().to_owned();
69+
70+
tx.remove(&tasks, key);
6971

7072
if tx.commit()?.is_ok() {
7173
counter.fetch_add(1, Relaxed);
7274
}
7375

74-
let task_id = std::str::from_utf8(&key).unwrap();
7576
println!("consumer {idx} completed task {task_id}");
7677

7778
let ms = rng.gen_range(50..200);

Diff for: examples/tx-ssi-partition-move/src/main.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,13 @@ fn main() -> fjall::Result<()> {
3333
// Something like SingleDelete https://github.com/facebook/rocksdb/wiki/Single-Delete
3434
// would be good for this type of workload
3535
if let Some((key, value)) = tx.first_key_value(&src)? {
36-
tx.remove(&src, &key);
37-
tx.insert(&dst, &key, &value);
36+
let task_id = std::str::from_utf8(&key).unwrap().to_owned();
37+
38+
tx.remove(&src, key.clone());
39+
tx.insert(&dst, key, value);
3840

3941
tx.commit()?.ok();
4042

41-
let task_id = std::str::from_utf8(&key).unwrap();
4243
println!("consumer {idx} moved {task_id}");
4344

4445
let ms = rng.gen_range(10..100);

Diff for: src/batch/mod.rs

+7-15
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ pub mod item;
66

77
use crate::{Keyspace, PartitionHandle, PersistMode};
88
use item::Item;
9-
use lsm_tree::{AbstractTree, ValueType};
9+
use lsm_tree::{AbstractTree, UserKey, UserValue, ValueType};
1010
use std::{collections::HashSet, sync::Arc};
1111

1212
/// Partition key (a.k.a. column family, locality group)
@@ -52,28 +52,20 @@ impl Batch {
5252
}
5353

5454
/// Inserts a key-value pair into the batch
55-
pub fn insert<K: AsRef<[u8]>, V: AsRef<[u8]>>(
55+
pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(
5656
&mut self,
5757
p: &PartitionHandle,
5858
key: K,
5959
value: V,
6060
) {
61-
self.data.push(Item::new(
62-
p.name.clone(),
63-
key.as_ref(),
64-
value.as_ref(),
65-
ValueType::Value,
66-
));
61+
self.data
62+
.push(Item::new(p.name.clone(), key, value, ValueType::Value));
6763
}
6864

6965
/// Adds a tombstone marker for a key
70-
pub fn remove<K: AsRef<[u8]>>(&mut self, p: &PartitionHandle, key: K) {
71-
self.data.push(Item::new(
72-
p.name.clone(),
73-
key.as_ref(),
74-
vec![],
75-
ValueType::Tombstone,
76-
));
66+
pub fn remove<K: Into<UserKey>>(&mut self, p: &PartitionHandle, key: K) {
67+
self.data
68+
.push(Item::new(p.name.clone(), key, vec![], ValueType::Tombstone));
7769
}
7870

7971
/// Commits the batch to the [`Keyspace`] atomically

Diff for: src/compaction/worker.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,14 @@
55
use super::manager::CompactionManager;
66
use crate::snapshot_tracker::SnapshotTracker;
77
use lsm_tree::AbstractTree;
8+
use std::sync::atomic::AtomicUsize;
89

910
/// Runs a single run of compaction.
10-
pub fn run(compaction_manager: &CompactionManager, snapshot_tracker: &SnapshotTracker) {
11+
pub fn run(
12+
compaction_manager: &CompactionManager,
13+
snapshot_tracker: &SnapshotTracker,
14+
compaction_counter: &AtomicUsize,
15+
) {
1116
let Some(item) = compaction_manager.pop() else {
1217
return;
1318
};
@@ -21,10 +26,12 @@ pub fn run(compaction_manager: &CompactionManager, snapshot_tracker: &SnapshotTr
2126

2227
// TODO: loop if there's more work to do
2328

29+
compaction_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2430
if let Err(e) = item
2531
.tree
2632
.compact(strategy.inner(), snapshot_tracker.get_seqno_safe_to_gc())
2733
{
2834
log::error!("Compaction failed: {e:?}");
2935
};
36+
compaction_counter.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
3037
}

Diff for: src/flush/worker.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,13 @@ pub fn run(
141141
partition.name,
142142
created_segments.len()
143143
);
144-
flush_tracker.dequeue_tasks(&partition.name, created_segments.len());
145144

145+
flush_tracker.dequeue_tasks(&partition.name, created_segments.len());
146146
flush_tracker.decrement_buffer_size(memtables_size);
147-
compaction_manager.notify(partition);
147+
148+
for _ in 0..parallelism {
149+
compaction_manager.notify(partition.clone());
150+
}
148151
}
149152
}
150153
Err(e) => {

Diff for: src/keyspace.rs

+19-1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ pub struct KeyspaceInner {
6868
/// True if fsync failed
6969
pub(crate) is_poisoned: Arc<AtomicBool>,
7070

71+
/// Active compaction conter
72+
pub(crate) active_compaction_count: Arc<AtomicUsize>,
73+
7174
#[doc(hidden)]
7275
pub snapshot_tracker: Arc<SnapshotTracker>,
7376
}
@@ -178,6 +181,14 @@ impl Keyspace {
178181
self.flush_tracker.buffer_size()
179182
}
180183

184+
/// Returns the number of active compactions currently running.
185+
#[doc(hidden)]
186+
#[must_use]
187+
pub fn active_compactions(&self) -> usize {
188+
self.active_compaction_count
189+
.load(std::sync::atomic::Ordering::Relaxed)
190+
}
191+
181192
/// Returns the amount of journals on disk.
182193
///
183194
/// # Examples
@@ -546,6 +557,7 @@ impl Keyspace {
546557
active_background_threads: Arc::default(),
547558
is_poisoned: Arc::default(),
548559
snapshot_tracker: Arc::default(),
560+
active_compaction_count: Arc::default(),
549561
};
550562

551563
let keyspace = Self(Arc::new(inner));
@@ -769,6 +781,7 @@ impl Keyspace {
769781
active_background_threads: Arc::default(),
770782
is_poisoned: Arc::default(),
771783
snapshot_tracker: Arc::default(),
784+
active_compaction_count: Arc::default(),
772785
};
773786

774787
// NOTE: Lastly, fsync .fjall marker, which contains the version
@@ -848,6 +861,7 @@ impl Keyspace {
848861
let stop_signal = self.stop_signal.clone();
849862
let thread_counter = self.active_background_threads.clone();
850863
let snapshot_tracker = self.snapshot_tracker.clone();
864+
let compaction_counter = self.active_compaction_count.clone();
851865

852866
thread_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
853867

@@ -858,7 +872,11 @@ impl Keyspace {
858872
log::trace!("compaction: waiting for work");
859873
compaction_manager.wait_for();
860874

861-
crate::compaction::worker::run(&compaction_manager, &snapshot_tracker);
875+
crate::compaction::worker::run(
876+
&compaction_manager,
877+
&snapshot_tracker,
878+
&compaction_counter,
879+
);
862880
}
863881

864882
log::trace!("compaction thread: exiting because keyspace is dropping");

Diff for: src/partition/mod.rs

+11-7
Original file line numberDiff line numberDiff line change
@@ -835,15 +835,19 @@ impl PartitionHandle {
835835
/// # Errors
836836
///
837837
/// Will return `Err` if an IO error occurs.
838-
pub fn insert<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> crate::Result<()> {
838+
pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(
839+
&self,
840+
key: K,
841+
value: V,
842+
) -> crate::Result<()> {
839843
use std::sync::atomic::Ordering;
840844

841845
if self.is_deleted.load(Ordering::Relaxed) {
842846
return Err(crate::Error::PartitionDeleted);
843847
}
844848

845-
let key = key.as_ref();
846-
let value = value.as_ref();
849+
let key = key.into();
850+
let value = value.into();
847851

848852
let mut journal_writer = self.journal.get_writer();
849853

@@ -854,7 +858,7 @@ impl PartitionHandle {
854858
return Err(crate::Error::Poisoned);
855859
}
856860

857-
journal_writer.write_raw(&self.name, key, value, lsm_tree::ValueType::Value, seqno)?;
861+
journal_writer.write_raw(&self.name, &key, &value, lsm_tree::ValueType::Value, seqno)?;
858862

859863
if !self.config.manual_journal_persist {
860864
journal_writer
@@ -914,14 +918,14 @@ impl PartitionHandle {
914918
/// # Errors
915919
///
916920
/// Will return `Err` if an IO error occurs.
917-
pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<()> {
921+
pub fn remove<K: Into<UserKey>>(&self, key: K) -> crate::Result<()> {
918922
use std::sync::atomic::Ordering;
919923

920924
if self.is_deleted.load(Ordering::Relaxed) {
921925
return Err(crate::Error::PartitionDeleted);
922926
}
923927

924-
let key = key.as_ref();
928+
let key = key.into();
925929

926930
let mut journal_writer = self.journal.get_writer();
927931

@@ -932,7 +936,7 @@ impl PartitionHandle {
932936
return Err(crate::Error::Poisoned);
933937
}
934938

935-
journal_writer.write_raw(&self.name, key, &[], lsm_tree::ValueType::Tombstone, seqno)?;
939+
journal_writer.write_raw(&self.name, &key, &[], lsm_tree::ValueType::Tombstone, seqno)?;
936940

937941
if !self.config.manual_journal_persist {
938942
journal_writer

Diff for: src/partition/options.rs

+1
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ impl lsm_tree::coding::Decode for CreateOptions {
231231
l0_threshold,
232232
target_size,
233233
level_ratio,
234+
..Default::default()
234235
})
235236
}
236237
1 => {

Diff for: src/tx/conflict_manager.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,18 @@ impl ConflictManager {
3131
}
3232
}
3333

34-
pub fn mark_read(&mut self, partition: &PartitionKey, key: &Slice) {
35-
self.push_read(partition, Read::Single(key.clone()));
34+
pub fn mark_read(&mut self, partition: &PartitionKey, key: Slice) {
35+
self.push_read(partition, Read::Single(key));
3636
}
3737

38-
pub fn mark_conflict(&mut self, partition: &PartitionKey, key: &[u8]) {
38+
pub fn mark_conflict(&mut self, partition: &PartitionKey, key: Slice) {
3939
if let Some(tbl) = self.conflict_keys.get_mut(partition) {
40-
tbl.insert(key.into());
40+
tbl.insert(key);
4141
} else {
4242
self.conflict_keys
4343
.entry(partition.clone())
4444
.or_default()
45-
.insert(key.into());
45+
.insert(key);
4646
}
4747
}
4848

0 commit comments

Comments
 (0)