Skip to content

Conversation

@ghukill
Copy link
Contributor

@ghukill ghukill commented Nov 5, 2024

Purpose and background context

This PR applies some updates to support large runs after some rough edges were discovered for a run ov ~1.6k input files, resulting in ~4.1m distinct records (~8.2m A/B versions). As we get into runs this large, some of the functional-but-not-optimized code from early development needed some adjustments.

Updates include:

  • TIMX 366: removing transformed files after they are added to collated dataset (commit)
    • this introduces an optional PRESERVE_ARTIFACTS env var and config property that is used here and in later data cleanup as well; artifacts are deleted by default, but this could be helpful for debugging purposes
    • this results in a very large reduction in space, where /transformed could be 75+ gb, but the final /records dataset may only be 7-8 gb
  • Improve record collation (commit)
    • add logging information during intermediate work for debugging/troubleshooting
    • be more precise with columns used for intermediate CTE tables and joins
    • introduce abdiff_record_id as an unambiguous way to identify the A/B combination of a record throughout the run
  • perform diff calculations via parallel processes (commit)
  • TIMX 370: create a final "records" dataset that combines the "diffs" and "metrics" datasets (commit)
    • webapp is updated to pull all information from "records" dataset instead of cherry picking from "difs" and "metrics" datasets

The net effect improves the creation and handling of datasets throughout the steps. If PRESERVE_ARTIFACTS is not set or is false, then intermediate datasets will be removed once they are no longer needed. The final, new dataset "records" contains full A/B versions for all records and the sparse matrix of modified fields, which fully supports the needs of the webapp for viewing.

The following shows an example where PRESERVE_ARTIFACTS=true:

├── 2024-11-04_19-45-24          <------------ WITH preserving artifacts
│   ├── collated
│   │   └── records-0.parquet
│   ├── diffs
│   │   └── has_diff=false
│   │       └── records-0.parquet
│   ├── metrics
│   │   └── source=libguides
│   │       └── records-0.parquet
│   ├── records
│   │   └── source=libguides
│   │       └── has_diff=false
│   │           └── records-0.parquet
│   ├── run.json
│   └── transformed
│       ├── a
│       │   ├── libguides-2024-04-03-full-transformed-records-to-index.json
│       │   └── libguides-2024-10-28-daily-transformed-records-to-index.json
│       ├── b
│       │   ├── libguides-2024-04-03-full-transformed-records-to-index.json
│       │   └── libguides-2024-10-28-daily-transformed-records-to-index.json
│       └── logs.txt

While the following shows if PRESERVE_ARTIFACTS=false or is not set (default behavior):

└── 2024-11-04_19-45-59           <------------ WITHOUT preserving artifacts
    ├── records
    │   └── source=libguides
    │       └── has_diff=false
    │           └── records-0.parquet
    └── run.json

PR Reviewing NOTE: There are a fair amount of files touched in this PR, but if reviewed by commit, each commit is fairly small and self-contained.

How can a reviewer manually see the effects of these changes?

The following will allow running a job with a fair amount of input files and records:

1- Set production AWS credentials

2- Set env var:

TIMDEX_BUCKET=timdex-extract-prod-300442551476

3- Init a job, produce a CSV file of input files (results in about 630), and run a diff. NOTE: this run will take roughly 13 minutes given the large number of input files, but each fairly small. This could be slightly sped up by setting TRANSMOGRIFIER_MAX_WORKERS=9 (default is 6), but be wary this is exerting a fair amount of pressure on the system given our 10 CPU core machines.

pipenv run abdiff --verbose init-job -d output/jobs/scale -a 008e20c -b 3f77c7c

pipenv run abdiff --verbose timdex-sources-csv -o output/jobs/scale/scale.csv -s libguides,researchdatabases,gismit,gisogm,aspace

pipenv run abdiff --verbose run-diff -d output/jobs/scale -i output/jobs/scale/scale.csv

The final result should look similar, with only the "records" dataset persisting at the end:

