diff --git a/Cargo.toml b/Cargo.toml index d1bfdc4..8c104eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "fjall" description = "LSM-based key-value storage engine" license = "MIT OR Apache-2.0" -version = "2.7.6" +version = "2.8.0" edition = "2021" rust-version = "1.75.0" readme = "README.md" @@ -28,7 +28,7 @@ __internal_whitebox = [] [dependencies] byteorder = "1.5.0" byteview = "0.6.1" -lsm-tree = { version = "=2.7.6", default-features = false, features = [] } +lsm-tree = { version = "~2.8", default-features = false, features = [] } log = "0.4.21" std-semaphore = "0.1.0" tempfile = "3.10.1" diff --git a/benches/fjall.rs b/benches/fjall.rs index 25b9df2..873a17d 100644 --- a/benches/fjall.rs +++ b/benches/fjall.rs @@ -1,15 +1,4 @@ use criterion::{criterion_group, criterion_main, Criterion}; -use fjall::BlockCache; -use lsm_tree::{ - segment::{ - block::{header::Header as BlockHeader, offset::BlockOffset}, - meta::CompressionType, - value_block::ValueBlock, - }, - InternalValue, -}; -use rand::Rng; -use std::sync::Arc; fn batch_write(c: &mut Criterion) { let dir = tempfile::tempdir().unwrap(); @@ -31,82 +20,5 @@ fn batch_write(c: &mut Criterion) { }); } -fn block_cache_insert(c: &mut Criterion) { - let block_cache = BlockCache::with_capacity_bytes(1_000); - - let items = (0..100) - .map(|_| { - InternalValue::from_components( - "a".repeat(16).as_bytes(), - "a".repeat(100).as_bytes(), - 63, - lsm_tree::ValueType::Tombstone, - ) - }) - .collect(); - - let block = Arc::new(ValueBlock { - items, - header: BlockHeader { - compression: CompressionType::Lz4, - checksum: lsm_tree::Checksum::from_raw(0), - previous_block_offset: BlockOffset(0), - data_length: 0, - uncompressed_length: 0, - }, - }); - - let mut id = 0; - - c.bench_function("BlockCache::insert_disk_block", |b| { - b.iter(|| { - block_cache.insert_disk_block((0, id).into(), BlockOffset(40), block.clone()); - id += 1; - }); - }); -} - -fn block_cache_get(c: &mut Criterion) { - let block_cache = BlockCache::with_capacity_bytes(u64::MAX); - - let items = (0..100) - .map(|_| { - InternalValue::from_components( - "a".repeat(16).as_bytes(), - "a".repeat(100).as_bytes(), - 63, - lsm_tree::ValueType::Tombstone, - ) - }) - .collect(); - - let seg_id = (0, 0).into(); - let block = Arc::new(ValueBlock { - items, - header: BlockHeader { - compression: CompressionType::Lz4, - checksum: lsm_tree::Checksum::from_raw(0), - previous_block_offset: BlockOffset(0), - data_length: 0, - uncompressed_length: 0, - }, - }); - - (0u64..100_000) - .for_each(|idx| block_cache.insert_disk_block(seg_id, BlockOffset(idx), block.clone())); - assert_eq!(100_000, block_cache.len()); - - let mut rng = rand::rng(); - - c.bench_function("BlockCache::get_disk_block", |b| { - b.iter(|| { - let key = rng.random_range(0u64..100_000); - block_cache - .get_disk_block(seg_id, BlockOffset(key)) - .unwrap(); - }); - }); -} - -criterion_group!(benches, batch_write, block_cache_insert, block_cache_get); +criterion_group!(benches, batch_write); criterion_main!(benches); diff --git a/src/config.rs b/src/config.rs index 47056f1..4bb2521 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,7 +3,7 @@ // (found in the LICENSE-* files in the repository) use crate::{journal::error::RecoveryMode, path::absolute_path, Keyspace}; -use lsm_tree::{descriptor_table::FileDescriptorTable, BlobCache, BlockCache}; +use lsm_tree::{descriptor_table::FileDescriptorTable, Cache}; use std::{ path::{Path, PathBuf}, sync::Arc, @@ -18,13 +18,10 @@ pub struct Config { /// When true, the path will be deleted upon drop pub(crate) clean_path_on_drop: bool, - /// Block cache that will be shared between partitions - #[doc(hidden)] - pub block_cache: Arc, + pub(crate) cache: Arc, - /// Blob cache that will be shared between partitions - #[doc(hidden)] - pub blob_cache: Arc, + // TODO: remove in V3 + monkey_patch_cache_size: u64, /// Descriptor table that will be shared between partitions pub(crate) descriptor_table: Arc, @@ -77,8 +74,6 @@ impl Default for Config { Self { path: absolute_path(".fjall_data"), clean_path_on_drop: false, - block_cache: Arc::new(BlockCache::with_capacity_bytes(/* 16 MiB */ 16 * 1_024 * 1_024)), - blob_cache: Arc::new(BlobCache::with_capacity_bytes(/* 16 MiB */ 16 * 1_024 * 1_024)), descriptor_table: Arc::new(FileDescriptorTable::new(get_open_file_limit(), 4)), max_write_buffer_size_in_bytes: /* 64 MiB */ 64 * 1_024 * 1_024, max_journaling_size_in_bytes: /* 512 MiB */ 512 * 1_024 * 1_024, @@ -87,6 +82,9 @@ impl Default for Config { compaction_workers_count: cpus.min(4), journal_recovery_mode: RecoveryMode::default(), manual_journal_persist: false, + + cache: Arc::new(Cache::with_capacity_bytes(/* 32 MiB */ 32*1_024*1_024)), + monkey_patch_cache_size: 0, } } } @@ -142,23 +140,37 @@ impl Config { self } + // TODO: remove in V3 /// Sets the block cache. /// /// Defaults to a block cache with 16 MiB of capacity /// shared between all partitions inside this keyspace. #[must_use] - pub fn block_cache(mut self, block_cache: Arc) -> Self { - self.block_cache = block_cache; + #[deprecated = "Use Config::cache_size instead"] + #[allow(deprecated)] + pub fn block_cache(mut self, block_cache: Arc) -> Self { + self.monkey_patch_cache_size += block_cache.capacity(); self } + // TODO: remove in V3 /// Sets the blob cache. /// /// Defaults to a block cache with 16 MiB of capacity /// shared between all partitions inside this keyspace. #[must_use] - pub fn blob_cache(mut self, blob_cache: Arc) -> Self { - self.blob_cache = blob_cache; + #[deprecated = "Use Config::cache_size instead"] + #[allow(deprecated)] + pub fn blob_cache(mut self, blob_cache: Arc) -> Self { + self.monkey_patch_cache_size += blob_cache.capacity(); + self + } + + /// Sets the cache capacity in bytes. + #[must_use] + pub fn cache_size(mut self, size_bytes: u64) -> Self { + self.monkey_patch_cache_size = 0; + self.cache = Arc::new(Cache::with_capacity_bytes(size_bytes)); self } @@ -224,7 +236,11 @@ impl Config { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn open(self) -> crate::Result { + pub fn open(mut self) -> crate::Result { + // TODO: remove in V3 + if self.monkey_patch_cache_size > 0 { + self.cache = Arc::new(Cache::with_capacity_bytes(self.monkey_patch_cache_size)); + } Keyspace::open(self) } @@ -234,7 +250,11 @@ impl Config { /// /// Will return `Err` if an IO error occurs. #[cfg(any(feature = "single_writer_tx", feature = "ssi_tx"))] - pub fn open_transactional(self) -> crate::Result { + pub fn open_transactional(mut self) -> crate::Result { + // TODO: remove in V3 + if self.monkey_patch_cache_size > 0 { + self.cache = Arc::new(Cache::with_capacity_bytes(self.monkey_patch_cache_size)); + } crate::TxKeyspace::open(self) } diff --git a/src/keyspace.rs b/src/keyspace.rs index bb2c064..2a69bd6 100644 --- a/src/keyspace.rs +++ b/src/keyspace.rs @@ -328,6 +328,12 @@ impl Keyspace { Ok(()) } + #[doc(hidden)] + #[must_use] + pub fn cache_capacity(&self) -> u64 { + self.config.cache.capacity() + } + /// Opens a keyspace in the given directory. /// /// # Errors @@ -335,12 +341,8 @@ impl Keyspace { /// Returns error, if an IO error occurred. pub fn open(config: Config) -> crate::Result { log::debug!( - "block cache capacity={}MiB", - config.block_cache.capacity() / 1_024 / 1_024, - ); - log::debug!( - "blob cache capacity={}MiB", - config.blob_cache.capacity() / 1_024 / 1_024, + "cache capacity={}MiB", + config.cache.capacity() / 1_024 / 1_024, ); let keyspace = Self::create_or_recover(config)?; diff --git a/src/lib.rs b/src/lib.rs index 61c5f50..0e20976 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -168,6 +168,49 @@ pub type LsmError = lsm_tree::Error; #[doc(hidden)] pub use lsm_tree::AbstractTree; -pub use lsm_tree::{ - AnyTree, BlobCache, BlockCache, CompressionType, KvPair, Slice, TreeType, UserKey, UserValue, -}; +pub use lsm_tree::{AnyTree, CompressionType, KvPair, Slice, TreeType, UserKey, UserValue}; + +// TODO: remove in V3 + +/// Block cache that caches frequently read disk blocks +#[deprecated = "Use Config::cache_size instead"] +pub struct BlockCache(u64); + +#[allow(deprecated)] +impl BlockCache { + /// Creates a new cache with given capacity in bytes. + #[must_use] + pub fn with_capacity_bytes(bytes: u64) -> Self { + Self(bytes) + } + + /// Returns the cache capacity in bytes. + #[must_use] + pub fn capacity(&self) -> u64 { + self.0 + } +} + +/// Blob cache that caches frequently read blobs +#[deprecated = "Use Config::cache_size instead"] +#[allow(deprecated)] +pub struct BlobCache(BlockCache); + +#[allow(deprecated)] +impl BlobCache { + /// Creates a new cache with given capacity in bytes. + #[must_use] + pub fn with_capacity_bytes(bytes: u64) -> Self { + #[allow(deprecated)] + Self(BlockCache::with_capacity_bytes(bytes)) + } +} + +#[allow(deprecated)] +impl std::ops::Deref for BlobCache { + type Target = BlockCache; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} diff --git a/src/partition/mod.rs b/src/partition/mod.rs index ad5849b..b9294e9 100644 --- a/src/partition/mod.rs +++ b/src/partition/mod.rs @@ -220,6 +220,28 @@ impl GarbageCollection for PartitionHandle { } impl PartitionHandle { + /// Ingests a sorted stream of key-value pairs into the partition. + /// + /// Can only be called on a new fresh, empty partition. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + /// + /// # Panics + /// + /// Panics if the partition is **not** initially empty. + /// + /// Will panic if the input iterator is not sorted in ascending order. + pub fn ingest, V: Into>( + &self, + iter: impl Iterator, + ) -> crate::Result<()> { + self.tree + .ingest(iter.map(|(k, v)| (k.into(), v.into()))) + .map_err(Into::into) + } + pub(crate) fn from_keyspace( keyspace: &Keyspace, tree: AnyTree, @@ -273,8 +295,7 @@ impl PartitionHandle { let mut base_config = lsm_tree::Config::new(base_folder) .descriptor_table(keyspace.config.descriptor_table.clone()) - .block_cache(keyspace.config.block_cache.clone()) - .blob_cache(keyspace.config.blob_cache.clone()) + .use_cache(keyspace.config.cache.clone()) .data_block_size(config.data_block_size) .index_block_size(config.index_block_size) .level_count(config.level_count) diff --git a/src/recovery.rs b/src/recovery.rs index 70d58c4..a155879 100644 --- a/src/recovery.rs +++ b/src/recovery.rs @@ -76,8 +76,7 @@ pub fn recover_partitions(keyspace: &Keyspace) -> crate::Result<()> { let mut base_config = lsm_tree::Config::new(path) .descriptor_table(keyspace.config.descriptor_table.clone()) - .block_cache(keyspace.config.block_cache.clone()) - .blob_cache(keyspace.config.blob_cache.clone()); + .use_cache(keyspace.config.cache.clone()); base_config.bloom_bits_per_key = recovered_config.bloom_bits_per_key; base_config.data_block_size = recovered_config.data_block_size; diff --git a/tests/v2_cache_api.rs b/tests/v2_cache_api.rs new file mode 100644 index 0000000..2b8cda8 --- /dev/null +++ b/tests/v2_cache_api.rs @@ -0,0 +1,22 @@ +// TODO: remove in V3 + +#[allow(deprecated)] +use fjall::{BlobCache, BlockCache}; +use std::sync::Arc; + +#[test_log::test] +#[allow(deprecated)] +fn v2_cache_api() -> fjall::Result<()> { + use fjall::Config; + + let folder = tempfile::tempdir()?; + + let keyspace = Config::new(&folder) + .blob_cache(Arc::new(BlobCache::with_capacity_bytes(64_000))) + .block_cache(Arc::new(BlockCache::with_capacity_bytes(64_000))) + .open()?; + + assert_eq!(keyspace.cache_capacity(), 128_000); + + Ok(()) +}