Skip to content

Commit e4aa089

Browse files
authored
Merge pull request #153 from fjall-rs/2.8.0
2.8.0
2 parents e6dc698 + cfaee51 commit e4aa089

File tree

8 files changed

+138
-119
lines changed

8 files changed

+138
-119
lines changed

Diff for: Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
name = "fjall"
33
description = "LSM-based key-value storage engine"
44
license = "MIT OR Apache-2.0"
5-
version = "2.7.6"
5+
version = "2.8.0"
66
edition = "2021"
77
rust-version = "1.75.0"
88
readme = "README.md"
@@ -28,7 +28,7 @@ __internal_whitebox = []
2828
[dependencies]
2929
byteorder = "1.5.0"
3030
byteview = "0.6.1"
31-
lsm-tree = { version = "=2.7.6", default-features = false, features = [] }
31+
lsm-tree = { version = "~2.8", default-features = false, features = [] }
3232
log = "0.4.21"
3333
std-semaphore = "0.1.0"
3434
tempfile = "3.10.1"

Diff for: benches/fjall.rs

+1-89
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,4 @@
11
use criterion::{criterion_group, criterion_main, Criterion};
2-
use fjall::BlockCache;
3-
use lsm_tree::{
4-
segment::{
5-
block::{header::Header as BlockHeader, offset::BlockOffset},
6-
meta::CompressionType,
7-
value_block::ValueBlock,
8-
},
9-
InternalValue,
10-
};
11-
use rand::Rng;
12-
use std::sync::Arc;
132

143
fn batch_write(c: &mut Criterion) {
154
let dir = tempfile::tempdir().unwrap();
@@ -31,82 +20,5 @@ fn batch_write(c: &mut Criterion) {
3120
});
3221
}
3322

34-
fn block_cache_insert(c: &mut Criterion) {
35-
let block_cache = BlockCache::with_capacity_bytes(1_000);
36-
37-
let items = (0..100)
38-
.map(|_| {
39-
InternalValue::from_components(
40-
"a".repeat(16).as_bytes(),
41-
"a".repeat(100).as_bytes(),
42-
63,
43-
lsm_tree::ValueType::Tombstone,
44-
)
45-
})
46-
.collect();
47-
48-
let block = Arc::new(ValueBlock {
49-
items,
50-
header: BlockHeader {
51-
compression: CompressionType::Lz4,
52-
checksum: lsm_tree::Checksum::from_raw(0),
53-
previous_block_offset: BlockOffset(0),
54-
data_length: 0,
55-
uncompressed_length: 0,
56-
},
57-
});
58-
59-
let mut id = 0;
60-
61-
c.bench_function("BlockCache::insert_disk_block", |b| {
62-
b.iter(|| {
63-
block_cache.insert_disk_block((0, id).into(), BlockOffset(40), block.clone());
64-
id += 1;
65-
});
66-
});
67-
}
68-
69-
fn block_cache_get(c: &mut Criterion) {
70-
let block_cache = BlockCache::with_capacity_bytes(u64::MAX);
71-
72-
let items = (0..100)
73-
.map(|_| {
74-
InternalValue::from_components(
75-
"a".repeat(16).as_bytes(),
76-
"a".repeat(100).as_bytes(),
77-
63,
78-
lsm_tree::ValueType::Tombstone,
79-
)
80-
})
81-
.collect();
82-
83-
let seg_id = (0, 0).into();
84-
let block = Arc::new(ValueBlock {
85-
items,
86-
header: BlockHeader {
87-
compression: CompressionType::Lz4,
88-
checksum: lsm_tree::Checksum::from_raw(0),
89-
previous_block_offset: BlockOffset(0),
90-
data_length: 0,
91-
uncompressed_length: 0,
92-
},
93-
});
94-
95-
(0u64..100_000)
96-
.for_each(|idx| block_cache.insert_disk_block(seg_id, BlockOffset(idx), block.clone()));
97-
assert_eq!(100_000, block_cache.len());
98-
99-
let mut rng = rand::rng();
100-
101-
c.bench_function("BlockCache::get_disk_block", |b| {
102-
b.iter(|| {
103-
let key = rng.random_range(0u64..100_000);
104-
block_cache
105-
.get_disk_block(seg_id, BlockOffset(key))
106-
.unwrap();
107-
});
108-
});
109-
}
110-
111-
criterion_group!(benches, batch_write, block_cache_insert, block_cache_get);
23+
criterion_group!(benches, batch_write);
11224
criterion_main!(benches);