output/jobs/scale
├── job.json
├── runs
│   └── 2024-11-05_14-50-04
│       ├── records
│       │   ├── source=aspace
│       │   │   ├── has_diff=false
│       │   │   │   └── records-0.parquet
│       │   │   └── has_diff=true                # <------ only source that had diffs for this run
│       │   │       └── records-0.parquet
│       │   ├── source=gismit
│       │   │   └── has_diff=false
│       │   │       └── records-0.parquet
│       │   ├── source=gisogm
│       │   │   └── has_diff=false
│       │   │       ├── records-0.parquet
│       │   │       └── records-1.parquet
│       │   ├── source=libguides
│       │   │   └── has_diff=false
│       │   │       └── records-0.parquet
│       │   └── source=researchdatabases
│       │       └── has_diff=false
│       │           └── records-0.parquet
│       └── run.json

NOTE: If you were to run the CLI command view-job at this point, when attempting to view sample records, you will run into an error along the lines of

duckdb.duckdb.BinderException: Binder Error: Referenced column "has_diff" not found in FROM clause!
Candidate bindings: "record_diff_matrix.related_items", "record_diff_matrix.citation"

This will be immediately resolved by TIMX-385 (which is largely complete in local spike code) which altogether removes the idea of "samples" of records by providing a filterable table of all records. This function throwing this error is removed entirely.

Other notes from test run

  • The joining of A and B records during collate_ab_transforms was formerly somewhat slow, given the number of parquest files that DuckDB would scan but discard. This step is considerably faster.
  • While not present in a dataset of this size, formerly the de-duping process could result in out of memory (OOM) errors. This is no longer the case given an updated SQL query for de-duping in collate_ab_transforms.
  • The calculation of A/B diffs was also a slow process, given it was performed sequentially for all records. By running in true parallelism (multiple CPU cores), this is also considerbly quicker.

Includes new or updated dependencies?

NO

Changes expectations for external applications?

NO

What are the relevant tickets?

Developer

  • All new ENV is documented in README
  • All new ENV has been added to staging and production environments
  • All related Jira tickets are linked in commit message(s)
  • Stakeholder approval has been confirmed (or is not needed)

Code Reviewer(s)

  • The commit message is clear and follows our guidelines (not just this PR message)
  • There are appropriate tests covering any new functionality
  • The provided documentation is sufficient for understanding any new functionality introduced
  • Any manual tests have been performed or provided examples verified
  • New dependencies are appropriate or there were no changes

@ghukill ghukill force-pushed the TIMX-383-pipeline-tweaks-large-runs branch 2 times, most recently from 5077089 to faa3583 Compare November 5, 2024 15:38
@coveralls
Copy link

coveralls commented Nov 5, 2024

Pull Request Test Coverage Report for Build 11800082877

Details

  • 98 of 133 (73.68%) changed or added relevant lines in 10 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage decreased (-2.4%) to 85.265%

Changes Missing Coverage Covered Lines Changed/Added Lines %
abdiff/cli.py 6 9 66.67%
abdiff/core/collate_ab_transforms.py 28 31 90.32%
abdiff/core/create_final_records.py 22 35 62.86%
abdiff/core/calc_ab_diffs.py 20 36 55.56%
Totals Coverage Status
Change from base Build 11728992114: -2.4%
Covered Lines: 787
Relevant Lines: 923

💛 - Coveralls

a.source,
a.run_date,
a.run_type,
COALESCE(a.action, b.action) AS action,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noting that this COALESCE(a.action, b.action) AS action remains non-deterministic. @jonavellecuerdo pointed out that two different versions of Transmogrifier could technically result in the same file getting interpreted as "to index" or "to delete" which would vary this action value.

It is proposed that we ignore for now. It remains a very unlikely edge case, and even more so, it actually exposes something that ABDiff is not equipped to handle yet: what if a timdex_record_id changes for the same intellectual record? or in this case, the action changes?

This is getting into territory we should explore sometime -- what it means when Transmog varies something fundamental about the record, beyond just the metadata -- but I would propose as out of scope to ABDiff (at this time) merely showing us differences in metadata.

Comment on lines +96 to +99
if len(pending_futures) >= max_parallel_processes:
idx, completed_future = pending_futures.pop(0)
result = completed_future.result()
logger.info(f"Yielding diffed batch: {idx}")
yield result
Copy link
Contributor Author

@ghukill ghukill Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: I learned something I had not considered about parallel workers during this work.

I had thought that ProcessPoolExecutor(max_workers=X) was sufficient to reduce CPU and memory load, but that's only partially true. While only X workers will actively use the CPU at once, they will be scheduled all together at once, as will the arguments for each worker. If the arguments -- data passed to the worker -- is large, this can actually put a large amount of data into memory before the worker even starts "working" on it.

