Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ env/
.coverage
htmlcov/
.mypy_cache/
testdata/

# OS
.DS_Store
Thumbs.db

154 changes: 114 additions & 40 deletions src/knx_telegram_store/backends/base_sql.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

from abc import abstractmethod
from collections.abc import Sequence
from datetime import UTC, datetime, timedelta
from typing import Any
Expand All @@ -13,7 +12,6 @@
Double,
Integer,
MetaData,
String,
Table,
Text,
and_,
Expand All @@ -22,6 +20,7 @@
)
from sqlalchemy.ext.asyncio import AsyncEngine

from ..lookup import LookupCache, build_lookup_table
from ..model import StoredTelegram
from ..query import TelegramQuery, TelegramQueryResult
from ..store import StoreCapabilities, TelegramStore
Expand All @@ -35,25 +34,28 @@ def __init__(self, engine: AsyncEngine, retention_days: int | None = None) -> No
self.engine = engine
self._retention_days = retention_days
self._metadata = MetaData()
self._lookup_cache = LookupCache()
self.string_lookup = build_lookup_table(self._metadata)

self.telegrams = Table(
"telegrams",
self._metadata,
Column("timestamp", DateTime(timezone=True), nullable=False, index=True),
Column("source", String(20), nullable=False),
Column("destination", String(20), nullable=False, index=True),
Column("telegramtype", String(50), nullable=False),
Column("direction", String(20), nullable=False, server_default=""),
Column("source_id", Integer, nullable=False),
Column("destination_id", Integer, nullable=False, index=True),
Column("telegramtype_id", Integer, nullable=False),
Comment thread
martinhoefling marked this conversation as resolved.
Column("direction_id", Integer, nullable=False),
Comment thread
martinhoefling marked this conversation as resolved.
Column("dpt_name_id", Integer, nullable=True),
Column("unit_id", Integer, nullable=True),
Comment thread
martinhoefling marked this conversation as resolved.
Outdated
Column("source_name_id", Integer, nullable=True),
Column("destination_name_id", Integer, nullable=True),
Column("payload", JSON, nullable=True),
Column("dpt_main", Integer, nullable=True),
Column("dpt_sub", Integer, nullable=True),
Column("dpt_name", String(100), nullable=True),
Column("unit", String(20), nullable=True),
Column("value", JSON, nullable=True),
Column("value_numeric", Double, nullable=True),
Column("raw_data", Text, nullable=True), # Hex encoded string
Column("data_secure", Boolean, nullable=True),
Column("source_name", String(255), server_default=""),
Column("destination_name", String(255), server_default=""),
)
self._capabilities = StoreCapabilities(
supports_time_range=True,
Expand All @@ -78,9 +80,10 @@ def max_telegrams(self) -> int | None:
"""SQL stores are typically not limited by count."""
return None

@abstractmethod
async def initialize(self) -> None:
"""Set up the store (create tables, upgrades)."""
# Subclasses should call this or implement their own with super().initialize()
await self._lookup_cache.warm(self.engine, self.string_lookup)

async def close(self) -> None:
"""Close the engine."""
Expand All @@ -95,29 +98,46 @@ async def store_many(self, telegrams: Sequence[StoredTelegram]) -> None:
if not telegrams:
return

values = [
{
"timestamp": t.timestamp,
"source": t.source,
"destination": t.destination,
"telegramtype": t.telegramtype,
"direction": t.direction,
"payload": t.payload,
"dpt_main": t.dpt_main,
"dpt_sub": t.dpt_sub,
"dpt_name": t.dpt_name,
"unit": t.unit,
"value": t.value,
"value_numeric": t.value_numeric,
"raw_data": t.raw_data,
"data_secure": t.data_secure,
"source_name": t.source_name,
"destination_name": t.destination_name,
}
for t in telegrams
]
# 1. Resolve lookup IDs
pairs: set[tuple[str, str]] = set()
for t in telegrams:
pairs.add(("source", t.source))
pairs.add(("destination", t.destination))
pairs.add(("telegramtype", t.telegramtype))
pairs.add(("direction", t.direction))
if t.dpt_name:
pairs.add(("dpt_name", t.dpt_name))
if t.unit:
pairs.add(("unit", t.unit))
pairs.add(("source_name", t.source_name))
pairs.add(("destination_name", t.destination_name))

async with self.engine.begin() as conn:
lookup_ids = await self._lookup_cache.get_or_create_ids(conn, self.string_lookup, pairs)

values = []
for t in telegrams:
values.append(
{
"timestamp": t.timestamp,
"source_id": lookup_ids[("source", t.source)],
"destination_id": lookup_ids[("destination", t.destination)],
"telegramtype_id": lookup_ids[("telegramtype", t.telegramtype)],
"direction_id": lookup_ids[("direction", t.direction)],
"dpt_name_id": lookup_ids.get(("dpt_name", t.dpt_name)) if t.dpt_name else None,
"unit_id": lookup_ids.get(("unit", t.unit)) if t.unit else None,
"source_name_id": lookup_ids[("source_name", t.source_name)],
"destination_name_id": lookup_ids[("destination_name", t.destination_name)],
"payload": t.payload,
"dpt_main": t.dpt_main,
"dpt_sub": t.dpt_sub,
"value": t.value,
"value_numeric": t.value_numeric,
"raw_data": t.raw_data,
"data_secure": t.data_secure,
}
)

await conn.execute(self.telegrams.insert(), values)

async def evict_older_than(self, cutoff: datetime, *, dry_run: bool = False) -> int:
Expand All @@ -142,18 +162,72 @@ async def evict_expired(self, *, dry_run: bool = False) -> int:

async def query(self, query: TelegramQuery) -> TelegramQueryResult:
"""Retrieve telegrams matching the given query."""
stmt = select(self.telegrams)
# Aliases for lookup JOINs
s_lk = self.string_lookup.alias("s_lk")
d_lk = self.string_lookup.alias("d_lk")
tt_lk = self.string_lookup.alias("tt_lk")
dir_lk = self.string_lookup.alias("dir_lk")
dn_lk = self.string_lookup.alias("dn_lk")
u_lk = self.string_lookup.alias("u_lk")
sn_lk = self.string_lookup.alias("sn_lk")
den_lk = self.string_lookup.alias("den_lk")

stmt = select(
self.telegrams.c.timestamp,
s_lk.c.value.label("source"),
d_lk.c.value.label("destination"),
tt_lk.c.value.label("telegramtype"),
dir_lk.c.value.label("direction"),
dn_lk.c.value.label("dpt_name"),
u_lk.c.value.label("unit"),
sn_lk.c.value.label("source_name"),
den_lk.c.value.label("destination_name"),
self.telegrams.c.payload,
self.telegrams.c.dpt_main,
self.telegrams.c.dpt_sub,
self.telegrams.c.value,
self.telegrams.c.value_numeric,
self.telegrams.c.raw_data,
self.telegrams.c.data_secure,
)

# Joins to lookup table
stmt = stmt.join(s_lk, and_(s_lk.c.id == self.telegrams.c.source_id, s_lk.c.category == "source"))
stmt = stmt.join(d_lk, and_(d_lk.c.id == self.telegrams.c.destination_id, d_lk.c.category == "destination"))
stmt = stmt.join(tt_lk, and_(tt_lk.c.id == self.telegrams.c.telegramtype_id, tt_lk.c.category == "telegramtype"))
stmt = stmt.join(dir_lk, and_(dir_lk.c.id == self.telegrams.c.direction_id, dir_lk.c.category == "direction"))
stmt = stmt.outerjoin(dn_lk, and_(dn_lk.c.id == self.telegrams.c.dpt_name_id, dn_lk.c.category == "dpt_name"))
stmt = stmt.outerjoin(u_lk, and_(u_lk.c.id == self.telegrams.c.unit_id, u_lk.c.category == "unit"))
stmt = stmt.outerjoin(
sn_lk, and_(sn_lk.c.id == self.telegrams.c.source_name_id, sn_lk.c.category == "source_name")
)
stmt = stmt.outerjoin(
den_lk, and_(den_lk.c.id == self.telegrams.c.destination_name_id, den_lk.c.category == "destination_name")
)

# 1. Base Filters
filters: list[Any] = []
if query.sources:
filters.append(self.telegrams.c.source.in_(query.sources))
# Subquery to get IDs for sources
source_ids = select(self.string_lookup.c.id).where(
self.string_lookup.c.category == "source", self.string_lookup.c.value.in_(query.sources)
)
filters.append(self.telegrams.c.source_id.in_(source_ids))
if query.destinations:
filters.append(self.telegrams.c.destination.in_(query.destinations))
dest_ids = select(self.string_lookup.c.id).where(
self.string_lookup.c.category == "destination", self.string_lookup.c.value.in_(query.destinations)
)
filters.append(self.telegrams.c.destination_id.in_(dest_ids))
if query.telegram_types:
filters.append(self.telegrams.c.telegramtype.in_(query.telegram_types))
tt_ids = select(self.string_lookup.c.id).where(
self.string_lookup.c.category == "telegramtype", self.string_lookup.c.value.in_(query.telegram_types)
)
filters.append(self.telegrams.c.telegramtype_id.in_(tt_ids))
if query.directions:
filters.append(self.telegrams.c.direction.in_(query.directions))
dir_ids = select(self.string_lookup.c.id).where(
self.string_lookup.c.category == "direction", self.string_lookup.c.value.in_(query.directions)
)
filters.append(self.telegrams.c.direction_id.in_(dir_ids))
if query.dpt_mains:
filters.append(self.telegrams.c.dpt_main.in_(query.dpt_mains))

Expand Down Expand Up @@ -186,7 +260,7 @@ async def query(self, query: TelegramQuery) -> TelegramQueryResult:
)

# Use EXISTS to find rows within range of any pivot
stmt = select(self.telegrams).where(select(pivots).where(cond).exists())
stmt = stmt.where(select(pivots).where(cond).exists())
else:
if filters:
stmt = stmt.where(and_(*filters))
Expand Down Expand Up @@ -224,8 +298,8 @@ async def query(self, query: TelegramQuery) -> TelegramQueryResult:
value_numeric=row.value_numeric,
raw_data=row.raw_data,
data_secure=row.data_secure,
source_name=row.source_name,
destination_name=row.destination_name,
source_name=row.source_name or "",
destination_name=row.destination_name or "",
)
for row in rows
]
Expand Down
63 changes: 55 additions & 8 deletions src/knx_telegram_store/backends/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ async def initialize(self) -> None:
# 4. Convert to hypertable (idempotent)
await conn.execute(text("SELECT create_hypertable('telegrams', 'timestamp', if_not_exists => TRUE)"))

# 5. Warm the cache
await super().initialize()

def _upgrade_schema(self, connection) -> None:
"""Synchronous part of schema upgrade (run via run_sync)."""
inspector = inspect(connection)
Expand Down Expand Up @@ -79,25 +82,69 @@ def _upgrade_schema(self, connection) -> None:
text("ALTER TABLE telegrams ALTER COLUMN raw_data TYPE TEXT USING encode(raw_data, 'hex')")
)

