Skip to content

Commit bb815e8

Browse files
author
Francisco
committed
refactor: replace Entity SDK with direct service calls in MessageCache
- Removed dependency on `Entity` SDK for message operations. - Implemented lazy service accessors for improved modularity and reduced circular imports. - Updated cold-load paths to leverage `MessageService` and `NativeExecutionService` directly. - Enhanced error handling and logging during cache misses and cold-load failures. - Updated docstrings and comments for clarity.
1 parent 63c3c8d commit bb815e8

1 file changed

Lines changed: 106 additions & 46 deletions

File tree

src/api/entities_api/cache/message_cache.py

Lines changed: 106 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import os
55
from typing import Any, Dict, List, Union
66

7-
from projectdavid import Entity
87
from redis import Redis as SyncRedis
98

109
from src.api.entities_api.services.logging_service import LoggingUtility
@@ -22,22 +21,68 @@ class AsyncRedis:
2221

2322

2423
class MessageCache:
24+
"""
25+
Redis-backed message history cache.
26+
27+
SDK Entity client removed — all cold-load paths now go directly through
28+
MessageService (sync) or NativeExecutionService (async), eliminating the
29+
internal HTTP round-trip and the user-identity mismatch that caused 403s
30+
after ownership primitives were tightened.
31+
"""
32+
2533
def __init__(self, redis: Union[SyncRedis, "AsyncRedis"]):
2634
self.redis = redis
27-
self.client = Entity(
28-
base_url=os.getenv("ASSISTANTS_BASE_URL"),
29-
api_key=os.getenv("ADMIN_API_KEY"),
30-
)
35+
# No Entity / SDK client instantiated here.
36+
37+
# ------------------------------------------------------------------
38+
# Lazy service accessors (avoids circular imports at module load time)
39+
# ------------------------------------------------------------------
40+
41+
@property
42+
def _message_svc(self):
43+
"""
44+
Synchronous MessageService — used by the sync cold-load path.
45+
Instantiated once and cached on the instance.
46+
"""
47+
if not hasattr(self, "_message_svc_instance") or self._message_svc_instance is None:
48+
from src.api.entities_api.services.message_service import \
49+
MessageService
50+
51+
self._message_svc_instance = MessageService()
52+
return self._message_svc_instance
53+
54+
@property
55+
def _native_exec(self):
56+
"""
57+
NativeExecutionService — used by the async cold-load path.
58+
Instantiated once and cached on the instance.
59+
"""
60+
if not hasattr(self, "_native_exec_instance") or self._native_exec_instance is None:
61+
from src.api.entities_api.services.native_execution_service import \
62+
NativeExecutionService
63+
64+
self._native_exec_instance = NativeExecutionService()
65+
return self._native_exec_instance
66+
67+
# ------------------------------------------------------------------
68+
# Internal helpers
69+
# ------------------------------------------------------------------
3170

3271
def _cache_key(self, thread_id: str) -> str:
3372
return f"thread:{thread_id}:history"
3473

35-
# ──────────────────────────────────────────────────────────
36-
# Asynchronous Methods (Core)
37-
# ──────────────────────────────────────────────────────────
74+
# ------------------------------------------------------------------
75+
# Asynchronous Methods
76+
# ------------------------------------------------------------------
3877

3978
async def get_history(self, thread_id: str) -> List[Dict]:
40-
"""Retrieves history from Redis, falling back to DB if empty."""
79+
"""
80+
Retrieve history from Redis, falling back to DB on a cache miss.
81+
82+
Cold-load uses NativeExecutionService.get_formatted_messages which
83+
calls MessageService → DB directly with no ownership check needed
84+
(internal orchestration path).
85+
"""
4186
key = self._cache_key(thread_id)
4287

4388
if isinstance(self.redis, AsyncRedis):
@@ -48,18 +93,25 @@ async def get_history(self, thread_id: str) -> List[Dict]:
4893
if raw_list:
4994
return [json.loads(m) for m in raw_list]
5095

51-
LOG.debug(f"[CACHE] Miss for thread {thread_id}. Performing cold load.")
52-
full_hist = await asyncio.to_thread(
53-
self.client.messages.get_formatted_messages, thread_id, system_message=None
54-
)
96+
LOG.debug(f"[CACHE] Miss for thread {thread_id}. Performing async cold load.")
97+
98+
try:
99+
full_hist = await self._native_exec.get_formatted_messages(thread_id)
100+
except Exception as e:
101+
LOG.warning(
102+
"[CACHE] Async cold load failed for thread %s (%s). Returning empty history.",
103+
thread_id,
104+
e,
105+
)
106+
return []
55107

