Skip to content

Latest commit

 

History

History
676 lines (520 loc) · 21.8 KB

File metadata and controls

676 lines (520 loc) · 21.8 KB

Memory Component

Path: lib/src/holiday_peak_lib/agents/memory/
Design Pattern: Tiered memory with explicit injection
Purpose: Three-tier memory architecture (hot/warm/cold) injected into agents via AgentBuilder

Overview

Provides a unified memory abstraction for agents to store and retrieve conversation context, user profiles, and operational state across three performance tiers:

  • Hot (Redis): <50ms latency for real-time session state and frequently accessed data
  • Warm (Cosmos DB): 100-500ms latency for user profiles, search history, and structured metadata
  • Cold (Blob Storage): Seconds latency for product images, catalog snapshots, and archival data

Applications inject only the tiers they need, avoiding unnecessary connections and costs.

Architecture

graph TB
    subgraph "Application"
        Agent[Agent/Service]
        Builder[AgentBuilder]
    end
    
    subgraph "Memory Tiers"
        HotTier[HotMemory]
        WarmTier[WarmMemory]
        ColdTier[ColdMemory]
    end
    
    subgraph "Azure Services"
        Redis[(Redis Cache)]
        Cosmos[(Cosmos DB)]
        Blob[(Blob Storage)]
    end
    
    Builder -->|injects| HotTier
    Builder -->|injects| WarmTier
    Builder -->|injects| ColdTier
    Agent -->|uses| HotTier
    Agent -->|uses| WarmTier
    Agent -->|uses| ColdTier
    HotTier --> Redis
    WarmTier --> Cosmos
    ColdTier --> Blob
Loading

Configuration and Usage

Memory tiers are constructed explicitly and injected into agents via AgentBuilder or build_service_app.

from holiday_peak_lib.agents.memory import HotMemory, WarmMemory, ColdMemory
from holiday_peak_lib.agents.memory import MemoryBuilder
from holiday_peak_lib.config import MemorySettings

settings = MemorySettings()

hot = HotMemory(settings.redis_url)
warm = WarmMemory(settings.cosmos_account_uri, settings.cosmos_database, settings.cosmos_container)
cold = ColdMemory(settings.blob_account_url, settings.blob_container)

memory_client = (
    MemoryBuilder()
    .with_hot(hot)
    .with_warm(warm)
    .with_cold(cold)
    .with_rules(read_fallback=True, promote_on_read=True, write_through=True)
    .build()
)

Env vars (via MemorySettings):

  • REDIS_URL
  • COSMOS_ACCOUNT_URI
  • COSMOS_DATABASE
  • COSMOS_CONTAINER
  • BLOB_ACCOUNT_URL
  • BLOB_CONTAINER

Tier Implementations

HotMemory (Redis)

Implements connect, get, and set with TTL support using redis.asyncio.

WarmMemory (Cosmos DB)

Implements connect, upsert, and read using azure.cosmos.aio.CosmosClient and DefaultAzureCredential.

ColdMemory (Blob Storage)

Implements connect, upload_text, and download_text using azure.storage.blob.aio.BlobServiceClient.

MemoryBuilder Responsibilities

MemoryBuilder composes the three tiers and defines the cascading rules for reads and writes. It should focus on:

  • Latency management: track $p50$/$p95$ for reads and writes and tune promotion rules.
  • Retrieval quality: ensure promoted values are correct and consistent across tiers.
  • Content evaluation: validate payload shape before promoting or writing through.

Builder-level features:

  • Cascading reads (hot → warm → cold) with optional promotion.
  • Write-through to warm and optional cold persistence.
  • Compression hooks for cold writes (large payloads).
  • Batch operations to reduce latency and improve throughput.

What's Implemented

HotMemory: Redis-backed hot tier with connect, get, and set (TTL supported)
WarmMemory: Cosmos DB-backed warm tier with connect, upsert, and read
ColdMemory: Blob-backed cold tier with connect, upload_text, and download_text
MemoryBuilder: Fluent builder that assembles tiers and rules into a MemoryClient
MemoryClient: Cascading read/write orchestration across tiers
MemorySettings: Environment-driven configuration for tier endpoints
Parallel I/O: Hot/warm/cold tier reads and writes execute concurrently via asyncio.gather (PR #800)
Memory Tools: get_memory, set_memory, search_memory tools exposed for agent use
gather_adapters: Concurrent adapter initialization helper in BaseRetailAgent

What's NOT Implemented

Built-in metrics: $p50$/$p95$ latency tracking, retrieval quality scoring, and content evaluation hooks are not implemented yet

Cascading Reads/Writes

Cascading Reads: Hot → warm → cold with optional promotion to hot/warm
Write-Through: Optional warm write-through and optional cold persistence
Tiered Eviction: No demotion logic from hot→warm→cold

How to extend tiered eviction: Implement business rules that decide when a key should be demoted and where it should go. Typical rules include TTL expiration, access frequency, payload size, SLA, and compliance requirements. Add a policy on the MemoryClient that evaluates per-key metadata and triggers a demotion pipeline.

Example rules:

  • Access-based: if a key hasn’t been read in $N$ minutes, demote hot → warm.
  • Size-based: if payload > $X$ KB, demote hot → warm or warm → cold.
  • SLA-based: if p95 read latency for a key exceeds a threshold, demote hot → warm to reduce contention.
  • Compliance-based: if data is archival-only, always persist to cold and evict from hot/warm.

Demotion sketch:

from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class MemoryPolicy:
    max_hot_age: timedelta = timedelta(minutes=15)
    max_hot_bytes: int = 128 * 1024
    max_warm_age: timedelta = timedelta(days=7)

    def should_demote_from_hot(self, meta) -> bool:
        age = datetime.utcnow() - meta.last_accessed
        return age > self.max_hot_age or meta.size_bytes > self.max_hot_bytes

    def should_demote_from_warm(self, meta) -> bool:
        age = datetime.utcnow() - meta.last_accessed
        return age > self.max_warm_age

class EvictingMemoryClient(MemoryClient):
    def __init__(self, *args, policy: MemoryPolicy, **kwargs):
        super().__init__(*args, **kwargs)
        self._policy = policy

    async def evict(self, key: str, meta) -> None:
        if self.hot and self._policy.should_demote_from_hot(meta) and self.warm:
            value = await self.hot.get(key)
            if value is not None:
                await self.warm.upsert(key, value)
                await self.hot.delete(key)

        if self.warm and self._policy.should_demote_from_warm(meta) and self.cold:
            value = await self.warm.read(key)
            if value is not None:
                await self.cold.upload_text(key, value)
                await self.warm.delete(key)

Compression

No Compression: Builder does not yet compress large payloads automatically
No Automatic Detection: No size-based compression policy by default

Builder-level addition: Add a compression policy in MemoryBuilder.with_rules(...) and apply it before ColdMemory.upload_text.

Add Compression:

import gzip

class CompressedBlobMemory(BlobColdMemory):
    async def set(self, key: str, value: bytes, content_type: str = "application/octet-stream") -> None:
        # Compress if > 1MB
        if len(value) > 1_000_000:
            value = gzip.compress(value)
            key = f"{key}.gz"
        await super().set(key, value, content_type)
    
    async def get(self, key: str) -> Optional[bytes]:
        # Try with .gz suffix
        blob = await super().get(f"{key}.gz")
        if blob:
            return gzip.decompress(blob)
        
        # Fallback to uncompressed
        return await super().get(key)

Batch Operations

No Multi-Get: Builder does not expose mget across tiers
No Multi-Set: Builder does not expose mset across tiers

Builder-level addition: Provide MemoryClient.mget() and MemoryClient.mset() helpers for parallel tier operations.

Add Batch Support:

async def mget(self, keys: list[str]) -> dict[str, Optional[str]]:
    """Get multiple keys in parallel using cascading reads."""
    results = await asyncio.gather(*[self.get(k) for k in keys])
    return dict(zip(keys, results))

async def mset(self, items: dict[str, str]) -> None:
    """Set multiple keys in parallel using cascading writes."""
    await asyncio.gather(*[self.set(k, v) for k, v in items.items()])

Monitoring

No Hit/Miss Metrics: No tracking of cache hit rate per tier
No Latency Tracking: No P95/P99 per tier
No Connection Pool Stats: No visibility into Redis/Cosmos connection health

Add Observability:

import time
from opencensus.ext.azure.log_exporter import AzureLogHandler

class ObservableMemoryClient(MemoryClient):
    async def get(self, key: str) -> Optional[str]:
        start = time.time()
        try:
            value = await super().get(key)
            duration_ms = (time.time() - start) * 1000
            
            logger.info("memory.get", extra={
                "hit": value is not None,
                "duration_ms": duration_ms
            })
            return value
        except Exception:
            logger.error("memory.get.error", exc_info=True)
            raise

Connection Pooling

Redis Pool Limits: HotMemory supports max_connections and socket timeouts
Cosmos Connection Limit: WarmMemory supports connection_limit and custom client kwargs
Blob Transport Pooling: ColdMemory supports connection_pool_size and transport timeouts
Cosmos Retry Policy: No explicit retry policy configured for transient 429/503

Connection Pool Usage:

from holiday_peak_lib.agents.memory import HotMemory

hot = HotMemory(
    "redis://localhost:6379",
    max_connections=50,
    socket_timeout=5.0,
    socket_connect_timeout=5.0,
    health_check_interval=30,
    retry_on_timeout=True,
)
from holiday_peak_lib.agents.memory import WarmMemory

warm = WarmMemory(
    "https://account.documents.azure.com",
    "db",
    "container",
    connection_limit=50,
    client_kwargs={
        "consistency_level": "Session",
        "enable_content_response_on_write": False,
    },
)
from holiday_peak_lib.agents.memory import ColdMemory

cold = ColdMemory(
    "https://storage.blob.core.windows.net",
    "container",
    connection_pool_size=50,
    connection_timeout=5.0,
    read_timeout=30.0,
)

Extension Guide

Adding a Custom Tier

To add a fourth tier (e.g., Warm-Plus with Azure Table Storage):

Step 1: Define protocol

class WarmPlusMemory(Protocol):
    async def get(self, partition_key: str, row_key: str) -> Optional[dict]:
        ...
    
    async def set(self, partition_key: str, row_key: str, entity: dict) -> None:
        ...

Step 2: Implement client

from azure.data.tables.aio import TableServiceClient

class TableWarmPlusMemory:
    def __init__(self, account_url: str, table_name: str, credential):
        self.service = TableServiceClient(account_url=account_url, credential=credential)
        self.table = self.service.get_table_client(table_name)
    
    async def get(self, partition_key: str, row_key: str) -> Optional[dict]:
        try:
            entity = await self.table.get_entity(partition_key, row_key)
            return dict(entity)
        except Exception:
            return None

Step 3: Add to builder

class MemoryBuilder:
    def with_warm_plus(self, account_url: str, table: str, credential) -> "MemoryBuilder":
        self._warm_plus = TableWarmPlusMemory(account_url, table, credential)
        return self
    
    def build(self) -> MemoryClient:
        return MemoryClient(
            hot=self._hot,
            warm=self._warm,
            warm_plus=self._warm_plus,
            cold=self._cold
        )

Customizing TTL Per Use Case

Default TTLs may not fit all scenarios. Expose TTL configuration:

class MemoryConfig:
    """TTL presets for different use cases."""
    SESSION_TTL = 300  # 5 minutes (hot)
    USER_PROFILE_TTL = 86400  # 1 day (warm)
    SEARCH_HISTORY_TTL = 2592000  # 30 days (warm)
    CATALOG_SNAPSHOT_TTL = None  # No expiration (cold)

# Usage
await memory.set(
    f"session:{session_id}",
    session_data,
    ttl=MemoryConfig.SESSION_TTL
)

Using Managed Identity

Avoid keys in code; use DefaultAzureCredential:

from azure.identity.aio import DefaultAzureCredential

credential = DefaultAzureCredential()

warm = WarmMemory("https://account.documents.azure.com", "db", "container")
cold = ColdMemory("https://storage.blob.core.windows.net", "container")

memory = (
    MemoryBuilder()
    .with_warm(warm)
    .with_cold(cold)
    .build()
)

Security Considerations

Current State

⚠️ Partial Security:

  • ✅ Redis supports password auth (enabled in builder)
  • ✅ Cosmos/Blob accept DefaultAzureCredential
  • No Key Vault: Connection strings in .env files
  • No Secret Rotation: Static passwords; no auto-refresh
  • No Encryption at Rest: Assumes Azure-managed encryption (not customer-managed keys)

Recommendations

Use Key Vault for Secrets:

from azure.keyvault.secrets.aio import SecretClient
from azure.identity.aio import DefaultAzureCredential

credential = DefaultAzureCredential()
kv_client = SecretClient(vault_url="https://vault.vault.azure.net", credential=credential)

redis_password = await kv_client.get_secret("redis-password")
cosmos_key = await kv_client.get_secret("cosmos-key")

memory = (
    MemoryBuilder()
    .with_hot(redis_host="...", password=redis_password.value)
    .with_warm(cosmos_endpoint="...", key=cosmos_key.value)
    .build()
)

Enable Customer-Managed Keys (CMK):

  • Cosmos: Configure with Key Vault key for encryption at rest
  • Blob: Use CMK for BYOK (Bring Your Own Key)

Least Privilege Access:

  • Redis: Use RBAC instead of password (Azure Cache for Redis supports Entra ID)
  • Cosmos: Assign Cosmos DB Data Reader/Writer role, not account key
  • Blob: Assign Storage Blob Data Contributor, not account key

Observability

Current State

⚠️ Basic Logging Only:

  • ✅ Python logging configured for errors
  • No Structured Logs: No JSON format with tier/key/latency
  • No Distributed Tracing: No correlation IDs across tiers
  • No Metrics Dashboard: No Azure Monitor dashboard for hit rates
  • No Alerts: No alerts for cache misses > 50% or latency > SLA

Recommendations

Add Azure Monitor Integration:

from opencensus.ext.azure import metrics_exporter
from opencensus.stats import aggregation, measure, view

# Define metrics
hit_rate = measure.MeasureFloat("memory/hit_rate", "Cache hit rate", "%")
latency = measure.MeasureFloat("memory/latency", "Operation latency", "ms")

# Export to Azure Monitor
exporter = metrics_exporter.new_metrics_exporter(
    connection_string=os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING")
)

# Track hit rate
view_manager.register_view(view.View(
    "memory_hit_rate",
    "Cache hit rate per tier",
    ["tier"],
    hit_rate,
    aggregation.LastValueAggregation()
))

Add Distributed Tracing:

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter

tracer = trace.get_tracer(__name__)

class TracedMemoryClient(MemoryClient):
    async def get(self, key: str) -> Optional[str]:
        with tracer.start_as_current_span("memory.get") as span:
            span.set_attribute("key", key)
            
            value = await super().get(key)
            span.set_attribute("hit", value is not None)
            return value

Create Azure Monitor Dashboard:

  • Widget 1: Cache hit rate (last 24h) per tier
  • Widget 2: P95 latency per tier
  • Widget 3: Connection failures per service
  • Widget 4: Redis memory usage (MB)

Testing

Current State

⚠️ Limited Coverage:

  • No dedicated memory unit tests yet
  • No Integration Tests: No tests with real Redis/Cosmos/Blob
  • No Load Tests: No validation of 10k+ ops/sec throughput
  • No Failover Tests: No chaos testing for Redis failover

Recommendations

Add Integration Tests:

import pytest
from testcontainers.redis import RedisContainer

from holiday_peak_lib.agents.memory import HotMemory, MemoryBuilder

@pytest.mark.integration
@pytest.mark.asyncio
async def test_redis_hot_memory_integration():
    # Start Redis in Docker
    with RedisContainer("redis:7") as redis:
        host, port = redis.get_connection_url().split(":")[-2:]
        
        redis_url = f"redis://{host}:{int(port)}"
        memory = MemoryBuilder().with_hot(HotMemory(redis_url)).build()
        
        # Test set/get
        await memory.set("test-key", "test-value")
        value = await memory.get("test-key")
        
        assert value == "test-value"

Add Load Tests:

import asyncio
import time

from holiday_peak_lib.agents.memory import HotMemory, MemoryBuilder

async def load_test_hot_tier():
    """Test 10k reads/writes per second."""
    memory = MemoryBuilder().with_hot(HotMemory("redis://localhost:6379")).build()
    
    start = time.time()
    tasks = []
    for i in range(10_000):
        tasks.append(memory.set(f"key-{i}", f"value-{i}"))
    await asyncio.gather(*tasks)
    
    duration = time.time() - start
    print(f"10k writes: {duration:.2f}s ({10_000/duration:.0f} ops/sec)")

Add Failover Tests:

# Simulate Redis node failure
kubectl delete pod redis-master-0 -n memory

# Verify app reconnects and continues
pytest tests/integration/test_redis_failover.py

Performance Tuning

Current State

⚠️ No Tuning Done:

  • No Connection Pooling: Each request creates new Redis connection
  • No Pipelining: Redis commands sent individually (high latency)
  • No Compression: Large values (>1MB) stored uncompressed
  • No Prefetching: Warm tier not pre-loaded into hot tier

Recommendations

Enable Redis Pipelining:

async def mset_pipelined(self, items: dict[str, str], ttl: Optional[int] = None) -> None:
    """Set multiple keys in one round-trip."""
    pipe = self.client.pipeline()
    for key, value in items.items():
        if ttl:
            pipe.setex(key, ttl, value)
        else:
            pipe.set(key, value)
    await pipe.execute()

Prefetch Hot Data:

async def warm_hot_cache(self, keys: list[str]) -> None:
    """Pre-load warm tier data into hot tier."""
    # Fetch from Cosmos in parallel
    docs = await asyncio.gather(*[self.warm.get(k) for k in keys])
    
    # Write to Redis pipeline
    hot_items = {k: doc["value"] for k, doc in zip(keys, docs) if doc}
    await self.hot.mset_pipelined(hot_items, ttl=300)

Optimize Cosmos Queries:

# Use partition key in all queries
await memory.warm.query({"partition_key": user_id}, limit=100)

# Avoid cross-partition queries (slow + expensive RUs)
# BAD: SELECT * FROM c WHERE c.status = 'active'  (scans all partitions)
# GOOD: SELECT * FROM c WHERE c.user_id = 'user-123' AND c.status = 'active'

Monitor RU Consumption:

# Check Cosmos DB RU usage
az cosmosdb sql container show \
  --resource-group holiday-peak-hub \
  --account-name memory-cosmos \
  --database-name memory \
  --name user-profiles \
  --query "resource.provisionedThroughput"

Runbooks (NOT PROVIDED)

Operational playbooks needed:

Redis OOM (Out of Memory)

Symptom: Redis evictions, cache misses spike
Diagnosis: Check INFO memory for used_memory_peak, evicted_keys
Remediation: Scale Redis to higher tier or enable eviction policy (allkeys-lru)

Cosmos High RU Consumption

Symptom: 429 rate limit errors, high latency
Diagnosis: Check Azure Portal metrics for RU/s usage, identify hot partition
Remediation: Scale container RUs, optimize queries to use partition key, implement retry with backoff

Blob Storage Throttling

Symptom: 503 errors on blob uploads, slow cold tier reads
Diagnosis: Check storage account metrics for throttled requests
Remediation: Increase storage account limits, use CDN for frequently accessed blobs, implement exponential backoff

Connection Pool Exhaustion

Symptom: Timeouts on get/set, "too many connections" errors
Diagnosis: Check Redis INFO clients for connected_clients, compare to pool limit
Remediation: Increase max_connections in pool config, audit app for connection leaks

TTL Not Expiring

Symptom: Memory growth, stale data served
Diagnosis: Check Cosmos TTL property enabled, Redis TTL with TTL key
Remediation: Enable TTL on Cosmos container, verify Redis TTL set correctly in code

Configuration

Environment Variables

Variable Description Default Required
REDIS_URL Redis connection URL -
COSMOS_ACCOUNT_URI Cosmos DB URI -
COSMOS_DATABASE Database name -
COSMOS_CONTAINER Container name -
BLOB_ACCOUNT_URL Storage account -
BLOB_CONTAINER Container name -

Prod Note: Use Managed Identity instead of keys for Cosmos/Blob.

Related Components

  • Agents — Primary consumer of memory for conversation context
  • Orchestration — Uses memory for SAGA state persistence
  • Adapters — Can cache adapter responses in hot tier

Related ADRs


License: MIT + Microsoft | Author: Ricardo Cataldi | Last Updated: December 30, 2025