Skip to content

cache file descriptor #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: v2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions benches/value_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ use criterion::{criterion_group, criterion_main, Criterion};
use rand::{Rng, RngCore};
use std::{
collections::BTreeMap,
fs::File,
io::BufReader,
sync::{Arc, RwLock},
};
use value_log::{
BlobCache, Compressor, Config, IndexReader, IndexWriter, UserKey, UserValue, ValueHandle,
ValueLog, ValueLogId,
BlobCache, BlobFileId, Compressor, Config, FDCache, IndexReader, IndexWriter, UserKey,
UserValue, ValueHandle, ValueLog, ValueLogId,
};

type MockIndexInner = RwLock<BTreeMap<UserKey, (ValueHandle, u32)>>;
Expand Down Expand Up @@ -89,6 +91,13 @@ impl BlobCache for NoCacher {
fn insert(&self, _: ValueLogId, _: &ValueHandle, _: UserValue) {}
}

impl FDCache for NoCacher {
fn get(&self, _: ValueLogId, _: BlobFileId) -> Option<BufReader<File>> {
None
}
fn insert(&self, _: ValueLogId, _: BlobFileId, _: BufReader<File>) {}
}

fn prefetch(c: &mut Criterion) {
let mut group = c.benchmark_group("prefetch range");

Expand All @@ -101,7 +110,11 @@ fn prefetch(c: &mut Criterion) {
let folder = tempfile::tempdir().unwrap();
let vl_path = folder.path();

let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher)).unwrap();
let value_log = ValueLog::open(
vl_path,
Config::<_, _, NoCompressor>::new(NoCacher, NoCacher),
)
.unwrap();

let mut writer = value_log.get_writer().unwrap();

Expand Down Expand Up @@ -184,7 +197,11 @@ fn load_value(c: &mut Criterion) {
let folder = tempfile::tempdir().unwrap();
let vl_path = folder.path();

let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher)).unwrap();
let value_log = ValueLog::open(
vl_path,
Config::<_, _, NoCompressor>::new(NoCacher, NoCacher),
)
.unwrap();

let mut writer = value_log.get_writer().unwrap();

Expand Down
12 changes: 8 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,29 @@
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use crate::{blob_cache::BlobCache, compression::Compressor};
use crate::{blob_cache::BlobCache, compression::Compressor, fd_cache::BlobFileId, FDCache};

