Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
ef1fa4b
Update update_config.py - update log files in background
tpd-opitz May 8, 2026
7c39914
Update update_config.py - fix flake
tpd-opitz May 8, 2026
3df4f32
Update update_config.py - import asyncio
tpd-opitz May 8, 2026
354c395
Update update_config.py - delete whitespace
tpd-opitz May 8, 2026
3f63235
Update update_config.py - delete whitespace again
tpd-opitz May 8, 2026
9d7443c
Add 2 second wait time for async action to complete after line 119
fdai0127 May 8, 2026
dcd793a
Merge branch 'openWB:master' into improvement/speed-up-boot-time
tpd-opitz May 11, 2026
0c71d77
address copilot issues
tpd-opitz May 12, 2026
71620d0
Merge branch 'openWB:master' into improvement/speed-up-boot-time
tpd-opitz May 12, 2026
1d42379
fix concurrency issues addressed by github copilot
tpd-opitz May 13, 2026
f29b08c
Merge branch 'openWB:master' into improvement/speed-up-boot-time
tpd-opitz May 13, 2026
ad5fe64
Merge branch 'openWB:master' into improvement/speed-up-boot-time
tpd-opitz May 15, 2026
365c348
Merge branch 'openWB:master' into improvement/speed-up-boot-time
tpd-opitz May 18, 2026
8ad6ded
Merge branch 'openWB:master' into improvement/speed-up-boot-time
tpd-opitz May 18, 2026
d770836
Merge remote-tracking branch 'upstream/master' into improvement/speed…
tpd-opitz May 19, 2026
6ca816e
Merge branch 'openWB:master' into improvement/speed-up-boot-time
tpd-opitz May 20, 2026
00c2794
Merge branch 'openWB:master' into improvement/speed-up-boot-time
tpd-opitz May 20, 2026
3f64ba2
Merge branch 'openWB:master' into improvement/speed-up-boot-time
tpd-opitz May 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 72 additions & 46 deletions packages/helpermodules/update_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pathlib import Path
import re
import time
import threading
from typing import List, Optional
from paho.mqtt.client import Client as MqttClient, MQTTMessage

Expand Down Expand Up @@ -3077,51 +3078,76 @@ def upgrade(topic: str, payload) -> Optional[dict]:
self._append_datastore_version(121)

def upgrade_datastore_122(self) -> None:
for folder in ("daily_log", "monthly_log"):
path_list = Path(Path(__file__).resolve().parents[2]/"data"/folder).glob('**/*.json')
for path in path_list:
with open(path, "r+") as jsonFile:
try:
content = json.load(jsonFile)
for entry in content["entries"]:
if entry.get("prices") is not None and entry["prices"].get("fault_state") is None:
entry["prices"]["fault_state"] = None
for cp in entry.get("cp", {}).values():
if cp.get("fault_state") is None:
cp["fault_state"] = None
for ev_data in entry.get("ev", {}).values():
if ev_data.get("fault_state") is None:
ev_data["fault_state"] = None
for counter in entry.get("counter", {}).values():
if counter.get("fault_state") is None:
counter["fault_state"] = None
for pv in entry.get("pv", {}).values():
if pv.get("fault_state") is None:
pv["fault_state"] = None
for bat in entry.get("bat", {}).values():
if bat.get("fault_state") is None:
bat["fault_state"] = None
if entry.get("hc") is not None and entry["hc"].get("all") is not None:
if entry["hc"]["all"].get("fault_state") is None:
entry["hc"]["all"]["fault_state"] = None
"""Process log files with fault_state updates. Latest file first (synchronous),
then process remaining files in a background thread."""

jsonFile.seek(0)
json.dump(content, jsonFile)
jsonFile.truncate()
log.debug(f"Format der Logdatei '{path}' aktualisiert.")
except FileNotFoundError:
pass
except Exception:
log.exception(f"Logdatei '{path}' konnte nicht konvertiert werden.")
self._append_datastore_version(122)
def process_file(file_path: Path) -> bool:
"""Process a single file and return True if successful, False otherwise."""
try:
with open(file_path, "r+") as jsonFile:
content = json.load(jsonFile)
for entry in content["entries"]:
if entry.get("prices") is not None and entry["prices"].get("fault_state") is None:
entry["prices"]["fault_state"] = None
for cp in entry.get("cp", {}).values():
if cp.get("fault_state") is None:
cp["fault_state"] = None
for ev_data in entry.get("ev", {}).values():
if ev_data.get("fault_state") is None:
ev_data["fault_state"] = None
for counter in entry.get("counter", {}).values():
if counter.get("fault_state") is None:
counter["fault_state"] = None
for pv in entry.get("pv", {}).values():
if pv.get("fault_state") is None:
pv["fault_state"] = None
for bat in entry.get("bat", {}).values():
if bat.get("fault_state") is None:
bat["fault_state"] = None
if entry.get("hc") is not None and entry["hc"].get("all") is not None:
if entry["hc"]["all"].get("fault_state") is None:
entry["hc"]["all"]["fault_state"] = None

def upgrade_datastore_123(self) -> None:
def upgrade(topic: str, payload) -> Optional[dict]:
if re.search("^openWB/system/backup_cloud/config$", topic) is not None:
configuration_payload = decode_payload(payload)
if (configuration_payload.get("type") == "nextcloud" and
configuration_payload["configuration"].get("base_path") is None):
configuration_payload["configuration"].update({"base_path": None})
return {topic: configuration_payload}
self._loop_all_received_topics(upgrade)
self._append_datastore_version(123)
jsonFile.seek(0)
json.dump(content, jsonFile)
jsonFile.truncate()
log.debug(f"Format der Logdatei '{file_path}' aktualisiert.")
return True
except FileNotFoundError:
pass
except Exception:
log.exception(f"Logdatei '{file_path}' konnte nicht konvertiert werden.")
return False

def process_files(file_paths: List[Path]) -> None:
"""Process remaining files and append datastore version afterwards."""
for file_path in file_paths:
process_file(file_path)
self._append_datastore_version(122)

Comment on lines +3122 to +3127
all_remaining_files = []
for folder in ("daily_log", "monthly_log"):
folder_path = self.base_path / "data" / folder
# Get all JSON files and sort by name (date format yyyymm(dd).json, newest first)
path_list = sorted(
folder_path.glob('**/*.json'),
key=lambda p: p.name,
reverse=True
)

if not path_list:
continue

# Process latest file synchronously
latest_file = path_list[0]
log.debug(f"Processing latest file synchronously: {latest_file}")
process_file(latest_file)
all_remaining_files.extend(path_list[1:])

# Process all remaining files in background to avoid blocking startup
if len(all_remaining_files) > 0:
log.debug(f"Starting background thread to process {len(all_remaining_files)} remaining files")
threading.Thread(target=process_files, args=(all_remaining_files,), daemon=True).start()
else:
# No remaining files, just append datastore version
self._append_datastore_version(122)
110 changes: 107 additions & 3 deletions packages/helpermodules/update_config_test.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from unittest.mock import Mock, patch, mock_open
from helpermodules import update_config
import json
import random
import threading
from pathlib import Path
from unittest.mock import Mock, mock_open, patch

import pytest

from helpermodules import update_config
from helpermodules.update_config import UpdateConfig


Expand Down Expand Up @@ -110,13 +113,114 @@ def test_upgrade_datastore_122(name, monkeypatch):

mock_dump = Mock()
monkeypatch.setattr(update_config.json, "dump", mock_dump)
mock_glob = Mock(return_value=["dummy_path"])
mock_glob = Mock(return_value=[Path("20240512.json")])
monkeypatch.setattr(update_config.Path, "glob", mock_glob)

def immediate_thread(*args, **kwargs):
thread = threading.Thread(*args, **kwargs)

def immediate_start():
target = kwargs.get("target")
thread_args = kwargs.get("args", ())
if target is not None:
target(*thread_args)

thread.start = immediate_start
return thread

monkeypatch.setattr(update_config.threading, "Thread", immediate_thread)
Comment thread
tpd-opitz marked this conversation as resolved.

# Act
with patch("builtins.open", mock_open(read_data=json.dumps(log_content))):
uc.upgrade_datastore_122()

# Assert
assert mock_dump.call_args_list[0].args[0] == expected_content
assert uc.all_received_topics["openWB/system/datastore_version"] == [122]


@pytest.mark.parametrize(
"folders",
[
pytest.param(
{
"daily_log": [
"20240501.json"
],
"monthly_log": [
"202401.json"
],
},
id="single_file_in_each_log_folder",
),
pytest.param(
{
"daily_log": [
"20240501.json", "20240502.json", "20240503.json", "20240504.json",
"20240505.json", "20240506.json", "20240507.json", "20240508.json",
"20240509.json", "20240510.json", "20240511.json", "20240512.json",
"20240513.json",
],
"monthly_log": [
"202401.json", "202402.json", "202403.json", "202404.json",
"202405.json", "202406.json", "202407.json", "202408.json",
"202409.json", "202410.json", "202411.json", "202412.json",
],
},
id="multiple_files_in_log_folders",
),
],
)
def test_latest_file_processed_sync_rest_async(monkeypatch, folders):
"""Verify latest file per folder is processed synchronously and the rest in one async batch."""
uc = UpdateConfig()
uc.base_path = Path("/mock_base")
uc.all_received_topics = {"openWB/system/datastore_version": []}

shuffled_paths_by_folder = {}
for folder_name, file_names in folders.items():
shuffled_names = file_names[:]
random.shuffle(shuffled_names)
shuffled_paths_by_folder[folder_name] = [
Path(f"/mock_base/data/{folder_name}/{file_name}")
for file_name in shuffled_names
]
Comment on lines +180 to +187

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no flakiness caused by the random file order


def mock_glob(self, pattern):
folder_name = self.name
return shuffled_paths_by_folder.get(folder_name, [])

monkeypatch.setattr(update_config.Path, "glob", mock_glob)
monkeypatch.setattr(
"builtins.open",
mock_open(read_data=json.dumps({"entries": [{"prices": {}}]})),
)

thread_calls = []

def capture_thread(*args, **kwargs):
thread_calls.append(kwargs)
return type("MockThread", (), {"start": lambda self: None})()

monkeypatch.setattr(update_config.threading, "Thread", capture_thread)

uc.upgrade_datastore_122()

expected_remaining_count = (
sum(len(file_names) for file_names in folders.values()) - len(folders)
)
if expected_remaining_count == 0:
assert thread_calls == [], "No thread should be created when there are no remaining files to process"
return

remaining_files = thread_calls[0]["args"][0]
assert len(remaining_files) == expected_remaining_count
assert thread_calls[0].get("daemon") is True, "Thread should be created as daemon"

expected_remaining = []
for folder_name, file_names in folders.items():
expected_remaining.extend(
Path(f"/mock_base/data/{folder_name}/{file_name}")
for file_name in sorted(file_names, reverse=True)[1:]
)
assert remaining_files == expected_remaining
Loading