Skip to content

Commit 46526a8

Browse files
Merge pull request #2 from martinhoefling/base-library
Initial code commit with in memory backend
2 parents e9106be + a3c7a4a commit 46526a8

8 files changed

Lines changed: 444 additions & 20 deletions

File tree

docs/design_and_implementation.md

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ A **standalone, host-agnostic Python library** (`knx-telegram-store`) that:
4040

4141
1. Defines a **single canonical data model** for a stored KNX telegram.
4242
2. Provides an **abstract storage interface** with pluggable backends.
43-
3. Ships three backends: **In-Memory** (testing and small deployments), **SQLite** (lightweight persistent), **PostgreSQL/TimescaleDB** (full scale).
44-
4. Provides a **unified query/filter model** implemented natively by all backends.
43+
3. Ships two primary backend types: **In-Memory** (testing and small deployments) and **SQLAlchemy-based SQL** (SQLite for lightweight persistence, PostgreSQL for full scale).
44+
4. Provides a **unified query/filter model** implemented natively by both backend types.
4545
5. Is usable from both Home Assistant KNX and SpectrumKNX **without pulling in their respective framework dependencies**.
4646

4747
---
@@ -288,13 +288,13 @@ class TelegramQueryResult:
288288

289289
### Graceful degradation matrix
290290

291-
| Feature | In-Memory | SQLite | PostgreSQL |
292-
|---|---|---|---|
293-
| Multi-value filters | ✅ list comprehension |SQL `IN()` | ✅ SQL `IN()` |
294-
| Time range | ✅ timestamp check |`WHERE ts BETWEEN` | ✅ hypertable-optimized |
295-
| Time-delta context | ✅ two-pass filter | ✅ SQL subquery | ✅ native (current SpectrumKNX impl) |
296-
| Pagination | ✅ list slicing |`LIMIT/OFFSET` |`LIMIT/OFFSET` |
297-
| Count |`len()` |`SELECT COUNT(*)` |`SELECT COUNT(*)` |
291+
| Feature | In-Memory | SQL (SQLite / Postgres) |
292+
|---|---|---|
293+
| Multi-value filters | ✅ list comprehension |SQLAlchemy `in_()` |
294+
| Time range | ✅ timestamp check |`ts.between()` |
295+
| Time-delta context | ✅ two-pass filter | ✅ SQL subquery / CTE |
296+
| Pagination | ✅ list slicing |`limit()` / `offset()` |
297+
| Count |`len()` |`select(func.count())` |
298298

299299
Since all backends support native filtering, the consumer can trust that the results returned by `query()` are accurate and do not require further client-side processing.
300300

