Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
01e1cfb
[DEV-13391] Create dataframe to populate transaction download
sethstoudenmier Jan 27, 2026
c74260a
[DEV-13391] handle merge conflict
sethstoudenmier Jan 27, 2026
8bff522
[DEV-13391] adjust variable names
sethstoudenmier Jan 27, 2026
6c84262
[DEV-13391] avoid multiple award_id columns
sethstoudenmier Jan 27, 2026
1ed04f2
[DEV-13391] removing url_encode and adding TODO
sethstoudenmier Jan 27, 2026
f791216
Merge branch 'qat' of https://github.com/fedspendingtransparency/usas…
sethstoudenmier Jan 28, 2026
dea5b37
[DEV-13391] add test
sethstoudenmier Jan 28, 2026
673500d
[DEV-13391] handle merge conflicts
sethstoudenmier Feb 3, 2026
78ac539
[DEV-13391] fix style checks
sethstoudenmier Feb 3, 2026
c62fcf3
[DEV-13391] Update date to timestamp
sethstoudenmier Feb 17, 2026
3c1aa40
[DEV-13391] Repartition the final df returned
sethstoudenmier Feb 18, 2026
b128a0c
[DEV-13391] filter out null award ids on FABA dataframe
sethstoudenmier Mar 6, 2026
891a26c
[DEV-13391] try creating file_c table to join
sethstoudenmier Mar 6, 2026
219fcfb
[DEV-13391] test repartitioning dataframes
sethstoudenmier Mar 10, 2026
766854e
[DEV-13391] Test approach with single dataframe to write
sethstoudenmier Mar 10, 2026
8c9a1fd
[DEV-13391] Cleanup
sethstoudenmier Mar 10, 2026
3e7c66c
Merge branch 'qat' of https://github.com/fedspendingtransparency/usas…
sethstoudenmier Mar 10, 2026
5bd2260
Merge branch 'qat' into ftr/dev-13391-populate-transaction-download-t…
sethstoudenmier Mar 11, 2026
8d8c408
Merge branch 'qat' into ftr/dev-13391-populate-transaction-download-t…
sethstoudenmier Mar 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as sf
from pyspark.sql.functions import expr
from pyspark.sql.types import (
Expand Down Expand Up @@ -124,7 +124,7 @@
)


def award_financial_df(spark: SparkSession):
def award_financial_df(spark: SparkSession) -> DataFrame:
faba = spark.table("int.financial_accounts_by_awards").alias("faba")
sa = spark.table("global_temp.submission_attributes").alias("sa")
taa = spark.table("global_temp.treasury_appropriation_account").alias("taa")
Expand Down Expand Up @@ -169,6 +169,7 @@ def award_financial_df(spark: SparkSession):
how="left",
)
.withColumn("submission_period", fy_quarter_period())
# TODO: Update to use url_encode Spark SQL function
.withColumn(
"usaspending_permalink",
sf.when(
Expand Down Expand Up @@ -347,7 +348,8 @@ def load_award_financial_incremental(
(
target.merge(
source,
"s.financial_accounts_by_awards_id = t.financial_accounts_by_awards_id and s.merge_hash_key = t.merge_hash_key",
"s.financial_accounts_by_awards_id = t.financial_accounts_by_awards_id"
" and s.merge_hash_key = t.merge_hash_key",
)
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
Expand Down
Empty file.

Large diffs are not rendered by default.

470 changes: 241 additions & 229 deletions usaspending_api/download/delta_models/transaction_download.py

Large diffs are not rendered by default.

91 changes: 69 additions & 22 deletions usaspending_api/etl/management/commands/load_query_to_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from argparse import ArgumentTypeError
from typing import Callable

from django.core.management.base import BaseCommand
from django.core.management.base import BaseCommand, CommandParser
from pyspark.sql import SparkSession

from usaspending_api.common.etl.spark import create_ref_temp_views
Expand Down Expand Up @@ -30,12 +30,18 @@
load_award_financial,
load_award_financial_incremental,
)
from usaspending_api.download.delta_models.dataframes.transaction_download import (
load_transaction_download,
load_transaction_download_incremental,
)
from usaspending_api.download.delta_models.object_class_program_activity_download import (
load_object_class_program_activity,
load_object_class_program_activity_incremental,
object_class_program_activity_schema,
)
from usaspending_api.download.delta_models.transaction_download import transaction_download_schema
from usaspending_api.download.delta_models.transaction_download import (
transaction_download_schema,
)
from usaspending_api.recipient.delta_models import (
RECIPIENT_LOOKUP_POSTGRES_COLUMNS,
RECIPIENT_PROFILE_POSTGRES_COLUMNS,
Expand All @@ -57,7 +63,10 @@
AWARD_SEARCH_POSTGRES_GOLD_COLUMNS,
award_search_create_sql_string,
)
from usaspending_api.search.delta_models.dataframes.award_search import load_award_search, load_award_search_incremental
from usaspending_api.search.delta_models.dataframes.award_search import (
load_award_search,
load_award_search_incremental,
)
from usaspending_api.search.delta_models.dataframes.transaction_search import (
load_transaction_search,
load_transaction_search_incremental,
Expand All @@ -69,7 +78,12 @@
subaward_search_create_sql_string,
subaward_search_load_sql_string,
)
from usaspending_api.search.models import AwardSearch, SubawardSearch, SummaryStateView, TransactionSearch
from usaspending_api.search.models import (
AwardSearch,
SubawardSearch,
SummaryStateView,
TransactionSearch,
)
from usaspending_api.settings import HOST
from usaspending_api.transactions.delta_models import (
SUMMARY_STATE_VIEW_COLUMNS,
Expand Down Expand Up @@ -276,8 +290,14 @@
"partition_keys": ["is_fpds"],
"partitioning_form": "LIST",
"partitions": [
{"table_suffix": "_fpds", "partitioning_clause": "FOR VALUES IN (TRUE)"},
{"table_suffix": "_fabs", "partitioning_clause": "FOR VALUES IN (FALSE)"},
{
"table_suffix": "_fpds",
"partitioning_clause": "FOR VALUES IN (TRUE)",
},
{
"table_suffix": "_fabs",
"partitioning_clause": "FOR VALUES IN (FALSE)",
},
],
},
"delta_table_create_partitions": None,
Expand Down Expand Up @@ -372,7 +392,10 @@
"postgres_seq_name": None,
"tsvectors": None,
"postgres_partition_spec": None,
"delta_table_create_partitions": ["reporting_fiscal_year", "funding_toptier_agency_id"],
"delta_table_create_partitions": [
"reporting_fiscal_year",
"funding_toptier_agency_id",
],
},
"award_financial_download": {
"model": None,
Expand All @@ -395,7 +418,10 @@
"postgres_seq_name": None,
"tsvectors": None,
"postgres_partition_spec": None,
"delta_table_create_partitions": ["reporting_fiscal_year", "funding_toptier_agency_id"],
"delta_table_create_partitions": [
"reporting_fiscal_year",
"funding_toptier_agency_id",
],
},
"object_class_program_activity_download": {
"model": None,
Expand All @@ -418,21 +444,24 @@
"postgres_seq_name": None,
"tsvectors": None,
"postgres_partition_spec": None,
"delta_table_create_partitions": ["reporting_fiscal_year", "funding_toptier_agency_id"],
"delta_table_create_partitions": [
"reporting_fiscal_year",
"funding_toptier_agency_id",
],
},
"transaction_download": {
"model": None,
"is_from_broker": False,
"source_query": None,
"source_query_incremental": None,
"source_query": load_transaction_download,
"source_query_incremental": load_transaction_download_incremental,
"source_database": None,
"source_table": None,
"destination_database": "rpt",
"swap_table": None,
"swap_schema": None,
"partition_column": "transaction_id",
"partition_column_type": "numeric",
"is_partition_column_unique": False,
"is_partition_column_unique": True,
"delta_table_create_sql": transaction_download_schema,
"delta_table_create_options": {"delta.enableChangeDataFeed": True},
"source_schema": None,
Expand All @@ -441,7 +470,11 @@
"postgres_seq_name": None,
"tsvectors": None,
"postgres_partition_spec": None,
"delta_table_create_partitions": ["awarding_agency_code", "is_fpds", "action_date_fiscal_year"],
"delta_table_create_partitions": [
"awarding_agency_code",
"is_fpds",
"action_date_fiscal_year",
],
},
}

Expand All @@ -458,7 +491,7 @@ class Command(BaseCommand):
destination_table_name: str
spark: SparkSession

def add_arguments(self, parser):
def add_arguments(self, parser: CommandParser) -> None:
parser.add_argument(
"--destination-table",
type=str,
Expand Down Expand Up @@ -486,7 +519,7 @@ def add_arguments(self, parser):
help="Whether or not the table will be updated incrementally",
)

def handle(self, *args, **options):
def handle(self, *args, **options) -> None:
extra_conf = {
# Config for Delta Lake tables and SQL. Need these to keep Dela table metadata in the metastore
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
Expand All @@ -506,12 +539,20 @@ def handle(self, *args, **options):
# Resolve Parameters
destination_table = options["destination_table"]
table_spec = TABLE_SPEC[destination_table]
self.destination_database = options["alt_db"] or table_spec["destination_database"]
self.destination_table_name = options["alt_name"] or destination_table.split(".")[-1]
source_query_key = "source_query_incremental" if options["incremental"] else "source_query"
self.destination_database = (
options["alt_db"] or table_spec["destination_database"]
)
self.destination_table_name = (
options["alt_name"] or destination_table.split(".")[-1]
)
source_query_key = (
"source_query_incremental" if options["incremental"] else "source_query"
)
load_query = table_spec.get(source_query_key)
if load_query is None:
raise ArgumentTypeError(f"Invalid source query. `{source_query_key}` must be specified in the TABLE_SPEC.")
raise ArgumentTypeError(
f"Invalid source query. `{source_query_key}` must be specified in the TABLE_SPEC."
)

# Set the database that will be interacted with for all Delta Lake table Spark-based activity
logger.info(f"Using Spark Database: {self.destination_database}")
Expand All @@ -526,15 +567,19 @@ def handle(self, *args, **options):

if isinstance(load_query, list):
for index, query in enumerate(load_query):
logger.info(f"Running query number: {index + 1}\nPreview of query: {query[:100]}")
logger.info(
f"Running query number: {index + 1}\nPreview of query: {query[:100]}"
)
self.run_spark_sql(query)
else:
self.run_spark_sql(load_query)

if spark_created_by_command:
self.spark.stop()

def run_spark_sql(self, query: str | Callable[[SparkSession, str, str], None]):
def run_spark_sql(
self, query: str | Callable[[SparkSession, str, str], None]
) -> None:
if isinstance(query, str):
jdbc_conn_props = get_jdbc_connection_properties()
self.spark.sql(
Expand All @@ -551,4 +596,6 @@ def run_spark_sql(self, query: str | Callable[[SparkSession, str, str], None]):
elif isinstance(query, Callable):
query(self.spark, self.destination_database, self.destination_table_name)
else:
raise ArgumentTypeError(f"Invalid query. `{query}` must be a string or a Callable.")
raise ArgumentTypeError(
f"Invalid query. `{query}` must be a string or a Callable."
)
Loading
Loading