/// Value log configuration
pub struct Config<BC: BlobCache, C: Compressor + Clone> {
pub struct Config<BC: BlobCache, FDC: FDCache, C: Compressor + Clone> {
/// Target size of vLog segments
pub(crate) segment_size_bytes: u64,

/// Blob cache to use
pub(crate) blob_cache: BC,

/// File descriptor cache to use
pub(crate) fd_cache: FDC,

/// Compression to use
pub(crate) compression: C,
}

impl<BC: BlobCache, C: Compressor + Clone + Default> Config<BC, C> {
impl<BC: BlobCache, FDC: FDCache, C: Compressor + Clone + Default> Config<BC, FDC, C> {
/// Creates a new configuration builder.
pub fn new(blob_cache: BC) -> Self {
pub fn new(blob_cache: BC, fd_cache: FDC) -> Self {
Self {
blob_cache,
fd_cache,
compression: Default::default(),
segment_size_bytes: 128 * 1_024 * 1_024,
}
Expand Down
19 changes: 19 additions & 0 deletions src/fd_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) 2024-present, fjall-rs
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use crate::ValueLogId;
use std::{fs::File, io::BufReader};

/// The unique identifier for a value log blob file. Another name for SegmentId
pub type BlobFileId = u64;

/// File descriptor cache, to cache file descriptors after an fopen().
/// Reduces the number of fopen() needed when accessing the same blob file.
pub trait FDCache: Clone {
/// Caches a file descriptor
fn insert(&self, vlog_id: ValueLogId, blob_file_id: BlobFileId, fd: BufReader<File>);

/// Retrieves a file descriptor from the cache, or `None` if it could not be found
fn get(&self, vlog_id: ValueLogId, blob_file_id: BlobFileId) -> Option<BufReader<File>>;
}
18 changes: 11 additions & 7 deletions src/gc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

pub mod report;

use crate::{id::SegmentId, BlobCache, Compressor, ValueLog};
use crate::{id::SegmentId, BlobCache, Compressor, FDCache, ValueLog};

/// GC strategy
#[allow(clippy::module_name_repetitions)]
pub trait GcStrategy<BC: BlobCache, C: Compressor + Clone> {
pub trait GcStrategy<BC: BlobCache, FD: FDCache, C: Compressor + Clone> {
/// Picks segments based on a predicate.
fn pick(&self, value_log: &ValueLog<BC, C>) -> Vec<SegmentId>;
fn pick(&self, value_log: &ValueLog<BC, FD, C>) -> Vec<SegmentId>;
}

/// Picks segments that have a certain percentage of stale blobs
Expand All @@ -32,8 +32,10 @@ impl StaleThresholdStrategy {
}
}

impl<BC: BlobCache, C: Compressor + Clone> GcStrategy<BC, C> for StaleThresholdStrategy {
fn pick(&self, value_log: &ValueLog<BC, C>) -> Vec<SegmentId> {
impl<BC: BlobCache, FDC: FDCache, C: Compressor + Clone> GcStrategy<BC, FDC, C>
for StaleThresholdStrategy
{
fn pick(&self, value_log: &ValueLog<BC, FDC, C>) -> Vec<SegmentId> {
value_log
.manifest
.segments
Expand Down Expand Up @@ -62,9 +64,11 @@ impl SpaceAmpStrategy {
}
}

impl<BC: BlobCache, C: Compressor + Clone> GcStrategy<BC, C> for SpaceAmpStrategy {
impl<BC: BlobCache, FDC: FDCache, C: Compressor + Clone> GcStrategy<BC, FDC, C>
for SpaceAmpStrategy
{
#[allow(clippy::cast_precision_loss, clippy::significant_drop_tightening)]
fn pick(&self, value_log: &ValueLog<BC, C>) -> Vec<SegmentId> {
fn pick(&self, value_log: &ValueLog<BC, FDC, C>) -> Vec<SegmentId> {
let space_amp_target = self.0;
let current_space_amp = value_log.space_amp();

Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#![cfg_attr(not(feature = "bytes"), forbid(unsafe_code))]

mod blob_cache;
mod fd_cache;

#[doc(hidden)]
pub mod coding;
Expand Down Expand Up @@ -82,6 +83,7 @@ pub use {
compression::Compressor,
config::Config,
error::{Error, Result},
fd_cache::{BlobFileId, FDCache},
gc::report::GcReport,
gc::{GcStrategy, SpaceAmpStrategy, StaleThresholdStrategy},
handle::ValueHandle,
Expand Down
4 changes: 4 additions & 0 deletions src/segment/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ impl<C: Compressor + Clone> Reader<C> {
self.compression = Some(compressor);
self
}

pub(crate) fn into_inner(self) -> BufReader<File> {
self.inner
}
}

impl<C: Compressor + Clone> Iterator for Reader<C> {
Expand Down
44 changes: 32 additions & 12 deletions src/value_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
scanner::{Scanner, SizeMap},
segment::merge::MergeReader,
version::Version,
BlobCache, Compressor, Config, GcStrategy, IndexReader, SegmentReader, SegmentWriter,
BlobCache, Compressor, Config, FDCache, GcStrategy, IndexReader, SegmentReader, SegmentWriter,
UserValue, ValueHandle,
};
use std::{
Expand Down Expand Up @@ -43,30 +43,35 @@ fn unlink_blob_files(base_path: &Path, ids: &[SegmentId]) {

/// A disk-resident value log
#[derive(Clone)]
pub struct ValueLog<BC: BlobCache, C: Compressor + Clone>(Arc<ValueLogInner<BC, C>>);
pub struct ValueLog<BC: BlobCache, FDC: FDCache, C: Compressor + Clone>(
Arc<ValueLogInner<BC, FDC, C>>,
);

impl<BC: BlobCache, C: Compressor + Clone> std::ops::Deref for ValueLog<BC, C> {
type Target = ValueLogInner<BC, C>;
impl<BC: BlobCache, C: Compressor + Clone, FDC: FDCache> std::ops::Deref for ValueLog<BC, FDC, C> {
type Target = ValueLogInner<BC, FDC, C>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

#[allow(clippy::module_name_repetitions)]
pub struct ValueLogInner<BC: BlobCache, C: Compressor + Clone> {
pub struct ValueLogInner<BC: BlobCache, FDC: FDCache, C: Compressor + Clone> {
/// Unique value log ID
id: u64,

/// Base folder
pub path: PathBuf,

/// Value log configuration
config: Config<BC, C>,
config: Config<BC, FDC, C>,

/// In-memory blob cache
blob_cache: BC,

/// In-memory FD cache
fd_cache: FDC,

/// Segment manifest
#[doc(hidden)]
pub manifest: SegmentManifest<C>,
Expand All @@ -80,15 +85,15 @@ pub struct ValueLogInner<BC: BlobCache, C: Compressor + Clone> {
pub rollover_guard: Mutex<()>,
}

impl<BC: BlobCache, C: Compressor + Clone> ValueLog<BC, C> {
impl<BC: BlobCache, C: Compressor + Clone, FDC: FDCache> ValueLog<BC, FDC, C> {
/// Creates or recovers a value log in the given directory.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn open<P: Into<PathBuf>>(
path: P, // TODO: move path into config?
config: Config<BC, C>,
config: Config<BC, FDC, C>,
) -> crate::Result<Self> {
let path = path.into();

Expand Down Expand Up @@ -143,7 +148,7 @@ impl<BC: BlobCache, C: Compressor + Clone> ValueLog<BC, C> {
/// Creates a new empty value log in a directory.
pub(crate) fn create_new<P: Into<PathBuf>>(
path: P,
config: Config<BC, C>,
config: Config<BC, FDC, C>,
) -> crate::Result<Self> {
let path = absolute_path(path.into());
log::trace!("Creating value-log at {}", path.display());
Expand Down Expand Up @@ -174,20 +179,25 @@ impl<BC: BlobCache, C: Compressor + Clone> ValueLog<BC, C> {
}

let blob_cache = config.blob_cache.clone();
let fd_cache = config.fd_cache.clone();
let manifest = SegmentManifest::create_new(&path)?;

Ok(Self(Arc::new(ValueLogInner {
id: get_next_vlog_id(),
config,
path,
blob_cache,
fd_cache,
manifest,
id_generator: IdGenerator::default(),
rollover_guard: Mutex::new(()),
})))
}

pub(crate) fn recover<P: Into<PathBuf>>(path: P, config: Config<BC, C>) -> crate::Result<Self> {
pub(crate) fn recover<P: Into<PathBuf>>(
path: P,
config: Config<BC, FDC, C>,
) -> crate::Result<Self> {
let path = path.into();
log::info!("Recovering vLog at {}", path.display());

Expand All @@ -204,6 +214,7 @@ impl<BC: BlobCache, C: Compressor + Clone> ValueLog<BC, C> {
}

let blob_cache = config.blob_cache.clone();
let fd_cache = config.fd_cache.clone();
let manifest = SegmentManifest::recover(&path)?;

let highest_id = manifest
Expand All @@ -220,6 +231,7 @@ impl<BC: BlobCache, C: Compressor + Clone> ValueLog<BC, C> {
config,
path,
blob_cache,
fd_cache,
manifest,
id_generator: IdGenerator::new(highest_id + 1),
rollover_guard: Mutex::new(()),
Expand Down Expand Up @@ -270,7 +282,11 @@ impl<BC: BlobCache, C: Compressor + Clone> ValueLog<BC, C> {
return Ok(None);
};

let mut reader = BufReader::new(File::open(&segment.path)?);
let mut reader = match self.fd_cache.get(self.id, vhandle.segment_id) {
Some(fd) => fd,
None => BufReader::new(File::open(&segment.path)?),
};

reader.seek(std::io::SeekFrom::Start(vhandle.offset))?;
let mut reader = SegmentReader::with_reader(vhandle.segment_id, reader)
.use_compression(self.config.compression.clone());
Expand Down Expand Up @@ -302,6 +318,10 @@ impl<BC: BlobCache, C: Compressor + Clone> ValueLog<BC, C> {
self.blob_cache.insert(self.id, &value_handle, val);
}

// cache the BufReader for future use, must ensure to always use SeekFrom::Start when using it from cache
self.fd_cache
.insert(self.id, vhandle.segment_id, reader.into_inner());

Ok(Some(val))
}

Expand Down Expand Up @@ -499,7 +519,7 @@ impl<BC: BlobCache, C: Compressor + Clone> ValueLog<BC, C> {
/// Will return `Err` if an IO error occurs.
pub fn apply_gc_strategy<R: IndexReader, W: IndexWriter>(
&self,
strategy: &impl GcStrategy<BC, C>,
strategy: &impl GcStrategy<BC, FDC, C>,
index_reader: &R,
index_writer: W,
) -> crate::Result<u64> {
Expand Down
5 changes: 4 additions & 1 deletion tests/accidental_drop_rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ fn accidental_drop_rc() -> value_log::Result<()> {

let index = MockIndex::default();

let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher))?;
let value_log = ValueLog::open(
vl_path,
Config::<_, _, NoCompressor>::new(NoCacher, NoCacher),
)?;

for key in ["a", "b"] {
let value = &key;
Expand Down
5 changes: 4 additions & 1 deletion tests/basic_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ fn basic_gc() -> value_log::Result<()> {

let index = MockIndex::default();

let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher))?;
let value_log = ValueLog::open(
vl_path,
Config::<_, _, NoCompressor>::new(NoCacher, NoCacher),
)?;

{
let items = ["a", "b", "c", "d", "e"];
Expand Down
Loading