[DEV-14238] - Transaction Loader Refactor#4577
[DEV-14238] - Transaction Loader Refactor#4577zachflanders-frb wants to merge 22 commits intoqatfrom
Conversation
…n / award keys for relations instead of lookup keys
…mmands to remove unused id columns
…d update awards loader
There was a problem hiding this comment.
I left this step largely unchanged except I used the unique_award_key/generated_unique_award_id and unique_transaction_id to form the relationships between the awards and normalized transactions.
Co-authored-by: Andrew Guest <110476931+aguest-kc@users.noreply.github.com>
| table_exists = self.spark._jsparkSession.catalog().tableExists(f"int.awards") | ||
| if not table_exists: |
There was a problem hiding this comment.
We could avoid creating a single-use variable here.
| table_exists = self.spark._jsparkSession.catalog().tableExists(f"int.awards") | |
| if not table_exists: | |
| if not self.spark._jsparkSession.catalog().tableExists(f"int.awards"): |
…ithub.com/fedspendingtransparency/usaspending-api into ftr/dev-14238-transaction-loader-refactor
…n_id back to long type
| super().load_transactions() | ||
| self.populate_award_ids() | ||
| self.populate_transaction_normalized_ids() | ||
| self.link_transactions_to_normalized() |
There was a problem hiding this comment.
This involves 4 sequential executions of merge statements. I need to explore whether I can adjust this to two executions and whether that will be more performant.
…ithub.com/fedspendingtransparency/usaspending-api into ftr/dev-14238-transaction-loader-refactor
sethstoudenmier
left a comment
There was a problem hiding this comment.
An initial pass on the changes. Overall, I didn't see anything that I would block over. Will take another pass before approving once testing is done.
| @@ -62,7 +62,7 @@ jobs: | |||
| with: | |||
| cov-report-name: 'spark-load-transactions-fabs-fpds-tests' | |||
There was a problem hiding this comment.
| cov-report-name: 'spark-load-transactions-fabs-fpds-tests' | |
| cov-report-name: 'spark-load-transactions-tests' |
I believe we actually can remove the generation of these reports, however, to be consistent we should change the name for now.
There was a problem hiding this comment.
Can we update the filename accordingly as well?
| ) | ||
|
|
||
| def handle(self, *args, **options): | ||
| with self.prepare_spark(): |
There was a problem hiding this comment.
Should this use the defined function in usaspending_api/etl/transaction_delta_loaders/context_managers.py instead of the separately defined method below?
| update_last_load_date("awards", next_last_load) | ||
|
|
||
| @contextmanager | ||
| def prepare_spark(self): |
There was a problem hiding this comment.
Similar to my comment above; could this be removed and instead use the function in usaspending_api/etl/transaction_delta_loaders/context_managers.py?
| subquery = """ | ||
| SELECT awards.generated_unique_award_id AS id_to_remove | ||
| FROM int.awards | ||
| LEFT JOIN int.transaction_normalized on awards.transaction_unique_id = transaction_normalized.transaction_unique_id |
There was a problem hiding this comment.
This probably isn't too bad performance wise, but it is possible that an EXISTS instead of a JOIN could help with performance some since we don't care about returning any of the values from int.transaction_normalized in this query.
I won't add this comment anywhere else, but the same could hold true. Being inside of a subquery may negate benefit of using EXISTS over a JOIN. If performance is looking good, then probably not worth exploring.
|
|
||
| def delete_records_sql(self): | ||
| id_col = "generated_unique_award_id" | ||
| # TODO could do an outer join here to find awards that do not join to transaction fpds or transaction fabs |
There was a problem hiding this comment.
Is this comment saying that you could move away from Transaction Normalized to use FPDS and FABS directly? If so, do you mind clarifying in the comment.
| /* NOTE: In Postgres, the default sorting order sorts NULLs as larger than all other values. | ||
| However, in Spark, the default sorting order sorts NULLs as smaller than all other | ||
| values. In the Postgres transaction loader the default sorting behavior was used, so to | ||
| be consistent with the behavior of the previous loader, we need to reverse the default | ||
| Spark NULL sorting behavior for any field that can be NULL. */ |
There was a problem hiding this comment.
| /* NOTE: In Postgres, the default sorting order sorts NULLs as larger than all other values. | |
| However, in Spark, the default sorting order sorts NULLs as smaller than all other | |
| values. In the Postgres transaction loader the default sorting behavior was used, so to | |
| be consistent with the behavior of the previous loader, we need to reverse the default | |
| Spark NULL sorting behavior for any field that can be NULL. */ | |
| -- NOTE: In Spark, the default sorting order sorts NULLs as smaller than all other values. |
Now that we are so far removed from the Postgres pipeline, I feel that this comment can be updated to include simply how Spark handles NULL values.
| SELECT | ||
| latest.id, | ||
| latest.unique_award_key, | ||
| 0 AS subaward_count, -- for consistency with Postgres table |
There was a problem hiding this comment.
| 0 AS subaward_count, -- for consistency with Postgres table | |
| 0 AS subaward_count, -- default value that is updated later |
Similar to other comment suggestion, since we are here and far removed from the Postgres pipeline it would be good to make this more meaningful.
Description:
This PR refactors the transaction loader flow to remove the lookup tables.
Technical Details:
This PR make the following changes to the transaction loader flow:
afa_generated_unique/detached_award_proc_unique) for the merge with transaction fabs/fpds/normalized tables.last_etl_date. This ensures the entire tables are kept in sync.WHEN NOT MATCHED BY SOURCEin the merge statement itself instead of having a separate delete function for transactionsunique_award_keycolumn to form the relationships between awards and transactions instead of using the award lookup table (eliminates need for the award lookup table)load_transactions_in_deltainto separate commandsRequirements for PR Merge:
Explain N/A in above checklist: