Skip to content

Commit ddfa969

Browse files
committed
fix(cache): clean up lib
1 parent 38080c6 commit ddfa969

File tree

8 files changed

+74
-383
lines changed

8 files changed

+74
-383
lines changed

engine/artifacts/config-schema.json

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/cache/src/driver.rs

Lines changed: 28 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,14 @@ use std::{
44
};
55

66
use moka::future::{Cache, CacheBuilder};
7-
use serde::{Serialize, de::DeserializeOwned};
87
use tracing::Instrument;
98

10-
use crate::{errors::Error, metrics};
9+
use crate::{RawCacheKey, errors::Error};
1110

1211
/// Type alias for cache values stored as bytes
1312
pub type CacheValue = Vec<u8>;
1413

1514
/// Enum wrapper for different cache driver implementations
16-
#[derive(Debug, Clone)]
1715
#[non_exhaustive]
1816
pub enum Driver {
1917
InMemory(InMemoryDriver),
@@ -22,84 +20,49 @@ pub enum Driver {
2220
impl Driver {
2321
/// Fetch multiple values from cache at once
2422
#[tracing::instrument(skip_all, fields(driver=%self))]
25-
pub async fn fetch_values<'a>(
23+
pub async fn get<'a>(
2624
&'a self,
2725
base_key: &'a str,
28-
keys: &[String],
26+
keys: Vec<RawCacheKey>,
2927
) -> Result<Vec<Option<CacheValue>>, Error> {
3028
match self {
31-
Driver::InMemory(d) => d.fetch_values(base_key, keys).await,
29+
Driver::InMemory(d) => d.get(base_key, keys).await,
3230
}
3331
}
3432

3533
/// Set multiple values in cache at once
3634
#[tracing::instrument(skip_all, fields(driver=%self))]
37-
pub async fn set_values<'a>(
35+
pub async fn set<'a>(
3836
&'a self,
3937
base_key: &'a str,
40-
keys_values: Vec<(String, CacheValue, i64)>,
38+
keys_values: Vec<(RawCacheKey, CacheValue, i64)>,
4139
) -> Result<(), Error> {
4240
match self {
43-
Driver::InMemory(d) => d.set_values(base_key, keys_values).await,
41+
Driver::InMemory(d) => d.set(base_key, keys_values).await,
4442
}
4543
}
4644

4745
/// Delete multiple keys from cache
4846
#[tracing::instrument(skip_all, fields(driver=%self))]
49-
pub async fn delete_keys<'a>(
47+
pub async fn delete<'a>(
5048
&'a self,
5149
base_key: &'a str,
52-
keys: Vec<String>,
50+
keys: Vec<RawCacheKey>,
5351
) -> Result<(), Error> {
5452
match self {
55-
Driver::InMemory(d) => d.delete_keys(base_key, keys).await,
53+
Driver::InMemory(d) => d.delete(base_key, keys).await,
5654
}
5755
}
5856

5957
/// Process a raw key into a driver-specific format
6058
///
6159
/// Different implementations use different key formats:
6260
/// - In-memory uses simpler keys
63-
pub fn process_key(&self, base_key: &str, key: &impl crate::CacheKey) -> String {
61+
pub fn process_key(&self, base_key: &str, key: &impl crate::CacheKey) -> RawCacheKey {
6462
match self {
6563
Driver::InMemory(d) => d.process_key(base_key, key),
6664
}
6765
}
68-
69-
/// Process a rate limit key into a driver-specific format
70-
pub fn process_rate_limit_key(
71-
&self,
72-
key: &impl crate::CacheKey,
73-
remote_address: impl AsRef<str>,
74-
bucket: i64,
75-
bucket_duration_ms: i64,
76-
) -> String {
77-
match self {
78-
Driver::InMemory(d) => {
79-
d.process_rate_limit_key(key, remote_address, bucket, bucket_duration_ms)
80-
}
81-
}
82-
}
83-
84-
/// Increment a rate limit counter and return the new count
85-
#[tracing::instrument(skip_all, fields(driver=%self))]
86-
pub async fn rate_limit_increment<'a>(
87-
&'a self,
88-
key: &'a str,
89-
ttl_ms: i64,
90-
) -> Result<i64, Error> {
91-
match self {
92-
Driver::InMemory(d) => d.rate_limit_increment(key, ttl_ms).await,
93-
}
94-
}
95-
96-
pub fn encode_value<T: Serialize>(&self, value: &T) -> Result<CacheValue, Error> {
97-
serde_json::to_vec(value).map_err(Error::SerdeEncode)
98-
}
99-
100-
pub fn decode_value<T: DeserializeOwned>(&self, bytes: &[u8]) -> Result<T, Error> {
101-
serde_json::from_slice(bytes).map_err(Error::SerdeDecode)
102-
}
10366
}
10467

