Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dependencies = [
"aiohttp>=3.10.5",
"aiohttp-retry>=2.9.1",
"pytest-asyncio>=0.24.0",
"gcsfs>=2025.5.1,<2026",
"gcsfs>=2025.5.1",
"deprecated>=1.2.18",
"types-deprecated>=1.2.15.20250304",
]
Expand Down
215 changes: 166 additions & 49 deletions src/dapla_pseudo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from dapla_pseudo.v1.models.core import PseudoKeyset
from dapla_pseudo.v1.models.core import PseudoRule
from dapla_pseudo.v1.models.core import RedactKeywordArgs
from dapla_pseudo.v1.mutable_dataframe import FieldMatch
from dapla_pseudo.v1.mutable_dataframe import MutableDataFrame
from dapla_pseudo.v1.supported_file_format import SupportedOutputFileFormat

Expand Down Expand Up @@ -161,54 +162,170 @@ def build_pseudo_field_request(
"""Builds a FieldRequest object."""
mutable_df.match_rules(rules, target_rules)
matched_fields = mutable_df.get_matched_fields()
requests: list[PseudoFieldRequest | DepseudoFieldRequest | RepseudoFieldRequest] = (
[]
if mutable_df.hierarchical:
return _build_hierarchical_field_requests(
pseudo_operation=pseudo_operation,
mutable_df=mutable_df,
matched_fields=matched_fields,
custom_keyset=custom_keyset,
target_custom_keyset=target_custom_keyset,
target_rules=target_rules,
)

else:
return _build_tabular_field_requests(
pseudo_operation=pseudo_operation,
matched_fields=matched_fields,
custom_keyset=custom_keyset,
target_custom_keyset=target_custom_keyset,
target_rules=target_rules,
)


def _build_tabular_field_requests(
pseudo_operation: PseudoOperation,
matched_fields: dict[str, FieldMatch],
custom_keyset: PseudoKeyset | str | None,
target_custom_keyset: PseudoKeyset | str | None,
target_rules: list[PseudoRule] | None,
) -> list[PseudoFieldRequest | DepseudoFieldRequest | RepseudoFieldRequest]:
return [
_build_single_field_request(
pseudo_operation=pseudo_operation,
request_name=field.path,
representative=field,
values=field.get_value(),
custom_keyset=custom_keyset,
target_custom_keyset=target_custom_keyset,
target_rules=target_rules,
)
for field in matched_fields.values()
]


def _build_hierarchical_field_requests(
pseudo_operation: PseudoOperation,
mutable_df: MutableDataFrame,
matched_fields: dict[str, FieldMatch],
custom_keyset: PseudoKeyset | str | None,
target_custom_keyset: PseudoKeyset | str | None,
target_rules: list[PseudoRule] | None,
) -> list[PseudoFieldRequest | DepseudoFieldRequest | RepseudoFieldRequest]:
grouped_matches = _group_hierarchical_fields_for_requests(
mutable_df=mutable_df,
matched_fields=matched_fields,
)
return [
_build_single_field_request(
pseudo_operation=pseudo_operation,
request_name=request_name,
representative=fields[0],
values=[value for field in fields for value in field.get_value()],
custom_keyset=custom_keyset,
target_custom_keyset=target_custom_keyset,
target_rules=target_rules,
)
for request_name, fields in grouped_matches
]


def _build_single_field_request(
pseudo_operation: PseudoOperation,
request_name: str,
representative: FieldMatch,
values: list[str | int | None],
custom_keyset: PseudoKeyset | str | None,
target_custom_keyset: PseudoKeyset | str | None,
target_rules: list[PseudoRule] | None,
) -> PseudoFieldRequest | DepseudoFieldRequest | RepseudoFieldRequest:
req: PseudoFieldRequest | DepseudoFieldRequest | RepseudoFieldRequest
match pseudo_operation:
case PseudoOperation.PSEUDONYMIZE:
for field in matched_fields.values():
try:
req = PseudoFieldRequest(
pseudo_func=field.func,
name=field.path,
pattern=field.pattern,
values=field.get_value(),
keyset=KeyWrapper(custom_keyset).keyset,
)
requests.append(req)
except ValidationError as e:
raise Exception(f"Path or column: {field.path}") from e
case PseudoOperation.DEPSEUDONYMIZE:
for field in matched_fields.values():
try:
req = DepseudoFieldRequest(
pseudo_func=field.func,
name=field.path,
pattern=field.pattern,
values=field.get_value(),
keyset=KeyWrapper(custom_keyset).keyset,
)
requests.append(req)
except ValidationError as e:
raise Exception(f"Path or column: {field.path}") from e

case PseudoOperation.REPSEUDONYMIZE:
if target_rules is not None:
for field in matched_fields.values():
try:
req = RepseudoFieldRequest(
source_pseudo_func=field.func,
target_pseudo_func=field.target_func,
name=field.path,
pattern=field.pattern,
values=field.get_value(),
source_keyset=KeyWrapper(custom_keyset).keyset,
target_keyset=KeyWrapper(target_custom_keyset).keyset,
)
requests.append(req)
except ValidationError as e:
raise Exception(f"Path or column: {field.path}") from e
else:
raise ValueError("Found no target rules")
return requests

try:
match pseudo_operation:
case PseudoOperation.PSEUDONYMIZE:
req = PseudoFieldRequest(
pseudo_func=representative.func,
name=request_name,
pattern=representative.pattern,
values=values,
keyset=KeyWrapper(custom_keyset).keyset,
)
case PseudoOperation.DEPSEUDONYMIZE:
req = DepseudoFieldRequest(
pseudo_func=representative.func,
name=request_name,
pattern=representative.pattern,
values=values,
keyset=KeyWrapper(custom_keyset).keyset,
)
case PseudoOperation.REPSEUDONYMIZE:
if target_rules is None:
raise ValueError("Found no target rules")

req = RepseudoFieldRequest(
source_pseudo_func=representative.func,
target_pseudo_func=representative.target_func,
name=request_name,
pattern=representative.pattern,
values=values,
source_keyset=KeyWrapper(custom_keyset).keyset,
target_keyset=KeyWrapper(target_custom_keyset).keyset,
)
except ValidationError as e:
raise Exception(f"Path or column: {request_name}") from e

return req


def _group_hierarchical_fields_for_requests(
mutable_df: MutableDataFrame,
matched_fields: dict[str, FieldMatch],
) -> list[tuple[str, list[FieldMatch]]]:
"""Group hierarchical field matches into pseudo-service requests.

Example input paths:
- ``person_info[0]/fnr``
- ``person_info[1]/fnr``
- ``person_info[2]/fnr``

These are grouped into one request named ``person_info/fnr``. The grouped
request contains all values from all matching leaf paths.

Two paths share a request only when all of these match:
- normalized request name (array indices removed)
- pattern
- source pseudo function
- target pseudo function (for repseudonymize)

"""
grouped: dict[tuple[str, str, str, str | None], list[FieldMatch]] = {}

# Group paths that can share one API request.
for field in matched_fields.values():
request_name = _remove_array_indices(field.path)
target_func = str(field.target_func) if field.target_func else None
group_key = (request_name, field.pattern, str(field.func), target_func)
grouped.setdefault(group_key, []).append(field)

grouped_matches: list[tuple[str, list[FieldMatch]]] = []

# If a group is batched, store slice boundaries so one response list can be
# written back to the original leaf paths.
for _, fields in grouped.items():
representative = fields[0]
request_name = _remove_array_indices(representative.path)

if len(fields) > 1:
mutable_df.map_batch_to_leaf_slices(
request_name,
[(field.path, len(field.get_value())) for field in fields],
)
grouped_matches.append((request_name, fields))
else:
grouped_matches.append((representative.path, fields))

return grouped_matches


def _remove_array_indices(path: str) -> str:
return re.sub(r"\[\d+]", "", path)
39 changes: 39 additions & 0 deletions src/dapla_pseudo/v1/mutable_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(
"""Initialize the class."""
self.dataset: pl.DataFrame | dict[str, Any] | pl.LazyFrame = dataframe
self.matched_fields: dict[str, FieldMatch] = {}
self.batched_fields: dict[str, list[tuple[str, int, int]]] = {}
self.matched_fields_metrics: dict[str, int] | None = None
self.hierarchical: bool = hierarchical
self.schema = (
Expand All @@ -75,6 +76,7 @@ def match_rules(
self, rules: list[PseudoRule], target_rules: list[PseudoRule] | None
) -> None:
"""Create references to all the columns that matches the given pseudo rules."""
self.batched_fields = {}
if self.hierarchical is False:
assert isinstance(self.dataset, pl.DataFrame) or isinstance(
self.dataset, pl.LazyFrame
Expand Down Expand Up @@ -119,6 +121,40 @@ def extract_column_data(
for match in matches:
self.matched_fields[match.path] = match

def map_batch_to_leaf_slices(
self, batch_name: str, segments: list[tuple[str, int]]
) -> None:
"""Store how a batched response maps back to concrete leaf paths.

A hierarchical batch request flattens values from multiple concrete paths
into one list that is sent to the pseudo service. Example:

- ``person_info[0]/fnr`` contributes 1 value
- ``person_info[1]/fnr`` contributes 2 values
- ``person_info[2]/fnr`` contributes 1 value

This method converts that to slice boundaries for the batched request name
(e.g. ``person_info/fnr``):

- ``person_info[0]/fnr`` -> ``[0:1]``
- ``person_info[1]/fnr`` -> ``[1:3]``
- ``person_info[2]/fnr`` -> ``[3:4]``

Later, ``update(batch_name, data)`` uses these slices to split one flat
response list into the right chunks and write each chunk back to the
correct leaf path.
"""
if len(segments) <= 1:
return

offset = 0
indexed_segments: list[tuple[str, int, int]] = []
for path, length in segments:
indexed_segments.append((path, offset, offset + length))
offset += length

self.batched_fields[batch_name] = indexed_segments

def get_matched_fields(self) -> dict[str, FieldMatch]:
"""Get a reference to all the columns that matched pseudo rules."""
return self.matched_fields
Expand All @@ -130,6 +166,9 @@ def update(self, path: str, data: list[str | None]) -> None:
self.dataset, pl.LazyFrame
)
self.dataset = self.dataset.with_columns(pl.Series(data).alias(path))
elif (batched_segments := self.batched_fields.get(path)) is not None:
for leaf_path, start, end in batched_segments:
self.update(leaf_path, data[start:end])
elif (field_match := self.matched_fields.get(path)) is not None:
assert isinstance(self.dataset, dict)
tree = self.dataset
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,7 @@
[
{
"short_name": "fnr",
"data_element_path": "person_info[0].fnr",
"pseudonymization": {
"encryption_algorithm": "REDACT",
"encryption_algorithm_parameters": [
{
"placeholder": ":"
}
]
}
},
{
"short_name": "fnr",
"data_element_path": "person_info[1].fnr",
"pseudonymization": {
"encryption_algorithm": "REDACT",
"encryption_algorithm_parameters": [
{
"placeholder": ":"
}
]
}
},
{
"short_name": "fnr",
"data_element_path": "person_info[2].fnr",
"data_element_path": "person_info.fnr",
"pseudonymization": {
"encryption_algorithm": "REDACT",
"encryption_algorithm_parameters": [
Expand Down
26 changes: 26 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,29 @@ def test_build_repseudo_field_request() -> None:
pattern="**/foo",
values=["baz"],
)


def test_build_pseudo_field_request_hierarchical_batching() -> None:
data = [
{"struct": {"foo": "baz"}},
{"struct": {"foo": "qux"}},
]
df = MutableDataFrame(pl.DataFrame(data), hierarchical=True)
rules = [
PseudoRule.from_json(
'{"name":"my-rule","pattern":"**/foo","path":"struct/foo","func":"daead(keyId=ssb-common-key-1)"}'
)
]

requests = build_pseudo_field_request(PseudoOperation.PSEUDONYMIZE, df, rules)

assert len(requests) == 1
assert requests[0].name == "struct/foo"
assert requests[0].pattern == "**/foo"
assert requests[0].values == ["baz", "qux"]

# Ensure batched responses can be scattered back to concrete paths.
df.update("struct/foo", ["#", "#"])
modified_df = df.to_polars()
assert modified_df["struct"][0]["foo"] == "#"
assert modified_df["struct"][1]["foo"] == "#"
Loading
Loading