-
Notifications
You must be signed in to change notification settings - Fork 3
feat: add configurable on-cache-hit hooks to gcache #121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d399a3f
8202d88
68816cb
95780c0
b3a1132
0ebf679
4ad1ad3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,29 @@ | ||
| from gcache.config import CacheConfigProvider, CacheLayer, GCacheConfig, GCacheKey, GCacheKeyConfig, RedisConfig | ||
| from gcache.config import ( | ||
| CacheCallContext, | ||
| CacheConfigProvider, | ||
| CacheHitDecision, | ||
| CacheHitHook, | ||
| CacheLayer, | ||
| EvictAndFallback, | ||
| GCacheConfig, | ||
| GCacheKey, | ||
| GCacheKeyConfig, | ||
| RedisConfig, | ||
| ReturnCached, | ||
| ) | ||
| from gcache.gcache import GCache | ||
|
|
||
| __all__ = ["CacheConfigProvider", "CacheLayer", "GCache", "GCacheConfig", "GCacheKey", "GCacheKeyConfig", "RedisConfig"] | ||
| __all__ = [ | ||
| "CacheCallContext", | ||
| "CacheConfigProvider", | ||
| "CacheHitDecision", | ||
| "CacheHitHook", | ||
| "CacheLayer", | ||
| "EvictAndFallback", | ||
| "GCache", | ||
| "GCacheConfig", | ||
| "GCacheKey", | ||
| "GCacheKeyConfig", | ||
| "RedisConfig", | ||
| "ReturnCached", | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| import inspect | ||
| from dataclasses import dataclass | ||
| from typing import Any | ||
|
|
||
| from gcache._internal.metrics import GCacheMetrics | ||
| from gcache._internal.state import _GLOBAL_GCACHE_STATE | ||
| from gcache.config import CacheCallContext, CacheHitHook, CacheLayer, EvictAndFallback, GCacheKey, ReturnCached | ||
|
|
||
|
|
||
| @dataclass(frozen=True, slots=True) | ||
| class BypassCurrentLayer: | ||
| """ | ||
| Internal signal to ignore this layer for the current request only. | ||
|
|
||
| This is used when the hook machinery is unreliable rather than the cached | ||
| value itself being known-bad. Two cases currently map here: | ||
| - the hook raised an exception | ||
| - the hook returned an unsupported decision type | ||
|
|
||
| In those situations we want fail-open behavior: | ||
| - do not fail the caller request | ||
| - do not evict the cached value, because the problem may be in the hook | ||
| - continue through the normal fallback chain as if this layer had no usable hit | ||
| """ | ||
|
|
||
|
|
||
| async def run_cache_hit_hook( | ||
| *, | ||
| key: GCacheKey, | ||
| layer: CacheLayer, | ||
| value: Any, | ||
| on_cache_hit: CacheHitHook | None, | ||
| ) -> ReturnCached | EvictAndFallback | BypassCurrentLayer: | ||
| """ | ||
| Execute the optional cache-hit hook and normalize the result. | ||
|
|
||
| Returning `BypassCurrentLayer` is reserved for hook execution/contract | ||
| failures. It lets cache layers skip a hit without deleting it, which keeps | ||
| hook bugs from turning into request failures or unnecessary evictions. | ||
| """ | ||
| if on_cache_hit is None: | ||
| return ReturnCached() | ||
|
|
||
| context = CacheCallContext(key=key, layer=layer) | ||
|
|
||
| try: | ||
| decision = on_cache_hit(context, value) | ||
| if inspect.isawaitable(decision): | ||
| decision = await decision | ||
| except Exception: | ||
| _GLOBAL_GCACHE_STATE.logger.error( | ||
| "Error executing cache hit hook", | ||
| extra={"use_case": key.use_case, "key_type": key.key_type, "layer": layer.name}, | ||
| exc_info=True, | ||
| ) | ||
| GCacheMetrics.HIT_HOOK_ERROR_COUNTER.labels(key.use_case, key.key_type, layer.name).inc() | ||
| return BypassCurrentLayer() | ||
|
|
||
| if isinstance(decision, ReturnCached): | ||
| GCacheMetrics.HIT_HOOK_ACTION_COUNTER.labels( | ||
| key.use_case, | ||
| key.key_type, | ||
| layer.name, | ||
| "return", | ||
| "none", | ||
| ).inc() | ||
| return decision | ||
|
|
||
| if isinstance(decision, EvictAndFallback): | ||
| GCacheMetrics.HIT_HOOK_ACTION_COUNTER.labels( | ||
| key.use_case, | ||
| key.key_type, | ||
| layer.name, | ||
| "evict", | ||
| decision.reason or "none", | ||
| ).inc() | ||
| return decision | ||
|
|
||
| _GLOBAL_GCACHE_STATE.logger.error( | ||
| "Cache hit hook returned invalid decision type", | ||
| extra={ | ||
| "use_case": key.use_case, | ||
| "key_type": key.key_type, | ||
| "layer": layer.name, | ||
| "decision_type": type(decision).__name__, | ||
| }, | ||
| ) | ||
| GCacheMetrics.HIT_HOOK_ERROR_COUNTER.labels(key.use_case, key.key_type, layer.name).inc() | ||
| return BypassCurrentLayer() | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -3,10 +3,11 @@ | |||||
|
|
||||||
| from cachetools import TTLCache | ||||||
|
|
||||||
| from gcache._internal.cache_hit import BypassCurrentLayer, run_cache_hit_hook | ||||||
| from gcache._internal.cache_interface import CacheInterface, Fallback | ||||||
| from gcache._internal.constants import LOCAL_CACHE_MAX_SIZE | ||||||
| from gcache._internal.state import _GLOBAL_GCACHE_STATE | ||||||
| from gcache.config import CacheConfigProvider, CacheLayer, GCacheKey | ||||||
| from gcache.config import CacheConfigProvider, CacheHitHook, CacheLayer, EvictAndFallback, GCacheKey, ReturnCached | ||||||
| from gcache.exceptions import MissingKeyConfig | ||||||
|
|
||||||
|
|
||||||
|
|
@@ -44,14 +45,41 @@ async def _get_ttl_cache(self, key: GCacheKey) -> TTLCache: | |||||
|
|
||||||
| return cache | ||||||
|
|
||||||
| async def get(self, key: GCacheKey, fallback: Fallback) -> Any: | ||||||
| async def _exec_fallback(self, key: GCacheKey, fallback: Fallback) -> Any: | ||||||
| value = await fallback() | ||||||
| await self.put(key, value) | ||||||
| return value | ||||||
|
|
||||||
| async def get( | ||||||
| self, | ||||||
| key: GCacheKey, | ||||||
| fallback: Fallback, | ||||||
| *, | ||||||
| on_cache_hit: CacheHitHook | None = None, | ||||||
| ) -> Any: | ||||||
| _GLOBAL_GCACHE_STATE.logger.debug("Calling local cache") | ||||||
| cache = await self._get_ttl_cache(key) | ||||||
|
|
||||||
| if key not in cache: | ||||||
| await self.put(key, await fallback()) | ||||||
|
|
||||||
| return cache[key] | ||||||
| return await self._exec_fallback(key, fallback) | ||||||
|
|
||||||
| cached_value = cache[key] | ||||||
|
|
||||||
| decision = await run_cache_hit_hook( | ||||||
| key=key, | ||||||
| layer=self.layer(), | ||||||
| value=cached_value, | ||||||
| on_cache_hit=on_cache_hit, | ||||||
| ) | ||||||
| if isinstance(decision, ReturnCached): | ||||||
| return cached_value | ||||||
| if isinstance(decision, EvictAndFallback): | ||||||
| cache.pop(key, None) | ||||||
| return await self._exec_fallback(key, fallback) | ||||||
| if isinstance(decision, BypassCurrentLayer): | ||||||
| return await fallback() | ||||||
|
|
||||||
| return cached_value | ||||||
|
||||||
| return cached_value | |
| raise AssertionError(f"Unexpected cache hit decision: {decision!r}") |
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -9,11 +9,20 @@ | |||||||
|
|
||||||||
| from redis.asyncio import Redis, RedisCluster | ||||||||
|
|
||||||||
| from gcache._internal.cache_hit import BypassCurrentLayer, run_cache_hit_hook | ||||||||
| from gcache._internal.cache_interface import CacheInterface, Fallback | ||||||||
| from gcache._internal.constants import ASYNC_PICKLE_THRESHOLD_BYTES, WATERMARK_TTL_SECONDS | ||||||||
| from gcache._internal.metrics import GCacheMetrics | ||||||||
| from gcache._internal.state import _GLOBAL_GCACHE_STATE | ||||||||
| from gcache.config import CacheConfigProvider, CacheLayer, GCacheKey, RedisConfig | ||||||||
| from gcache.config import ( | ||||||||
| CacheConfigProvider, | ||||||||
| CacheHitHook, | ||||||||
| CacheLayer, | ||||||||
| EvictAndFallback, | ||||||||
| GCacheKey, | ||||||||
| RedisConfig, | ||||||||
| ReturnCached, | ||||||||
| ) | ||||||||
| from gcache.exceptions import MissingKeyConfig | ||||||||
|
|
||||||||
|
|
||||||||
|
|
@@ -128,7 +137,13 @@ async def _async_pickle_loads(data: bytes) -> Any: | |||||||
| loop = asyncio.get_event_loop() | ||||||||
| return await loop.run_in_executor(RedisCache._executor, pickle.loads, data) | ||||||||
|
|
||||||||
| async def get(self, key: GCacheKey, fallback: Fallback) -> Any: | ||||||||
| async def get( | ||||||||
| self, | ||||||||
| key: GCacheKey, | ||||||||
| fallback: Fallback, | ||||||||
| *, | ||||||||
| on_cache_hit: CacheHitHook | None = None, | ||||||||
| ) -> Any: | ||||||||
| _GLOBAL_GCACHE_STATE.logger.debug("Calling Redis Cache") | ||||||||
|
|
||||||||
| watermark_ms = None | ||||||||
|
|
@@ -142,29 +157,44 @@ async def get(self, key: GCacheKey, fallback: Fallback) -> Any: | |||||||
| val_pickle = await self.client.get(key.urn) | ||||||||
| if val_pickle is not None: | ||||||||
| start_sec = time.monotonic() | ||||||||
| serialization_timer = GCacheMetrics.SERIALIZATION_TIMER.labels( | ||||||||
| key.use_case, key.key_type, self.layer().name, "load" | ||||||||
| ) | ||||||||
|
|
||||||||
| deserialized_value: RedisValue = ( | ||||||||
| pickle.loads(val_pickle) | ||||||||
| if len(val_pickle) < ASYNC_PICKLE_THRESHOLD_BYTES | ||||||||
| else await RedisCache._async_pickle_loads(val_pickle) | ||||||||
| ) | ||||||||
|
|
||||||||
| # Ignore invalidated remote entries before payload deserialization or hook execution. | ||||||||
| if watermark_ms is not None: | ||||||||
| watermark_ms = int(watermark_ms) | ||||||||
| if watermark_ms >= deserialized_value.created_at_ms: | ||||||||
| serialization_timer.observe(time.monotonic() - start_sec) | ||||||||
| return await self._exec_fallback(key, watermark_ms, fallback) | ||||||||
|
|
||||||||
| # Load payload using custom serializer if present. | ||||||||
| payload = deserialized_value.payload | ||||||||
| if key.serializer is not None: | ||||||||
| payload = await key.serializer.load(payload) | ||||||||
|
|
||||||||
| ( | ||||||||
| GCacheMetrics.SERIALIZATION_TIMER.labels(key.use_case, key.key_type, self.layer().name, "load").observe( | ||||||||
| time.monotonic() - start_sec | ||||||||
| ) | ||||||||
| serialization_timer.observe(time.monotonic() - start_sec) | ||||||||
|
|
||||||||
| decision = await run_cache_hit_hook( | ||||||||
| key=key, | ||||||||
| layer=self.layer(), | ||||||||
| value=payload, | ||||||||
| on_cache_hit=on_cache_hit, | ||||||||
| ) | ||||||||
| if isinstance(decision, EvictAndFallback): | ||||||||
| await self.delete(key) | ||||||||
| return await self._exec_fallback(key, watermark_ms, fallback) | ||||||||
| if isinstance(decision, BypassCurrentLayer): | ||||||||
| return await fallback() | ||||||||
| if not isinstance(decision, ReturnCached): | ||||||||
| return await fallback() | ||||||||
|
Comment on lines
+195
to
+196
|
||||||||
| if not isinstance(decision, ReturnCached): | |
| return await fallback() | |
| assert isinstance(decision, ReturnCached) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EvictAndFallback.reasonis used directly as a Prometheus label value (reason=decision.reason). Since this is a public API and callers can supply arbitrary strings, it creates a real risk of unbounded label cardinality (e.g., if someone includes IDs/details in the reason), which can destabilize metrics storage.Consider constraining
reasonto a small enum-like set (or validating/sanitizing it before labeling, e.g., mapping unknown/long values to a fixed bucket) to keep cardinality bounded.