As such, sometimes we need to actually throttle how quickly we even schedule workers. This code ensures that the "in flight" workers is less than a threshold, and we aren't scheduling too many at once.

Consider this example. We have 100gb of data, but we'll only pass 1gb to each worker to perform calculations on, and we'll only have ProcessPoolExecutor(max_workers=10). I had thought this meant only 10 workers x 1gb = 10gb in memory at any given time. But actually, we would schedule 100 workers, each with 1gb of "arguments" (input data), and therefore 100gb could be stored in memory. Sure, only 10 workers would be "working" on the 1gb they were passed each, but the waiting workers would still have their input data already in memory. This is particularly sneaky when the input arguments are coming from an iterator; it would "drain" the iterator completely (the full 100gb) to schedule all the workers, even though only 10 at once would be "working".

What we want is actually only < 10 workers even scheduled at once, thereby slowly consuming the iterator of 100gb, and keeping memory pressure low.

This is probably poorly worded, and happy to discuss more, but worth noting. FWIW, I have confirmed that this is memory safe as-is by following the pattern of throttling how many workers we have submitted to the ProcessPoolExecutor().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Informative!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I've made a personal note to revisit this. I know for a fact that this works, and is memory safe at scale, so perhaps it's okay to proceed with it as-is at the moment. But I'm not 100% sure what I've outlined above is accurate; I'm personally interested in digging into this a bit more, and confirming if workers can be scheduled with sizable amounts of data even before their processes are formally begun.

@ghukill ghukill force-pushed the TIMX-383-pipeline-tweaks-large-runs branch 2 times, most recently from e5d1401 to 9d416e9 Compare November 7, 2024 18:32
Why these changes are being introduced:

At scale, a couple of issues emerged from the record collation
process:

1. it was unknown what stage of the work was being performed
2. record joining was steady, but slow with large number of files
3. final deduping could cause OOM

How this addresses that need:

* additional logging introduced throughout
* intermediate, temporary datasets are removed explicitly after no longer needed
* join operation focused on specific parquet files vs all
parquet files and DuckDB WHERE clause filtering; this improves
performance while keeping the same memory efficiency
* final deduping uses minimal amount of columns for ordering,
 then joins on new 'abdiff_record_id' column introduced
 for final output

In combination, these changes supported a run of 4.1m+ records,
totaling around 75gb in transformed files.

Side effects of this change:
* None

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/TIMX-383
Why these changes are being introduced:

Once a collated dataset is prepared, we need to calculate diffs
for each record.  This was memory efficient, but slow, as we
sequentially worked through batches of records.

How this addresses that need:
* By having get_diffed_batches_iter() use parallel processes to
produce batches, the work is sped up considerably, while still
within memory bounds

Side effects of this change:
* None

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/TIMX-383
Why these changes are being introduced:

The size of the raw, original transformed files that are produced by
Transmogrifier are orders of magnitude larger than the parquet datasets
created of the same joined and deduped records.

Unless there is a need to re-use or analyze them, removing them as records
are prepared for collation frees up disk space as the process works.

How this addresses that need:
* Remove transformed files as individual A and B records are written to
the 'transformed' dataset
* Remove the '/transformed' directory entirely once all files are utilized
* Add new env var 'PRESERVE_ARTIFACTS' that, if set to 'true', will skip
this removal.  This optional env var will be used by other artifact removal
steps currently underway.  Default is to remove.

Side effects of this change:
* Reduced storage size of final run

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/TIMX-366
Why these changes are being introduced:

With the full pipeline mostly formed it was observed that we could
simplify the final artifacts produced by a run by combining the 'diffs'
and 'metrics' datasets into a final 'records' dataset.

This dovetails with the new optional env var 'PRESERVE_ARTIFACTS' where
every other artifact except this one would be removed after successful creation.

How this addresses that need:
* Adds new core function 'create_final_records' that writes a final
dataset containing all records and diff information needed for statistical
and individual record analysis.

Side effects of this change:
* Fewer final disparate datasets for a run

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/TIMX-370
@ghukill ghukill force-pushed the TIMX-383-pipeline-tweaks-large-runs branch from 9d416e9 to 268edd1 Compare November 7, 2024 18:49
@ghukill ghukill marked this pull request as ready for review November 7, 2024 18:49
Copy link
Contributor

