Skip to content

Commit 9931069

Browse files
authored
Merge pull request #219 from MITLibraries/TIMX-405-write-output-to-dataset
TIMX 405 - support output to TIMDEX parquet dataset
2 parents d3a6018 + 0b9bcc5 commit 9931069

10 files changed

+650
-446
lines changed

Pipfile

+1-2
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,18 @@ click = "*"
1010
jsonlines = "*"
1111
lxml = "*"
1212
lxml-stubs = "*"
13-
pyarrow = "*"
1413
python-dateutil = "*"
1514
sentry-sdk = "*"
1615
smart-open = {version = "*", extras = ["s3"]}
1716
types-python-dateutil = "*"
17+
timdex-dataset-api = {ref = "8bf085a", git = "git+https://github.com/MITLibraries/timdex-dataset-api.git"}
1818

1919
[dev-packages]
2020
black = "*"
2121
coveralls = "*"
2222
ipython = "*"
2323
mypy = "*"
2424
pre-commit = "*"
25-
pyarrow-stubs = "*"
2625
pytest = "*"
2726
ruff = "*"
2827
safety = "*"

Pipfile.lock

+388-262
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

+19-9
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
An application to transform source records to the TIMDEX data model to facilitate ingest into an OpenSearch index.
44

5-
TIMDEX ingests records from various sources with different metadata formats, necessitating an application to transform those source records to a common metadata format, the TIMDEX data model in this case. This application processes both XML and JSON source records and outputs a JSON file of records formatted according to the TIMDEX data model.
5+
TIMDEX ingests records from various sources with different metadata formats, necessitating an application to transform those source records to a common metadata format, the TIMDEX data model in this case. This application processes source records, creates records formatted according to the TIMDEX data model, and writes to a TIMDEX parquet dataset.
66

77
```mermaid
88
---
@@ -14,21 +14,21 @@ flowchart TD
1414
GeoData
1515
MARC
1616
transmogrifier((transmogrifier))
17-
JSON
17+
timdex-dataset
1818
timdex-index-manager
1919
ArchivesSpace[("ArchivesSpace<br>(EAD XML)")] --> transmogrifier
2020
DSpace[("DSpace<br>(METS XML)")] --> transmogrifier
2121
GeoData[("GeoData<br>(Aardvark JSON)")] --> transmogrifier
2222
MARC[("Alma<br>(MARCXML)")] --> transmogrifier
23-
transmogrifier --> JSON["TIMDEX JSON"]
24-
JSON[TIMDEX JSON file] --> timdex-index-manager((timdex-index-manager))
23+
transmogrifier --> timdex-dataset["TIMDEX Parquet Dataset"]
24+
timdex-dataset["TIMDEX Parquet Dataset"] --> timdex-index-manager((timdex-index-manager))
2525
```
2626

2727
The TIMDEX data model is designed to produce records that can be successfully ingested into an OpenSearch index and contains data fields that are broadly applicable to various types of records. `transmogrifier` contains different validators to ensure that the record is structured properly and that certain types of values, such as dates, align with OpenSearch's expectations.
2828

2929
Each source is defined with configuration values and a dedicated transform class to process records from that source. For each transform class, various errors and warnings are logged. Some errors are logged and the entire source record is skipped because the severity implies it should not be processed until fixed, while others are merely logged as warnings for later review. The application also determines which records are marked as deleted in each source and removes those record from the OpenSearch index.
3030

31-
After the JSON file of transformed records is produced, it is processed by `timdex-index-manager` for ingest into an OpenSearch index.
31+
After Transmogrifier writes the transformed files to the TIMDEX parquet dataset, it is processed by `timdex-index-manager` for ingest into an OpenSearch index.
3232

3333
## Development
3434

