@@ -4,16 +4,14 @@ use std::{
44} ;
55
66use moka:: future:: { Cache , CacheBuilder } ;
7- use serde:: { Serialize , de:: DeserializeOwned } ;
87use tracing:: Instrument ;
98
10- use crate :: { errors:: Error , metrics } ;
9+ use crate :: { RawCacheKey , errors:: Error } ;
1110
1211/// Type alias for cache values stored as bytes
1312pub type CacheValue = Vec < u8 > ;
1413
1514/// Enum wrapper for different cache driver implementations
16- #[ derive( Debug , Clone ) ]
1715#[ non_exhaustive]
1816pub enum Driver {
1917 InMemory ( InMemoryDriver ) ,
@@ -22,84 +20,49 @@ pub enum Driver {
2220impl Driver {
2321 /// Fetch multiple values from cache at once
2422 #[ tracing:: instrument( skip_all, fields( driver=%self ) ) ]
25- pub async fn fetch_values < ' a > (
23+ pub async fn get < ' a > (
2624 & ' a self ,
2725 base_key : & ' a str ,
28- keys : & [ String ] ,
26+ keys : Vec < RawCacheKey > ,
2927 ) -> Result < Vec < Option < CacheValue > > , Error > {
3028 match self {
31- Driver :: InMemory ( d) => d. fetch_values ( base_key, keys) . await ,
29+ Driver :: InMemory ( d) => d. get ( base_key, keys) . await ,
3230 }
3331 }
3432
3533 /// Set multiple values in cache at once
3634 #[ tracing:: instrument( skip_all, fields( driver=%self ) ) ]
37- pub async fn set_values < ' a > (
35+ pub async fn set < ' a > (
3836 & ' a self ,
3937 base_key : & ' a str ,
40- keys_values : Vec < ( String , CacheValue , i64 ) > ,
38+ keys_values : Vec < ( RawCacheKey , CacheValue , i64 ) > ,
4139 ) -> Result < ( ) , Error > {
4240 match self {
43- Driver :: InMemory ( d) => d. set_values ( base_key, keys_values) . await ,
41+ Driver :: InMemory ( d) => d. set ( base_key, keys_values) . await ,
4442 }
4543 }
4644
4745 /// Delete multiple keys from cache
4846 #[ tracing:: instrument( skip_all, fields( driver=%self ) ) ]
49- pub async fn delete_keys < ' a > (
47+ pub async fn delete < ' a > (
5048 & ' a self ,
5149 base_key : & ' a str ,
52- keys : Vec < String > ,
50+ keys : Vec < RawCacheKey > ,
5351 ) -> Result < ( ) , Error > {
5452 match self {
55- Driver :: InMemory ( d) => d. delete_keys ( base_key, keys) . await ,
53+ Driver :: InMemory ( d) => d. delete ( base_key, keys) . await ,
5654 }
5755 }
5856
5957 /// Process a raw key into a driver-specific format
6058 ///
6159 /// Different implementations use different key formats:
6260 /// - In-memory uses simpler keys
63- pub fn process_key ( & self , base_key : & str , key : & impl crate :: CacheKey ) -> String {
61+ pub fn process_key ( & self , base_key : & str , key : & impl crate :: CacheKey ) -> RawCacheKey {
6462 match self {
6563 Driver :: InMemory ( d) => d. process_key ( base_key, key) ,
6664 }
6765 }
68-
69- /// Process a rate limit key into a driver-specific format
70- pub fn process_rate_limit_key (
71- & self ,
72- key : & impl crate :: CacheKey ,
73- remote_address : impl AsRef < str > ,
74- bucket : i64 ,
75- bucket_duration_ms : i64 ,
76- ) -> String {
77- match self {
78- Driver :: InMemory ( d) => {
79- d. process_rate_limit_key ( key, remote_address, bucket, bucket_duration_ms)
80- }
81- }
82- }
83-
84- /// Increment a rate limit counter and return the new count
85- #[ tracing:: instrument( skip_all, fields( driver=%self ) ) ]
86- pub async fn rate_limit_increment < ' a > (
87- & ' a self ,
88- key : & ' a str ,
89- ttl_ms : i64 ,
90- ) -> Result < i64 , Error > {
91- match self {
92- Driver :: InMemory ( d) => d. rate_limit_increment ( key, ttl_ms) . await ,
93- }
94- }
95-
96- pub fn encode_value < T : Serialize > ( & self , value : & T ) -> Result < CacheValue , Error > {
97- serde_json:: to_vec ( value) . map_err ( Error :: SerdeEncode )
98- }
99-
100- pub fn decode_value < T : DeserializeOwned > ( & self , bytes : & [ u8 ] ) -> Result < T , Error > {
101- serde_json:: from_slice ( bytes) . map_err ( Error :: SerdeDecode )
102- }
10366}
10467
10568impl std:: fmt:: Display for Driver {
@@ -120,7 +83,7 @@ struct ExpiringValue {
12083}
12184
12285/// Cache expiry implementation for Moka
123- #[ derive( Clone , Debug ) ]
86+ #[ derive( Debug ) ]
12487struct ValueExpiry ;
12588
12689impl moka:: Expiry < String , ExpiringValue > for ValueExpiry {
@@ -170,55 +133,37 @@ impl moka::Expiry<String, ExpiringValue> for ValueExpiry {
170133}
171134
172135/// In-memory cache driver implementation using the moka crate
173- #[ derive( Clone ) ]
174136pub struct InMemoryDriver {
175- service_name : String ,
176137 cache : Cache < String , ExpiringValue > ,
177- /// In-memory rate limiting store - maps keys to hit counts with expiration
178- rate_limits : Cache < String , ExpiringValue > ,
179138}
180139
181140impl Debug for InMemoryDriver {
182141 fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
183- f. debug_struct ( "InMemoryDriver" )
184- . field ( "service_name" , & self . service_name )
185- . finish ( )
142+ f. debug_struct ( "InMemoryDriver" ) . finish ( )
186143 }
187144}
188145
189146impl InMemoryDriver {
190- pub fn new ( service_name : String , max_capacity : u64 ) -> Self {
147+ pub fn new ( max_capacity : u64 ) -> Self {
191148 // Create a cache with ValueExpiry implementation for custom expiration times
192149 let cache = CacheBuilder :: new ( max_capacity)
193150 . expire_after ( ValueExpiry )
194151 . build ( ) ;
195152
196- // Create a separate cache for rate limiting with the same expiry mechanism
197- let rate_limits = CacheBuilder :: new ( max_capacity)
198- . expire_after ( ValueExpiry )
199- . build ( ) ;
200-
201- Self {
202- service_name,
203- cache,
204- rate_limits,
205- }
153+ Self { cache }
206154 }
207155
208- pub async fn fetch_values < ' a > (
156+ pub async fn get < ' a > (
209157 & ' a self ,
210158 _base_key : & ' a str ,
211- keys : & [ String ] ,
159+ keys : Vec < RawCacheKey > ,
212160 ) -> Result < Vec < Option < CacheValue > > , Error > {
213- let keys = keys. to_vec ( ) ;
214- let cache = self . cache . clone ( ) ;
215-
216161 let mut result = Vec :: with_capacity ( keys. len ( ) ) ;
217162
218163 // Async block for metrics
219164 async {
220165 for key in keys {
221- result. push ( cache. get ( & key) . await . map ( |x| x. value . clone ( ) ) ) ;
166+ result. push ( self . cache . get ( & * key) . await . map ( |x| x. value . clone ( ) ) ) ;
222167 }
223168 }
224169 . instrument ( tracing:: info_span!( "get" ) )
@@ -233,13 +178,11 @@ impl InMemoryDriver {
233178 Ok ( result)
234179 }
235180
236- pub async fn set_values < ' a > (
181+ pub async fn set < ' a > (
237182 & ' a self ,
238183 _base_key : & ' a str ,
239- keys_values : Vec < ( String , CacheValue , i64 ) > ,
184+ keys_values : Vec < ( RawCacheKey , CacheValue , i64 ) > ,
240185 ) -> Result < ( ) , Error > {
241- let cache = self . cache . clone ( ) ;
242-
243186 // Async block for metrics
244187 async {
245188 for ( key, value, expire_at) in keys_values {
@@ -250,7 +193,7 @@ impl InMemoryDriver {
250193 } ;
251194
252195 // Store in cache - expiry will be handled by ValueExpiry
253- cache. insert ( key, entry) . await ;
196+ self . cache . insert ( key. into ( ) , entry) . await ;
254197 }
255198 }
256199 . instrument ( tracing:: info_span!( "set" ) )
@@ -260,87 +203,26 @@ impl InMemoryDriver {
260203 Ok ( ( ) )
261204 }
262205
263- pub async fn delete_keys < ' a > (
206+ pub async fn delete < ' a > (
264207 & ' a self ,
265- base_key : & ' a str ,
266- keys : Vec < String > ,
208+ _base_key : & ' a str ,
209+ keys : Vec < RawCacheKey > ,
267210 ) -> Result < ( ) , Error > {
268- let cache = self . cache . clone ( ) ;
269-
270- metrics:: CACHE_PURGE_REQUEST_TOTAL
271- . with_label_values ( & [ base_key] )
272- . inc ( ) ;
273- metrics:: CACHE_PURGE_VALUE_TOTAL
274- . with_label_values ( & [ base_key] )
275- . inc_by ( keys. len ( ) as u64 ) ;
276-
277211 // Async block for metrics
278212 async {
279213 for key in keys {
280214 // Use remove instead of invalidate to ensure it's actually removed
281- cache. remove ( & key) . await ;
215+ self . cache . remove ( & * key) . await ;
282216 }
283217 }
284- . instrument ( tracing:: info_span!( "remove " ) )
218+ . instrument ( tracing:: info_span!( "delete " ) )
285219 . await ;
286220
287221 tracing:: trace!( "successfully deleted keys from in-memory cache" ) ;
288222 Ok ( ( ) )
289223 }
290224
291- pub fn process_key ( & self , base_key : & str , key : & impl crate :: CacheKey ) -> String {
292- format ! ( "{}:{}" , base_key, key. cache_key( ) )
293- }
294-
295- pub fn process_rate_limit_key (
296- & self ,
297- key : & impl crate :: CacheKey ,
298- remote_address : impl AsRef < str > ,
299- bucket : i64 ,
300- bucket_duration_ms : i64 ,
301- ) -> String {
302- format ! (
303- "rate_limit:{}:{}:{}:{}" ,
304- key. cache_key( ) ,
305- remote_address. as_ref( ) ,
306- bucket_duration_ms,
307- bucket,
308- )
309- }
310-
311- /// Increment a rate limit counter for in-memory storage
312- pub async fn rate_limit_increment < ' a > (
313- & ' a self ,
314- key : & ' a str ,
315- ttl_ms : i64 ,
316- ) -> Result < i64 , Error > {
317- let rate_limits = self . rate_limits . clone ( ) ;
318-
319- // Get current value or default to 0
320- let current_value = match rate_limits. get ( key) . await {
321- Some ( value) => {
322- // Try to decode the value as an integer
323- match serde_json:: from_slice :: < i64 > ( & value. value ) . map_err ( Error :: SerdeDecode ) {
324- Ok ( count) => count,
325- Err ( _) => 0 , // If we can't decode, reset to 0
326- }
327- }
328- None => 0 ,
329- } ;
330-
331- // Increment the value
332- let new_value = current_value + 1 ;
333- let encoded = serde_json:: to_vec ( & new_value) . map_err ( Error :: SerdeEncode ) ?;
334-
335- // Store with expiration
336- let entry = ExpiringValue {
337- value : encoded,
338- expiry_time : rivet_util:: timestamp:: now ( ) + ttl_ms,
339- } ;
340-
341- // Update the rate limit cache
342- rate_limits. insert ( key. to_string ( ) , entry) . await ;
343-
344- Ok ( new_value)
225+ pub fn process_key ( & self , base_key : & str , key : & impl crate :: CacheKey ) -> RawCacheKey {
226+ RawCacheKey :: from ( format ! ( "{}:{}" , base_key, key. cache_key( ) ) )
345227 }
346228}
0 commit comments