|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +from collections import deque |
| 4 | +from collections.abc import Sequence |
| 5 | +from datetime import timedelta |
| 6 | + |
| 7 | +from ..model import StoredTelegram |
| 8 | +from ..query import TelegramQuery, TelegramQueryResult |
| 9 | +from ..store import StoreCapabilities, TelegramStore |
| 10 | + |
| 11 | + |
| 12 | +class MemoryStore(TelegramStore): |
| 13 | + """In-memory implementation of TelegramStore using a deque.""" |
| 14 | + |
| 15 | + def __init__(self, max_size: int = 500) -> None: |
| 16 | + """Initialize the memory store.""" |
| 17 | + self._max_size = max_size |
| 18 | + self._telegrams: deque[StoredTelegram] = deque(maxlen=max_size) |
| 19 | + self._capabilities = StoreCapabilities( |
| 20 | + supports_time_range=True, |
| 21 | + supports_time_delta=True, |
| 22 | + supports_pagination=True, |
| 23 | + supports_count=True, |
| 24 | + max_storage=max_size, |
| 25 | + ) |
| 26 | + |
| 27 | + @property |
| 28 | + def capabilities(self) -> StoreCapabilities: |
| 29 | + """Return the capabilities of this backend.""" |
| 30 | + return self._capabilities |
| 31 | + |
| 32 | + async def initialize(self) -> None: |
| 33 | + """Set up the store. Idempotent.""" |
| 34 | + |
| 35 | + async def close(self) -> None: |
| 36 | + """Tear down the store.""" |
| 37 | + |
| 38 | + async def store(self, telegram: StoredTelegram) -> None: |
| 39 | + """Persist a single telegram.""" |
| 40 | + self._telegrams.append(telegram) |
| 41 | + |
| 42 | + async def store_many(self, telegrams: Sequence[StoredTelegram]) -> None: |
| 43 | + """Persist multiple telegrams in a single batch.""" |
| 44 | + self._telegrams.extend(telegrams) |
| 45 | + |
| 46 | + async def query(self, query: TelegramQuery) -> TelegramQueryResult: |
| 47 | + """Retrieve telegrams matching the given query.""" |
| 48 | + results = list(self._telegrams) |
| 49 | + |
| 50 | + # 1. Multi-value filters (AND across, OR within) |
| 51 | + if query.sources: |
| 52 | + results = [t for t in results if t.source in query.sources] |
| 53 | + if query.destinations: |
| 54 | + results = [t for t in results if t.destination in query.destinations] |
| 55 | + if query.telegram_types: |
| 56 | + results = [t for t in results if t.telegramtype in query.telegram_types] |
| 57 | + if query.directions: |
| 58 | + results = [t for t in results if t.direction in query.directions] |
| 59 | + if query.dpt_mains: |
| 60 | + results = [t for t in results if t.dpt_main in query.dpt_mains] |
| 61 | + |
| 62 | + # 2. Time range |
| 63 | + if query.start_time: |
| 64 | + results = [t for t in results if t.timestamp >= query.start_time] |
| 65 | + if query.end_time: |
| 66 | + results = [t for t in results if t.timestamp <= query.end_time] |
| 67 | + |
| 68 | + # 3. Time-delta context window |
| 69 | + if query.delta_before_ms > 0 or query.delta_after_ms > 0: |
| 70 | + pivot_timestamps = [t.timestamp for t in results] |
| 71 | + |
| 72 | + delta_before = timedelta(milliseconds=query.delta_before_ms) |
| 73 | + delta_after = timedelta(milliseconds=query.delta_after_ms) |
| 74 | + |
| 75 | + # Re-collect all telegrams within any pivot's window |
| 76 | + # This implementation is O(N*M) but N is small for MemoryStore (500) |
| 77 | + context_results = set() |
| 78 | + for t in self._telegrams: |
| 79 | + for pivot_ts in pivot_timestamps: |
| 80 | + if (pivot_ts - delta_before) <= t.timestamp <= (pivot_ts + delta_after): |
| 81 | + context_results.add(t) |
| 82 | + break |
| 83 | + results = list(context_results) |
| 84 | + |
| 85 | + # 4. Ordering |
| 86 | + results.sort(key=lambda t: t.timestamp, reverse=query.order_descending) |
| 87 | + |
| 88 | + total_count = len(results) |
| 89 | + |
| 90 | + # 5. Pagination |
| 91 | + start = query.offset |
| 92 | + end = query.offset + query.limit |
| 93 | + paginated_results = results[start:end] |
| 94 | + |
| 95 | + limit_reached = len(results) > end |
| 96 | + |
| 97 | + return TelegramQueryResult( |
| 98 | + telegrams=paginated_results, |
| 99 | + total_count=total_count, |
| 100 | + limit_reached=limit_reached, |
| 101 | + ) |
| 102 | + |
| 103 | + async def count(self) -> int: |
| 104 | + """Return the total number of stored telegrams.""" |
| 105 | + return len(self._telegrams) |
| 106 | + |
| 107 | + async def clear(self) -> None: |
| 108 | + """Remove all stored telegrams.""" |
| 109 | + self._telegrams.clear() |
0 commit comments