10568
impl std::fmt::Display for Driver {
@@ -120,7 +83,7 @@ struct ExpiringValue {
12083
}
12184

12285
/// Cache expiry implementation for Moka
123-
#[derive(Clone, Debug)]
86+
#[derive(Debug)]
12487
struct ValueExpiry;
12588

12689
impl moka::Expiry<String, ExpiringValue> for ValueExpiry {
@@ -170,55 +133,37 @@ impl moka::Expiry<String, ExpiringValue> for ValueExpiry {
170133
}
171134

172135
/// In-memory cache driver implementation using the moka crate
173-
#[derive(Clone)]
174136
pub struct InMemoryDriver {
175-
service_name: String,
176137
cache: Cache<String, ExpiringValue>,
177-
/// In-memory rate limiting store - maps keys to hit counts with expiration
178-
rate_limits: Cache<String, ExpiringValue>,
179138
}
180139

181140
impl Debug for InMemoryDriver {
182141
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183-
f.debug_struct("InMemoryDriver")
184-
.field("service_name", &self.service_name)
185-
.finish()
142+
f.debug_struct("InMemoryDriver").finish()
186143
}
187144
}
188145

189146
impl InMemoryDriver {
190-
pub fn new(service_name: String, max_capacity: u64) -> Self {
147+
pub fn new(max_capacity: u64) -> Self {
191148
// Create a cache with ValueExpiry implementation for custom expiration times
192149
let cache = CacheBuilder::new(max_capacity)
193150
.expire_after(ValueExpiry)
194151
.build();
195152

196-
// Create a separate cache for rate limiting with the same expiry mechanism
197-
let rate_limits = CacheBuilder::new(max_capacity)
198-
.expire_after(ValueExpiry)
199-
.build();
200-
201-
Self {
202-
service_name,
203-
cache,
204-
rate_limits,
205-
}
153+
Self { cache }
206154
}
207155

208-
pub async fn fetch_values<'a>(
156+
pub async fn get<'a>(
209157
&'a self,
210158
_base_key: &'a str,
211-
keys: &[String],
159+
keys: Vec<RawCacheKey>,
212160
) -> Result<Vec<Option<CacheValue>>, Error> {
213-
let keys = keys.to_vec();
214-
let cache = self.cache.clone();
215-
216161
let mut result = Vec::with_capacity(keys.len());
217162

218163
// Async block for metrics
219164
async {
220165
for key in keys {
221-
result.push(cache.get(&key).await.map(|x| x.value.clone()));
166+
result.push(self.cache.get(&*key).await.map(|x| x.value.clone()));
222167
}
223168
}
224169
.instrument(tracing::info_span!("get"))
@@ -233,13 +178,11 @@ impl InMemoryDriver {
233178
Ok(result)
234179
}
235180

236-
pub async fn set_values<'a>(
181+
pub async fn set<'a>(
237182
&'a self,
238183
_base_key: &'a str,
239-
keys_values: Vec<(String, CacheValue, i64)>,
184+
keys_values: Vec<(RawCacheKey, CacheValue, i64)>,
240185
) -> Result<(), Error> {
241-
let cache = self.cache.clone();
242-
243186
// Async block for metrics
244187
async {
245188
for (key, value, expire_at) in keys_values {
@@ -250,7 +193,7 @@ impl InMemoryDriver {
250193
};
251194

252195
// Store in cache - expiry will be handled by ValueExpiry
253-
cache.insert(key, entry).await;
196+
self.cache.insert(key.into(), entry).await;
254197
}
255198
}
256199
.instrument(tracing::info_span!("set"))
@@ -260,87 +203,26 @@ impl InMemoryDriver {
260203
Ok(())
261204
}
262205

263-
pub async fn delete_keys<'a>(
206+
pub async fn delete<'a>(
264207
&'a self,
265-
base_key: &'a str,
266-
keys: Vec<String>,
208+
_base_key: &'a str,
209+
keys: Vec<RawCacheKey>,
267210
) -> Result<(), Error> {
268-
let cache = self.cache.clone();
269-
270-
metrics::CACHE_PURGE_REQUEST_TOTAL
271-
.with_label_values(&[base_key])
272-
.inc();
273-
metrics::CACHE_PURGE_VALUE_TOTAL
274-
.with_label_values(&[base_key])
275-
.inc_by(keys.len() as u64);
276-
277211
// Async block for metrics
278212
async {
279213
for key in keys {
280214
// Use remove instead of invalidate to ensure it's actually removed
281-
cache.remove(&key).await;
215+
self.cache.remove(&*key).await;
282216
}
283217
}
284-
.instrument(tracing::info_span!("remove"))
218+
.instrument(tracing::info_span!("delete"))
285219
.await;
286220

287221
tracing::trace!("successfully deleted keys from in-memory cache");
288222
Ok(())
289223
}
290224

291-
pub fn process_key(&self, base_key: &str, key: &impl crate::CacheKey) -> String {
292-
format!("{}:{}", base_key, key.cache_key())
293-
}
294-
295-
pub fn process_rate_limit_key(
296-
&self,
297-
key: &impl crate::CacheKey,
298-
remote_address: impl AsRef<str>,
299-
bucket: i64,
300-
bucket_duration_ms: i64,
301-
) -> String {
302-
format!(
303-
"rate_limit:{}:{}:{}:{}",
304-
key.cache_key(),
305-
remote_address.as_ref(),
306-
bucket_duration_ms,
307-
bucket,
308-
)
309-
}
310-
311-
/// Increment a rate limit counter for in-memory storage
312-
pub async fn rate_limit_increment<'a>(
313-
&'a self,
314-
key: &'a str,
315-
ttl_ms: i64,
316-
) -> Result<i64, Error> {
317-
let rate_limits = self.rate_limits.clone();
318-
319-
// Get current value or default to 0
320-
let current_value = match rate_limits.get(key).await {
321-
Some(value) => {
322-
// Try to decode the value as an integer
323-
match serde_json::from_slice::<i64>(&value.value).map_err(Error::SerdeDecode) {
324-
Ok(count) => count,
325-
Err(_) => 0, // If we can't decode, reset to 0
326-
}
327-
}
328-
None => 0,
329-
};
330-
331-
// Increment the value
332-
let new_value = current_value + 1;
333-
let encoded = serde_json::to_vec(&new_value).map_err(Error::SerdeEncode)?;
334-
335-
// Store with expiration
336-
let entry = ExpiringValue {
337-
value: encoded,
338-
expiry_time: rivet_util::timestamp::now() + ttl_ms,
339-
};
340-
341-
// Update the rate limit cache
342-
rate_limits.insert(key.to_string(), entry).await;
343-
344-
Ok(new_value)
225+
pub fn process_key(&self, base_key: &str, key: &impl crate::CacheKey) -> RawCacheKey {
226+
RawCacheKey::from(format!("{}:{}", base_key, key.cache_key()))
345227
}
346228
}

engine/packages/cache/src/inner.rs

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,13 @@ pub type Cache = Arc<CacheInner>;
77

88
/// Utility type used to hold information relating to caching.
99
pub struct CacheInner {
10-
service_name: String,
1110
pub(crate) driver: Driver,
1211
pub(crate) ups: Option<universalpubsub::PubSub>,
1312
}
1413

1514
impl Debug for CacheInner {
1615
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
17-
f.debug_struct("CacheInner")
18-
.field("service_name", &self.service_name)
19-
.finish()
16+
f.debug_struct("CacheInner").finish()
2017
}
2118
}
2219

