Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions src/knx_telegram_store/backends/base_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ def __init__(self, engine: AsyncEngine, retention_days: int | None = None) -> No
Column("data_secure", Boolean, nullable=True),
)

self.store_metadata = Table(
"store_metadata",
self._metadata,
Column("key", Text, primary_key=True),
Column("value", Text, nullable=True),
)

self._capabilities = StoreCapabilities(
supports_time_range=True,
supports_time_delta=True,
Expand All @@ -97,6 +104,16 @@ def max_telegrams(self) -> int | None:
"""SQL stores are typically not limited by count."""
return None

@wrap_store_errors
async def needs_migration(self) -> bool:
"""Check if any schema upgrades or migrations are pending."""
async with self.engine.connect() as conn:
return await conn.run_sync(self._needs_migration_sync)

def _needs_migration_sync(self, connection) -> bool:
"""Synchronously check if schema upgrades or legacy migrations are required."""
return False

@wrap_store_errors
async def initialize(self) -> None:
"""Set up the store (create tables, upgrades)."""
Expand Down Expand Up @@ -448,9 +465,12 @@ async def _populate_last_ga_telegrams_if_empty(self) -> None:
if count == 0:
t2 = self.telegrams.alias("t2")
subq = (
select(func.max(t2.c.timestamp))
.where(t2.c.destination_id == self.telegrams.c.destination_id)
.scalar_subquery()
select(
t2.c.destination_id,
func.max(t2.c.timestamp).label("max_ts"),
)
.group_by(t2.c.destination_id)
.subquery()
)

select_stmt = select(
Expand All @@ -468,7 +488,13 @@ async def _populate_last_ga_telegrams_if_empty(self) -> None:
self.telegrams.c.value_numeric,
self.telegrams.c.raw_data,
self.telegrams.c.data_secure,
).where(self.telegrams.c.timestamp == subq)
).join(
subq,
and_(
self.telegrams.c.destination_id == subq.c.destination_id,
self.telegrams.c.timestamp == subq.c.max_ts,
),
)

if self.engine.dialect.name == "sqlite":
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
Expand Down
202 changes: 201 additions & 1 deletion src/knx_telegram_store/backends/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def _upgrade_schema(self, connection) -> None:
connection.execute(
text(
"UPDATE telegrams SET value = to_jsonb(value_numeric) "
"WHERE value IS NULL AND value_numeric IS NOT NULL"
"WHERE (value IS NULL OR value = 'null'::jsonb) AND value_numeric IS NOT NULL"
)
)

Expand All @@ -173,3 +173,203 @@ def _upgrade_schema(self, connection) -> None:
"WHERE value_numeric IS NULL AND value_legacy_float IS NOT NULL"
)
)

# 5. Data unwrapping pass for legacy {"value": ...} wrapped structures
try:
# Postgres supports casting JSONB to text, so we can cast value::text or payload::text
rows = connection.execute(
text(
"SELECT timestamp, source_id, destination_id, value::text, payload::text FROM telegrams "
"WHERE (value::text LIKE '{\"value\":%' AND value IS NOT NULL) "
"OR (payload::text LIKE '{\"value\":%' AND payload IS NOT NULL)"
)
).fetchall()

if rows:
import json

for row in rows:
timestamp = row[0]
source_id = row[1]
destination_id = row[2]
val_str = row[3]
pay_str = row[4]

new_val = None
new_pay = None
needs_update = False

def unwrap(s):
if s is None:
return None, False
try:
if isinstance(s, dict):
d = s
else:
d = json.loads(s)
if isinstance(d, dict) and "value" in d and len(d) == 1:
return d["value"], True
except Exception:
pass
return s, False

if val_str is not None:
unwrapped_val, unwrapped = unwrap(val_str)
if unwrapped:
new_val = unwrapped_val
needs_update = True
else:
new_val = val_str

if pay_str is not None:
unwrapped_pay, unwrapped = unwrap(pay_str)
if unwrapped:
new_pay = unwrapped_pay
needs_update = True
else:
new_pay = pay_str

if needs_update:

def to_json_str(orig_val, new_val_unwrapped, did_unwrap):
if did_unwrap:
return json.dumps(new_val_unwrapped)
if orig_val is None:
return None
if isinstance(orig_val, dict | list | int | float | bool):
return json.dumps(orig_val)
try:
json.loads(orig_val)
return orig_val
except Exception:
return json.dumps(orig_val)

json_val = to_json_str(val_str, new_val, val_str != new_val)
json_pay = to_json_str(pay_str, new_pay, pay_str != new_pay)

connection.execute(
text(
"UPDATE telegrams SET value = :value, payload = :payload "
"WHERE timestamp = :timestamp AND source_id = :source_id AND destination_id = :destination_id"
),
{
"value": json_val,
"payload": json_pay,
"timestamp": timestamp,
"source_id": source_id,
"destination_id": destination_id,
},
)

# Record successful migration state in store_metadata
connection.execute(
text(
"INSERT INTO store_metadata (key, value) VALUES ('data_unwrapped', 'true') "
"ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value"
)
)
except Exception:
pass

def _needs_migration_sync(self, connection) -> bool:
"""Synchronously check if legacy Postgres schema migration is required."""
inspector = inspect(connection)
try:
columns = inspector.get_columns("telegrams")
except Exception:
return False
existing_columns = {col["name"] for col in columns}

# 1. Handle renames from legacy SpectrumKNX schema
renames = {
"source_address",
"target_address",
"telegram_type",
"value_json",
}
for old in renames:
if old in existing_columns:
return True

# Special value float rename check
if "value" in existing_columns:
is_float = any(c["name"] == "value" and "double" in str(c["type"]).lower() for c in columns)
if is_float:
return True

# raw_data bytea check
if "raw_data" in existing_columns:
for col in columns:
if col["name"] == "raw_data" and "bytea" in str(col["type"]).lower():
return True

# 2. Handle normalization to string_lookup
if "source" in existing_columns:
return True

# Add *_id columns
cols_to_migrate = [
"source_id",
"destination_id",
"telegramtype_id",
"direction_id",
"source_name_id",
"destination_name_id",
]
for col_id in cols_to_migrate:
if col_id not in existing_columns:
return True

# Missing columns
expected_columns = {
"payload",
"dpt_main",
"dpt_sub",
"value",
"value_numeric",
"data_secure",
}
for col_name in expected_columns:
if col_name not in existing_columns:
return True

# 4.5. Check if there are any legacy 'null' values to recover from value_numeric
if "value" in existing_columns and "value_numeric" in existing_columns:
try:
row = connection.execute(
text(
"SELECT 1 FROM telegrams WHERE (value IS NULL OR value = 'null'::jsonb) AND value_numeric IS NOT NULL LIMIT 1"
)
).fetchone()
if row:
return True
except Exception:
pass

# 5. Check if any rows contain legacy {"value": ...} wrapped values
# Skip this scan entirely if the metadata table indicates we already unwrapped
is_unwrapped = False
try:
if inspector.has_table("store_metadata"):
row = connection.execute(
text("SELECT value FROM store_metadata WHERE key = 'data_unwrapped'")
).fetchone()
if row and row[0] == "true":
is_unwrapped = True
except Exception:
pass

if not is_unwrapped:
try:
row = connection.execute(
text(
"SELECT 1 FROM telegrams WHERE (value::text LIKE '{\"value\":%' AND value IS NOT NULL) "
"OR (payload::text LIKE '{\"value\":%' AND payload IS NOT NULL) LIMIT 1"
)
).fetchone()
if row:
return True
except Exception:
pass

return False
Loading
Loading