Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies = [
"dash-bootstrap-templates (>=2.1.0,<3.0.0)",
"duckdb (==1.3.2)",
"ssb-poc-statlog-model (>=1.0.0,<2.0.0)",
"universal-pathlib (>=0.3.7,<0.4.0)",
]

[project.urls]
Expand Down
52 changes: 29 additions & 23 deletions src/ssb_dash_framework/modules/parquet_editor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datetime import UTC
from datetime import datetime
from io import StringIO
from pathlib import Path
from upath import UPath
from typing import Any

import dash_ag_grid as dag
Expand Down Expand Up @@ -83,7 +83,7 @@ def __init__(
selected_inputs=id_vars, selected_states=[]
)
self.file_path = data_source
path = Path(data_source)
path = UPath(data_source)
self.log_filepath = get_log_path(data_source)
self.label = path.stem

Expand Down Expand Up @@ -323,12 +323,18 @@ def confirm_edit(
change_to_log = self._build_process_log_entry(pending_edit)

logger.debug(f"Record for changelog: {change_to_log}")
with open(self.log_filepath, "a", encoding="utf-8") as f:
logger.debug("Writing change")
f.write(
json.dumps(change_to_log, ensure_ascii=False, default=str) + "\n"
)
logger.debug("Change written.")
logger.debug("Writing change")
p = UPath(self.log_filepath)
if p.exists():
existing = p.read_text(encoding="utf-8")
else:
existing = ""

new_line = json.dumps(change_to_log, ensure_ascii=False, default=str) + "\n"
content = existing + new_line

p.write_text(content, encoding="utf-8")
logger.debug("Change written.")
error_log = [
create_alert(
"Prosesslogg oppdatert!",
Expand Down Expand Up @@ -481,7 +487,7 @@ def __init__(self, id_vars: list[str], file_path: str) -> None:
)
self.user = os.getenv("DAPLA_USER")
self.tz = zoneinfo.ZoneInfo("Europe/Oslo")
path = Path(file_path)
path = UPath(file_path)
self.log_filepath = get_log_path(file_path)
self.label = "Changes - " + path.stem

Expand Down Expand Up @@ -515,7 +521,7 @@ def layout(self) -> html.Div:
return html.Div(self.module_layout)


def get_log_path(parquet_path: str | Path) -> Path:
def get_log_path(parquet_path: str | UPath) -> UPath:
"""Return the expected log file path (.jsonl) for a given parquet file.

The function searches for known data-state subfolders in the parquet path
Expand All @@ -526,20 +532,20 @@ def get_log_path(parquet_path: str | Path) -> Path:
data_states = ["inndata", "klargjorte-data", "statistikk", "utdata"]
log_subpath = "temp/parqueteditor"

p = Path(parquet_path)
p = UPath(parquet_path)
posix = p.as_posix()

for state in data_states:
token = f"/{state}/"
if token in posix:
replaced = posix.replace(token, f"/{state}/{log_subpath}/")
return Path(replaced).with_suffix(".jsonl")
return UPath(replaced).with_suffix(".jsonl")

print(f"Expecting subfolder {data_states}. Log file path set to parquet path.")
return p.with_suffix(".jsonl")


def read_jsonl_log(path: str | Path) -> list[Any]:
def read_jsonl_log(path: str | UPath) -> list[Any]:
"""Reads the jsonl log.

Args:
Expand All @@ -550,14 +556,14 @@ def read_jsonl_log(path: str | Path) -> list[Any]:
"""
all_data = []
try:
with open(path, encoding="utf-8") as file:
with path.open(encoding="utf-8") as file:
data = json.load(file)
if isinstance(data, dict):
all_data.append(data)
elif isinstance(data, list):
all_data.extend(data)
except json.JSONDecodeError:
with open(path, encoding="utf-8") as file:
with path.open(encoding="utf-8") as file:
for line in file:
line = line.strip()
if line:
Expand Down Expand Up @@ -615,14 +621,14 @@ def _apply_change_detail(
return data_to_change


def read_jsonl_file_to_string(file_path: str | Path) -> str:
def read_jsonl_file_to_string(file_path: str | UPath) -> str:
"""Reads a JSONL file and returns its contents as a single string."""
file_path = Path(file_path)
file_path = UPath(file_path)
with file_path.open("r", encoding="utf-8") as f:
return f.read()


def log_as_text(file_path: str | Path) -> str:
def log_as_text(file_path: str | UPath) -> str:
"""Convert a JSONL string of change logs into a human-readable text format.

Returns a single string.
Expand Down Expand Up @@ -670,7 +676,7 @@ def _raise_if_duplicates(df: pd.DataFrame, subset: set[str] | list[str]) -> None
)


def apply_edits(parquet_path: str | Path) -> pd.DataFrame:
def apply_edits(parquet_path: str | UPath) -> pd.DataFrame:
"""Applies edits from the jsonl log to a parquet file.

Args:
Expand Down Expand Up @@ -722,27 +728,27 @@ def export_from_parqueteditor(
for entry in processlog:
if entry.get("data_target") == "data_target_placeholder":
entry["data_target"] = data_target
data_path = Path(data_target)
data_path = UPath(data_target)
bucket_root = data_path.parents[1]
relative = data_path.relative_to(bucket_root).with_suffix(".jsonl")
export_log_path = bucket_root / "logg" / "prosessdata" / relative
export_log_path.parent.mkdir(parents=True, exist_ok=True)
logger.debug(f"export_log_path:\n{export_log_path}")
Path(data_target).parent.mkdir(parents=True, exist_ok=True)
UPath(data_target).parent.mkdir(parents=True, exist_ok=True)
export_log_path.parent.mkdir(parents=True, exist_ok=True)
if export_log_path.exists() and not force_overwrite:
raise FileExistsError(
f"Process log '{export_log_path}' already exists. "
f"Use force_overwrite=True to overwrite."
)
with open(export_log_path, "w", encoding="utf-8") as f:
with export_log_path.open("w", encoding="utf-8") as f:
for entry in processlog:
f.write(json.dumps(entry, ensure_ascii=False, default=str) + "\n")
else:
raise FileNotFoundError(
f"Process log not found at '{log_path}'. No edits have been recorded for '{data_source}'."
)
data_target_path = Path(data_target)
data_target_path = UPath(data_target)
if data_target_path.exists() and not force_overwrite:
raise FileExistsError(
f"Target parquet file '{data_target}' already exists. "
Expand Down
Loading