Skip to content

Commit 268edd1

Browse files
committed
Create final records dataset from diffs and metrics
Why these changes are being introduced: With the full pipeline mostly formed it was observed that we could simplify the final artifacts produced by a run by combining the 'diffs' and 'metrics' datasets into a final 'records' dataset. This dovetails with the new optional env var 'PRESERVE_ARTIFACTS' where every other artifact except this one would be removed after successful creation. How this addresses that need: * Adds new core function 'create_final_records' that writes a final dataset containing all records and diff information needed for statistical and individual record analysis. Side effects of this change: * Fewer final disparate datasets for a run Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-370
1 parent ca25f5c commit 268edd1

File tree

21 files changed

+221
-23374
lines changed

21 files changed

+221
-23374
lines changed

abdiff/cli.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
import logging
3+
import shutil
34
from datetime import timedelta
45
from itertools import chain
56
from time import perf_counter
@@ -14,6 +15,7 @@
1415
calc_ab_diffs,
1516
calc_ab_metrics,
1617
collate_ab_transforms,
18+
create_final_records,
1719
download_input_files,
1820
init_run,
1921
run_ab_transforms,
@@ -25,6 +27,8 @@
2527

2628
logger = logging.getLogger(__name__)
2729

30+
CONFIG = Config()
31+
2832