# 2. Ensure all library columns exist
# 2. Handle normalization to string_lookup
if "source" in existing_columns:
cols_to_migrate = {
"source": "source",
"destination": "destination",
"telegramtype": "telegramtype",
"direction": "direction",
"dpt_name": "dpt_name",
"unit": "unit",
"source_name": "source_name",
"destination_name": "destination_name",
}

# Populate string_lookup table
for cat, old_col in cols_to_migrate.items():
if old_col in existing_columns:
connection.execute(
text(
f"INSERT INTO string_lookup (category, value) "
f"SELECT DISTINCT '{cat}', {old_col} FROM telegrams WHERE {old_col} IS NOT NULL "
f"ON CONFLICT DO NOTHING"
)
)

# Add *_id columns
for cat in cols_to_migrate:
id_col = f"{cat}_id"
if id_col not in existing_columns:
connection.execute(text(f"ALTER TABLE telegrams ADD COLUMN {id_col} INTEGER"))

# Update IDs using JOIN
for cat, old_col in cols_to_migrate.items():
connection.execute(
text(
f"UPDATE telegrams SET {cat}_id = sl.id "
f"FROM string_lookup sl WHERE sl.category='{cat}' AND sl.value=telegrams.{old_col}"
)
)

# Drop old columns
for old_col in cols_to_migrate.values():
connection.execute(text(f'ALTER TABLE telegrams DROP COLUMN "{old_col}"'))

# Re-fetch existing columns after drops
columns = inspector.get_columns("telegrams")
existing_columns = {col["name"] for col in columns}

# 3. Ensure all non-normalized library columns exist
expected_columns = {
"direction": "VARCHAR(20) DEFAULT 'Incoming'",
"value": "JSONB",
"value_numeric": "FLOAT",
"payload": "JSONB",
"dpt_name": "VARCHAR(100)",
"unit": "VARCHAR(20)",
"data_secure": "BOOLEAN",
"source_name": "VARCHAR(255) DEFAULT ''",
"destination_name": "VARCHAR(255) DEFAULT ''",
"dpt_main": "INTEGER",
"dpt_sub": "INTEGER",
}

for col_name, col_type in expected_columns.items():
if col_name not in existing_columns:
if col_name not in existing_columns and f"{col_name}_id" not in existing_columns:
connection.execute(text(f"ALTER TABLE telegrams ADD COLUMN {col_name} {col_type}"))
existing_columns.add(col_name)

# 3. Data migrations for old SpectrumKNX rows
# 4. Data migrations for old SpectrumKNX rows
# Old schema had value_numeric (FLOAT) and value_json (now payload),
# but no value (JSONB) column. Populate value from value_numeric
# so the library's query returns it correctly.
Expand Down
Loading
Loading