Skip to content

Commit 75e1e38

Browse files
committed
feat(cache): add in flight deduping
1 parent 175706a commit 75e1e38

File tree

11 files changed

+984
-699
lines changed

11 files changed

+984
-699
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/cache/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ edition.workspace = true
88
[dependencies]
99
anyhow.workspace = true
1010
futures-util.workspace = true
11+
itertools.workspace = true
1112
lazy_static.workspace = true
1213
moka.workspace = true
1314
rivet-cache-result.workspace = true
@@ -16,8 +17,9 @@ rivet-env.workspace = true
1617
rivet-metrics.workspace = true
1718
rivet-pools.workspace = true
1819
rivet-util.workspace = true
19-
serde.workspace = true
20+
scc.workspace = true
2021
serde_json.workspace = true
22+
serde.workspace = true
2123
thiserror.workspace = true
2224
tokio.workspace = true
2325
tracing.workspace = true

engine/packages/cache/src/driver.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ impl Driver {
2323
pub async fn get<'a>(
2424
&'a self,
2525
base_key: &'a str,
26-
keys: Vec<RawCacheKey>,
26+
keys: &[RawCacheKey],
2727
) -> Result<Vec<Option<CacheValue>>, Error> {
2828
match self {
2929
Driver::InMemory(d) => d.get(base_key, keys).await,
@@ -156,14 +156,14 @@ impl InMemoryDriver {
156156
pub async fn get<'a>(
157157
&'a self,
158158
_base_key: &'a str,
159-
keys: Vec<RawCacheKey>,
159+
keys: &[RawCacheKey],
160160
) -> Result<Vec<Option<CacheValue>>, Error> {
161161
let mut result = Vec::with_capacity(keys.len());
162162

163163
// Async block for metrics
164164
async {
165165
for key in keys {
166-
result.push(self.cache.get(&*key).await.map(|x| x.value.clone()));
166+
result.push(self.cache.get(&**key).await.map(|x| x.value.clone()));
167167
}
168168
}
169169
.instrument(tracing::info_span!("get"))
Lines changed: 48 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
1-
use std::fmt::Debug;
1+
use std::{collections::HashMap, fmt::Debug};
22

33
use super::*;
44

55
/// Entry for a single value that is going to be read/written to the cache.
66
#[derive(Debug)]
7-
pub(super) struct GetterCtxKey<K, V> {
8-
/// `CacheKey` that will be used to build Redis keys.
9-
pub(super) key: K,
10-
7+
pub(super) struct GetterCtxEntry<V> {
118
/// The value that was read from the cache or getter.
129
value: Option<V>,
1310

@@ -23,81 +20,68 @@ pub struct GetterCtx<K, V>
2320
where
2421
K: CacheKey,
2522
{
26-
/// The name of the service-specific key to write this cached value to. For
27-
/// example, a team get service would use the "team_profile" key to store
28-
/// the profile a "team_members" to store a cache of members.
29-
///
30-
/// This is local to the service & source hash that caches this value.
31-
#[allow(unused)]
32-
base_key: String,
33-
34-
/// The keys to get/populate from the cache.
35-
keys: Vec<GetterCtxKey<K, V>>,
23+
/// The entries to get/populate from the cache.
24+
entries: HashMap<K, GetterCtxEntry<V>>,
3625
}
3726

3827
impl<K, V> GetterCtx<K, V>
3928
where
4029
K: CacheKey,
4130
{
42-
pub(super) fn new(base_key: String, keys: Vec<K>) -> Self {
31+
pub(super) fn new(keys: Vec<K>) -> Self {
4332
GetterCtx {
44-
base_key,
45-
keys: {
46-
// Create deduplicated ctx keys
47-
let mut ctx_keys = Vec::<GetterCtxKey<K, V>>::new();
48-
for key in keys {
49-
if !ctx_keys.iter().any(|x| x.key == key) {
50-
ctx_keys.push(GetterCtxKey {
51-
key,
33+
entries: keys
34+
.into_iter()
35+
.map(|k| {
36+
(
37+
k,
38+
GetterCtxEntry {
5239
value: None,
5340
from_cache: false,
54-
});
55-
}
56-
}
57-
ctx_keys
58-
},
41+
},
42+
)
43+
})
44+
.collect(),
5945
}
6046
}
6147

48+
pub(super) fn merge(&mut self, other: GetterCtx<K, V>) {
49+
self.entries.extend(other.entries);
50+
}
51+
6252
pub(super) fn into_values(self) -> Vec<(K, V)> {
63-
self.keys
53+
self.entries
6454
.into_iter()
65-
.filter_map(|k| {
66-
if let Some(v) = k.value {
67-
Some((k.key, v))
68-
} else {
69-
None
70-
}
71-
})
55+
.filter_map(|(k, x)| x.value.map(|v| (k, v)))
7256
.collect()
7357
}
7458

75-
/// All keys.
76-
pub(super) fn keys(&self) -> &[GetterCtxKey<K, V>] {
77-
&self.keys[..]
59+
/// All entries.
60+
pub(super) fn entries(&self) -> impl Iterator<Item = (&K, &GetterCtxEntry<V>)> {
61+
self.entries.iter()
7862
}
7963

80-
/// If all keys have an associated value.
81-
pub(super) fn all_keys_have_value(&self) -> bool {
82-
self.keys.iter().all(|x| x.value.is_some())
64+
/// If all entries have an associated value.
65+
pub(super) fn all_entries_have_value(&self) -> bool {
66+
self.entries.iter().all(|(_, x)| x.value.is_some())
8367
}
8468

8569
/// Keys that do not have a value yet.
8670
pub(super) fn unresolved_keys(&self) -> Vec<K> {
87-
self.keys
71+
self.entries
8872
.iter()
89-
.filter(|x| x.value.is_none())
90-
.map(|x| x.key.clone())
73+
.filter(|(_, x)| x.value.is_none())
74+
.map(|(k, _)| k.clone())
9175
.collect()
9276
}
9377

94-
/// Keys that have been resolved in a getter and need to be written to the
78+
/// Entries that have been resolved in a getter and need to be written to the
9579
/// cache.
96-
pub(super) fn values_needing_cache_write(&self) -> Vec<(&GetterCtxKey<K, V>, &V)> {
97-
self.keys
80+
pub(super) fn entries_needing_cache_write(&self) -> Vec<(&K, &V)> {
81+
self.entries
9882
.iter()
99-
.filter(|x| !x.from_cache)
100-
.filter_map(|k| k.value.as_ref().map(|v| (k, v)))
83+
.filter(|(_, x)| !x.from_cache)
84+
.filter_map(|(k, x)| x.value.as_ref().map(|v| (k, v)))
10185
.collect()
10286
}
10387
}
@@ -107,32 +91,26 @@ where
10791
K: CacheKey,
10892
V: Debug,
10993
{
110-
/// Sets a value with the value provided from the cache.
111-
pub(super) fn resolve_from_cache(&mut self, idx: usize, value: V) {
112-
if let Some(key) = self.keys.get_mut(idx) {
113-
key.value = Some(value);
114-
key.from_cache = true;
94+
/// Sets an entry with the value provided from the cache.
95+
pub(super) fn resolve_from_cache(&mut self, key: &K, value: V) {
96+
if let Some(entry) = self.entries.get_mut(key) {
97+
entry.value = Some(value);
98+
entry.from_cache = true;
11599
} else {
116-
tracing::warn!(?idx, ?value, "resolving cache key index out of range");
100+
tracing::warn!(?key, ?value, "resolving nonexistent cache entry");
117101
}
118102
}
119103

120-
/// Calls the callback with a mutable reference to a given key. Validates
121-
/// that the key does not already have a value.
122-
fn get_key_for_resolve(&mut self, key: &K, cb: impl FnOnce(&mut GetterCtxKey<K, V>)) {
123-
if let Some(key) = self.keys.iter_mut().find(|x| x.key == *key) {
124-
if key.value.is_some() {
125-
tracing::warn!(?key, "cache key already has value");
104+
/// Sets a value with the value provided from the getter function.
105+
pub fn resolve(&mut self, key: &K, value: V) {
106+
if let Some(entry) = self.entries.get_mut(key) {
107+
if entry.value.is_some() {
108+
tracing::warn!(?entry, "cache entry already has value");
126109
} else {
127-
cb(key);
110+
entry.value = Some(value);
128111
}
129112
} else {
130-
tracing::warn!(?key, "resolved value for nonexistent cache key");
113+
tracing::warn!(?key, "resolved value for nonexistent cache entry");
131114
}
132115
}
133-
134-
/// Sets a value with the value provided from the getter function.
135-
pub fn resolve(&mut self, key: &K, value: V) {
136-
self.get_key_for_resolve(key, |key| key.value = Some(value));
137-
}
138116
}

engine/packages/cache/src/inner.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use std::{fmt::Debug, sync::Arc};
22

3+
use tokio::sync::broadcast;
4+
35
use super::*;
46
use crate::driver::{Driver, InMemoryDriver};
57

@@ -8,6 +10,7 @@ pub type Cache = Arc<CacheInner>;
810
/// Utility type used to hold information relating to caching.
911
pub struct CacheInner {
1012
pub(crate) driver: Driver,
13+
pub(crate) in_flight: scc::HashMap<RawCacheKey, broadcast::Sender<()>>,
1114
pub(crate) ups: Option<universalpubsub::PubSub>,
1215
}
1316

@@ -33,7 +36,11 @@ impl CacheInner {
3336
#[tracing::instrument(skip(ups))]
3437
pub fn new_in_memory(max_capacity: u64, ups: Option<universalpubsub::PubSub>) -> Cache {
3538
let driver = Driver::InMemory(InMemoryDriver::new(max_capacity));
36-
Arc::new(CacheInner { driver, ups })
39+
Arc::new(CacheInner {
40+
driver,
41+
in_flight: scc::HashMap::new(),
42+
ups,
43+
})
3744
}
3845
}
3946

engine/packages/cache/src/key.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use serde::{Deserialize, Serialize};
2-
use std::{fmt::Debug, ops::Deref};
2+
use std::{fmt::Debug, hash::Hash, ops::Deref};
33

44
/// A type that can be serialized in to a key that can be used in the cache.
5-
pub trait CacheKey: Clone + Debug + PartialEq {
5+
pub trait CacheKey: Clone + Debug + Eq + PartialEq + Hash {
66
fn cache_key(&self) -> String;
77
}
88

@@ -85,7 +85,7 @@ impl_to_string!(isize);
8585
/// Unlike other types that implement `CacheKey` (which escape special characters like `:` and `\`),
8686
/// `RawCacheKey` uses the provided string as-is. This is useful when you already have a properly
8787
/// formatted cache key string or need to preserve the exact format without transformations.
88-
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
88+
#[derive(Clone, Debug, Serialize, Deserialize, Hash, Eq, PartialEq)]
8989
pub struct RawCacheKey(String);
9090

9191
impl CacheKey for RawCacheKey {

0 commit comments

Comments
 (0)