Skip to content

Commit 89b56a2

Browse files
committed
Create final records dataset from diffs and metrics
Why these changes are being introduced: With the full pipeline mostly formed it was observed that we could simplify the final artifacts produced by a run by combining the 'diffs' and 'metrics' datasets into a final 'records' dataset. This dovetails with the new optional env var 'PRESERVE_ARTIFACTS' where every other artifact except this one would be removed after successful creation. How this addresses that need: * Adds new core function 'create_final_records' that writes a final dataset containing all records and diff information needed for statistical and individual record analysis. Side effects of this change: * Fewer final disparate datasets for a run Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-370
1 parent fcd3d6e commit 89b56a2

File tree

7 files changed

+152
-10
lines changed

7 files changed

+152
-10
lines changed

abdiff/cli.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
import logging
3+
import shutil
34
from datetime import timedelta
45
from itertools import chain
56
from time import perf_counter
@@ -14,6 +15,7 @@
1415
calc_ab_diffs,
1516
calc_ab_metrics,
1617
collate_ab_transforms,
18+
create_final_records,
1719
init_run,
1820
run_ab_transforms,
1921
)
@@ -24,6 +26,8 @@
2426

2527
logger = logging.getLogger(__name__)
2628

29+
CONFIG = Config()
30+
2731

2832
@click.group(context_settings={"help_option_names": ["-h", "--help"]})
2933
@click.option(
@@ -166,19 +170,31 @@ def run_diff(job_directory: str, input_files: str, message: str) -> None:
166170
image_tag_b=job_data["image_tag_b"],
167171
input_files=input_files_list,
168172
)
173+
169174
collated_dataset_path = collate_ab_transforms(
170175
run_directory=run_directory,
171176
ab_transformed_file_lists=ab_transformed_file_lists,
172177
)
178+
173179
diffs_dataset_path = calc_ab_diffs(
174180
run_directory=run_directory,
175181
collated_dataset_path=collated_dataset_path,
176182
)
177-
calc_ab_metrics(
183+
184+
if not CONFIG.preserve_artifacts:
185+
shutil.rmtree(collated_dataset_path)
186+
187+
metrics_dataset_path = calc_ab_metrics(
178188
run_directory=run_directory,
179189
diffs_dataset_path=diffs_dataset_path,
180190
)
181191

192+
create_final_records(run_directory, diffs_dataset_path, metrics_dataset_path)
193+
194+
if not CONFIG.preserve_artifacts:
195+
shutil.rmtree(diffs_dataset_path)
196+
shutil.rmtree(metrics_dataset_path)
197+
182198

183199
@main.command()
184200
@click.option(

abdiff/core/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from abdiff.core.calc_ab_diffs import calc_ab_diffs
88
from abdiff.core.calc_ab_metrics import calc_ab_metrics
99
from abdiff.core.collate_ab_transforms import collate_ab_transforms
10+
from abdiff.core.create_final_records import create_final_records
1011
from abdiff.core.init_job import init_job
1112
from abdiff.core.init_run import init_run
1213
from abdiff.core.run_ab_transforms import run_ab_transforms
@@ -19,4 +20,5 @@
1920
"collate_ab_transforms",
2021
"calc_ab_diffs",
2122
"calc_ab_metrics",
23+
"create_final_records",
2224
]

abdiff/core/calc_ab_diffs.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ def get_diffed_batches_iter(
8989
) as executor:
9090
pending_futures = []
9191
for batch_count, batch in enumerate(batches_iter):
92-
logger.info(f"Submitting batch {batch_count} for processing")
9392
future = executor.submit(process_batch, batch)
9493
pending_futures.append((batch_count, future))
9594

abdiff/core/calc_ab_metrics.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020
def calc_ab_metrics(
2121
run_directory: str,
2222
diffs_dataset_path: str,
23-
) -> dict:
23+
) -> str:
2424

25-
os.makedirs(Path(run_directory) / "metrics", exist_ok=True)
25+
metrics_dataset = Path(run_directory) / "metrics"
26+
os.makedirs(metrics_dataset, exist_ok=True)
2627

