Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
3d4de9e
[DEV-13931] - break apart transaction types and use unique transactio…
zachflanders-frb Dec 11, 2025
ee11d91
[DEV-14238] - Add load_transactions_fabs_in_delta and update other co…
zachflanders-frb Jan 5, 2026
8e93414
[DEV-14238] - WIP
zachflanders-frb Jan 7, 2026
682253f
[DEV-14238] - WIP transaction flow refactor
zachflanders-frb Jan 12, 2026
b4212aa
[DEV-14238] - update load_table_to_delta to add hash column option an…
zachflanders-frb Jan 14, 2026
3772625
[DEV-14238] - remove last_etl_date from load_awards
zachflanders-frb Jan 14, 2026
bd8a1ab
[DEV-14238] - Adding tests for transaction loaders
zachflanders-frb Jan 15, 2026
d27f079
[DEV-14238] - flake8 fixes
zachflanders-frb Jan 15, 2026
1c45071
[DEV-14238] - Updating github actions
zachflanders-frb Jan 15, 2026
5324741
Update usaspending_api/common/spark/configs.py
zachflanders-frb Jan 15, 2026
fdcbe0b
[DEV-14238] - Adding external load types
zachflanders-frb Jan 15, 2026
00b898d
Merge branch 'ftr/dev-14238-transaction-loader-refactor' of https://g…
zachflanders-frb Jan 15, 2026
96b5326
[DEV-14238] - flake8 fix
zachflanders-frb Jan 15, 2026
1d4ba2b
[DEV-14238] - cast transaction id cols to string in vw_awards
zachflanders-frb Jan 15, 2026
475e50b
[DEV-14238] - add ids and transaction_ids
zachflanders-frb Jan 20, 2026
2e9ec24
[DEV-14238] - adding award_ids
zachflanders-frb Jan 21, 2026
a7abd6c
[DEV-14238] - updating lastest_transaction_id and earliest_transactio…
zachflanders-frb Jan 21, 2026
3568600
[DEV-14238] - removing trailing white space
zachflanders-frb Jan 21, 2026
76ad7be
[DEV-14238] - moving spark context manager out of loader class
zachflanders-frb Jan 21, 2026
fcd5e30
Merge branch 'qat' into ftr/dev-14238-transaction-loader-refactor
zachflanders-frb Jan 21, 2026
180ca0c
[DEV-14238] - Remove extra logging
zachflanders-frb Jan 22, 2026
b1a00e7
Merge branch 'ftr/dev-14238-transaction-loader-refactor' of https://g…
zachflanders-frb Jan 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions .github/workflows/pull-request-checks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,6 @@ jobs:
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/test-spark-integration-load-transactions-fabs-fpds.yaml

Run-Spark-Integration-Load-Transactions-Lookup-Tests:
name: Run Spark Integration Load Transactions Lookup Tests
needs:
- Run-Code-Style-Checks
- Build-Broker-Docker-Image
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/test-spark-integration-load-transactions-lookup.yaml

Run-Spark-Integration-Load-To-From-Delta-Tests:
name: Run Spark Integration Load To From Delta Tests
needs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:
with:
cov-report-name: 'spark-load-transactions-fabs-fpds-tests'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cov-report-name: 'spark-load-transactions-fabs-fpds-tests'
cov-report-name: 'spark-load-transactions-tests'

I believe we actually can remove the generation of these reports, however, to be consistent we should change the name for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update the filename accordingly as well?

include-glob: 'test_*.py *_test.py'
keyword: 'test_load_transactions_in_delta_fabs_fpds.py'
keyword: 'test_load_transactions.py'
marker: 'spark'
num-processes: 0
working-directory: ./usaspending-api

This file was deleted.