@@ -63,10 +63,20 @@ ETL_VERSION=### Version number of the TIMDEX ETL infrastructure. This can be us
6363
Usage: -c [OPTIONS]
6464
6565
Options:
66-
-i, --input-file TEXT Filepath for harvested input records to
67-
transform [required]
68-
-o, --output-file TEXT Filepath to write output TIMDEX JSON records
69-
to [required]
66+
-i, --input-file TEXT Filepath of input records to transform.
67+
The filename must be in the format
68+
<source>-<YYYY-MM-DD>-<run-type>-extracted-
69+
records-
70+
to-<action><index[optional]>.<extension>.
71+
Examples: 'gisogm-2024-03-28-daily-extracted-
72+
records-to-index.jsonl' or
73+
'alma-2023-01-13-full-extracted-records-to-
74+
index_17.xml'. [required]
75+
--output-file TEXT Filepath to write output TIMDEX JSON records
76+
to. NOTE: this option will be removed when
77+
output to parquet is finalized.
78+
-o, --output-location TEXT Location of TIMDEX parquet dataset to write
79+
to.
7080
-s, --source [alma|aspace|dspace|jpal|libguides|gismit|gisogm|researchdatabases|whoas|zenodo]
7181
Source records were harvested from, must
7282
choose from list of options [required]

tests/conftest.py

+30-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ def runner():
5151

