Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion engine/packages/cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition.workspace = true
[dependencies]
anyhow.workspace = true
futures-util.workspace = true
itertools.workspace = true
lazy_static.workspace = true
moka.workspace = true
rivet-cache-result.workspace = true
Expand All @@ -16,8 +17,9 @@ rivet-env.workspace = true
rivet-metrics.workspace = true
rivet-pools.workspace = true
rivet-util.workspace = true
serde.workspace = true
scc.workspace = true
serde_json.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true
Expand Down
6 changes: 3 additions & 3 deletions engine/packages/cache/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl Driver {
pub async fn get<'a>(
&'a self,
base_key: &'a str,
keys: Vec<RawCacheKey>,
keys: &[RawCacheKey],
) -> Result<Vec<Option<CacheValue>>, Error> {
match self {
Driver::InMemory(d) => d.get(base_key, keys).await,
Expand Down Expand Up @@ -156,14 +156,14 @@ impl InMemoryDriver {
pub async fn get<'a>(
&'a self,
_base_key: &'a str,
keys: Vec<RawCacheKey>,
keys: &[RawCacheKey],
) -> Result<Vec<Option<CacheValue>>, Error> {
let mut result = Vec::with_capacity(keys.len());

// Async block for metrics
async {
for key in keys {
result.push(self.cache.get(&*key).await.map(|x| x.value.clone()));
result.push(self.cache.get(&**key).await.map(|x| x.value.clone()));
}
}
.instrument(tracing::info_span!("get"))
Expand Down
118 changes: 48 additions & 70 deletions engine/packages/cache/src/getter_ctx.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use std::fmt::Debug;
use std::{collections::HashMap, fmt::Debug};

use super::*;

/// Entry for a single value that is going to be read/written to the cache.
#[derive(Debug)]
pub(super) struct GetterCtxKey<K, V> {
/// `CacheKey` that will be used to build Redis keys.
pub(super) key: K,

pub(super) struct GetterCtxEntry<V> {
/// The value that was read from the cache or getter.
value: Option<V>,

Expand All @@ -23,81 +20,68 @@ pub struct GetterCtx<K, V>
where
K: CacheKey,
{
/// The name of the service-specific key to write this cached value to. For
/// example, a team get service would use the "team_profile" key to store
/// the profile a "team_members" to store a cache of members.
///
/// This is local to the service & source hash that caches this value.
#[allow(unused)]
base_key: String,

/// The keys to get/populate from the cache.
keys: Vec<GetterCtxKey<K, V>>,
/// The entries to get/populate from the cache.
entries: HashMap<K, GetterCtxEntry<V>>,
}

impl<K, V> GetterCtx<K, V>
where
K: CacheKey,
{
pub(super) fn new(base_key: String, keys: Vec<K>) -> Self {
pub(super) fn new(keys: Vec<K>) -> Self {
GetterCtx {
base_key,
keys: {
// Create deduplicated ctx keys
let mut ctx_keys = Vec::<GetterCtxKey<K, V>>::new();
for key in keys {
if !ctx_keys.iter().any(|x| x.key == key) {
ctx_keys.push(GetterCtxKey {
key,
entries: keys
.into_iter()
.map(|k| {
(
k,
GetterCtxEntry {
value: None,
from_cache: false,
});
}
}
ctx_keys
},
},
)
})
.collect(),
}
}

pub(super) fn merge(&mut self, other: GetterCtx<K, V>) {
self.entries.extend(other.entries);
}

pub(super) fn into_values(self) -> Vec<(K, V)> {
self.keys
self.entries
.into_iter()
.filter_map(|k| {
if let Some(v) = k.value {
Some((k.key, v))
} else {
None
}
})
.filter_map(|(k, x)| x.value.map(|v| (k, v)))
.collect()
}

/// All keys.
pub(super) fn keys(&self) -> &[GetterCtxKey<K, V>] {
&self.keys[..]
/// All entries.
pub(super) fn entries(&self) -> impl Iterator<Item = (&K, &GetterCtxEntry<V>)> {
self.entries.iter()
}

/// If all keys have an associated value.
pub(super) fn all_keys_have_value(&self) -> bool {
self.keys.iter().all(|x| x.value.is_some())
/// If all entries have an associated value.
pub(super) fn all_entries_have_value(&self) -> bool {
self.entries.iter().all(|(_, x)| x.value.is_some())
}

/// Keys that do not have a value yet.
pub(super) fn unresolved_keys(&self) -> Vec<K> {
self.keys
self.entries
.iter()
.filter(|x| x.value.is_none())
.map(|x| x.key.clone())
.filter(|(_, x)| x.value.is_none())
.map(|(k, _)| k.clone())
.collect()
}

/// Keys that have been resolved in a getter and need to be written to the
/// Entries that have been resolved in a getter and need to be written to the
/// cache.
pub(super) fn values_needing_cache_write(&self) -> Vec<(&GetterCtxKey<K, V>, &V)> {
self.keys
pub(super) fn entries_needing_cache_write(&self) -> Vec<(&K, &V)> {
self.entries
.iter()
.filter(|x| !x.from_cache)
.filter_map(|k| k.value.as_ref().map(|v| (k, v)))
.filter(|(_, x)| !x.from_cache)
.filter_map(|(k, x)| x.value.as_ref().map(|v| (k, v)))
.collect()
}
}
Expand All @@ -107,32 +91,26 @@ where
K: CacheKey,
V: Debug,
{
/// Sets a value with the value provided from the cache.
pub(super) fn resolve_from_cache(&mut self, idx: usize, value: V) {
if let Some(key) = self.keys.get_mut(idx) {
key.value = Some(value);
key.from_cache = true;
/// Sets an entry with the value provided from the cache.
pub(super) fn resolve_from_cache(&mut self, key: &K, value: V) {
if let Some(entry) = self.entries.get_mut(key) {
entry.value = Some(value);
entry.from_cache = true;
} else {
tracing::warn!(?idx, ?value, "resolving cache key index out of range");
tracing::warn!(?key, ?value, "resolving nonexistent cache entry");
}
}

/// Calls the callback with a mutable reference to a given key. Validates
/// that the key does not already have a value.
fn get_key_for_resolve(&mut self, key: &K, cb: impl FnOnce(&mut GetterCtxKey<K, V>)) {
if let Some(key) = self.keys.iter_mut().find(|x| x.key == *key) {
if key.value.is_some() {
tracing::warn!(?key, "cache key already has value");
/// Sets a value with the value provided from the getter function.
pub fn resolve(&mut self, key: &K, value: V) {
if let Some(entry) = self.entries.get_mut(key) {
if entry.value.is_some() {
tracing::warn!(?entry, "cache entry already has value");
} else {
cb(key);
entry.value = Some(value);
}
} else {
tracing::warn!(?key, "resolved value for nonexistent cache key");
tracing::warn!(?key, "resolved value for nonexistent cache entry");
}
}

/// Sets a value with the value provided from the getter function.
pub fn resolve(&mut self, key: &K, value: V) {
self.get_key_for_resolve(key, |key| key.value = Some(value));
}
}
9 changes: 8 additions & 1 deletion engine/packages/cache/src/inner.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{fmt::Debug, sync::Arc};

use tokio::sync::broadcast;

use super::*;
use crate::driver::{Driver, InMemoryDriver};

Expand All @@ -8,6 +10,7 @@ pub type Cache = Arc<CacheInner>;
/// Utility type used to hold information relating to caching.
pub struct CacheInner {
pub(crate) driver: Driver,
pub(crate) in_flight: scc::HashMap<RawCacheKey, broadcast::Sender<()>>,
pub(crate) ups: Option<universalpubsub::PubSub>,
}

Expand All @@ -33,7 +36,11 @@ impl CacheInner {
#[tracing::instrument(skip(ups))]
pub fn new_in_memory(max_capacity: u64, ups: Option<universalpubsub::PubSub>) -> Cache {
let driver = Driver::InMemory(InMemoryDriver::new(max_capacity));
Arc::new(CacheInner { driver, ups })
Arc::new(CacheInner {
driver,
in_flight: scc::HashMap::new(),
ups,
})
}
}

Expand Down
6 changes: 3 additions & 3 deletions engine/packages/cache/src/key.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, ops::Deref};
use std::{fmt::Debug, hash::Hash, ops::Deref};

/// A type that can be serialized in to a key that can be used in the cache.
pub trait CacheKey: Clone + Debug + PartialEq {
pub trait CacheKey: Clone + Debug + Eq + PartialEq + Hash {
fn cache_key(&self) -> String;
}

Expand Down Expand Up @@ -85,7 +85,7 @@ impl_to_string!(isize);
/// Unlike other types that implement `CacheKey` (which escape special characters like `:` and `\`),
/// `RawCacheKey` uses the provided string as-is. This is useful when you already have a properly
/// formatted cache key string or need to preserve the exact format without transformations.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[derive(Clone, Debug, Serialize, Deserialize, Hash, Eq, PartialEq)]
pub struct RawCacheKey(String);

impl CacheKey for RawCacheKey {
Expand Down
Loading
Loading