56108
if full_hist:
57109
await self.set_history(thread_id, full_hist)
58110

59-
return full_hist
111+
return full_hist or []
60112

61113
async def set_history(self, thread_id: str, messages: List[Dict]):
62-
"""Overwrite/Initialize the cache for a thread."""
114+
"""Overwrite / initialise the cache for a thread."""
63115
key = self._cache_key(thread_id)
64116
serialized = [json.dumps(m) for m in messages[-200:]]
65117

@@ -96,34 +148,44 @@ async def delete_history(self, thread_id: str):
96148
else:
97149
await asyncio.to_thread(self.redis.delete, key)
98150

99-
# ──────────────────────────────────────────────────────────
100-
# Synchronous Helpers (The Bridge)
101-
# ──────────────────────────────────────────────────────────
151+
# ------------------------------------------------------------------
152+
# Synchronous Methods (hot path for context building)
153+
# ------------------------------------------------------------------
154+
102155
def get_history_sync(self, thread_id: str) -> List[Dict]:
156+
"""
157+
Retrieve history from Redis, falling back to DB on a cache miss.
158+
159+
This is the hot path called by ContextMixin._set_up_context_window.
160+
Cold-load uses MessageService.get_formatted_messages directly —
161+
no SDK client, no HTTP hop, no user-identity mismatch.
162+
"""
163+
if not isinstance(self.redis, SyncRedis):
164+
return []
165+
103166
key = self._cache_key(thread_id)
104-
if isinstance(self.redis, SyncRedis):
105-
raw_list = self.redis.lrange(key, 0, -1)
106-
if raw_list:
107-
return [json.loads(m) for m in raw_list]
108-
# Cache miss — attempt cold load, but never block the stream
109-
try:
110-
full_hist = self.client.messages.get_formatted_messages(
111-
thread_id, system_message=None
112-
)
113-
if full_hist:
114-
self.set_history_sync(thread_id, full_hist)
115-
return full_hist or []
116-
except Exception as e:
117-
LOG.warning(
118-
"[CACHE-SYNC] Cold load failed for thread %s (%s). " "Returning empty history.",
119-
thread_id,
120-
e,
121-
)
122-
return []
123-
return []
167+
raw_list = self.redis.lrange(key, 0, -1)
168+
169+
if raw_list:
170+
return [json.loads(m) for m in raw_list]
171+
172+
LOG.debug(f"[CACHE-SYNC] Miss for thread {thread_id}. Performing sync cold load.")
173+
174+
try:
175+
full_hist = self._message_svc.get_formatted_messages(thread_id)
176+
if full_hist:
177+
self.set_history_sync(thread_id, full_hist)
178+
return full_hist or []
179+
except Exception as e:
180+
LOG.warning(
181+
"[CACHE-SYNC] Cold load failed for thread %s (%s). Returning empty history.",
182+
thread_id,
183+
e,
184+
)
185+
return []
124186

125187
def set_history_sync(self, thread_id: str, messages: List[Dict]):
126-
"""Synchronous wrapper to initialize the cache."""
188+
"""Synchronous wrapper to initialise the cache."""
127189
if isinstance(self.redis, SyncRedis):
128190
key = self._cache_key(thread_id)
129191
serialized = [json.dumps(m) for m in messages[-200:]]
@@ -152,15 +214,13 @@ def append_message_sync(self, thread_id: str, message: Dict):
152214
asyncio.run(self.append_message(thread_id, message))
153215

154216

155-
# ──────────────────────────────────────────────────────────
156-
# Standalone Factory Function (OUTSIDE THE CLASS)
157-
# ──────────────────────────────────────────────────────────
158-
159-
217+
# ------------------------------------------------------------------
218+
# Standalone factory (imported by mixins and services)
219+
# ------------------------------------------------------------------
160220
def get_sync_message_cache() -> MessageCache:
161221
"""
162-
Standalone factory to create a synchronous MessageCache instance.
163-
This is what your Mixins will import.
222+
Create a synchronous MessageCache instance backed by a SyncRedis client.
223+
This is what ContextMixin and ThreadService import.
164224
"""
165225
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
166226
client = SyncRedis.from_url(redis_url, decode_responses=True)

0 commit comments

Comments
 (0)