@ehanson8 ehanson8 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great updates that are clearly informed by the challenges encountered when scaling up the input! One question (that I assume has a good answer) so consider this an approval once @jonavellecuerdo gives the ✅

Comment on lines +96 to +99
if len(pending_futures) >= max_parallel_processes:
idx, completed_future = pending_futures.pop(0)
result = completed_future.result()
logger.info(f"Yielding diffed batch: {idx}")
yield result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Informative!

Copy link
Contributor

@jonavellecuerdo jonavellecuerdo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, here are a few questions I have for you! Thank you again for providing a lot of information on the key changes / improvements introduced in this PR. Definitely learning a lot from this project.

Comment on lines 266 to 269
a.timdex_record_id,
a.source,
a.run_date,
a.run_type,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think these columns also need to be wrapped in a COALESCE(). Isn't it true that if:

a record exists in version B but not in version A, then these columns would be set to NULL values

Unless something changed? 🤔

Copy link
Contributor Author

@ghukill ghukill Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all these values -- timdex_record_id, source, run_date, and run_type -- we are gauranteed they will be the same value per this join. As such, we can remove othe unecessary COALESCE() and just take the value arbitrarily from either a or b. I missed this on our first pass on this.

It's probably inconsequential, but at scale removing 4 COALESCE() calls should be a bit quicker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not sure I'm 100% following. 🤔 Going back to our previous discussion where we discussed what happens if a record is present in version A but deleted in version B, isn't the code still reproducing the same situation described here where:

As we're joining records and looping through the list of transformed files, when we get to the "*_deleted.txt" file, table a will not have any records, but table b will have records. If we're only selecting the columns from a, the the joined output will look like, no? 🤔

abdiff_record_id timdex_record_id source run_date run_type action record_a record_b
asdf12345 delete

Copy link
Contributor Author

@ghukill ghukill Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonavellecuerdo - even if a record is "deleted" in version B, it would still have column values for timdex_record_id, source, run_date, and run_type. At least it should: these can all be known from the filename of the <source>-<date>-<run-type>-to-delete.txt file that Transmogrifier produces.

If not, then you're right; it'd be dangerous to arbitrarily only take these columns from A or B.

And I think we have proof that these will always be set from here where we produce the base of the record in the intermediate "transformed" dataset, regardless of whether it came from a "to-index" or "to-delete" file produced by Transmog, ensuring these values are always present. These values come from the filename_details, which is produced by parse_timdex_filename().

Note that we still do use COALESCE() for action here. This could also be removed, and we could artibrarily take a.action or b.action knowing they are both present (again, came from the filename). I elected to leave this, as a bit of a talisman for when we revisit that issue of them potentially being different as outlined in this comment.

@ghukill
Copy link
Contributor Author

ghukill commented Nov 8, 2024

@jonavellecuerdo, @ehanson8 - discussions with @jonavellecuerdo revealed a couple of subtle bugs with the changes in this PR. Going to push another commit with tweaks to address, but may push this review into next week! Thanks for your patience and excellent feedback so far.

Why these changes are being introduced:

A bug was discovered in the function calc_record_diff() where if the
A and B records passed do not share any common root keys, the library
would throw an error when attempting diff.affected_root_keys, which is
precisely what we use for determining if any TIMDEX fields (root keys)
were modified.

How this addresses that need:
* Adds additional logic to handle situation if A or B is empty, or if
somehow they neglect to share any root keys

Side effects of this change:
* Successful record diff calculating for large runs.

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/TIMX-383
Why these changes are being introduced:

When joining the A and B record versions, a previous commit removed a handful
of COALESCE statements in the SQL query, assuming they were redundant given
that A or B record would arbitrarily supply those shared values.  Code review
surfaced that if A or B is missing, then a COALESCE remains helpful to
capture those values.

Side effects of this change:
* Improved A/B record joining

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/TIMX-383
Why these changes are being introduced:

When attempting to write the intermediate transformed files dataset,
the error 'OSError: Too many open files' was cropping up.  Because this
dataset is partitioned by transformed filename, which can range into the
thousands, the pyarrow dataset writing method was keeping a large number
of directories and files open during writing, exceeding the open file limit
on the operating system (aka 'ulimit' on Mac and Linux).

How this addresses that need:
* Utility function write_to_dataset() reads OS configurations to understand
the open file limit, then limits concurrency of writes via precisely
relevant 'max_open_files' argument.

