From 34e7a58bbcca550abfb8417caba5cee0f7e715b0 Mon Sep 17 00:00:00 2001 From: ssb-dor Date: Thu, 11 Dec 2025 16:28:59 +0100 Subject: [PATCH] Solution that might work --- poetry.lock | 36 ++++++++++++- pyproject.toml | 1 + .../modules/parquet_editor.py | 52 +++++++++++-------- 3 files changed, 65 insertions(+), 24 deletions(-) diff --git a/poetry.lock b/poetry.lock index 7ecbc796..322cbc64 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3498,6 +3498,18 @@ files = [ {file = "parsy-2.2.tar.gz", hash = "sha256:e943147644a8cf0d82d1bcb5c5867dd517495254cea3e3eb058b1e421cb7561f"}, ] +[[package]] +name = "pathlib-abc" +version = "0.5.2" +description = "Backport of pathlib ABCs" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "pathlib_abc-0.5.2-py3-none-any.whl", hash = "sha256:4c9d94cf1b23af417ce7c0417b43333b06a106c01000b286c99de230d95eefbb"}, + {file = "pathlib_abc-0.5.2.tar.gz", hash = "sha256:fcd56f147234645e2c59c7ae22808b34c364bb231f685ddd9f96885aed78a94c"}, +] + [[package]] name = "pathspec" version = "0.12.1" @@ -6253,6 +6265,28 @@ tzdata = {version = "*", markers = "platform_system == \"Windows\""} [package.extras] devenv = ["check-manifest", "pytest (>=4.3)", "pytest-cov", "pytest-mock (>=3.3)", "zest.releaser"] +[[package]] +name = "universal-pathlib" +version = "0.3.7" +description = "pathlib api extended to use fsspec backends" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "universal_pathlib-0.3.7-py3-none-any.whl", hash = "sha256:fb95117b20b5981f86ef9d887fddbf9c61d3596634ba42cccea444931d87c201"}, + {file = "universal_pathlib-0.3.7.tar.gz", hash = "sha256:36331056fa59a7d7cd3b61b4045f3a3418f446f23ec1a01d281c4510814b4b05"}, +] + +[package.dependencies] +fsspec = ">=2024.5.0" +pathlib-abc = ">=0.5.1,<0.6.0" + +[package.extras] +dev = ["adlfs (>=2024)", "cheroot", "fsspec[adl,gcs,github,http,s3,smb,ssh] (>=2024.5.0)", "gcsfs (>=2024.5.0)", "huggingface_hub", "moto[s3,server]", "pyftpdlib", "s3fs (>=2024.5.0)", "typing_extensions ; python_version < \"3.11\"", "webdav4[fsspec]", "wsgidav"] +dev-third-party = ["pydantic", "pydantic-settings"] +tests = ["mypy (>=1.10.0)", "packaging", "pydantic (>=2)", "pylint (>=2.17.4)", "pytest (>=8)", "pytest-cov (>=4.1.0)", "pytest-mock (>=3.12.0)", "pytest-mypy-plugins (>=3.1.2)", "pytest-sugar (>=0.9.7)"] +typechecking = ["mypy (>=1.10.0)", "pytest-mypy-plugins (>=3.1.2)"] + [[package]] name = "urllib3" version = "2.5.0" @@ -6904,4 +6938,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<4.0" -content-hash = "8af368699c45529d77111acca2460fd693cde1567c1226d64efc1e829ce25c90" +content-hash = "90aa728c3387f1479a8679cb43fe3ae9ad7ca8b33919b8fc7a76bf852afeea5e" diff --git a/pyproject.toml b/pyproject.toml index 72408eed..de1d44bb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,7 @@ dependencies = [ "dash-bootstrap-templates (>=2.1.0,<3.0.0)", "duckdb (==1.3.2)", "ssb-poc-statlog-model (>=1.0.0,<2.0.0)", + "universal-pathlib (>=0.3.7,<0.4.0)", ] [project.urls] diff --git a/src/ssb_dash_framework/modules/parquet_editor.py b/src/ssb_dash_framework/modules/parquet_editor.py index 716b0ce6..a177e4a4 100644 --- a/src/ssb_dash_framework/modules/parquet_editor.py +++ b/src/ssb_dash_framework/modules/parquet_editor.py @@ -5,7 +5,7 @@ from datetime import UTC from datetime import datetime from io import StringIO -from pathlib import Path +from upath import UPath from typing import Any import dash_ag_grid as dag @@ -83,7 +83,7 @@ def __init__( selected_inputs=id_vars, selected_states=[] ) self.file_path = data_source - path = Path(data_source) + path = UPath(data_source) self.log_filepath = get_log_path(data_source) self.label = path.stem @@ -323,12 +323,18 @@ def confirm_edit( change_to_log = self._build_process_log_entry(pending_edit) logger.debug(f"Record for changelog: {change_to_log}") - with open(self.log_filepath, "a", encoding="utf-8") as f: - logger.debug("Writing change") - f.write( - json.dumps(change_to_log, ensure_ascii=False, default=str) + "\n" - ) - logger.debug("Change written.") + logger.debug("Writing change") + p = UPath(self.log_filepath) + if p.exists(): + existing = p.read_text(encoding="utf-8") + else: + existing = "" + + new_line = json.dumps(change_to_log, ensure_ascii=False, default=str) + "\n" + content = existing + new_line + + p.write_text(content, encoding="utf-8") + logger.debug("Change written.") error_log = [ create_alert( "Prosesslogg oppdatert!", @@ -481,7 +487,7 @@ def __init__(self, id_vars: list[str], file_path: str) -> None: ) self.user = os.getenv("DAPLA_USER") self.tz = zoneinfo.ZoneInfo("Europe/Oslo") - path = Path(file_path) + path = UPath(file_path) self.log_filepath = get_log_path(file_path) self.label = "Changes - " + path.stem @@ -515,7 +521,7 @@ def layout(self) -> html.Div: return html.Div(self.module_layout) -def get_log_path(parquet_path: str | Path) -> Path: +def get_log_path(parquet_path: str | UPath) -> UPath: """Return the expected log file path (.jsonl) for a given parquet file. The function searches for known data-state subfolders in the parquet path @@ -526,20 +532,20 @@ def get_log_path(parquet_path: str | Path) -> Path: data_states = ["inndata", "klargjorte-data", "statistikk", "utdata"] log_subpath = "temp/parqueteditor" - p = Path(parquet_path) + p = UPath(parquet_path) posix = p.as_posix() for state in data_states: token = f"/{state}/" if token in posix: replaced = posix.replace(token, f"/{state}/{log_subpath}/") - return Path(replaced).with_suffix(".jsonl") + return UPath(replaced).with_suffix(".jsonl") print(f"Expecting subfolder {data_states}. Log file path set to parquet path.") return p.with_suffix(".jsonl") -def read_jsonl_log(path: str | Path) -> list[Any]: +def read_jsonl_log(path: str | UPath) -> list[Any]: """Reads the jsonl log. Args: @@ -550,14 +556,14 @@ def read_jsonl_log(path: str | Path) -> list[Any]: """ all_data = [] try: - with open(path, encoding="utf-8") as file: + with path.open(encoding="utf-8") as file: data = json.load(file) if isinstance(data, dict): all_data.append(data) elif isinstance(data, list): all_data.extend(data) except json.JSONDecodeError: - with open(path, encoding="utf-8") as file: + with path.open(encoding="utf-8") as file: for line in file: line = line.strip() if line: @@ -615,14 +621,14 @@ def _apply_change_detail( return data_to_change -def read_jsonl_file_to_string(file_path: str | Path) -> str: +def read_jsonl_file_to_string(file_path: str | UPath) -> str: """Reads a JSONL file and returns its contents as a single string.""" - file_path = Path(file_path) + file_path = UPath(file_path) with file_path.open("r", encoding="utf-8") as f: return f.read() -def log_as_text(file_path: str | Path) -> str: +def log_as_text(file_path: str | UPath) -> str: """Convert a JSONL string of change logs into a human-readable text format. Returns a single string. @@ -670,7 +676,7 @@ def _raise_if_duplicates(df: pd.DataFrame, subset: set[str] | list[str]) -> None ) -def apply_edits(parquet_path: str | Path) -> pd.DataFrame: +def apply_edits(parquet_path: str | UPath) -> pd.DataFrame: """Applies edits from the jsonl log to a parquet file. Args: @@ -722,27 +728,27 @@ def export_from_parqueteditor( for entry in processlog: if entry.get("data_target") == "data_target_placeholder": entry["data_target"] = data_target - data_path = Path(data_target) + data_path = UPath(data_target) bucket_root = data_path.parents[1] relative = data_path.relative_to(bucket_root).with_suffix(".jsonl") export_log_path = bucket_root / "logg" / "prosessdata" / relative export_log_path.parent.mkdir(parents=True, exist_ok=True) logger.debug(f"export_log_path:\n{export_log_path}") - Path(data_target).parent.mkdir(parents=True, exist_ok=True) + UPath(data_target).parent.mkdir(parents=True, exist_ok=True) export_log_path.parent.mkdir(parents=True, exist_ok=True) if export_log_path.exists() and not force_overwrite: raise FileExistsError( f"Process log '{export_log_path}' already exists. " f"Use force_overwrite=True to overwrite." ) - with open(export_log_path, "w", encoding="utf-8") as f: + with export_log_path.open("w", encoding="utf-8") as f: for entry in processlog: f.write(json.dumps(entry, ensure_ascii=False, default=str) + "\n") else: raise FileNotFoundError( f"Process log not found at '{log_path}'. No edits have been recorded for '{data_source}'." ) - data_target_path = Path(data_target) + data_target_path = UPath(data_target) if data_target_path.exists() and not force_overwrite: raise FileExistsError( f"Target parquet file '{data_target}' already exists. "