diff --git a/packages/helpermodules/update_config.py b/packages/helpermodules/update_config.py index 12ae89ddbe..a20a79d835 100644 --- a/packages/helpermodules/update_config.py +++ b/packages/helpermodules/update_config.py @@ -8,6 +8,7 @@ from pathlib import Path import re import time +import threading from typing import List, Optional from paho.mqtt.client import Client as MqttClient, MQTTMessage @@ -3077,51 +3078,76 @@ def upgrade(topic: str, payload) -> Optional[dict]: self._append_datastore_version(121) def upgrade_datastore_122(self) -> None: - for folder in ("daily_log", "monthly_log"): - path_list = Path(Path(__file__).resolve().parents[2]/"data"/folder).glob('**/*.json') - for path in path_list: - with open(path, "r+") as jsonFile: - try: - content = json.load(jsonFile) - for entry in content["entries"]: - if entry.get("prices") is not None and entry["prices"].get("fault_state") is None: - entry["prices"]["fault_state"] = None - for cp in entry.get("cp", {}).values(): - if cp.get("fault_state") is None: - cp["fault_state"] = None - for ev_data in entry.get("ev", {}).values(): - if ev_data.get("fault_state") is None: - ev_data["fault_state"] = None - for counter in entry.get("counter", {}).values(): - if counter.get("fault_state") is None: - counter["fault_state"] = None - for pv in entry.get("pv", {}).values(): - if pv.get("fault_state") is None: - pv["fault_state"] = None - for bat in entry.get("bat", {}).values(): - if bat.get("fault_state") is None: - bat["fault_state"] = None - if entry.get("hc") is not None and entry["hc"].get("all") is not None: - if entry["hc"]["all"].get("fault_state") is None: - entry["hc"]["all"]["fault_state"] = None + """Process log files with fault_state updates. Latest file first (synchronous), + then process remaining files in a background thread.""" - jsonFile.seek(0) - json.dump(content, jsonFile) - jsonFile.truncate() - log.debug(f"Format der Logdatei '{path}' aktualisiert.") - except FileNotFoundError: - pass - except Exception: - log.exception(f"Logdatei '{path}' konnte nicht konvertiert werden.") - self._append_datastore_version(122) + def process_file(file_path: Path) -> bool: + """Process a single file and return True if successful, False otherwise.""" + try: + with open(file_path, "r+") as jsonFile: + content = json.load(jsonFile) + for entry in content["entries"]: + if entry.get("prices") is not None and entry["prices"].get("fault_state") is None: + entry["prices"]["fault_state"] = None + for cp in entry.get("cp", {}).values(): + if cp.get("fault_state") is None: + cp["fault_state"] = None + for ev_data in entry.get("ev", {}).values(): + if ev_data.get("fault_state") is None: + ev_data["fault_state"] = None + for counter in entry.get("counter", {}).values(): + if counter.get("fault_state") is None: + counter["fault_state"] = None + for pv in entry.get("pv", {}).values(): + if pv.get("fault_state") is None: + pv["fault_state"] = None + for bat in entry.get("bat", {}).values(): + if bat.get("fault_state") is None: + bat["fault_state"] = None + if entry.get("hc") is not None and entry["hc"].get("all") is not None: + if entry["hc"]["all"].get("fault_state") is None: + entry["hc"]["all"]["fault_state"] = None - def upgrade_datastore_123(self) -> None: - def upgrade(topic: str, payload) -> Optional[dict]: - if re.search("^openWB/system/backup_cloud/config$", topic) is not None: - configuration_payload = decode_payload(payload) - if (configuration_payload.get("type") == "nextcloud" and - configuration_payload["configuration"].get("base_path") is None): - configuration_payload["configuration"].update({"base_path": None}) - return {topic: configuration_payload} - self._loop_all_received_topics(upgrade) - self._append_datastore_version(123) + jsonFile.seek(0) + json.dump(content, jsonFile) + jsonFile.truncate() + log.debug(f"Format der Logdatei '{file_path}' aktualisiert.") + return True + except FileNotFoundError: + pass + except Exception: + log.exception(f"Logdatei '{file_path}' konnte nicht konvertiert werden.") + return False + + def process_files(file_paths: List[Path]) -> None: + """Process remaining files and append datastore version afterwards.""" + for file_path in file_paths: + process_file(file_path) + self._append_datastore_version(122) + + all_remaining_files = [] + for folder in ("daily_log", "monthly_log"): + folder_path = self.base_path / "data" / folder + # Get all JSON files and sort by name (date format yyyymm(dd).json, newest first) + path_list = sorted( + folder_path.glob('**/*.json'), + key=lambda p: p.name, + reverse=True + ) + + if not path_list: + continue + + # Process latest file synchronously + latest_file = path_list[0] + log.debug(f"Processing latest file synchronously: {latest_file}") + process_file(latest_file) + all_remaining_files.extend(path_list[1:]) + + # Process all remaining files in background to avoid blocking startup + if len(all_remaining_files) > 0: + log.debug(f"Starting background thread to process {len(all_remaining_files)} remaining files") + threading.Thread(target=process_files, args=(all_remaining_files,), daemon=True).start() + else: + # No remaining files, just append datastore version + self._append_datastore_version(122) diff --git a/packages/helpermodules/update_config_test.py b/packages/helpermodules/update_config_test.py index 6aa1001c75..87ef3bf33f 100644 --- a/packages/helpermodules/update_config_test.py +++ b/packages/helpermodules/update_config_test.py @@ -1,9 +1,12 @@ -from unittest.mock import Mock, patch, mock_open -from helpermodules import update_config import json +import random +import threading from pathlib import Path +from unittest.mock import Mock, mock_open, patch import pytest + +from helpermodules import update_config from helpermodules.update_config import UpdateConfig @@ -110,9 +113,23 @@ def test_upgrade_datastore_122(name, monkeypatch): mock_dump = Mock() monkeypatch.setattr(update_config.json, "dump", mock_dump) - mock_glob = Mock(return_value=["dummy_path"]) + mock_glob = Mock(return_value=[Path("20240512.json")]) monkeypatch.setattr(update_config.Path, "glob", mock_glob) + def immediate_thread(*args, **kwargs): + thread = threading.Thread(*args, **kwargs) + + def immediate_start(): + target = kwargs.get("target") + thread_args = kwargs.get("args", ()) + if target is not None: + target(*thread_args) + + thread.start = immediate_start + return thread + + monkeypatch.setattr(update_config.threading, "Thread", immediate_thread) + # Act with patch("builtins.open", mock_open(read_data=json.dumps(log_content))): uc.upgrade_datastore_122() @@ -120,3 +137,90 @@ def test_upgrade_datastore_122(name, monkeypatch): # Assert assert mock_dump.call_args_list[0].args[0] == expected_content assert uc.all_received_topics["openWB/system/datastore_version"] == [122] + + +@pytest.mark.parametrize( + "folders", + [ + pytest.param( + { + "daily_log": [ + "20240501.json" + ], + "monthly_log": [ + "202401.json" + ], + }, + id="single_file_in_each_log_folder", + ), + pytest.param( + { + "daily_log": [ + "20240501.json", "20240502.json", "20240503.json", "20240504.json", + "20240505.json", "20240506.json", "20240507.json", "20240508.json", + "20240509.json", "20240510.json", "20240511.json", "20240512.json", + "20240513.json", + ], + "monthly_log": [ + "202401.json", "202402.json", "202403.json", "202404.json", + "202405.json", "202406.json", "202407.json", "202408.json", + "202409.json", "202410.json", "202411.json", "202412.json", + ], + }, + id="multiple_files_in_log_folders", + ), + ], +) +def test_latest_file_processed_sync_rest_async(monkeypatch, folders): + """Verify latest file per folder is processed synchronously and the rest in one async batch.""" + uc = UpdateConfig() + uc.base_path = Path("/mock_base") + uc.all_received_topics = {"openWB/system/datastore_version": []} + + shuffled_paths_by_folder = {} + for folder_name, file_names in folders.items(): + shuffled_names = file_names[:] + random.shuffle(shuffled_names) + shuffled_paths_by_folder[folder_name] = [ + Path(f"/mock_base/data/{folder_name}/{file_name}") + for file_name in shuffled_names + ] + + def mock_glob(self, pattern): + folder_name = self.name + return shuffled_paths_by_folder.get(folder_name, []) + + monkeypatch.setattr(update_config.Path, "glob", mock_glob) + monkeypatch.setattr( + "builtins.open", + mock_open(read_data=json.dumps({"entries": [{"prices": {}}]})), + ) + + thread_calls = [] + + def capture_thread(*args, **kwargs): + thread_calls.append(kwargs) + return type("MockThread", (), {"start": lambda self: None})() + + monkeypatch.setattr(update_config.threading, "Thread", capture_thread) + + uc.upgrade_datastore_122() + + expected_remaining_count = ( + sum(len(file_names) for file_names in folders.values()) - len(folders) + ) + if expected_remaining_count == 0: + assert thread_calls == [], "No thread should be created when there are no remaining files to process" + return + + remaining_files = thread_calls[0]["args"][0] + assert len(remaining_files) == expected_remaining_count + assert thread_calls[0].get("daemon") is True, "Thread should be created as daemon" + + expected_remaining = [] + for folder_name, file_names in folders.items(): + expected_remaining.extend( + Path(f"/mock_base/data/{folder_name}/{file_name}") + for file_name in sorted(file_names, reverse=True)[1:] + ) + assert remaining_files == expected_remaining