Diff for: src/config.rs

+35-15
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
// (found in the LICENSE-* files in the repository)
44

55
use crate::{journal::error::RecoveryMode, path::absolute_path, Keyspace};
6-
use lsm_tree::{descriptor_table::FileDescriptorTable, BlobCache, BlockCache};
6+
use lsm_tree::{descriptor_table::FileDescriptorTable, Cache};
77
use std::{
88
path::{Path, PathBuf},
99
sync::Arc,
@@ -18,13 +18,10 @@ pub struct Config {
1818
/// When true, the path will be deleted upon drop
1919
pub(crate) clean_path_on_drop: bool,
2020

21-
/// Block cache that will be shared between partitions
22-
#[doc(hidden)]
23-
pub block_cache: Arc<BlockCache>,
21+
pub(crate) cache: Arc<Cache>,
2422

25-
/// Blob cache that will be shared between partitions
26-
#[doc(hidden)]
27-
pub blob_cache: Arc<BlobCache>,
23+
// TODO: remove in V3
24+
monkey_patch_cache_size: u64,
2825

2926
/// Descriptor table that will be shared between partitions
3027
pub(crate) descriptor_table: Arc<FileDescriptorTable>,
@@ -77,8 +74,6 @@ impl Default for Config {
7774
Self {
7875
path: absolute_path(".fjall_data"),
7976
clean_path_on_drop: false,
80-
block_cache: Arc::new(BlockCache::with_capacity_bytes(/* 16 MiB */ 16 * 1_024 * 1_024)),
81-
blob_cache: Arc::new(BlobCache::with_capacity_bytes(/* 16 MiB */ 16 * 1_024 * 1_024)),
8277
descriptor_table: Arc::new(FileDescriptorTable::new(get_open_file_limit(), 4)),
8378
max_write_buffer_size_in_bytes: /* 64 MiB */ 64 * 1_024 * 1_024,
8479
max_journaling_size_in_bytes: /* 512 MiB */ 512 * 1_024 * 1_024,
@@ -87,6 +82,9 @@ impl Default for Config {
8782
compaction_workers_count: cpus.min(4),
8883
journal_recovery_mode: RecoveryMode::default(),
8984
manual_journal_persist: false,
85+
86+
cache: Arc::new(Cache::with_capacity_bytes(/* 32 MiB */ 32*1_024*1_024)),
87+
monkey_patch_cache_size: 0,
9088
}
9189
}
9290
}
@@ -142,23 +140,37 @@ impl Config {
142140
self
143141
}
144142

143+
// TODO: remove in V3
145144
/// Sets the block cache.
146145
///
147146
/// Defaults to a block cache with 16 MiB of capacity
148147
/// shared between all partitions inside this keyspace.
149148
#[must_use]
150-
pub fn block_cache(mut self, block_cache: Arc<BlockCache>) -> Self {
151-
self.block_cache = block_cache;
149+
#[deprecated = "Use Config::cache_size instead"]
150+
#[allow(deprecated)]
151+
pub fn block_cache(mut self, block_cache: Arc<crate::BlockCache>) -> Self {
152+
self.monkey_patch_cache_size += block_cache.capacity();
152153
self
153154
}
154155

156+
// TODO: remove in V3
155157
/// Sets the blob cache.
156158
///
157159
/// Defaults to a block cache with 16 MiB of capacity
158160
/// shared between all partitions inside this keyspace.
159161
#[must_use]
160-
pub fn blob_cache(mut self, blob_cache: Arc<BlobCache>) -> Self {
161-
self.blob_cache = blob_cache;
162+
#[deprecated = "Use Config::cache_size instead"]
163+
#[allow(deprecated)]
164+
pub fn blob_cache(mut self, blob_cache: Arc<crate::BlobCache>) -> Self {
165+
self.monkey_patch_cache_size += blob_cache.capacity();
166+
self
167+
}
168+
169+
/// Sets the cache capacity in bytes.
170+
#[must_use]
171+
pub fn cache_size(mut self, size_bytes: u64) -> Self {
172+
self.monkey_patch_cache_size = 0;
173+
self.cache = Arc::new(Cache::with_capacity_bytes(size_bytes));
162174
self
163175
}
164176

@@ -224,7 +236,11 @@ impl Config {
224236
/// # Errors
225237
///
226238
/// Will return `Err` if an IO error occurs.
227-
pub fn open(self) -> crate::Result<Keyspace> {
239+
pub fn open(mut self) -> crate::Result<Keyspace> {
240+
// TODO: remove in V3
241+
if self.monkey_patch_cache_size > 0 {
242+
self.cache = Arc::new(Cache::with_capacity_bytes(self.monkey_patch_cache_size));
243+
}
228244
Keyspace::open(self)
229245
}
230246

@@ -234,7 +250,11 @@ impl Config {
234250
///
235251
/// Will return `Err` if an IO error occurs.
236252
#[cfg(any(feature = "single_writer_tx", feature = "ssi_tx"))]
237-
pub fn open_transactional(self) -> crate::Result<crate::TxKeyspace> {
253+
pub fn open_transactional(mut self) -> crate::Result<crate::TxKeyspace> {
254+
// TODO: remove in V3
255+
if self.monkey_patch_cache_size > 0 {
256+
self.cache = Arc::new(Cache::with_capacity_bytes(self.monkey_patch_cache_size));
257+
}
238258
crate::TxKeyspace::open(self)
239259
}
240260

Diff for: src/keyspace.rs

+8-6
Original file line numberDiff line numberDiff line change
@@ -328,19 +328,21 @@ impl Keyspace {
328328
Ok(())
329329
}
330330

331+
#[doc(hidden)]
332+
#[must_use]
333+
pub fn cache_capacity(&self) -> u64 {
334+
self.config.cache.capacity()
335+
}
336+
331337
/// Opens a keyspace in the given directory.
332338
///
333339
/// # Errors
334340
///
335341
/// Returns error, if an IO error occurred.
336342
pub fn open(config: Config) -> crate::Result<Self> {
337343
log::debug!(
338-
"block cache capacity={}MiB",
339-
config.block_cache.capacity() / 1_024 / 1_024,
340-
);
341-
log::debug!(
342-
"blob cache capacity={}MiB",
343-
config.blob_cache.capacity() / 1_024 / 1_024,
344+
"cache capacity={}MiB",
345+
config.cache.capacity() / 1_024 / 1_024,
344346
);
345347

346348
let keyspace = Self::create_or_recover(config)?;

Diff for: src/lib.rs

+46-3
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,49 @@ pub type LsmError = lsm_tree::Error;
168168
#[doc(hidden)]
169169
pub use lsm_tree::AbstractTree;
170170

171-
pub use lsm_tree::{
172-
AnyTree, BlobCache, BlockCache, CompressionType, KvPair, Slice, TreeType, UserKey, UserValue,
173-
};
171+
pub use lsm_tree::{AnyTree, CompressionType, KvPair, Slice, TreeType, UserKey, UserValue};
172+
173+
// TODO: remove in V3
174+
175+
/// Block cache that caches frequently read disk blocks
176+
#[deprecated = "Use Config::cache_size instead"]
177+
pub struct BlockCache(u64);
178+
179+
#[allow(deprecated)]
180+
impl BlockCache {
181+
/// Creates a new cache with given capacity in bytes.
182+
#[must_use]
183+
pub fn with_capacity_bytes(bytes: u64) -> Self {
184+
Self(bytes)
185+
}
186+
187+
/// Returns the cache capacity in bytes.
188+
#[must_use]
189+
pub fn capacity(&self) -> u64 {
190+
self.0
191+
}
192+
}
193+
194+
/// Blob cache that caches frequently read blobs
195+
#[deprecated = "Use Config::cache_size instead"]
196+
#[allow(deprecated)]
197+
pub struct BlobCache(BlockCache);
198+
199+
#[allow(deprecated)]
200+
impl BlobCache {
201+
/// Creates a new cache with given capacity in bytes.
202+
#[must_use]
203+
pub fn with_capacity_bytes(bytes: u64) -> Self {
204+
#[allow(deprecated)]
205+
Self(BlockCache::with_capacity_bytes(bytes))
206+
}
207+
}
208+
209+
#[allow(deprecated)]
210+
impl std::ops::Deref for BlobCache {
211+
type Target = BlockCache;
212+
213+
fn deref(&self) -> &Self::Target {
214+
&self.0
215+
}
216+
}

Diff for: src/partition/mod.rs

+23-2
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,28 @@ impl GarbageCollection for PartitionHandle {
220220
}
221221

222222
impl PartitionHandle {
223+
/// Ingests a sorted stream of key-value pairs into the partition.
224+
///
225+
/// Can only be called on a new fresh, empty partition.
226+
///
227+
/// # Errors
228+
///
229+
/// Will return `Err` if an IO error occurs.
230+
///
231+
/// # Panics
232+
///
233+
/// Panics if the partition is **not** initially empty.
234+
///
235+
/// Will panic if the input iterator is not sorted in ascending order.
236+
pub fn ingest<K: Into<UserKey>, V: Into<UserValue>>(
237+
&self,
238+
iter: impl Iterator<Item = (K, V)>,
239+
) -> crate::Result<()> {
240+
self.tree
241+
.ingest(iter.map(|(k, v)| (k.into(), v.into())))
242+
.map_err(Into::into)
243+
}
244+
223245
pub(crate) fn from_keyspace(
224246
keyspace: &Keyspace,
225247
tree: AnyTree,
@@ -273,8 +295,7 @@ impl PartitionHandle {
273295

274296
let mut base_config = lsm_tree::Config::new(base_folder)
275297
.descriptor_table(keyspace.config.descriptor_table.clone())
276-
.block_cache(keyspace.config.block_cache.clone())
277-
.blob_cache(keyspace.config.blob_cache.clone())
298+
.use_cache(keyspace.config.cache.clone())
278299
.data_block_size(config.data_block_size)
279300
.index_block_size(config.index_block_size)
280301
.level_count(config.level_count)

Diff for: src/recovery.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,7 @@ pub fn recover_partitions(keyspace: &Keyspace) -> crate::Result<()> {
7676

7777
let mut base_config = lsm_tree::Config::new(path)
7878
.descriptor_table(keyspace.config.descriptor_table.clone())
79-
.block_cache(keyspace.config.block_cache.clone())
80-
.blob_cache(keyspace.config.blob_cache.clone());
79+
.use_cache(keyspace.config.cache.clone());
8180

8281
base_config.bloom_bits_per_key = recovered_config.bloom_bits_per_key;
8382
base_config.data_block_size = recovered_config.data_block_size;

Diff for: tests/v2_cache_api.rs

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// TODO: remove in V3
2+
3+
#[allow(deprecated)]
4+
use fjall::{BlobCache, BlockCache};
5+
use std::sync::Arc;
6+
7+
#[test_log::test]
8+
#[allow(deprecated)]
9+
fn v2_cache_api() -> fjall::Result<()> {
10+
use fjall::Config;
11+
12+
let folder = tempfile::tempdir()?;
13+
14+
let keyspace = Config::new(&folder)
15+
.blob_cache(Arc::new(BlobCache::with_capacity_bytes(64_000)))
16+
.block_cache(Arc::new(BlockCache::with_capacity_bytes(64_000)))
17+
.open()?;
18+
19+
assert_eq!(keyspace.cache_capacity(), 128_000);
20+
21+
Ok(())
22+
}

0 commit comments

Comments
 (0)