2728
# build field diffs dataframe
2829
field_matrix_dataset_filepath = create_record_diff_matrix_dataset(
@@ -37,7 +38,7 @@ def calc_ab_metrics(
3738
run_directory=run_directory, new_data={"metrics": metrics_data}
3839
)
3940

40-
return metrics_data
41+
return str(metrics_dataset)
4142

4243

4344
def create_record_diff_matrix_dataset(
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import logging
2+
from collections.abc import Generator
3+
from pathlib import Path
4+
5+
import duckdb
6+
import pyarrow as pa
7+
import pyarrow.dataset as ds
8+
9+
from abdiff.config import Config
10+
from abdiff.core.utils import load_dataset, write_to_dataset
11+
12+
logger = logging.getLogger(__name__)
13+
14+
CONFIG = Config()
15+
16+
READ_BATCH_SIZE = 1_000
17+
18+
19+
def create_final_records(
20+
run_directory: str, diffs_dataset_path: str, metrics_dataset_path: str
21+
) -> str:
22+
"""Produce a single, final dataset that contains all records and diff information.
23+
24+
This dataset is produced by joining the "diffs" dataset (which contains the full
25+
A and B records, and the JSON diff) with the "metrics" dataset (which is a sparse
26+
matrix of TIMDEX fields and boolean 1 or 0 if that record has a diff for that field).
27+
This dataset should be sufficient for supporting any webapp data needs.
28+
"""
29+
logger.info("Creating final records dataset from 'diffs' and 'metrics' datasets.")
30+
31+
diffs_dataset = load_dataset(diffs_dataset_path)
32+
metrics_dataset = load_dataset(metrics_dataset_path)
33+
34+
# get list of unique columns from metrics dataset, and create final dataset schema
35+
metrics_timdex_field_columns = [
36+
name
37+
for name in metrics_dataset.schema.names
38+
if name not in diffs_dataset.schema.names
39+
]
40+
metrics_columns = (
41+
pa.field(name, pa.int64())
42+
for name in metrics_dataset.schema.names
43+
if name in metrics_timdex_field_columns
44+
)
45+
final_records_dataset_schema = pa.schema(
46+
(
47+
pa.field("timdex_record_id", pa.string()),
48+
pa.field("source", pa.string()),
49+
pa.field("record_a", pa.binary()),
50+
pa.field("record_b", pa.binary()),
51+
pa.field("ab_diff", pa.string()),
52+
pa.field("modified_timdex_fields", pa.list_(pa.string())),
53+
pa.field("has_diff", pa.string()),
54+
*metrics_columns, # type: ignore[arg-type]
55+
)
56+
)
57+
58+
records_dataset_path = str(Path(run_directory) / "records")
59+
write_to_dataset(
60+
get_final_records_iter(
61+
diffs_dataset, metrics_dataset, metrics_timdex_field_columns
62+
),
63+
base_dir=records_dataset_path,
64+
schema=final_records_dataset_schema,
65+
partition_columns=["source", "has_diff"],
66+
)
67+
68+
return records_dataset_path
69+
70+
71+
def get_final_records_iter(
72+
diffs_dataset: ds.Dataset,
73+
metrics_dataset: ds.Dataset,
74+
metrics_timdex_field_columns: list[str],
75+
) -> Generator[pa.RecordBatch, None, None]:
76+
77+
with duckdb.connect(":memory:") as conn:
78+
79+
# register datasets in DuckDB for use
80+
conn.register("diffs", diffs_dataset.to_table())
81+
conn.register("metrics", metrics_dataset.to_table())
82+
83+
# prepare select columns
84+
select_columns = ",".join(
85+
[
86+
"d.timdex_record_id",
87+
"d.source",
88+
"d.record_a",
89+
"d.record_b",
90+
"d.ab_diff",
91+
"d.modified_timdex_fields",
92+
"d.has_diff",
93+
*[f"m.{name}" for name in metrics_timdex_field_columns],
94+
]
95+
)
96+
97+
results = conn.execute(
98+
f"""
99+
select {select_columns}
100+
from diffs d
101+
inner join metrics m on m.timdex_record_id = d.timdex_record_id
102+
"""
103+
).fetch_record_batch(READ_BATCH_SIZE)
104+
105+
count = 0
106+
while True:
107+
try:
108+
count += 1
109+
logger.info(f"Yielding final records dataset batch: {count}")
110+
yield results.read_next_batch()
111+
except StopIteration:
112+
break

tests/test_calc_ab_metrics.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,10 @@ def test_full_metrics_data_has_expected_structure(diff_matrix_dataset_filepath):
163163

164164

165165
def test_core_function_updates_run_data(run_directory, diffs_dataset_directory):
166-
metrics = calc_ab_metrics(run_directory, diffs_dataset_directory)
167-
run_data = read_run_json(run_directory)
166+
metrics_dataset_filepath = calc_ab_metrics(run_directory, diffs_dataset_directory)
167+
168+
metrics_dataset = load_dataset(metrics_dataset_filepath)
169+
assert isinstance(metrics_dataset, ds.Dataset)
168170

169-
assert isinstance(metrics, dict)
170-
assert run_data["metrics"] == metrics
171+
run_data = read_run_json(run_directory)
172+
assert isinstance(run_data["metrics"], dict)

tests/test_cli.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,16 +136,22 @@ def test_view_job_webapp_host_and_port_configurable(
136136
@patch("abdiff.cli.collate_ab_transforms")
137137
@patch("abdiff.cli.calc_ab_diffs")
138138
@patch("abdiff.cli.calc_ab_metrics")
139+
@patch("abdiff.cli.create_final_records")
139140
def test_run_diff_success(
140141
mock_init_run,
141142
mock_run_ab_transforms,
142143
mock_collate_ab_transforms,
143144
mock_calc_ab_diffs,
144145
mock_calc_ab_metrics,
146+
mock_create_final_records,
145147
caplog,
146148
runner,
147149
example_job_directory,
150+
monkeypatch,
148151
):
152+
# skip any attempts to remove any mocked artifacts
153+
monkeypatch.setenv("PRESERVE_ARTIFACTS", "true")
154+
149155
# mock initialization of run
150156
mock_init_run.return_value = str(
151157
Path(example_job_directory) / "runs" / "2024-10-15_12-00-00"
@@ -170,7 +176,10 @@ def test_run_diff_success(
170176
mock_calc_ab_diffs.return_value = "path/to/run/diffs"
171177

172178
# mock metrics generation
173-
mock_calc_ab_metrics.return_value = {"msg": "these are the from the diffs metrics"}
179+
mock_calc_ab_metrics.return_value = "path/to/run/metrics"
180+
181+
# mock final records dataset
182+
mock_create_final_records.return_value = "path/to/run/records"
174183

175184
caplog.set_level("DEBUG")
176185
result = runner.invoke(
@@ -191,3 +200,4 @@ def test_run_diff_success(
191200
mock_collate_ab_transforms.assert_called()
192201
mock_calc_ab_diffs.assert_called()
193202
mock_calc_ab_metrics.assert_called()
203+
mock_create_final_records.assert_called()

0 commit comments

Comments
 (0)