Skip to content

Commit ad9e732

Browse files
committed
Handle presence of TXT files from Transmogrifier
Why these changes are being introduced: It was overlooked that Transmogrifier will write a text file with records to delete as part of its output, and how this would be captured in the collating of records and deduping. In the case of records where the delete action was the last action, then they should be removed from the dataset. How this addresses that need: * Updates opinionations where a .json extension is assumed * Updates run_ab_transforms validation to look for output files that indicate Transmogrifier produced something as output Side effects of this change: * None Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-371
1 parent 2b03153 commit ad9e732

File tree

4 files changed

+136
-39
lines changed

4 files changed

+136
-39
lines changed

abdiff/core/collate_ab_transforms.py

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import duckdb
1212
import ijson
13+
import pandas as pd
1314
import pyarrow as pa
1415

1516
from abdiff.core.exceptions import OutputValidationError
@@ -119,17 +120,34 @@ def get_transformed_records_iter(
119120
"""
120121
version = get_transform_version(transformed_file)
121122
filename_details = parse_timdex_filename(transformed_file)
122-
with open(transformed_file, "rb") as file:
123-
for record in ijson.items(file, "item"):
123+
124+
base_record = {
125+
"source": filename_details["source"],
126+
"run_date": filename_details["run-date"],
127+
"run_type": filename_details["run-type"],
128+
"action": filename_details["action"],
129+
"version": version,
130+
"transformed_file_name": transformed_file.split("/")[-1],
131+
}
132+
133+
# handle JSON files with records to index
134+
if transformed_file.endswith(".json"):
135+
with open(transformed_file, "rb") as file:
136+
for record in ijson.items(file, "item"):
137+
yield {
138+
**base_record,
139+
"timdex_record_id": record["timdex_record_id"],
140+
"record": json.dumps(record).encode(),
141+
}
142+
143+
# handle TXT files with records to delete
144+
else:
145+
deleted_records_df = pd.read_csv(transformed_file, header=None)
146+
for row in deleted_records_df.itertuples():
124147
yield {
125-
"timdex_record_id": record["timdex_record_id"],
126-
"source": filename_details["source"],
127-
"run_date": filename_details["run-date"], # use underscore for DuckDB
128-
"run_type": filename_details["run-type"], # use underscore for DuckDB
129-
"action": filename_details["action"],
130-
"record": json.dumps(record).encode(),
131-
"version": version,
132-
"transformed_file_name": transformed_file.split("/")[-1],
148+
**base_record,
149+
"timdex_record_id": row[1],
150+
"record": None,
133151
}
134152

135153

@@ -364,10 +382,7 @@ def fetch_single_value(query: str) -> int:
364382

365383
def get_transform_version(transformed_filepath: str) -> str:
366384
"""Get A/B transform version, either 'a' or 'b'."""
367-
match_result = re.match(
368-
r".*transformed\/(.*)\/.*.json",
369-
transformed_filepath,
370-
)
385+
match_result = re.match(r".*transformed\/(.*)\/.*", transformed_filepath)
371386
if not match_result:
372387
raise ValueError(f"Transformed filepath is invalid: {transformed_filepath}.")
373388

abdiff/core/run_ab_transforms.py

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ def run_ab_transforms(
110110
"to complete successfully."
111111
)
112112
ab_transformed_file_lists = get_transformed_files(run_directory)
113-
validate_output(ab_transformed_file_lists, len(input_files))
113+
validate_output(ab_transformed_file_lists, input_files)
114114

115115
# write and return results
116116
run_data = {
@@ -278,11 +278,11 @@ def get_transformed_files(run_directory: str) -> tuple[list[str], ...]:
278278
279279
Returns:
280280
tuple[list[str]]: Tuple containing lists of paths to transformed
281-
JSON files for each image, relative to 'run_directory'.
281+
JSON and TXT (deletions) files for each image, relative to 'run_directory'.
282282
"""
283283
ordered_files = []
284284
for version in ["a", "b"]:
285-
absolute_filepaths = glob.glob(f"{run_directory}/transformed/{version}/*.json")
285+
absolute_filepaths = glob.glob(f"{run_directory}/transformed/{version}/*")
286286
relative_filepaths = [
287287
os.path.relpath(file, run_directory) for file in absolute_filepaths
288288
]
@@ -291,24 +291,39 @@ def get_transformed_files(run_directory: str) -> tuple[list[str], ...]:
291291

292292

293293
def validate_output(
294-
ab_transformed_file_lists: tuple[list[str], ...], input_files_count: int
294+
ab_transformed_file_lists: tuple[list[str], ...], input_files: list[str]
295295
) -> None:
296296
"""Validate the output of run_ab_transforms.
297297
298-
This function checks that the number of files in each of the A/B
299-
transformed file directories matches the number of input files
300-
provided to run_ab_transforms (i.e., the expected number of
301-
files that are transformed).
298+
Transmogrifier produces JSON files for records that need indexing, and TXT files for
299+
records that need deletion. Every run of Transmogrifier should produce one OR both of
300+
these. Some TIMDEX sources provide one file to Transmogrifier that contains both
301+
records to index and delete, and others provide separate files for each.
302+
303+
The net effect for validation is that, given an input file, we should expect to see
304+
1+ files in the A and B output for that input file, ignoring if it's records to index
305+
or delete.
302306
"""
303-
if any(
304-
len(transformed_files) != input_files_count
305-
for transformed_files in ab_transformed_file_lists
306-
):
307-
raise OutputValidationError( # noqa: TRY003
308-
"At least one or more transformed JSON file(s) are missing. "
309-
f"Expecting {input_files_count} transformed JSON file(s) per A/B version. "
310-
"Check the transformed file directories."
311-
)
307+
for input_file in input_files:
308+
file_parts = parse_timdex_filename(input_file)
309+
logger.debug(f"Validating output for input file root: {file_parts}")
310+
311+
file_found = False
312+
for version_files in ab_transformed_file_lists:
313+
for version_file in version_files:
314+
if (
315+
file_parts["source"] in version_file # type: ignore[operator]
316+
and file_parts["run-date"] in version_file # type: ignore[operator]
317+
and file_parts["run-type"] in version_file # type: ignore[operator]
318+
and (not file_parts["index"] or file_parts["index"] in version_file)
319+
):
320+
file_found = True
321+
break
322+
323+
if not file_found:
324+
raise OutputValidationError( # noqa: TRY003
325+
f"Transmogrifier output was not found for input file '{input_file}'"
326+
)
312327

313328

314329
def get_transformed_filename(filename_details: dict) -> str:

tests/test_collate_ab_transforms.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def test_get_transformed_records_iter_success(example_transformed_directory):
7575
)
7676
timdex_record_dict = next(records_iter)
7777

