Skip to content

2.8.0 #153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Mar 30, 2025
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "fjall"
description = "LSM-based key-value storage engine"
license = "MIT OR Apache-2.0"
version = "2.7.6"
version = "2.8.0"
edition = "2021"
rust-version = "1.75.0"
readme = "README.md"
Expand All @@ -28,7 +28,7 @@ __internal_whitebox = []
[dependencies]
byteorder = "1.5.0"
byteview = "0.6.1"
lsm-tree = { version = "=2.7.6", default-features = false, features = [] }
lsm-tree = { version = "~2.8", default-features = false, features = [] }
log = "0.4.21"
std-semaphore = "0.1.0"
tempfile = "3.10.1"
Expand Down
90 changes: 1 addition & 89 deletions benches/fjall.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,4 @@
use criterion::{criterion_group, criterion_main, Criterion};
use fjall::BlockCache;
use lsm_tree::{
segment::{
block::{header::Header as BlockHeader, offset::BlockOffset},
meta::CompressionType,
value_block::ValueBlock,
},
InternalValue,
};
use rand::Rng;
use std::sync::Arc;

fn batch_write(c: &mut Criterion) {
let dir = tempfile::tempdir().unwrap();
Expand All @@ -31,82 +20,5 @@ fn batch_write(c: &mut Criterion) {
});
}

fn block_cache_insert(c: &mut Criterion) {
let block_cache = BlockCache::with_capacity_bytes(1_000);

let items = (0..100)
.map(|_| {
InternalValue::from_components(
"a".repeat(16).as_bytes(),
"a".repeat(100).as_bytes(),
63,
lsm_tree::ValueType::Tombstone,
)
})
.collect();

let block = Arc::new(ValueBlock {
items,
header: BlockHeader {
compression: CompressionType::Lz4,
checksum: lsm_tree::Checksum::from_raw(0),
previous_block_offset: BlockOffset(0),
data_length: 0,
uncompressed_length: 0,
},
});

let mut id = 0;

c.bench_function("BlockCache::insert_disk_block", |b| {
b.iter(|| {
block_cache.insert_disk_block((0, id).into(), BlockOffset(40), block.clone());
id += 1;
});
});
}

fn block_cache_get(c: &mut Criterion) {
let block_cache = BlockCache::with_capacity_bytes(u64::MAX);

let items = (0..100)
.map(|_| {
InternalValue::from_components(
"a".repeat(16).as_bytes(),
"a".repeat(100).as_bytes(),
63,
lsm_tree::ValueType::Tombstone,
)
})
.collect();

let seg_id = (0, 0).into();
let block = Arc::new(ValueBlock {
items,
header: BlockHeader {
compression: CompressionType::Lz4,
checksum: lsm_tree::Checksum::from_raw(0),
previous_block_offset: BlockOffset(0),
data_length: 0,
uncompressed_length: 0,
},
});

(0u64..100_000)
.for_each(|idx| block_cache.insert_disk_block(seg_id, BlockOffset(idx), block.clone()));
assert_eq!(100_000, block_cache.len());

let mut rng = rand::rng();

c.bench_function("BlockCache::get_disk_block", |b| {
b.iter(|| {
let key = rng.random_range(0u64..100_000);
block_cache
.get_disk_block(seg_id, BlockOffset(key))
.unwrap();
});
});
}

criterion_group!(benches, batch_write, block_cache_insert, block_cache_get);
criterion_group!(benches, batch_write);
criterion_main!(benches);
50 changes: 35 additions & 15 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// (found in the LICENSE-* files in the repository)