2933
@click.group(context_settings={"help_option_names": ["-h", "--help"]})
3034
@click.option(
@@ -181,19 +185,31 @@ def run_diff(
181185
input_files=input_files_list,
182186
use_local_s3=download_files,
183187
)
188+
184189
collated_dataset_path = collate_ab_transforms(
185190
run_directory=run_directory,
186191
ab_transformed_file_lists=ab_transformed_file_lists,
187192
)
193+
188194
diffs_dataset_path = calc_ab_diffs(
189195
run_directory=run_directory,
190196
collated_dataset_path=collated_dataset_path,
191197
)
192-
calc_ab_metrics(
198+
199+
if not CONFIG.preserve_artifacts:
200+
shutil.rmtree(collated_dataset_path)
201+
202+
metrics_dataset_path = calc_ab_metrics(
193203
run_directory=run_directory,
194204
diffs_dataset_path=diffs_dataset_path,
195205
)
196206

207+
create_final_records(run_directory, diffs_dataset_path, metrics_dataset_path)
208+
209+
if not CONFIG.preserve_artifacts:
210+
shutil.rmtree(diffs_dataset_path)
211+
shutil.rmtree(metrics_dataset_path)
212+
197213

198214
@main.command()
199215
@click.option(

abdiff/core/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from abdiff.core.calc_ab_diffs import calc_ab_diffs
88
from abdiff.core.calc_ab_metrics import calc_ab_metrics
99
from abdiff.core.collate_ab_transforms import collate_ab_transforms
10+
from abdiff.core.create_final_records import create_final_records
1011
from abdiff.core.init_job import init_job
1112
from abdiff.core.init_run import init_run
1213
from abdiff.core.run_ab_transforms import run_ab_transforms
@@ -21,4 +22,5 @@
2122
"collate_ab_transforms",
2223
"calc_ab_diffs",
2324
"calc_ab_metrics",
25+
"create_final_records",
2426
]

abdiff/core/calc_ab_diffs.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ def get_diffed_batches_iter(
8989
) as executor:
9090
pending_futures = []
9191
for batch_count, batch in enumerate(batches_iter):
92-
logger.info(f"Submitting batch {batch_count} for processing")
9392
future = executor.submit(process_batch, batch)
9493
pending_futures.append((batch_count, future))
9594

abdiff/core/calc_ab_metrics.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020
def calc_ab_metrics(
2121
run_directory: str,
2222
diffs_dataset_path: str,
23-
) -> dict:
23+
) -> str:
2424

25-
os.makedirs(Path(run_directory) / "metrics", exist_ok=True)
25+
metrics_dataset = Path(run_directory) / "metrics"
26+
os.makedirs(metrics_dataset, exist_ok=True)
2627

2728
# build field diffs dataframe
2829
field_matrix_dataset_filepath = create_record_diff_matrix_dataset(
@@ -37,7 +38,7 @@ def calc_ab_metrics(
3738
run_directory=run_directory, new_data={"metrics": metrics_data}
3839
)
3940

40-
return metrics_data
41+
return str(metrics_dataset)
4142

4243

4344
def create_record_diff_matrix_dataset(
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
import logging
2+
from collections.abc import Generator
3+
from pathlib import Path
4+
5+
import duckdb
6+
import pyarrow as pa
7+
import pyarrow.dataset as ds
8+
9+
from abdiff.config import Config
10+
from abdiff.core.utils import load_dataset, write_to_dataset
11+
12+
logger = logging.getLogger(__name__)
13+
14+
CONFIG = Config()
15+
16+
READ_BATCH_SIZE = 1_000
17+
18+
19+
def create_final_records(
20+
run_directory: str, diffs_dataset_path: str, metrics_dataset_path: str
21+
) -> str:
22+
"""Produce a single, final dataset that contains all records and diff information.
23+
24+
This dataset is produced by joining the "diffs" dataset (which contains the full
25+
A and B records, and the JSON diff) with the "metrics" dataset (which is a sparse
26+
matrix of TIMDEX fields and boolean 1 or 0 if that record has a diff for that field).
27+
This dataset should be sufficient for supporting any webapp data needs.
28+
29+
This dataset is partitioned by source and 'has_diff' boolean.
30+
"""
31+
logger.info("Creating final records dataset from 'diffs' and 'metrics' datasets.")
32+
33+
diffs_dataset = load_dataset(diffs_dataset_path)
34+
metrics_dataset = load_dataset(metrics_dataset_path)
35+
36+
# get list of unique columns from metrics dataset, and create final dataset schema
37+
metrics_timdex_field_columns = [
38+
name
39+
for name in metrics_dataset.schema.names
40+
if name not in diffs_dataset.schema.names
41+
]
42+
metrics_columns = (
43+
pa.field(name, pa.int64())
44+
for name in metrics_dataset.schema.names
45+
if name in metrics_timdex_field_columns
46+
)
47+
final_records_dataset_schema = pa.schema(
48+
(
49+
pa.field("timdex_record_id", pa.string()),
50+
pa.field("source", pa.string()),
51+
pa.field("record_a", pa.binary()),
52+
pa.field("record_b", pa.binary()),
53+
pa.field("ab_diff", pa.string()),
54+
pa.field("modified_timdex_fields", pa.list_(pa.string())),
55+
pa.field("has_diff", pa.string()),
56+
*metrics_columns, # type: ignore[arg-type]
57+
)
58+
)
59+
60+
records_dataset_path = str(Path(run_directory) / "records")
61+
write_to_dataset(
62+
get_final_records_iter(
63+
diffs_dataset, metrics_dataset, metrics_timdex_field_columns
64+
),
65+
base_dir=records_dataset_path,
66+
schema=final_records_dataset_schema,
67+
partition_columns=["source", "has_diff"],
68+
)
69+
70+
return records_dataset_path
71+
72+
73+
def get_final_records_iter(
74+
diffs_dataset: ds.Dataset,
75+
metrics_dataset: ds.Dataset,
76+
metrics_timdex_field_columns: list[str],
77+
) -> Generator[pa.RecordBatch, None, None]:
78+
79+
with duckdb.connect(":memory:") as conn:
80+
81+
# register datasets in DuckDB for use
82+
conn.register("diffs", diffs_dataset.to_table())
83+
conn.register("metrics", metrics_dataset.to_table())
84+
85+
# prepare select columns
86+
select_columns = ",".join(
87+
[
88+
"d.timdex_record_id",
89+
"d.source",
90+
"d.record_a",
91+
"d.record_b",
92+
"d.ab_diff",
93+
"d.modified_timdex_fields",
94+
"d.has_diff",
95+
*[f"m.{name}" for name in metrics_timdex_field_columns],
96+
]
97+
)
98+
99+
results = conn.execute(
100+
f"""
101+
select {select_columns}
102+
from diffs d
103+
inner join metrics m on m.timdex_record_id = d.timdex_record_id
104+
"""
105+
).fetch_record_batch(READ_BATCH_SIZE)
106+
107+
count = 0
108+
while True:
109+
try:
110+
count += 1
111+
logger.info(f"Yielding final records dataset batch: {count}")
112+
yield results.read_next_batch()
113+
except StopIteration:
114+
break

abdiff/webapp/utils.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,14 @@
66
import pandas as pd
77
from flask import g
88

9+
SPARSE_MATRIX_SKIP_FIELDS = [
10+
"record_a",
11+
"record_b",
12+
"ab_diff",
13+
"modified_timdex_fields",
14+
"has_diff",
15+
]
16+
917

1018
def get_run_directory(run_timestamp: str) -> str:
1119
return str(Path(g.job_directory) / "runs" / run_timestamp)
@@ -16,7 +24,7 @@ def get_record_a_b_versions(
1624
) -> tuple[dict, dict]:
1725
"""Retrieve A and B versions of a single record from diffs dataset."""
1826
with duckdb.connect() as conn:
19-
parquet_glob_pattern = f"{run_directory}/diffs/**/*.parquet"
27+
parquet_glob_pattern = f"{run_directory}/records/**/*.parquet"
2028
conn.execute(
2129
"""
2230
select record_a, record_b
@@ -62,11 +70,13 @@ def duckdb_query_run_metrics(
6270
with duckdb.connect() as conn:
6371

6472
# prepare view of record diff matrix
65-
parquet_glob_pattern = f"{run_directory}/metrics/**/*.parquet"
73+
parquet_glob_pattern = f"{run_directory}/records/**/*.parquet"
6674
conn.execute(
6775
f"""
6876
create view record_diff_matrix as (
69-
select * from read_parquet(
77+
select
78+
* exclude ({",".join([f'"{col}"' for col in SPARSE_MATRIX_SKIP_FIELDS])})
79+
from read_parquet(
7080
'{parquet_glob_pattern}',
7181
hive_partitioning=true
7282
)
@@ -146,9 +156,10 @@ def get_record_field_diff_summary(run_directory: str, timdex_record_id: str) ->
146156
)
147157
record_row = results_df.iloc[0].to_dict()
148158

149-
skip_fields = ["timdex_record_id", "source", "has_diff"]
150159
timdex_fields = {
151-
field: value for field, value in record_row.items() if field not in skip_fields
160+
field: value
161+
for field, value in record_row.items()
162+
if field not in SPARSE_MATRIX_SKIP_FIELDS
152163
}
153164
fields_with_diffs = [field for field, value in timdex_fields.items() if value == 1]
154165

tests/conftest.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from click.testing import CliRunner
1717
from freezegun import freeze_time
1818

19-
from abdiff.core import calc_ab_diffs, init_job, init_run
19+
from abdiff.core import calc_ab_diffs, create_final_records, init_job, init_run
2020
from abdiff.core.calc_ab_metrics import (
2121
_prepare_duckdb_context,
2222
create_record_diff_matrix_dataset,
@@ -526,3 +526,14 @@ def duckdb_context_with_diff_matrix(
526526
function_duckdb_connection, diff_matrix_dataset_filepath
527527
)
528528
return function_duckdb_connection, fields, sources
529+
530+
531+
@pytest.fixture
532+
def final_records_dataset_path(
533+
run_directory, diffs_dataset_directory, diff_matrix_dataset_filepath
534+
):
535+
return create_final_records(
536+
run_directory=run_directory,
537+
diffs_dataset_path=diffs_dataset_directory,
538+
metrics_dataset_path=diff_matrix_dataset_filepath,
539+
)
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"job_directory": "tests/fixtures/jobs/example-job-2",
33
"job_message": "small job for testing purposes",
4-
"image_tag_a": "transmogrifier-example-job-2-395e612:latest",
5-
"image_tag_b": "transmogrifier-example-job-2-cf1024c:latest"
4+
"image_tag_a": "transmogrifier-abdiff-395e612:latest",
5+
"image_tag_b": "transmogrifier-abdiff-cf1024c:latest"
66
}
Binary file not shown.
Binary file not shown.

0 commit comments

Comments
 (0)