78-
assert list(timdex_record_dict.keys()) == [
78+
assert set(timdex_record_dict.keys()) == {
7979
"timdex_record_id",
8080
"source",
8181
"run_date",
@@ -84,7 +84,7 @@ def test_get_transformed_records_iter_success(example_transformed_directory):
8484
"record",
8585
"version",
8686
"transformed_file_name",
87-
]
87+
}
8888
assert isinstance(timdex_record_dict["record"], bytes)
8989
assert timdex_record_dict["version"] == "a"
9090
assert (
@@ -104,7 +104,7 @@ def test_get_transformed_batches_iter_success(
104104

105105
assert isinstance(transformed_batch, pa.RecordBatch)
106106
assert transformed_batch.num_rows <= READ_BATCH_SIZE
107-
assert transformed_batch.schema.names == TRANSFORMED_DATASET_SCHEMA.names
107+
assert set(transformed_batch.schema.names) == set(TRANSFORMED_DATASET_SCHEMA.names)
108108

109109

110110
def test_get_joined_batches_iter_success(transformed_parquet_dataset):

tests/test_run_ab_transforms.py

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -166,19 +166,86 @@ def test_get_transformed_files_success(
166166
)
167167

168168

169-
def test_validate_output_success():
169+
@pytest.mark.parametrize(
170+
("ab_files", "input_files"),
171+
[
172+
# single JSON from single file
173+
(
174+
(
175+
["dspace-2024-04-10-daily-extracted-records-to-index.json"],
176+
["dspace-2024-04-10-daily-extracted-records-to-index.json"],
177+
),
178+
["s3://X/dspace-2024-04-10-daily-extracted-records-to-index.xml"],
179+
),
180+
# JSON and TXT from single file
181+
(
182+
(
183+
[
184+
"dspace-2024-04-10-daily-extracted-records-to-index.json",
185+
"dspace-2024-04-10-daily-extracted-records-to-delete.txt",
186+
],
187+
[
188+
"dspace-2024-04-10-daily-extracted-records-to-index.json",
189+
"dspace-2024-04-10-daily-extracted-records-to-delete.txt",
190+
],
191+
),
192+
["s3://X/dspace-2024-04-10-daily-extracted-records-to-index.xml"],
193+
),
194+
# handles indexed files when multiple
195+
(
196+
(
197+
["alma-2024-04-10-daily-extracted-records-to-index_09.json"],
198+
["alma-2024-04-10-daily-extracted-records-to-index_09.json"],
199+
),
200+
["s3://X/alma-2024-04-10-daily-extracted-records-to-index_09.xml"],
201+
),
202+
# handles deletes only for alma deletes
203+
(
204+
(
205+
["alma-2024-04-10-daily-extracted-records-to-delete.txt"],
206+
["alma-2024-04-10-daily-extracted-records-to-delete.txt"],
207+
),
208+
["s3://X/alma-2024-04-10-daily-extracted-records-to-delete.xml"],
209+
),
210+
],
211+
)
212+
def test_validate_output_success(ab_files, input_files):
170213
assert (
171214
validate_output(
172-
ab_transformed_file_lists=(["transformed/a/file1"], ["transformed/b/file2"]),
173-
input_files_count=1,
215+
ab_transformed_file_lists=ab_files,
216+
input_files=input_files,
174217
)
175218
is None
176219
)
177220

178221

179-
def test_validate_output_error():
222+
@pytest.mark.parametrize(
223+
("ab_files", "input_files"),
224+
[
225+
# nothing returned
226+
(
227+
([], []),
228+
["s3://X/dspace-2024-04-10-daily-extracted-records-to-index.xml"],
229+
),
230+
# output files don't have index, or wrong index, so not direct match
231+
(
232+
(
233+
[
234+
"alma-2024-04-10-daily-extracted-records-to-index.json",
235+
"alma-2024-04-10-daily-extracted-records-to-index_04.json",
236+
],
237+
[
238+
"alma-2024-04-10-daily-extracted-records-to-index.json",
239+
"alma-2024-04-10-daily-extracted-records-to-index_04.json",
240+
],
241+
),
242+
["s3://X/alma-2024-04-10-daily-extracted-records-to-index_09.xml"],
243+
),
244+
],
245+
)
246+
def test_validate_output_error(ab_files, input_files):
180247
with pytest.raises(OutputValidationError):
181-
validate_output(ab_transformed_file_lists=([], []), input_files_count=1)
248+
validate_output(ab_transformed_file_lists=ab_files, input_files=input_files)
182249

183250

184251
def test_get_output_filename_success():

0 commit comments

Comments
 (0)