use crate::{journal::error::RecoveryMode, path::absolute_path, Keyspace};
use lsm_tree::{descriptor_table::FileDescriptorTable, BlobCache, BlockCache};
use lsm_tree::{descriptor_table::FileDescriptorTable, Cache};
use std::{
path::{Path, PathBuf},
sync::Arc,
Expand All @@ -18,13 +18,10 @@ pub struct Config {
/// When true, the path will be deleted upon drop
pub(crate) clean_path_on_drop: bool,

/// Block cache that will be shared between partitions
#[doc(hidden)]
pub block_cache: Arc<BlockCache>,
pub(crate) cache: Arc<Cache>,

/// Blob cache that will be shared between partitions
#[doc(hidden)]
pub blob_cache: Arc<BlobCache>,
// TODO: remove in V3
monkey_patch_cache_size: u64,

/// Descriptor table that will be shared between partitions
pub(crate) descriptor_table: Arc<FileDescriptorTable>,
Expand Down Expand Up @@ -77,8 +74,6 @@ impl Default for Config {
Self {
path: absolute_path(".fjall_data"),
clean_path_on_drop: false,
block_cache: Arc::new(BlockCache::with_capacity_bytes(/* 16 MiB */ 16 * 1_024 * 1_024)),
blob_cache: Arc::new(BlobCache::with_capacity_bytes(/* 16 MiB */ 16 * 1_024 * 1_024)),
descriptor_table: Arc::new(FileDescriptorTable::new(get_open_file_limit(), 4)),
max_write_buffer_size_in_bytes: /* 64 MiB */ 64 * 1_024 * 1_024,
max_journaling_size_in_bytes: /* 512 MiB */ 512 * 1_024 * 1_024,
Expand All @@ -87,6 +82,9 @@ impl Default for Config {
compaction_workers_count: cpus.min(4),
journal_recovery_mode: RecoveryMode::default(),
manual_journal_persist: false,

cache: Arc::new(Cache::with_capacity_bytes(/* 32 MiB */ 32*1_024*1_024)),
monkey_patch_cache_size: 0,
}
}
}
Expand Down Expand Up @@ -142,23 +140,37 @@ impl Config {
self
}

// TODO: remove in V3
/// Sets the block cache.
///
/// Defaults to a block cache with 16 MiB of capacity
/// shared between all partitions inside this keyspace.
#[must_use]
pub fn block_cache(mut self, block_cache: Arc<BlockCache>) -> Self {
self.block_cache = block_cache;
#[deprecated = "Use Config::cache_size instead"]
#[allow(deprecated)]
pub fn block_cache(mut self, block_cache: Arc<crate::BlockCache>) -> Self {
self.monkey_patch_cache_size += block_cache.capacity();
self
}

// TODO: remove in V3
/// Sets the blob cache.
///
/// Defaults to a block cache with 16 MiB of capacity
/// shared between all partitions inside this keyspace.
#[must_use]
pub fn blob_cache(mut self, blob_cache: Arc<BlobCache>) -> Self {
self.blob_cache = blob_cache;
#[deprecated = "Use Config::cache_size instead"]
#[allow(deprecated)]
pub fn blob_cache(mut self, blob_cache: Arc<crate::BlobCache>) -> Self {
self.monkey_patch_cache_size += blob_cache.capacity();
self
}

/// Sets the cache capacity in bytes.
#[must_use]
pub fn cache_size(mut self, size_bytes: u64) -> Self {
self.monkey_patch_cache_size = 0;
self.cache = Arc::new(Cache::with_capacity_bytes(size_bytes));
self
}

Expand Down Expand Up @@ -224,7 +236,11 @@ impl Config {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn open(self) -> crate::Result<Keyspace> {
pub fn open(mut self) -> crate::Result<Keyspace> {
// TODO: remove in V3
if self.monkey_patch_cache_size > 0 {
self.cache = Arc::new(Cache::with_capacity_bytes(self.monkey_patch_cache_size));
}
Keyspace::open(self)
}

Expand All @@ -234,7 +250,11 @@ impl Config {
///
/// Will return `Err` if an IO error occurs.
#[cfg(any(feature = "single_writer_tx", feature = "ssi_tx"))]
pub fn open_transactional(self) -> crate::Result<crate::TxKeyspace> {
pub fn open_transactional(mut self) -> crate::Result<crate::TxKeyspace> {
// TODO: remove in V3
if self.monkey_patch_cache_size > 0 {
self.cache = Arc::new(Cache::with_capacity_bytes(self.monkey_patch_cache_size));
}
crate::TxKeyspace::open(self)
}

Expand Down
14 changes: 8 additions & 6 deletions src/keyspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,19 +328,21 @@ impl Keyspace {
Ok(())
}

#[doc(hidden)]
#[must_use]
pub fn cache_capacity(&self) -> u64 {
self.config.cache.capacity()
}

/// Opens a keyspace in the given directory.
///
/// # Errors
///
/// Returns error, if an IO error occurred.
pub fn open(config: Config) -> crate::Result<Self> {
log::debug!(
"block cache capacity={}MiB",
config.block_cache.capacity() / 1_024 / 1_024,
);
log::debug!(
"blob cache capacity={}MiB",
config.blob_cache.capacity() / 1_024 / 1_024,
"cache capacity={}MiB",
config.cache.capacity() / 1_024 / 1_024,
);

let keyspace = Self::create_or_recover(config)?;
Expand Down
49 changes: 46 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,49 @@ pub type LsmError = lsm_tree::Error;
#[doc(hidden)]
pub use lsm_tree::AbstractTree;

pub use lsm_tree::{
AnyTree, BlobCache, BlockCache, CompressionType, KvPair, Slice, TreeType, UserKey, UserValue,
};
pub use lsm_tree::{AnyTree, CompressionType, KvPair, Slice, TreeType, UserKey, UserValue};

// TODO: remove in V3

/// Block cache that caches frequently read disk blocks
#[deprecated = "Use Config::cache_size instead"]
pub struct BlockCache(u64);

#[allow(deprecated)]
impl BlockCache {
/// Creates a new cache with given capacity in bytes.
#[must_use]
pub fn with_capacity_bytes(bytes: u64) -> Self {
Self(bytes)
}

/// Returns the cache capacity in bytes.
#[must_use]
pub fn capacity(&self) -> u64 {
self.0
}
}

/// Blob cache that caches frequently read blobs
#[deprecated = "Use Config::cache_size instead"]
#[allow(deprecated)]
pub struct BlobCache(BlockCache);

#[allow(deprecated)]
impl BlobCache {
/// Creates a new cache with given capacity in bytes.
#[must_use]
pub fn with_capacity_bytes(bytes: u64) -> Self {
#[allow(deprecated)]
Self(BlockCache::with_capacity_bytes(bytes))
}
}

#[allow(deprecated)]
impl std::ops::Deref for BlobCache {
type Target = BlockCache;

fn deref(&self) -> &Self::Target {
&self.0
}
}
25 changes: 23 additions & 2 deletions src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,28 @@ impl GarbageCollection for PartitionHandle {
}

impl PartitionHandle {
/// Ingests a sorted stream of key-value pairs into the partition.
///
/// Can only be called on a new fresh, empty partition.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
///
/// # Panics
///
/// Panics if the partition is **not** initially empty.
///
/// Will panic if the input iterator is not sorted in ascending order.
pub fn ingest<K: Into<UserKey>, V: Into<UserValue>>(
&self,
iter: impl Iterator<Item = (K, V)>,
) -> crate::Result<()> {
self.tree
.ingest(iter.map(|(k, v)| (k.into(), v.into())))
.map_err(Into::into)
}

pub(crate) fn from_keyspace(
keyspace: &Keyspace,
tree: AnyTree,
Expand Down Expand Up @@ -273,8 +295,7 @@ impl PartitionHandle {

let mut base_config = lsm_tree::Config::new(base_folder)
.descriptor_table(keyspace.config.descriptor_table.clone())
.block_cache(keyspace.config.block_cache.clone())
.blob_cache(keyspace.config.blob_cache.clone())
.use_cache(keyspace.config.cache.clone())
.data_block_size(config.data_block_size)
.index_block_size(config.index_block_size)
.level_count(config.level_count)
Expand Down
3 changes: 1 addition & 2 deletions src/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ pub fn recover_partitions(keyspace: &Keyspace) -> crate::Result<()> {

let mut base_config = lsm_tree::Config::new(path)
.descriptor_table(keyspace.config.descriptor_table.clone())
.block_cache(keyspace.config.block_cache.clone())
.blob_cache(keyspace.config.blob_cache.clone());
.use_cache(keyspace.config.cache.clone());

base_config.bloom_bits_per_key = recovered_config.bloom_bits_per_key;
base_config.data_block_size = recovered_config.data_block_size;
Expand Down
22 changes: 22 additions & 0 deletions tests/v2_cache_api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// TODO: remove in V3

#[allow(deprecated)]
use fjall::{BlobCache, BlockCache};
use std::sync::Arc;

#[test_log::test]
#[allow(deprecated)]
fn v2_cache_api() -> fjall::Result<()> {
use fjall::Config;

let folder = tempfile::tempdir()?;

let keyspace = Config::new(&folder)
.blob_cache(Arc::new(BlobCache::with_capacity_bytes(64_000)))
.block_cache(Arc::new(BlockCache::with_capacity_bytes(64_000)))
.open()?;

assert_eq!(keyspace.cache_capacity(), 128_000);

Ok(())
}