Side effects of this change:
* If the operating system has an unusually low open files limit, writing
to dataset will be slower but still functional

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/TIMX-383
Why these changes are being introduced:

It was suggested in a previous PR that we raise an exception if ANY
Transmogrifier containers fail.  This appeared to make sense from a data
integrity POV, ensuring that all input files were properly transformed
by Transmogrifier for analysis.  In retrospect, this fell short in two ways:

1. For very large runs, a very small number of containers failing may be
admissible given the run will still contain valuable data and should
be allowed to continue.

2. More subtle, a failed Transmogrifier container may not be an intermittent
bug, but potentially an indication that a code change in Transmog is
problematic.  The responsibility here should be for this application to
surface this in a meaningful way during analysis (out of scope here),
not halt the run completely.

How this addresses that need:
* Creates new env var ALLOW_FAILED_TRANSMOGRIFIER_CONTAINERS, that defaults
to 'true' in the Config class, to allow failed containers
* Continue with run even if failed containers are present

Side effects of this change:
* Runs with failed Transmog containers will continue.

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/TIMX-383
@ghukill
Copy link
Contributor Author

ghukill commented Nov 12, 2024

@jonavellecuerdo, @ehanson8 - the PR is ready for re-review. @ehanson8, I know that you gave an implicit approval, but I've re-tagged you here for review given some of these recent updates.

Thanks to @jonavellecuerdo for pushing on a couple of key areas, specifically:

  • the removal of COALESCE statements during A/B joining
  • related, the correctness and reliability of DeepDiff usage given unusual A or B records

I've tried to make the commits descriptive enough to stand on their own for review:

At this time, I still have not successfully performed a "full" run. Given these updates I got very close over the weekend (let it run), but encountered what appears to be an intermittent Docker engine failure. That said, I would like to continue with this PR, as I think it represents the best form so far for this application to successfully complete a "full" run (observing that a "full" run is probably not very common, but serves as a good benchmark of performance and reliability to strive for).

# read open file limit and set max_open_files as a fraction
if not max_open_files:
soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
max_open_files = max(int(soft_limit * 0.75), 50)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ratio of allowing max open files to be about 75% of the OS's limit is somewhat arbitrary, but based on some reading. 100% continued to throw errros, as the margins are too close. 50% worked, but is a decent performance degradation (i.e. half-speed possible of concurrent writes). 75% feels like a sweet spot for the moment of safe buffer to not exceed limit, but still pretty performant.

Copy link
Contributor

@ehanson8 ehanson8 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look good, one potential docstring typo. Again consider this effectively an approval once @jonavellecuerdo signs off, great work!

Copy link
Contributor

@jonavellecuerdo jonavellecuerdo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look good to me! Thank you for providing such detailed commits. ✨ Just had one question for ya'.

)

written_files = []
ds.write_dataset(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ghukill When is a file closed? I figured that when the max number of rows have been written to a file, then ds.write_dataset would close it automatically. Is that not the case? Does it wait until it hits the max_open_files limit before closing any files -- as this description seems to suggest:

If greater than 0 then this will limit the maximum number of files that can be left open. If an attempt is made to open too many files then the least recently used file will be closed. If this setting is set too low you may end up fragmenting your data into many small files

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The crux of the issue is partitioning. Because we write the transformed files where each filename gets a partition, it almost results in a dedicated partition (directory) for each input file, which could be thousands.

Normally, this many partitions is not ideal. And we may even want to return to this and re-evaluate. But it's helpful for joining, where we can limit by partition as a glob pattern. We could also do this with individual files.

I think the pyarrow dataset writer holds folder/files open for writing when it's partitioning. If not partitioning, I think it would happily output 2k parquet files (roughly the length of the iterator for the large job). But with partitions, I think it keeps those files open, as it may need to write to them.

I think we're close in a good architecture, but we may want to revisit this. For example, even with the changes, it still consumes a lot of disk space. I'm at 100+gb for transformed files only. Granted they will get deleted as they are joined... we could theoretically performs joins right when the A and B containers are finished and avoid having those transformed files accumlate fully just to be deleted shortly thereafter.

Which is a long way of saying: I think it's related to the partioning during writing, and I think we could/should revisit at some point.

@ghukill ghukill merged commit e436d53 into main Nov 12, 2024
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants