Skip to content

Commit 80b04d0

Browse files
authored
feat: support filter for in-memory cache, rename old to storage filter (#1094)
1 parent 241f5c5 commit 80b04d0

File tree

17 files changed

+222
-99
lines changed

17 files changed

+222
-99
lines changed

examples/hybrid_full.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ use std::{hash::BuildHasherDefault, num::NonZeroUsize};
1616

1717
use chrono::Datelike;
1818
use foyer::{
19-
BlockEngineBuilder, DeviceBuilder, FifoPicker, Filter, FsDeviceBuilder, HybridCache, HybridCacheBuilder,
20-
HybridCachePolicy, IoEngineBuilder, IopsCounter, LruConfig, PsyncIoEngineBuilder, RecoverMode, RejectAll, Result,
21-
RuntimeOptions, Throttle, TokioRuntimeOptions,
19+
BlockEngineBuilder, DeviceBuilder, FifoPicker, FsDeviceBuilder, HybridCache, HybridCacheBuilder, HybridCachePolicy,
20+
IoEngineBuilder, IopsCounter, LruConfig, PsyncIoEngineBuilder, RecoverMode, RejectAll, Result, RuntimeOptions,
21+
StorageFilter, Throttle, TokioRuntimeOptions,
2222
};
2323
use tempfile::tempdir;
2424

@@ -50,6 +50,7 @@ async fn main() -> anyhow::Result<()> {
5050
})
5151
.with_hash_builder(BuildHasherDefault::default())
5252
.with_weighter(|_key, value: &String| value.len())
53+
.with_filter(|_, _| true)
5354
.storage()
5455
.with_io_engine(io_engine)
5556
.with_engine_config(
@@ -62,8 +63,8 @@ async fn main() -> anyhow::Result<()> {
6263
.with_buffer_pool_size(256 * 1024 * 1024)
6364
.with_clean_block_threshold(4)
6465
.with_eviction_pickers(vec![Box::<FifoPicker>::default()])
65-
.with_admission_filter(Filter::new())
66-
.with_reinsertion_filter(Filter::new().with_condition(RejectAll))
66+
.with_admission_filter(StorageFilter::new())
67+
.with_reinsertion_filter(StorageFilter::new().with_condition(RejectAll))
6768
.with_tombstone_log(false),
6869
)
6970
.with_recover_mode(RecoverMode::Quiet)

foyer-memory/src/cache.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::{
3636
s3fifo::{S3Fifo, S3FifoConfig},
3737
sieve::{Sieve, SieveConfig},
3838
},
39-
raw::{FetchContext, FetchState, FetchTarget, RawCache, RawCacheConfig, RawCacheEntry, RawFetch, Weighter},
39+
raw::{FetchContext, FetchState, FetchTarget, Filter, RawCache, RawCacheConfig, RawCacheEntry, RawFetch, Weighter},
4040
Piece, Pipe, Result,
4141
};
4242

@@ -427,6 +427,7 @@ where
427427

428428
hash_builder: S,
429429
weighter: Arc<dyn Weighter<K, V>>,
430+
filter: Arc<dyn Filter<K, V>>,
430431

431432
event_listener: Option<Arc<dyn EventListener<Key = K, Value = V>>>,
432433

@@ -450,6 +451,7 @@ where
450451

451452
hash_builder: Default::default(),
452453
weighter: Arc::new(|_, _| 1),
454+
filter: Arc::new(|_, _| true),
453455
event_listener: None,
454456

455457
registry: Box::new(NoopMetricsRegistry),
@@ -501,6 +503,7 @@ where
501503
eviction_config: self.eviction_config,
502504
hash_builder,
503505
weighter: self.weighter,
506+
filter: self.filter,
504507
event_listener: self.event_listener,
505508
registry: self.registry,
506509
metrics: self.metrics,
@@ -513,6 +516,21 @@ where
513516
self
514517
}
515518

519+
/// Set the filter for the in-memory cache.
520+
///
521+
/// The filter is used to decide whether to admit or reject an entry based on its key and value.
522+
///
523+
/// If the filter returns true, the key value can be inserted into the in-memory cache;
524+
/// otherwise, the key value cannot be inserted.
525+
///
526+
/// To ensure API consistency, the in-memory cache will still return a cache entry,
527+
/// but it will not count towards the in-memory cache usage,
528+
/// and it will be immediately reclaimed when the cache entry is dropped.
529+
pub fn with_filter(mut self, filter: impl Filter<K, V>) -> Self {
530+
self.filter = Arc::new(filter);
531+
self
532+
}
533+
516534
/// Set event listener.
517535
pub fn with_event_listener(mut self, event_listener: Arc<dyn EventListener<Key = K, Value = V>>) -> Self {
518536
self.event_listener = Some(event_listener);
@@ -560,6 +578,7 @@ where
560578
eviction_config,
561579
hash_builder: self.hash_builder,
562580
weighter: self.weighter,
581+
filter: self.filter,
563582
event_listener: self.event_listener,
564583
metrics,
565584
}))),
@@ -569,6 +588,7 @@ where
569588
eviction_config,
570589
hash_builder: self.hash_builder,
571590
weighter: self.weighter,
591+
filter: self.filter,
572592
event_listener: self.event_listener,
573593
metrics,
574594
}))),
@@ -578,6 +598,7 @@ where
578598
eviction_config,
579599
hash_builder: self.hash_builder,
580600
weighter: self.weighter,
601+
filter: self.filter,
581602
event_listener: self.event_listener,
582603
metrics,
583604
}))),
@@ -587,6 +608,7 @@ where
587608
eviction_config,
588609
hash_builder: self.hash_builder,
589610
weighter: self.weighter,
611+
filter: self.filter,
590612
event_listener: self.event_listener,
591613
metrics,
592614
}))),
@@ -596,6 +618,7 @@ where
596618
eviction_config,
597619
hash_builder: self.hash_builder,
598620
weighter: self.weighter,
621+
filter: self.filter,
599622
event_listener: self.event_listener,
600623
metrics,
601624
}))),

foyer-memory/src/prelude.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,5 @@ pub use crate::{
1919
error::{Error, Result},
2020
eviction::{fifo::FifoConfig, lfu::LfuConfig, lru::LruConfig, s3fifo::S3FifoConfig, Eviction, Op},
2121
pipe::{Piece, Pipe},
22-
raw::{FetchContext, FetchState, FetchTarget, Weighter},
22+
raw::{FetchContext, FetchState, FetchTarget, Filter, Weighter},
2323
};

foyer-memory/src/raw.rs

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,19 @@ use crate::{
6060
pub trait Weighter<K, V>: Fn(&K, &V) -> usize + Send + Sync + 'static {}
6161
impl<K, V, T> Weighter<K, V> for T where T: Fn(&K, &V) -> usize + Send + Sync + 'static {}
6262

63+
/// The filter for the in-memory cache.
64+
///
65+
/// The filter is used to decide whether to admit or reject an entry based on its key and value.
66+
///
67+
/// If the filter returns true, the key value can be inserted into the in-memory cache;
68+
/// otherwise, the key value cannot be inserted.
69+
///
70+
/// To ensure API consistency, the in-memory cache will still return a cache entry,
71+
/// but it will not count towards the in-memory cache usage,
72+
/// and it will be immediately reclaimed when the cache entry is dropped.
73+
pub trait Filter<K, V>: Fn(&K, &V) -> bool + Send + Sync + 'static {}
74+
impl<K, V, T> Filter<K, V> for T where T: Fn(&K, &V) -> bool + Send + Sync + 'static {}
75+
6376
pub struct RawCacheConfig<E, S>
6477
where
6578
E: Eviction,
@@ -70,6 +83,7 @@ where
7083
pub eviction_config: E::Config,
7184
pub hash_builder: S,
7285
pub weighter: Arc<dyn Weighter<E::Key, E::Value>>,
86+
pub filter: Arc<dyn Filter<E::Key, E::Value>>,
7387
pub event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
7488
pub metrics: Arc<Metrics>,
7589
}
@@ -398,6 +412,7 @@ where
398412

399413
hash_builder: Arc<S>,
400414
weighter: Arc<dyn Weighter<E::Key, E::Value>>,
415+
filter: Arc<dyn Filter<E::Key, E::Value>>,
401416

402417
metrics: Arc<Metrics>,
403418
event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
@@ -491,6 +506,7 @@ where
491506
capacity: config.capacity,
492507
hash_builder: Arc::new(config.hash_builder),
493508
weighter: config.weighter,
509+
filter: config.filter,
494510
metrics: config.metrics,
495511
event_listener: config.event_listener,
496512
pipe: ArcSwap::new(Arc::new(pipe)),
@@ -559,10 +575,13 @@ where
559575
&self,
560576
key: E::Key,
561577
value: E::Value,
562-
properties: E::Properties,
578+
mut properties: E::Properties,
563579
) -> RawCacheEntry<E, S, I> {
564580
let hash = self.inner.hash_builder.hash_one(&key);
565581
let weight = (self.inner.weighter)(&key, &value);
582+
if !(self.inner.filter)(&key, &value) {
583+
properties = properties.with_disposable(true);
584+
}
566585
let record = Arc::new(Record::new(Data {
567586
key,
568587
value,
@@ -582,6 +601,17 @@ where
582601
#[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert_inner"))]
583602
fn insert_inner(&self, record: Arc<Record<E>>) -> RawCacheEntry<E, S, I> {
584603
if record.properties().disposable().unwrap_or_default() {
604+
// Remove the stale record if it exists.
605+
self.inner.shards[self.shard(record.hash())]
606+
.write()
607+
.remove(record.hash(), record.key())
608+
.inspect(|r| {
609+
// Deallocate data out of the lock critical section.
610+
if let Some(listener) = self.inner.event_listener.as_ref() {
611+
listener.on_leave(Event::Replace, r.key(), r.value());
612+
}
613+
});
614+
585615
// If the record is disposable, we do not insert it into the cache.
586616
// Instead, we just return it and let it be dropped immediately after the last reference drops.
587617
record.inc_refs(1);
@@ -1212,6 +1242,7 @@ mod tests {
12121242
eviction_config: FifoConfig::default(),
12131243
hash_builder: Default::default(),
12141244
weighter: Arc::new(|_, _| 1),
1245+
filter: Arc::new(|_, _| true),
12151246
event_listener: None,
12161247
metrics: Arc::new(Metrics::noop()),
12171248
})
@@ -1226,6 +1257,7 @@ mod tests {
12261257
eviction_config: S3FifoConfig::default(),
12271258
hash_builder: Default::default(),
12281259
weighter: Arc::new(|_, _| 1),
1260+
filter: Arc::new(|_, _| true),
12291261
event_listener: None,
12301262
metrics: Arc::new(Metrics::noop()),
12311263
})
@@ -1240,6 +1272,7 @@ mod tests {
12401272
eviction_config: LruConfig::default(),
12411273
hash_builder: Default::default(),
12421274
weighter: Arc::new(|_, _| 1),
1275+
filter: Arc::new(|_, _| true),
12431276
event_listener: None,
12441277
metrics: Arc::new(Metrics::noop()),
12451278
})
@@ -1254,6 +1287,7 @@ mod tests {
12541287
eviction_config: LfuConfig::default(),
12551288
hash_builder: Default::default(),
12561289
weighter: Arc::new(|_, _| 1),
1290+
filter: Arc::new(|_, _| true),
12571291
event_listener: None,
12581292
metrics: Arc::new(Metrics::noop()),
12591293
})
@@ -1268,6 +1302,7 @@ mod tests {
12681302
eviction_config: SieveConfig {},
12691303
hash_builder: Default::default(),
12701304
weighter: Arc::new(|_, _| 1),
1305+
filter: Arc::new(|_, _| true),
12711306
event_listener: None,
12721307
metrics: Arc::new(Metrics::noop()),
12731308
})
@@ -1306,6 +1341,43 @@ mod tests {
13061341
assert_eq!(fifo.usage(), 0);
13071342
drop(e2a);
13081343
assert_eq!(fifo.usage(), 0);
1344+
1345+
let fifo = fifo_cache_for_test();
1346+
fifo.insert(1, 1);
1347+
assert_eq!(fifo.usage(), 1);
1348+
assert_eq!(fifo.get(&1).unwrap().value(), &1);
1349+
let e2 = fifo.insert_with_properties(1, 100, TestProperties::default().with_disposable(true));
1350+
assert_eq!(fifo.usage(), 0);
1351+
drop(e2);
1352+
assert_eq!(fifo.usage(), 0);
1353+
assert!(fifo.get(&1).is_none());
1354+
}
1355+
1356+
#[expect(clippy::type_complexity)]
1357+
#[test_log::test]
1358+
fn test_insert_filter() {
1359+
let fifo: RawCache<
1360+
Fifo<u64, u64, TestProperties>,
1361+
ModHasher,
1362+
HashTableIndexer<Fifo<u64, u64, TestProperties>>,
1363+
> = RawCache::new(RawCacheConfig {
1364+
capacity: 256,
1365+
shards: 4,
1366+
eviction_config: FifoConfig::default(),
1367+
hash_builder: Default::default(),
1368+
weighter: Arc::new(|_, _| 1),
1369+
filter: Arc::new(|k, _| !matches!(*k, 42)),
1370+
event_listener: None,
1371+
metrics: Arc::new(Metrics::noop()),
1372+
});
1373+
1374+
fifo.insert(1, 1);
1375+
fifo.insert(2, 2);
1376+
fifo.insert(42, 42);
1377+
assert_eq!(fifo.usage(), 2);
1378+
assert_eq!(fifo.get(&1).unwrap().value(), &1);
1379+
assert_eq!(fifo.get(&2).unwrap().value(), &2);
1380+
assert!(fifo.get(&42).is_none());
13091381
}
13101382

13111383
#[test]
@@ -1338,6 +1410,7 @@ mod tests {
13381410
eviction_config: FifoConfig::default(),
13391411
hash_builder: Default::default(),
13401412
weighter: Arc::new(|k, v| k.len() + v.len()),
1413+
filter: Arc::new(|_, _| true),
13411414
event_listener: None,
13421415
metrics: Arc::new(Metrics::noop()),
13431416
});
@@ -1440,6 +1513,7 @@ mod tests {
14401513
eviction_config: FifoConfig::default(),
14411514
hash_builder: Default::default(),
14421515
weighter: Arc::new(|_, _| 1),
1516+
filter: Arc::new(|_, _| true),
14431517
event_listener: None,
14441518
metrics: Arc::new(Metrics::noop()),
14451519
});
@@ -1455,6 +1529,7 @@ mod tests {
14551529
eviction_config: S3FifoConfig::default(),
14561530
hash_builder: Default::default(),
14571531
weighter: Arc::new(|_, _| 1),
1532+
filter: Arc::new(|_, _| true),
14581533
event_listener: None,
14591534
metrics: Arc::new(Metrics::noop()),
14601535
});
@@ -1470,6 +1545,7 @@ mod tests {
14701545
eviction_config: LruConfig::default(),
14711546
hash_builder: Default::default(),
14721547
weighter: Arc::new(|_, _| 1),
1548+
filter: Arc::new(|_, _| true),
14731549
event_listener: None,
14741550
metrics: Arc::new(Metrics::noop()),
14751551
});
@@ -1485,6 +1561,7 @@ mod tests {
14851561
eviction_config: LfuConfig::default(),
14861562
hash_builder: Default::default(),
14871563
weighter: Arc::new(|_, _| 1),
1564+
filter: Arc::new(|_, _| true),
14881565
event_listener: None,
14891566
metrics: Arc::new(Metrics::noop()),
14901567
});
@@ -1500,6 +1577,7 @@ mod tests {
15001577
eviction_config: SieveConfig {},
15011578
hash_builder: Default::default(),
15021579
weighter: Arc::new(|_, _| 1),
1580+
filter: Arc::new(|_, _| true),
15031581
event_listener: None,
15041582
metrics: Arc::new(Metrics::noop()),
15051583
});

0 commit comments

Comments
 (0)