5252
@pytest.fixture
5353
def generic_transformer():
54-
5554
class GenericTransformer(Transformer):
5655
def parse_source_file(self):
5756
pass
@@ -229,3 +228,33 @@ def timdex_record_all_fields_and_subfields():
229228
subjects=[timdex.Subject(value=["Stuff"], kind="LCSH")],
230229
summary=["This is data."],
231230
)
231+
232+
233+
# timdex parquet dataset ##########################
234+
235+
236+
@pytest.fixture
237+
def run_id():
238+
return "run-abc-123"
239+
240+
241+
@pytest.fixture
242+
def empty_dataset_location(tmp_path):
243+
return str(tmp_path / "dataset")
244+
245+
246+
@pytest.fixture
247+
def libguides_input_file():
248+
return (
249+
"tests/fixtures/dataset/libguides-2024-06-03-full-extracted-records-to-index.xml"
250+
)
251+
252+
253+
@pytest.fixture
254+
def libguides_transformer(monkeypatch, run_id, libguides_input_file):
255+
monkeypatch.setenv("ETL_VERSION", "2")
256+
return Transformer.load(
257+
"libguides",
258+
libguides_input_file,
259+
run_id=run_id,
260+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<records>
3+
<!-- valid record to index -->
4+
<record xmlns="http://www.openarchives.org/OAI/2.0/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><header><identifier>oai:libguides.com:guides/175846</identifier><datestamp>2024-02-27T18:27:05Z</datestamp><setSpec>guides</setSpec></header><metadata><oai_dc:dc xmlns:oai_dc="http://www.openarchives.org/OAI/2.0/oai_dc/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/oai_dc/ http://www.openarchives.org/OAI/2.0/oai_dc.xsd"><dc:title>Materials Science &amp; Engineering</dc:title><dc:creator>Ye Li</dc:creator><dc:subject>Engineering</dc:subject><dc:subject>Science</dc:subject><dc:description>Useful databases and other research tips for materials science.</dc:description><dc:publisher>MIT Libraries</dc:publisher><dc:date>2008-06-19 17:55:27</dc:date><dc:identifier>https://libguides.mit.edu/materials</dc:identifier></oai_dc:dc></metadata></record>
5+
6+
<!-- deleted record -->
7+
<record xmlns="http://www.openarchives.org/OAI/2.0/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><header status="deleted"><identifier>oai:libguides.com:guides/175849</identifier><datestamp>2024-05-21T18:36:58Z</datestamp><setSpec>guides</setSpec></header><metadata><oai_dc:dc xmlns:oai_dc="http://www.openarchives.org/OAI/2.0/oai_dc/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/oai_dc/ http://www.openarchives.org/OAI/2.0/oai_dc.xsd"><dc:title>Country Data &amp; Analysis</dc:title><dc:creator>Nicholas Albaugh</dc:creator><dc:subject>Business &amp; management</dc:subject><dc:description>This is the subject guide for Country Data &amp; Analysis</dc:description><dc:publisher>MIT Libraries</dc:publisher><dc:date>2008-06-26 00:51:04</dc:date><dc:identifier>https://libguides.mit.edu/country</dc:identifier></oai_dc:dc></metadata></record>
8+
9+
<!-- skipped record -->
10+
<record xmlns="http://www.openarchives.org/OAI/2.0/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><header><identifier>oai:libguides.com:guides/175853</identifier><datestamp>2024-03-26T20:15:38Z</datestamp><setSpec>guides</setSpec></header><metadata><oai_dc:dc xmlns:oai_dc="http://www.openarchives.org/OAI/2.0/oai_dc/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/oai_dc/ http://www.openarchives.org/OAI/2.0/oai_dc.xsd"><dc:title>News, Newspapers, and Current Events</dc:title><dc:creator>Tina Chan</dc:creator><dc:subject>Interdisciplinary</dc:subject><dc:description>This is the subject guide for News</dc:description><dc:publisher>MIT Libraries</dc:publisher><dc:date>2008-06-26 21:29:54</dc:date><dc:identifier>https://libguides.mit.edu/news</dc:identifier></oai_dc:dc></metadata></record>
11+
12+
<!-- unhandled exception record -->
13+
<record xmlns="http://www.openarchives.org/OAI/2.0/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><header><identifier>oai:libguides.com:guides/175855</identifier><datestamp>2021-07-19T09:31:31Z</datestamp><setSpec>guides</setSpec></header><metadata><oai_dc:dc xmlns:oai_dc="http://www.openarchives.org/OAI/2.0/oai_dc/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/oai_dc/ http://www.openarchives.org/OAI/2.0/oai_dc.xsd"><dc:title>Biography</dc:title><dc:creator>Tina Chan</dc:creator><dc:subject>Interdisciplinary</dc:subject><dc:publisher>MIT Libraries</dc:publisher><dc:date>2008-06-26 22:05:13</dc:date><dc:identifier>https://libguides.mit.edu/biography</dc:identifier></oai_dc:dc></metadata></record>
14+
</records>

tests/sources/test_transformer.py

+79-11
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1-
# ruff: noqa: PLR2004
2-
3-
import uuid
1+
# ruff: noqa: PLR2004, SLF001, D202
2+
import datetime
3+
import json
4+
from unittest import mock
45

56
import pytest
7+
from lxml import etree
8+
from timdex_dataset_api.record import DatasetRecord
69

710
import transmogrifier.models as timdex
11+
from transmogrifier.exceptions import DeletedRecordEvent, SkippedRecordEvent
812
from transmogrifier.sources.transformer import Transformer
913
from transmogrifier.sources.xml.datacite import Datacite
1014

@@ -79,13 +83,77 @@ def test_create_locations_from_spatial_subjects_success(timdex_record_required_f
7983
]
8084

8185

82-
def test_transformer_run_id_explicitly_passed(generic_transformer):
83-
run_id = "abc123"
84-
transformer = generic_transformer("alma", [], run_id=run_id)
85-
assert transformer.run_id == run_id
86+
def test_transformer_get_run_data_from_source_file_and_run_id(
87+
libguides_transformer, libguides_input_file, run_id
88+
):
89+
assert libguides_transformer.get_run_data(libguides_input_file, run_id) == {
90+
"source": "libguides",
91+
"run_date": "2024-06-03",
92+
"run_type": "full",
93+
"run_id": run_id,
94+
}
95+
96+
97+
def test_transformer_next_iter_yields_dataset_records(libguides_transformer):
98+
assert isinstance(next(libguides_transformer), DatasetRecord)
99+
86100

101+
def test_transform_next_iter_sets_valid_source_and_transformed_records(
102+
libguides_transformer,
103+
):
104+
record = next(libguides_transformer)
87105

88-
def test_transformer_run_id_none_passed_generates_uuid(generic_transformer):
89-
transformer = generic_transformer("alma", [], run_id=None)
90-
assert transformer.run_id is not None
91-
assert uuid.UUID(transformer.run_id)
106+
# parse source record XML
107+
assert isinstance(record.source_record, bytes)
108+
source_record = etree.fromstring(record.source_record)
109+
assert isinstance(source_record, etree._Element)
110+
111+
# parse transformed record
112+
assert isinstance(record.transformed_record, bytes)
113+
transformed_record = json.loads(record.transformed_record)
114+
assert isinstance(transformed_record, dict)
115+
116+
117+
def test_transform_next_iter_uses_run_data_parsed_from_source_file(
118+
libguides_transformer, libguides_input_file, run_id
119+
):
120+
record = next(libguides_transformer)
121+
run_data = libguides_transformer.get_run_data(libguides_input_file, run_id)
122+
assert record.run_date == datetime.datetime.strptime(
123+
run_data["run_date"], "%Y-%m-%d"
124+
).astimezone(datetime.UTC)
125+
assert record.run_type == run_data["run_type"]
126+
assert record.run_id == run_id
127+
128+
129+
@pytest.mark.parametrize(
130+
("transform_exception", "expected_action"),
131+
[
132+
(None, "index"),
133+
(DeletedRecordEvent(timdex_record_id="libguides:guides-0"), "delete"),
134+
(SkippedRecordEvent(source_record_id="guides-0"), "skip"),
135+
(RuntimeError("totally unhandled event"), "error"),
136+
],
137+
)
138+
def test_transformer_action_column_based_on_transformation_exception_handling(
139+
libguides_transformer, transform_exception, expected_action
140+
):
141+
"""While Transmogrifier is often considered just an application to transform a source
142+
record into a TIMDEX record, it also serves the purpose of determining if a source
143+
record should be indexed or deleted, or skipped (no action taken), or handling when a
144+
record cannot be transformed (unhandled error). This 'action' that Transmogrifier
145+
determines for each record is captured in the 'action' column in the TIMDEX parquet
146+
dataset.
147+
148+
This test ensures that the 'action' column values are correct given behavior
149+
(exception handling) during transformation of each record.
150+
"""
151+
152+
if transform_exception:
153+
with mock.patch.object(libguides_transformer, "transform") as mocked_transform:
154+
mocked_transform.side_effect = transform_exception
155+
record = next(libguides_transformer)
156+
assert mocked_transform.called
157+
else:
158+
record = next(libguides_transformer)
159+
assert record.action == expected_action

tests/test_cli.py

+14-10
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ def test_transform_no_sentry_not_verbose(caplog, monkeypatch, runner, tmp_path):
1313
[
1414
"-i",
1515
"tests/fixtures/datacite/datacite_records.xml",
16-
"-o",
16+
"--output-file",
1717
outfile,
1818
"-s",
1919
"jpal",
@@ -39,7 +39,7 @@ def test_transform_with_sentry_and_verbose(caplog, monkeypatch, runner, tmp_path
3939
[
4040
"-i",
4141
"tests/fixtures/datacite/datacite_records.xml",
42-
"-o",
42+
"--output-file",
4343
outfile,
4444
"-s",
4545
"jpal",
@@ -66,7 +66,7 @@ def test_transform_no_records(runner, tmp_path):
6666
[
6767
"-i",
6868
"tests/fixtures/no_records.xml",
69-
"-o",
69+
"--output-file",
7070
outfile,
7171
"-s",
7272
"dspace",
@@ -83,7 +83,7 @@ def test_transform_deleted_records(caplog, runner, tmp_path):
8383
[
8484
"-i",
8585
"tests/fixtures/record_deleted.xml",
86-
"-o",
86+
"--output-file",
8787
outfile,
8888
"-s",
8989
"jpal",
@@ -96,7 +96,8 @@ def test_transform_deleted_records(caplog, runner, tmp_path):
9696
) in caplog.text
9797

9898

99-
def test_transform_run_id_argument_passed_and_used(caplog, runner, tmp_path):
99+
def test_transform_run_id_argument_passed_and_used(monkeypatch, caplog, runner, tmp_path):
100+
monkeypatch.setenv("ETL_VERSION", "2")
100101
caplog.set_level("INFO")
101102
run_id = "abc123"
102103
with mock.patch(
@@ -112,15 +113,18 @@ def test_transform_run_id_argument_passed_and_used(caplog, runner, tmp_path):
112113
"-r",
113114
run_id,
114115
"-i",
115-
"tests/fixtures/datacite/datacite_records.xml",
116+
"tests/fixtures/dataset/libguides-2024-06-03-full-extracted-records-to-index.xml",
116117
"-o",
117-
"/tmp/records.json",
118+
"/tmp/dataset",
118119
],
119120
)
120121
assert f"run_id set: '{run_id}'" in caplog.text
121122

122123

123-
def test_transform_run_id_argument_not_passed_and_uuid_minted(caplog, runner, tmp_path):
124+
def test_transform_run_id_argument_not_passed_and_uuid_minted(
125+
monkeypatch, caplog, runner, tmp_path
126+
):
127+
monkeypatch.setenv("ETL_VERSION", "2")
124128
caplog.set_level("INFO")
125129
with mock.patch(
126130
"transmogrifier.sources.transformer.Transformer.transform_and_write_output_files"
@@ -133,9 +137,9 @@ def test_transform_run_id_argument_not_passed_and_uuid_minted(caplog, runner, tm
133137
"-s",
134138
"alma",
135139
"-i",
136-
"tests/fixtures/datacite/datacite_records.xml",
140+
"tests/fixtures/dataset/libguides-2024-06-03-full-extracted-records-to-index.xml",
137141
"-o",
138-
"/tmp/records.json",
142+
"/tmp/dataset",
139143
],
140144
)
141145
assert "explicit run_id not passed, minting new UUID" in caplog.text

tests/test_temporary_feature_flagging.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def test_cli_etl_version_v1_invokes_v1_code(
6464
[
6565
"-i",
6666
"/does/not/exist/alma-2023-01-13-full-extracted-records-to-index_01.xml",
67-
"-o",
67+
"--output-file",
6868
"/does/not/exist/libguides.json",
6969
"-s",
7070
"libguides",

transmogrifier/cli.py

+22-5
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,23 @@
2020
"-i",
2121
"--input-file",
2222
required=True,
23-
help="Filepath for harvested input records to transform",
23+
help="Filepath of input records to transform. The filename must be in the format "
24+
"<source>-<YYYY-MM-DD>-<run-type>-extracted-records-to-<action><index[optional]>"
25+
".<extension>. Examples: 'gisogm-2024-03-28-daily-extracted-records-to-index.jsonl' "
26+
"or 'alma-2023-01-13-full-extracted-records-to-index_17.xml'.",
2427
)
28+
# NOTE: FEATURE FLAG: CLI arg '--output-file' will be removed after v2 work is complete
2529
@click.option(
26-
"-o",
2730
"--output-file",
28-
required=True,
29-
help="Filepath to write output TIMDEX JSON records to",
31+
required=False,
32+
help="Filepath to write output TIMDEX JSON records to. NOTE: this option will be "
33+
"removed when output to parquet is finalized.",
34+
)
35+
@click.option(
36+
"-o",
37+
"--output-location",
38+
required=False,
39+
help="Location of TIMDEX parquet dataset to write to.",
3040
)
3141
@click.option(
3242
"-s",
@@ -50,6 +60,7 @@ def main(
5060
source: str,
5161
input_file: str,
5262
output_file: str,
63+
output_location: str,
5364
run_id: str,
5465
verbose: bool, # noqa: FBT001
5566
) -> None:
@@ -65,9 +76,15 @@ def main(
6576
etl_version = get_etl_version()
6677
match etl_version:
6778
case 1:
79+
if output_file is None:
80+
message = "--output-file must be set when using ETL_VERSION=1"
81+
raise RuntimeError(message)
6882
transformer.transform_and_write_output_files(output_file)
6983
case 2:
70-
transformer.write_to_parquet_dataset(output_file)
84+
if output_location is None:
85+
message = "-o / --output-location must be set when using ETL_VERSION=2"
86+
raise RuntimeError(message)
87+
transformer.write_to_parquet_dataset(output_location)
7188

7289
logger.info(
7390
(

0 commit comments

Comments
 (0)