@@ -26,29 +23,17 @@ impl CacheInner {
2623
config: &rivet_config::Config,
2724
pools: rivet_pools::Pools,
2825
) -> Result<Cache, Error> {
29-
let service_name = rivet_env::service_name();
3026
let ups = pools.ups().ok();
3127

3228
match &config.cache().driver {
33-
rivet_config::config::CacheDriver::Redis => todo!(),
34-
rivet_config::config::CacheDriver::InMemory => {
35-
Ok(Self::new_in_memory(service_name.to_string(), 1000, ups))
36-
}
29+
rivet_config::config::CacheDriver::InMemory => Ok(Self::new_in_memory(10000, ups)),
3730
}
3831
}
3932

4033
#[tracing::instrument(skip(ups))]
41-
pub fn new_in_memory(
42-
service_name: String,
43-
max_capacity: u64,
44-
ups: Option<universalpubsub::PubSub>,
45-
) -> Cache {
46-
let driver = Driver::InMemory(InMemoryDriver::new(service_name.clone(), max_capacity));
47-
Arc::new(CacheInner {
48-
service_name,
49-
driver,
50-
ups,
51-
})
34+
pub fn new_in_memory(max_capacity: u64, ups: Option<universalpubsub::PubSub>) -> Cache {
35+
let driver = Driver::InMemory(InMemoryDriver::new(max_capacity));
36+
Arc::new(CacheInner { driver, ups })
5237
}
5338
}
5439

engine/packages/cache/src/key.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use serde::{Deserialize, Serialize};
2-
use std::fmt::Debug;
2+
use std::{fmt::Debug, ops::Deref};
33

44
/// A type that can be serialized in to a key that can be used in the cache.
55
pub trait CacheKey: Clone + Debug + PartialEq {
@@ -99,3 +99,17 @@ impl From<String> for RawCacheKey {
9999
RawCacheKey(value)
100100
}
101101
}
102+
103+
impl From<RawCacheKey> for String {
104+
fn from(value: RawCacheKey) -> Self {
105+
value.0
106+
}
107+
}
108+
109+
impl Deref for RawCacheKey {
110+
type Target = String;
111+
112+
fn deref(&self) -> &Self::Target {
113+
&self.0
114+
}
115+
}

0 commit comments

Comments
 (0)