2 changes: 1 addition & 1 deletion .github/workflows/test-spark-integration-other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,6 @@ jobs:
with:
cov-report-name: 'spark-other-tests'
include-glob: 'test_*.py *_test.py'
keyword: '(not test_load_to_from_delta.py and not test_load_transactions_in_delta_lookups.py and not test_load_transactions_in_delta_fabs_fpds.py)'
keyword: '(not test_load_to_from_delta.py and not test_load_transactions.py)'
marker: 'spark'
working-directory: ./usaspending-api
2 changes: 1 addition & 1 deletion usaspending_api/awards/delta_models/awards.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"fpds_parent_agency_id": "STRING",
"funding_agency_id": "INTEGER",
"generated_unique_award_id": "STRING NOT NULL",
"id": "LONG NOT NULL",
"id": "LONG",
"is_fpds": "BOOLEAN NOT NULL",
"last_modified_date": "DATE",
"latest_transaction_id": "LONG",
Expand Down
26 changes: 15 additions & 11 deletions usaspending_api/common/helpers/spark_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
from usaspending_api.common.helpers.aws_helpers import is_aws, get_aws_credentials
from usaspending_api.config import CONFIG
from usaspending_api.config.utils import parse_pg_uri, parse_http_url
from usaspending_api.transactions.delta_models import DETACHED_AWARD_PROCUREMENT_DELTA_COLUMNS, PUBLISHED_FABS_COLUMNS
from usaspending_api.transactions.delta_models import (
DETACHED_AWARD_PROCUREMENT_DELTA_COLUMNS,
PUBLISHED_FABS_DELTA_COLUMNS,
)
from usaspending_api.transactions.delta_models.transaction_fabs import (
TRANSACTION_FABS_COLUMN_INFO,
TRANSACTION_FABS_COLUMNS,
Expand Down Expand Up @@ -575,7 +578,7 @@ def load_dict_to_delta_table(spark, s3_data_bucket, table_schema, table_name, da
table_to_col_names_dict["awards"] = list(AWARDS_COLUMNS)
table_to_col_names_dict["financial_accounts_by_awards"] = list(FINANCIAL_ACCOUNTS_BY_AWARDS_COLUMNS)
table_to_col_names_dict["detached_award_procurement"] = list(DETACHED_AWARD_PROCUREMENT_DELTA_COLUMNS)
table_to_col_names_dict["published_fabs"] = list(PUBLISHED_FABS_COLUMNS)
table_to_col_names_dict["published_fabs"] = list(PUBLISHED_FABS_DELTA_COLUMNS)

table_to_col_info_dict = {}
for tbl_name, col_info in zip(
Expand All @@ -586,15 +589,16 @@ def load_dict_to_delta_table(spark, s3_data_bucket, table_schema, table_name, da
table_to_col_info_dict[tbl_name][col.dest_name] = col

# Make sure the table has been created first
call_command(
"create_delta_table",
"--destination-table",
table_name,
"--alt-db",
table_schema,
"--spark-s3-bucket",
s3_data_bucket,
)
if not spark.catalog.tableExists(table_name, table_schema):
call_command(
"create_delta_table",
"--destination-table",
table_name,
"--alt-db",
table_schema,
"--spark-s3-bucket",
s3_data_bucket,
)

if data:
insert_sql = f"INSERT {'OVERWRITE' if overwrite else 'INTO'} {table_schema}.{table_name} VALUES\n"
Expand Down
6 changes: 3 additions & 3 deletions usaspending_api/common/spark/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
# process is started from, even if started under the hood of a Py4J JavaGateway). With a "standalone" (not
# YARN or Mesos or Kubernetes) cluster manager, only client mode is supported.
"spark.submit.deployMode": "client",
# Default of 1g (1GiB) for Driver. Increase here if the Java process is crashing with memory errors
"spark.driver.memory": "1g",
"spark.executor.memory": "1g",
# Default of 4g (4GiB) for Driver. Increase here if the Java process is crashing with memory errors
"spark.driver.memory": "4g",
"spark.executor.memory": "4g",
"spark.ui.enabled": "false", # Does the same as setting SPARK_TESTING=true env var
"spark.jars.packages": ",".join(SPARK_SESSION_JARS),
}
Expand Down
Loading