@@ -554,16 +554,12 @@ dev = ["pytest", "pytest-asyncio", "pytest-cov", "aiosqlite", "asyncpg", "sqlalc
554554
3. **Unit tests** — Shared test suite parametrized across backends
555555
4. **Project Setup** — Initialize `LICENSE` (MIT) and project documentation
556556

557-
### Phase 2: PostgreSQL Backend
558-
559-
7. **Extract from SpectrumKNX** — Port the SQLAlchemy storage and time-delta query logic into `PostgresStore`
560-
8. **Refactor SpectrumKNX** — Replace `models.py` + inline queries with the library
561-
9. **Verify** — SpectrumKNX integration tests, existing live/history views work
562-
563-
### Phase 3: SQLite Backend
557+
### Phase 2: SQL Backends (SQLAlchemy)
564558

565-
10. **Implement SQLite** — Async SQLite with full query support
566-
11. **Test in both consumers** — HA with SQLite backend, SpectrumKNX with SQLite backend
559+
5. **Define SQL Schema** — Create a shared SQLAlchemy model for `telegrams`
560+
6. **Implement PostgresStore** — Port existing SpectrumKNX logic using the shared SQL base
561+
7. **Implement SqliteStore** — Leverage SQLAlchemy's async SQLite support
562+
8. **Verify** — Both backends pass the shared test suite
567563
12. **Publish to PyPI**
568564

569565
### Phase 5: Frontend Enhancements (future)

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ classifiers = [
2424
dependencies = []
2525

2626
[project.urls]
27-
Homepage = "https://github.com/mar-v-in/knx-telegram-store"
28-
Bug-Tracker = "https://github.com/mar-v-in/knx-telegram-store/issues"
27+
Homepage = "https://github.com/martinhoefling/knx-telegram-store"
28+
Bug-Tracker = "https://github.com/martinhoefling/knx-telegram-store/issues"
2929

3030
[tool.setuptools.packages.find]
3131
where = ["src"]

src/knx_telegram_store/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from .model import StoredTelegram
2+
from .query import TelegramQuery, TelegramQueryResult
3+
from .store import StoreCapabilities, TelegramStore
4+
5+
__all__ = [
6+
"StoredTelegram",
7+
"TelegramQuery",
8+
"TelegramQueryResult",
9+
"StoreCapabilities",
10+
"TelegramStore",
11+
]
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
from __future__ import annotations
2+
3+
from collections import deque
4+
from collections.abc import Sequence
5+
from datetime import timedelta
6+
7+
from ..model import StoredTelegram
8+
from ..query import TelegramQuery, TelegramQueryResult
9+
from ..store import StoreCapabilities, TelegramStore
10+
11+
12+
class MemoryStore(TelegramStore):
13+
"""In-memory implementation of TelegramStore using a deque."""
14+
15+
def __init__(self, max_size: int = 500) -> None:
16+
"""Initialize the memory store."""
17+
self._max_size = max_size
18+
self._telegrams: deque[StoredTelegram] = deque(maxlen=max_size)
19+
self._capabilities = StoreCapabilities(
20+
supports_time_range=True,
21+
supports_time_delta=True,
22+
supports_pagination=True,
23+
supports_count=True,
24+
max_storage=max_size,
25+
)
26+
27+
@property
28+
def capabilities(self) -> StoreCapabilities:
29+
"""Return the capabilities of this backend."""
30+
return self._capabilities
31+
32+
async def initialize(self) -> None:
33+
"""Set up the store. Idempotent."""
34+
35+
async def close(self) -> None:
36+
"""Tear down the store."""
37+
38+
async def store(self, telegram: StoredTelegram) -> None:
39+
"""Persist a single telegram."""
40+
self._telegrams.append(telegram)
41+
42+
async def store_many(self, telegrams: Sequence[StoredTelegram]) -> None:
43+
"""Persist multiple telegrams in a single batch."""
44+
self._telegrams.extend(telegrams)
45+
46+
async def query(self, query: TelegramQuery) -> TelegramQueryResult:
47+
"""Retrieve telegrams matching the given query."""
48+
results = list(self._telegrams)
49+
50+
# 1. Multi-value filters (AND across, OR within)
51+
if query.sources:
52+
results = [t for t in results if t.source in query.sources]
53+
if query.destinations:
54+
results = [t for t in results if t.destination in query.destinations]
55+
if query.telegram_types:
56+
results = [t for t in results if t.telegramtype in query.telegram_types]
57+
if query.directions:
58+
results = [t for t in results if t.direction in query.directions]
59+
if query.dpt_mains:
60+
results = [t for t in results if t.dpt_main in query.dpt_mains]
61+
62+
# 2. Time range
63+
if query.start_time:
64+
results = [t for t in results if t.timestamp >= query.start_time]
65+
if query.end_time:
66+
results = [t for t in results if t.timestamp <= query.end_time]
67+
68+
# 3. Time-delta context window
69+
if query.delta_before_ms > 0 or query.delta_after_ms > 0:
70+
pivot_timestamps = [t.timestamp for t in results]
71+
72+
delta_before = timedelta(milliseconds=query.delta_before_ms)
73+
delta_after = timedelta(milliseconds=query.delta_after_ms)
74+
75+
# Re-collect all telegrams within any pivot's window
76+
# This implementation is O(N*M) but N is small for MemoryStore (500)
77+
context_results = set()
78+
for t in self._telegrams:
79+
for pivot_ts in pivot_timestamps:
80+
if (pivot_ts - delta_before) <= t.timestamp <= (pivot_ts + delta_after):
81+
context_results.add(t)
82+
break
83+
results = list(context_results)
84+
85+
# 4. Ordering
86+
results.sort(key=lambda t: t.timestamp, reverse=query.order_descending)
87+
88+
total_count = len(results)
89+
90+
# 5. Pagination
91+
start = query.offset
92+
end = query.offset + query.limit
93+
paginated_results = results[start:end]
94+
95+
limit_reached = len(results) > end
96+
97+
return TelegramQueryResult(
98+
telegrams=paginated_results,
99+
total_count=total_count,
100+
limit_reached=limit_reached,
101+
)
102+
103+
async def count(self) -> int:
104+
"""Return the total number of stored telegrams."""
105+
return len(self._telegrams)
106+
107+
async def clear(self) -> None:
108+
"""Remove all stored telegrams."""
109+
self._telegrams.clear()

src/knx_telegram_store/model.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from __future__ import annotations
2+
3+
from dataclasses import dataclass
4+
from datetime import datetime
5+
from typing import Any
6+
7+
8+
@dataclass(frozen=True, slots=True)
9+
class StoredTelegram:
10+
"""A KNX telegram in its stored/serialized form."""
11+
12+
# ── Core identity ─────────────────────────────────────────────
13+
timestamp: datetime # timezone-aware UTC
14+
15+
# ── Addressing ────────────────────────────────────────────────
16+
source: str # Individual address, e.g. "1.2.3"
17+
destination: str # Group address, e.g. "1/2/3"
18+
19+
# ── Telegram classification ───────────────────────────────────
20+
telegramtype: str # "GroupValueWrite" | "GroupValueRead" | "GroupValueResponse"
21+
direction: str # "Incoming" | "Outgoing"
22+
23+
# ── Payload ───────────────────────────────────────────────────
24+
payload: int | tuple[int, ...] | None = None # Raw KNX payload (DPTBinary int or DPTArray tuple)
25+
26+
# ── DPT metadata ─────────────────────────────────────────────
27+
dpt_main: int | None = None
28+
dpt_sub: int | None = None
29+
dpt_name: str | None = None
30+
unit: str | None = None
31+
32+
# ── Decoded value (consumer-enriched at write time) ───────────
33+
value: bool | str | int | float | dict[str, Any] | None = None
34+
35+
# ── Numeric value for time-series queries (SQL backends) ──────
36+
value_numeric: float | None = None
37+
38+
# ── Raw bytes (hex-encoded string for JSON safety) ────────────
39+
raw_data: str | None = None # e.g. "0a1b2c"
40+
41+
# ── Security ──────────────────────────────────────────────────
42+
data_secure: bool | None = None
43+
44+
# ── Display names (consumer-enriched at write time) ───────────
45+
source_name: str = ""
46+
destination_name: str = ""

src/knx_telegram_store/query.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from __future__ import annotations
2+
3+
from dataclasses import dataclass, field
4+
from datetime import datetime
5+
from typing import TYPE_CHECKING
6+
7+
if TYPE_CHECKING:
8+
from .model import StoredTelegram
9+
10+
@dataclass(frozen=True, kw_only=True, slots=True)
11+
class TelegramQuery:
12+
"""Declarative query for telegram retrieval.
13+
14+
Filter semantics:
15+
- Empty list = no restriction (pass-through)
16+
- Within a category = OR logic (any match passes)
17+
- Across categories = AND logic (must pass all active categories)
18+
"""
19+
20+
# ── Multi-value filters (OR within, AND across) ───────────────
21+
sources: list[str] = field(default_factory=list)
22+
destinations: list[str] = field(default_factory=list)
23+
telegram_types: list[str] = field(default_factory=list)
24+
directions: list[str] = field(default_factory=list)
25+
dpt_mains: list[int] = field(default_factory=list)
26+
27+
# ── Time range ────────────────────────────────────────────────
28+
start_time: datetime | None = None
29+
end_time: datetime | None = None
30+
31+
# ── Time-delta context window (milliseconds) ──────────────────
32+
# When set, rows matching the filters are found first, then
33+
# ALL rows within ±delta of any matching row's timestamp are
34+
# included — even if they don't match the filters themselves.
35+
delta_before_ms: int = 0
36+
delta_after_ms: int = 0
37+
38+
# ── Pagination ────────────────────────────────────────────────
39+
limit: int = 25_000
40+
offset: int = 0
41+
42+
# ── Ordering ──────────────────────────────────────────────────
43+
order_descending: bool = True # newest first by default
44+
45+
46+
@dataclass(frozen=True, kw_only=True, slots=True)
47+
class TelegramQueryResult:
48+
"""Result of a telegram query."""
49+
50+
telegrams: list[StoredTelegram]
51+
total_count: int
52+
limit_reached: bool # True = more results exist beyond limit

src/knx_telegram_store/store.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
from __future__ import annotations
2+
3+
from abc import ABC, abstractmethod
4+
from collections.abc import Sequence
5+
from dataclasses import dataclass
6+
7+
from .model import StoredTelegram
8+
from .query import TelegramQuery, TelegramQueryResult
9+
10+
11+
@dataclass(frozen=True, slots=True)
12+
class StoreCapabilities:
13+
"""Declares what a backend can do natively."""
14+
supports_time_range: bool = False
15+
supports_time_delta: bool = False
16+
supports_pagination: bool = False
17+
supports_count: bool = False
18+
max_storage: int | None = None # None = unlimited
19+
20+
21+
class TelegramStore(ABC):
22+
"""Abstract interface for KNX telegram persistence."""
23+
24+
@property
25+
@abstractmethod
26+
def capabilities(self) -> StoreCapabilities:
27+
"""Return the capabilities of this backend."""
28+
29+
@abstractmethod
30+
async def initialize(self) -> None:
31+
"""Set up the store (create tables, open connections, etc.).
32+
33+
Called once at startup. Must be idempotent.
34+
"""
35+
36+
@abstractmethod
37+
async def close(self) -> None:
38+
"""Tear down the store (close connections, flush buffers).
39+
40+
Called once at shutdown.
41+
"""
42+
43+
@abstractmethod
44+
async def store(self, telegram: StoredTelegram) -> None:
45+
"""Persist a single telegram."""
46+
47+
@abstractmethod
48+
async def store_many(self, telegrams: Sequence[StoredTelegram]) -> None:
49+
"""Persist multiple telegrams in a single batch."""
50+
51+
@abstractmethod
52+
async def query(self, query: TelegramQuery) -> TelegramQueryResult:
53+
"""Retrieve telegrams matching the given query.
54+
55+
All backends MUST implement full filtering as defined in TelegramQuery.
56+
"""
57+
58+
@abstractmethod
59+
async def count(self) -> int:
60+
"""Return the total number of stored telegrams."""
61+
62+
async def clear(self) -> None:
63+
"""Remove all stored telegrams.
64+
65+
Optional — backends may raise NotImplementedError.
66+
"""
67+
raise NotImplementedError

0 commit comments

Comments
 (0)