diff --git a/usaspending_api/download/delta_models/award_financial_download.py b/usaspending_api/download/delta_models/award_financial_download.py index ebcfede4fd..db58201a46 100644 --- a/usaspending_api/download/delta_models/award_financial_download.py +++ b/usaspending_api/download/delta_models/award_financial_download.py @@ -1,5 +1,5 @@ from delta.tables import DeltaTable -from pyspark.sql import SparkSession +from pyspark.sql import DataFrame, SparkSession from pyspark.sql import functions as sf from pyspark.sql.functions import expr from pyspark.sql.types import ( @@ -124,7 +124,7 @@ ) -def award_financial_df(spark: SparkSession): +def award_financial_df(spark: SparkSession) -> DataFrame: faba = spark.table("int.financial_accounts_by_awards").alias("faba") sa = spark.table("global_temp.submission_attributes").alias("sa") taa = spark.table("global_temp.treasury_appropriation_account").alias("taa") @@ -169,6 +169,7 @@ def award_financial_df(spark: SparkSession): how="left", ) .withColumn("submission_period", fy_quarter_period()) + # TODO: Update to use url_encode Spark SQL function .withColumn( "usaspending_permalink", sf.when( @@ -347,7 +348,8 @@ def load_award_financial_incremental( ( target.merge( source, - "s.financial_accounts_by_awards_id = t.financial_accounts_by_awards_id and s.merge_hash_key = t.merge_hash_key", + "s.financial_accounts_by_awards_id = t.financial_accounts_by_awards_id" + " and s.merge_hash_key = t.merge_hash_key", ) .whenNotMatchedInsertAll() .whenNotMatchedBySourceDelete() diff --git a/usaspending_api/download/delta_models/dataframes/__init__.py b/usaspending_api/download/delta_models/dataframes/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/usaspending_api/download/delta_models/dataframes/transaction_download.py b/usaspending_api/download/delta_models/dataframes/transaction_download.py new file mode 100644 index 0000000000..d5544519e1 --- /dev/null +++ b/usaspending_api/download/delta_models/dataframes/transaction_download.py @@ -0,0 +1,651 @@ +import logging + +from delta import DeltaTable +from django.contrib.postgres.aggregates import ArrayAgg +from django.utils.functional import cached_property +from pyspark.sql import DataFrame, SparkSession, column +from pyspark.sql import functions as sf +from pyspark.sql.types import DecimalType + +from usaspending_api.download.helpers.download_annotation_functions import AWARD_URL +from usaspending_api.references.models import DisasterEmergencyFundCode + +logger = logging.getLogger(__name__) + + +class TransactionDownload: + + def __init__(self, spark: SparkSession): + self.award_search = spark.table("rpt.award_search") + self.disaster_emergency_fund_code = spark.table("global_temp.disaster_emergency_fund_code") + self.federal_account = spark.table("global_temp.federal_account") + self.faba = spark.table("int.financial_accounts_by_awards").withColumnRenamed( + "award_id", "faba_award_id" + ) + self.object_class = spark.table("global_temp.object_class") + self.ref_program_activity = spark.table("global_temp.ref_program_activity") + self.submission_attributes = spark.table("global_temp.submission_attributes") + self.transaction_search = spark.table("rpt.transaction_search").withColumnRenamed( + "award_id", "transaction_award_id" + ) + self.treasury_appropriation_account = spark.table("global_temp.treasury_appropriation_account") + + self.spark = spark + + @cached_property + def defc_by_group(self) -> dict[str, list[str]]: + defc_groups = ( + DisasterEmergencyFundCode.objects.filter(group_name__isnull=False) + .values("group_name") + .annotate(code_list=ArrayAgg("code")) + .values("group_name", "code_list") + ) + return {val["group_name"]: val["code_list"] for val in defc_groups} + + @property + def iija_defc(self) -> list[str]: + return self.defc_by_group["infrastructure"] + + @property + def covid_defc(self) -> list[str]: + return self.defc_by_group["covid_19"] + + @property + def faba_aggs_df(self) -> DataFrame: + faba_aggs = { + "covid_19_obligated_amount": ( + sf.sum( + sf.when( + self.faba.disaster_emergency_fund_code.isin(self.covid_defc), + self.faba.transaction_obligated_amount, + ) + ).cast(DecimalType(23, 2)) + ), + "covid_19_outlayed_amount": ( + sf.sum( + sf.when( + self.faba.disaster_emergency_fund_code.isin(self.covid_defc) + & self.submission_attributes.is_final_balances_for_fy, + self.faba.gross_outlay_amount_by_award_cpe + + self.faba.ussgl487200_down_adj_pri_ppaid_undel_orders_oblig_refund_cpe + + self.faba.ussgl497200_down_adj_pri_paid_deliv_orders_oblig_refund_cpe, + ) + ).cast(DecimalType(23, 2)) + ), + "defc_for_overall_award": ( + sf.concat_ws( + ";", + sf.sort_array( + sf.collect_set( + sf.when( + self.faba.disaster_emergency_fund_code.isNotNull(), + sf.concat( + self.faba.disaster_emergency_fund_code, + sf.lit(": "), + self.disaster_emergency_fund_code.public_law, + ), + ) + ) + ), + ) + ), + "federal_accounts_funding_this_award": ( + sf.concat_ws(";", sf.sort_array(sf.collect_set(self.federal_account.federal_account_code))) + ), + "iija_obligated_amount": ( + sf.sum( + sf.when( + self.faba.disaster_emergency_fund_code.isin(self.iija_defc), + self.faba.transaction_obligated_amount, + ) + ).cast(DecimalType(23, 2)) + ), + "iija_outlayed_amount": ( + sf.sum( + sf.when( + self.faba.disaster_emergency_fund_code.isin(self.iija_defc) + & self.submission_attributes.is_final_balances_for_fy, + self.faba.gross_outlay_amount_by_award_cpe + + self.faba.ussgl487200_down_adj_pri_ppaid_undel_orders_oblig_refund_cpe + + self.faba.ussgl497200_down_adj_pri_paid_deliv_orders_oblig_refund_cpe, + ) + ).cast(DecimalType(23, 2)) + ), + "object_classes_funding_this_award": ( + sf.concat_ws( + ";", + sf.sort_array( + sf.collect_set( + sf.when( + self.submission_attributes.is_final_balances_for_fy + & self.faba.object_class_id.isNotNull(), + sf.concat( + self.object_class.object_class, sf.lit(": "), self.object_class.object_class_name + ), + ) + ) + ), + ) + ), + "program_activities_funding_this_award": ( + sf.concat_ws( + ";", + sf.sort_array( + sf.collect_set( + sf.when( + self.submission_attributes.is_final_balances_for_fy + & self.faba.program_activity_id.isNotNull(), + sf.concat( + self.ref_program_activity.program_activity_code, + sf.lit(": "), + self.ref_program_activity.program_activity_name, + ), + ) + ) + ), + ) + ), + "total_outlayed_amount_for_overall_award": ( + sf.sum( + sf.when( + self.submission_attributes.is_final_balances_for_fy + & self.faba.gross_outlay_amount_by_award_cpe.isNotNull() + & self.faba.ussgl487200_down_adj_pri_ppaid_undel_orders_oblig_refund_cpe.isNotNull() + & self.faba.ussgl497200_down_adj_pri_paid_deliv_orders_oblig_refund_cpe.isNotNull(), + self.faba.gross_outlay_amount_by_award_cpe + + self.faba.ussgl487200_down_adj_pri_ppaid_undel_orders_oblig_refund_cpe + + self.faba.ussgl497200_down_adj_pri_paid_deliv_orders_oblig_refund_cpe, + ) + ).cast(DecimalType(23, 2)) + ), + "treasury_accounts_funding_this_award": ( + sf.concat_ws( + ";", sf.sort_array(sf.collect_set(self.treasury_appropriation_account.tas_rendering_label)) + ) + ), + } + + df = ( + self.faba.filter(self.faba.faba_award_id.isNotNull()) + .join(self.submission_attributes, "submission_id") + .join( + self.disaster_emergency_fund_code, + self.faba.disaster_emergency_fund_code + == self.disaster_emergency_fund_code.code, + "left", + ) + .join( + self.treasury_appropriation_account, + self.faba.treasury_account_id + == self.treasury_appropriation_account.treasury_account_identifier, + ) + .join( + self.federal_account, self.treasury_appropriation_account.federal_account_id == self.federal_account.id + ) + .join(self.object_class, self.faba.object_class_id == self.object_class.id, "left") + .join( + self.ref_program_activity, + self.faba.program_activity_id == self.ref_program_activity.id, + "left", + ) + .groupBy(self.faba.faba_award_id) + .agg(*[agg.alias(name) for name, agg in faba_aggs.items()]) + ) + return df + + @property + def common_cols(self) -> list[column]: + return [ + # --- Award --- + self.award_search.award_id, + self.award_search.generated_unique_award_id, + self.award_search.description.alias("award_description"), + self.award_search.total_obligation, + # TODO: Update to use url_encode Spark SQL function + sf.when( + self.award_search.generated_unique_award_id.isNotNull(), + sf.concat( + sf.lit(AWARD_URL), + sf.expr( + "java_method('java.net.URLEncoder', 'encode', award_search.generated_unique_award_id, 'UTF-8')" + ), + sf.lit("/"), + ), + ) + .otherwise("") + .alias("usaspending_permalink"), + # --- Transaction --- + self.transaction_search.modification_number, + self.transaction_search.transaction_id, + sf.coalesce( + self.transaction_search.published_fabs_id, self.transaction_search.detached_award_procurement_id + ).alias("transaction_broker_id"), + sf.coalesce( + self.transaction_search.afa_generated_unique, self.transaction_search.detached_award_proc_unique + ).alias("transaction_unique_key"), + # Agencies + self.transaction_search.awarding_agency_code, + self.transaction_search.awarding_toptier_agency_name_raw, + self.transaction_search.awarding_sub_tier_agency_c, + self.transaction_search.awarding_subtier_agency_name_raw, + self.transaction_search.funding_agency_code, + self.transaction_search.funding_toptier_agency_name_raw, + self.transaction_search.funding_sub_tier_agency_co, + self.transaction_search.funding_subtier_agency_name_raw, + # Amounts + self.transaction_search.federal_action_obligation, + self.transaction_search.generated_pragmatic_obligation, + # Dates + self.transaction_search.action_date, + sf.year(self.transaction_search.fiscal_action_date).alias("action_date_fiscal_year"), + self.transaction_search.initial_report_date, + self.transaction_search.last_modified_date, + self.transaction_search.period_of_performance_current_end_date, + self.transaction_search.period_of_performance_start_date, + # Offices + self.transaction_search.awarding_office_code, + self.transaction_search.awarding_office_name, + self.transaction_search.funding_office_code, + self.transaction_search.funding_office_name, + # Place of Performance + sf.when( + self.transaction_search.pop_state_code.isNotNull() + & self.transaction_search.pop_congressional_code_current.isNotNull() + & (self.transaction_search.pop_state_code != ""), + sf.concat( + self.transaction_search.pop_state_code, + sf.lit("-"), + self.transaction_search.pop_congressional_code_current, + ), + ).alias("pop_cd_current"), + sf.when( + self.transaction_search.pop_state_code.isNotNull() + & self.transaction_search.pop_congressional_code.isNotNull() + & (self.transaction_search.pop_state_code != ""), + sf.concat( + self.transaction_search.pop_state_code, + sf.lit("-"), + self.transaction_search.pop_congressional_code, + ), + ).alias("pop_cd_original"), + self.transaction_search.pop_city_name, + self.transaction_search.pop_country_code, + self.transaction_search.pop_country_name, + self.transaction_search.pop_county_fips, + self.transaction_search.pop_county_name, + self.transaction_search.pop_state_fips, + self.transaction_search.pop_state_code, + self.transaction_search.pop_state_name, + self.transaction_search.place_of_performance_zip4a, + # Recipient + self.transaction_search.recipient_unique_id, + self.transaction_search.recipient_name, + self.transaction_search.recipient_name_raw, + self.transaction_search.recipient_uei, + self.transaction_search.parent_recipient_unique_id, + self.transaction_search.parent_recipient_name, + self.transaction_search.parent_recipient_name_raw, + self.transaction_search.parent_uei, + # Recipient Location + self.transaction_search.legal_entity_address_line1, + self.transaction_search.legal_entity_address_line2, + sf.when( + self.transaction_search.recipient_location_state_code.isNotNull() + & self.transaction_search.recipient_location_congressional_code_current.isNotNull() + & (self.transaction_search.recipient_location_state_code != ""), + sf.concat( + self.transaction_search.recipient_location_state_code, + sf.lit("-"), + self.transaction_search.recipient_location_congressional_code_current, + ), + ).alias("recipient_location_cd_current"), + sf.when( + self.transaction_search.recipient_location_state_code.isNotNull() + & self.transaction_search.recipient_location_congressional_code.isNotNull() + & (self.transaction_search.recipient_location_state_code != ""), + sf.concat( + self.transaction_search.recipient_location_state_code, + sf.lit("-"), + self.transaction_search.recipient_location_congressional_code, + ), + ).alias("recipient_location_cd_original"), + self.transaction_search.recipient_location_city_name, + self.transaction_search.recipient_location_country_code, + self.transaction_search.recipient_location_country_name, + self.transaction_search.recipient_location_county_fips, + self.transaction_search.recipient_location_county_name, + self.transaction_search.recipient_location_state_code, + self.transaction_search.recipient_location_state_fips, + self.transaction_search.recipient_location_state_name, + self.transaction_search.recipient_location_zip5, + self.transaction_search.legal_entity_zip_last4, + # Typing + self.transaction_search.action_type, + self.transaction_search.action_type_description, + self.transaction_search.is_fpds, + self.transaction_search.transaction_description, + ] + + @property + def fabs_cols(self) -> list[column]: + return [ + # --- Award --- + self.award_search.total_loan_value, + self.award_search.total_subsidy_cost, + self.transaction_search.non_federal_funding_amount.alias("total_non_federal_funding_amount"), + # --- self.Transaction --- + self.transaction_search.fain, + self.transaction_search.uri, + # Amounts + self.transaction_search.face_value_loan_guarantee, + self.transaction_search.indirect_federal_sharing, + self.transaction_search.non_federal_funding_amount, + self.transaction_search.original_loan_subsidy_cost, + # Place of Performance + self.transaction_search.place_of_performance_code, + self.transaction_search.place_of_performance_forei, + self.transaction_search.place_of_performance_scope, + # Recipient Location + self.transaction_search.legal_entity_foreign_city, + self.transaction_search.legal_entity_foreign_posta, + self.transaction_search.legal_entity_foreign_provi, + # Additional FABS Fields + self.transaction_search.business_funds_indicator, + self.transaction_search.business_funds_ind_desc, + self.transaction_search.business_types, + self.transaction_search.business_types_desc, + self.transaction_search.cfda_number, + self.transaction_search.cfda_title, + self.transaction_search.correction_delete_indicatr, + self.transaction_search.correction_delete_ind_desc, + self.transaction_search.funding_opportunity_number, + self.transaction_search.funding_opportunity_goals, + self.transaction_search.record_type, + self.transaction_search.record_type_description, + self.transaction_search.sai_number, + self.transaction_search.type, + self.transaction_search.type_description, + ] + + @property + def fpds_cols(self) -> list[column]: + return [ + # -- self.Transaction --- + self.transaction_search.piid, + self.transaction_search.transaction_number, + # Dates + self.transaction_search.ordering_period_end_date, + self.transaction_search.solicitation_date, + # Officer Amounts + self.transaction_search.officer_1_name, + self.transaction_search.officer_1_amount, + self.transaction_search.officer_2_name, + self.transaction_search.officer_2_amount, + self.transaction_search.officer_3_name, + self.transaction_search.officer_3_amount, + self.transaction_search.officer_4_name, + self.transaction_search.officer_4_amount, + self.transaction_search.officer_5_name, + self.transaction_search.officer_5_amount, + # Parent Award (for IDV) + self.transaction_search.referenced_idv_agency_iden, + self.transaction_search.referenced_idv_agency_desc, + self.transaction_search.parent_award_id, + self.transaction_search.referenced_idv_modificatio, + self.transaction_search.referenced_mult_or_single, + self.transaction_search.referenced_mult_or_si_desc, + self.transaction_search.referenced_idv_type, + self.transaction_search.referenced_idv_type_desc, + # Recipient + self.transaction_search.vendor_phone_number, + self.transaction_search.vendor_fax_number, + self.transaction_search.vendor_doing_as_business_n, + # Recipient Location + self.transaction_search.legal_entity_zip4, + # Additional FPDS Fields + self.transaction_search.a_76_fair_act_action, + self.transaction_search.a_76_fair_act_action_desc, + self.transaction_search.airport_authority, + self.transaction_search.alaskan_native_owned_corpo, + self.transaction_search.alaskan_native_servicing_i, + self.transaction_search.american_indian_owned_busi, + self.transaction_search.asian_pacific_american_own, + self.transaction_search.pulled_from, + self.transaction_search.base_and_all_options_value, + self.transaction_search.base_exercised_options_val, + self.transaction_search.black_american_owned_busin, + self.transaction_search.c1862_land_grant_college, + self.transaction_search.c1890_land_grant_college, + self.transaction_search.c1994_land_grant_college, + self.transaction_search.c8a_program_participant, + self.transaction_search.cage_code, + self.transaction_search.city_local_government, + self.transaction_search.clinger_cohen_act_planning, + self.transaction_search.clinger_cohen_act_pla_desc, + self.transaction_search.commercial_item_acqui_desc, + self.transaction_search.commercial_item_acquisitio, + self.transaction_search.commercial_item_test_progr, + self.transaction_search.commercial_item_test_desc, + self.transaction_search.community_developed_corpor, + self.transaction_search.community_development_corp, + self.transaction_search.consolidated_contract, + self.transaction_search.consolidated_contract_desc, + self.transaction_search.construction_wage_rat_desc, + self.transaction_search.construction_wage_rate_req, + self.transaction_search.contingency_humanitar_desc, + self.transaction_search.contingency_humanitarian_o, + self.transaction_search.contract_award_type, + self.transaction_search.contract_award_type_desc, + self.transaction_search.contract_bundling, + self.transaction_search.contract_bundling_descrip, + self.transaction_search.contract_financing, + self.transaction_search.contract_financing_descrip, + self.transaction_search.contracting_officers_desc, + self.transaction_search.contracting_officers_deter, + self.transaction_search.contracts, + self.transaction_search.corporate_entity_not_tax_e, + self.transaction_search.corporate_entity_tax_exemp, + self.transaction_search.cost_accounting_standards, + self.transaction_search.cost_accounting_stand_desc, + self.transaction_search.cost_or_pricing_data, + self.transaction_search.cost_or_pricing_data_desc, + self.transaction_search.council_of_governments, + self.transaction_search.country_of_product_or_serv, + self.transaction_search.country_of_product_or_desc, + self.transaction_search.county_local_government, + self.transaction_search.current_total_value_award, + self.transaction_search.program_system_or_equipmen, + self.transaction_search.program_system_or_equ_desc, + self.transaction_search.dod_claimant_program_code, + self.transaction_search.dod_claimant_prog_cod_desc, + self.transaction_search.domestic_or_foreign_entity, + self.transaction_search.domestic_or_foreign_e_desc, + self.transaction_search.domestic_shelter, + self.transaction_search.dot_certified_disadvantage, + self.transaction_search.economically_disadvantaged, + self.transaction_search.educational_institution, + self.transaction_search.emerging_small_business, + self.transaction_search.epa_designated_product, + self.transaction_search.epa_designated_produc_desc, + self.transaction_search.evaluated_preference, + self.transaction_search.evaluated_preference_desc, + self.transaction_search.extent_competed, + self.transaction_search.extent_compete_description, + self.transaction_search.fair_opportunity_limited_s, + self.transaction_search.fair_opportunity_limi_desc, + self.transaction_search.fed_biz_opps, + self.transaction_search.fed_biz_opps_description, + self.transaction_search.federal_agency, + self.transaction_search.federally_funded_research, + self.transaction_search.for_profit_organization, + self.transaction_search.foreign_funding, + self.transaction_search.foreign_funding_desc, + self.transaction_search.foreign_government, + self.transaction_search.foreign_owned_and_located, + self.transaction_search.foundation, + self.transaction_search.government_furnished_prope, + self.transaction_search.government_furnished_desc, + self.transaction_search.grants, + self.transaction_search.hispanic_american_owned_bu, + self.transaction_search.hispanic_servicing_institu, + self.transaction_search.historically_black_college, + self.transaction_search.historically_underutilized, + self.transaction_search.hospital_flag, + self.transaction_search.housing_authorities_public, + self.transaction_search.idv_type, + self.transaction_search.idv_type_description, + self.transaction_search.information_technology_com, + self.transaction_search.information_technolog_desc, + self.transaction_search.indian_tribe_federally_rec, + self.transaction_search.inherently_government_func, + self.transaction_search.inherently_government_desc, + self.transaction_search.inter_municipal_local_gove, + self.transaction_search.interagency_contracting_au, + self.transaction_search.interagency_contract_desc, + self.transaction_search.international_organization, + self.transaction_search.interstate_entity, + self.transaction_search.joint_venture_economically, + self.transaction_search.joint_venture_women_owned, + self.transaction_search.labor_standards, + self.transaction_search.labor_standards_descrip, + self.transaction_search.labor_surplus_area_firm, + self.transaction_search.limited_liability_corporat, + self.transaction_search.local_area_set_aside, + self.transaction_search.local_area_set_aside_desc, + self.transaction_search.local_government_owned, + self.transaction_search.major_program, + self.transaction_search.manufacturer_of_goods, + self.transaction_search.materials_supplies_article, + self.transaction_search.materials_supplies_descrip, + self.transaction_search.minority_institution, + self.transaction_search.minority_owned_business, + self.transaction_search.multi_year_contract, + self.transaction_search.multi_year_contract_desc, + self.transaction_search.multiple_or_single_award_i, + self.transaction_search.multiple_or_single_aw_desc, + self.transaction_search.municipality_local_governm, + self.transaction_search.naics_code, + self.transaction_search.naics_description, + self.transaction_search.national_interest_action, + self.transaction_search.national_interest_desc, + self.transaction_search.native_american_owned_busi, + self.transaction_search.native_hawaiian_owned_busi, + self.transaction_search.native_hawaiian_servicing, + self.transaction_search.nonprofit_organization, + self.transaction_search.number_of_actions, + self.transaction_search.number_of_offers_received, + self.transaction_search.organizational_type, + self.transaction_search.other_minority_owned_busin, + self.transaction_search.other_not_for_profit_organ, + self.transaction_search.other_statutory_authority, + self.transaction_search.other_than_full_and_open_c, + self.transaction_search.other_than_full_and_o_desc, + self.transaction_search.partnership_or_limited_lia, + self.transaction_search.performance_based_service, + self.transaction_search.performance_based_se_desc, + self.transaction_search.place_of_manufacture, + self.transaction_search.place_of_manufacture_desc, + self.transaction_search.planning_commission, + self.transaction_search.port_authority, + self.transaction_search.potential_total_value_awar, + self.transaction_search.price_evaluation_adjustmen, + self.transaction_search.private_university_or_coll, + self.transaction_search.program_acronym, + self.transaction_search.product_or_service_code, + self.transaction_search.product_or_service_description, + self.transaction_search.purchase_card_as_payment_m, + self.transaction_search.purchase_card_as_paym_desc, + self.transaction_search.recovered_materials_sustai, + self.transaction_search.recovered_materials_s_desc, + self.transaction_search.research, + self.transaction_search.research_description, + self.transaction_search.sam_exception, + self.transaction_search.sam_exception_description, + self.transaction_search.sba_certified_8_a_joint_ve, + self.transaction_search.school_district_local_gove, + self.transaction_search.school_of_forestry, + self.transaction_search.sea_transportation, + self.transaction_search.sea_transportation_desc, + self.transaction_search.self_certified_small_disad, + self.transaction_search.service_disabled_veteran_o, + self.transaction_search.small_agricultural_coopera, + self.transaction_search.small_business_competitive, + self.transaction_search.small_disadvantaged_busine, + self.transaction_search.sole_proprietorship, + self.transaction_search.solicitation_identifier, + self.transaction_search.state_controlled_instituti, + self.transaction_search.subchapter_s_corporation, + self.transaction_search.subcontinent_asian_asian_i, + self.transaction_search.subcontracting_plan, + self.transaction_search.subcontracting_plan_desc, + self.transaction_search.the_ability_one_program, + self.transaction_search.township_local_government, + self.transaction_search.transit_authority, + self.transaction_search.tribal_college, + self.transaction_search.tribally_owned_business, + self.transaction_search.type_of_contract_pricing, + self.transaction_search.type_of_contract_pric_desc, + self.transaction_search.type_of_idc, + self.transaction_search.type_of_idc_description, + self.transaction_search.type_set_aside, + self.transaction_search.type_set_aside_description, + self.transaction_search.undefinitized_action, + self.transaction_search.undefinitized_action_desc, + self.transaction_search.us_federal_government, + self.transaction_search.us_government_entity, + self.transaction_search.us_local_government, + self.transaction_search.us_state_government, + self.transaction_search.us_tribal_government, + self.transaction_search.veteran_owned_business, + self.transaction_search.veterinary_college, + self.transaction_search.veterinary_hospital, + self.transaction_search.woman_owned_business, + self.transaction_search.women_owned_small_business, + ] + + @property + def dataframe(self) -> DataFrame: + # Capturing number of processes to repartition dataframes for help with memory limits + num_partitions = self.spark.sparkContext.defaultParallelism + logger.info( + f"Repartitioning dataframe to {num_partitions} partitions" + ) + + faba_cols = self.faba_aggs_df.columns + faba_cols.remove("faba_award_id") + + df = self.transaction_search.join( + self.award_search, self.transaction_search.transaction_award_id == self.award_search.award_id + ) + df = df.join( + self.faba_aggs_df, + self.transaction_search.transaction_award_id == self.faba_aggs_df.faba_award_id, + "left", + ) + df = df.select(*self.common_cols, *faba_cols, *self.fabs_cols, *self.fpds_cols) + df = df.withColumn("merge_hash_key", sf.xxhash64("*")) + df = df.repartition(num_partitions) + + return df + + +def load_transaction_download(spark: SparkSession, destination_database: str, destination_table_name: str) -> None: + df = TransactionDownload(spark).dataframe + df.write.saveAsTable( + f"{destination_database}.{destination_table_name}", + mode="overwrite", + format="delta", + ) + + +def load_transaction_download_incremental( + spark: SparkSession, destination_database: str, destination_table_name: str +) -> None: + target = DeltaTable.forName(spark, f"{destination_database}.{destination_table_name}").alias("t") + source = TransactionDownload(spark).dataframe.alias("s") + ( + target.merge(source, "s.transaction_id = t.transaction_id and s.merge_hash_key = t.merge_hash_key") + .whenNotMatchedInsertAll() + .whenNotMatchedBySourceDelete() + .execute() + ) diff --git a/usaspending_api/download/delta_models/transaction_download.py b/usaspending_api/download/delta_models/transaction_download.py index 45869aebdf..a5b2a51032 100644 --- a/usaspending_api/download/delta_models/transaction_download.py +++ b/usaspending_api/download/delta_models/transaction_download.py @@ -7,151 +7,143 @@ StringType, StructField, StructType, + TimestampType, ) _common_schema = StructType( [ # --- Award --- StructField("award_id", LongType(), False), - StructField("award_unique_key", StringType(), False), + StructField("generated_unique_award_id", StringType(), False), StructField("award_description", StringType()), - StructField("total_obligated_amount", DecimalType(23, 2)), + StructField("total_obligation", DecimalType(23, 2)), StructField("usaspending_permalink", StringType(), False), - # --- File C --- - StructField("covid_19_obligated_amount", DecimalType(23, 2)), - StructField("covid_19_outlayed_amount", DecimalType(23, 2)), - StructField("disaster_emergency_fund_codes_for_overall_award", DecimalType(23, 2)), - StructField("federal_accounts_funding_this_award", StringType()), - StructField("iija_obligated_amount", DecimalType(23, 2)), - StructField("iija_outlayed_amount", DecimalType(23, 2)), - StructField("object_classes_funding_this_award", StringType()), - StructField("program_activities_funding_this_award", StringType()), - StructField("total_outlayed_amount_for_overall_award", DecimalType(23, 2)), - StructField("treasury_accounts_funding_this_award", StringType()), # --- Transaction --- StructField("modification_number", StringType()), StructField("transaction_id", LongType(), False), - StructField("transaction_raw_id", LongType(), False), + StructField("transaction_broker_id", IntegerType(), False), StructField("transaction_unique_key", StringType(), False), # Agencies StructField("awarding_agency_code", StringType()), - StructField("awarding_agency_name", StringType()), - StructField("awarding_sub_agency_code", StringType()), - StructField("awarding_sub_agency_name", StringType()), + StructField("awarding_toptier_agency_name_raw", StringType()), + StructField("awarding_sub_tier_agency_c", StringType()), + StructField("awarding_subtier_agency_name_raw", StringType()), StructField("funding_agency_code", StringType()), - StructField("funding_agency_name", StringType()), - StructField("funding_sub_agency_code", StringType()), - StructField("funding_sub_agency_name", StringType()), + StructField("funding_toptier_agency_name_raw", StringType()), + StructField("funding_sub_tier_agency_co", StringType()), + StructField("funding_subtier_agency_name_raw", StringType()), # Amounts StructField("federal_action_obligation", DecimalType(23, 2)), - StructField("generated_pragmatic_obligations", DecimalType(23, 2)), + StructField("generated_pragmatic_obligation", DecimalType(23, 2)), # Dates StructField("action_date", DateType()), StructField("action_date_fiscal_year", IntegerType()), - StructField("initial_report_date", DateType()), - StructField("last_modified_date", DateType()), + StructField("initial_report_date", TimestampType()), + StructField("last_modified_date", TimestampType()), StructField("period_of_performance_current_end_date", DateType()), StructField("period_of_performance_start_date", DateType()), - # Officer Amounts - StructField("highly_compensated_officer_1_name", StringType()), - StructField("highly_compensated_officer_1_amount", DecimalType(23, 2)), - StructField("highly_compensated_officer_2_name", StringType()), - StructField("highly_compensated_officer_2_amount", DecimalType(23, 2)), - StructField("highly_compensated_officer_3_name", StringType()), - StructField("highly_compensated_officer_3_amount", DecimalType(23, 2)), - StructField("highly_compensated_officer_4_name", StringType()), - StructField("highly_compensated_officer_4_amount", DecimalType(23, 2)), - StructField("highly_compensated_officer_5_name", StringType()), - StructField("highly_compensated_officer_5_amount", DecimalType(23, 2)), # Offices StructField("awarding_office_code", StringType()), StructField("awarding_office_name", StringType()), StructField("funding_office_code", StringType()), StructField("funding_office_name", StringType()), # Place of Performance - StructField("place_of_performance_cd_current", StringType()), - StructField("place_of_performance_cd_original", StringType()), - StructField("place_of_performance_city_name", StringType()), - StructField("place_of_performance_country_code", StringType()), - StructField("place_of_performance_country_name", StringType()), - StructField("place_of_performance_county_fips_code", StringType()), - StructField("place_of_performance_county_name", StringType()), - StructField("place_of_performance_state_fips_code", StringType()), - StructField("place_of_performance_state_code", StringType()), - StructField("place_of_performance_state_name", StringType()), - StructField("place_of_performance_zip_4a", StringType()), + StructField("pop_cd_current", StringType()), + StructField("pop_cd_original", StringType()), + StructField("pop_city_name", StringType()), + StructField("pop_country_code", StringType()), + StructField("pop_country_name", StringType()), + StructField("pop_county_fips", StringType()), + StructField("pop_county_name", StringType()), + StructField("pop_state_fips", StringType()), + StructField("pop_state_code", StringType()), + StructField("pop_state_name", StringType()), + StructField("place_of_performance_zip4a", StringType()), # Recipient - StructField("recipient_duns", StringType()), + StructField("recipient_unique_id", StringType()), StructField("recipient_name", StringType()), StructField("recipient_name_raw", StringType()), StructField("recipient_uei", StringType()), - StructField("parent_recipient_duns", StringType()), + StructField("parent_recipient_unique_id", StringType()), StructField("parent_recipient_name", StringType()), StructField("parent_recipient_name_raw", StringType()), - StructField("parent_recipient_uei", StringType()), + StructField("parent_uei", StringType()), # Recipient Location - StructField("recipient_location_address_line_1", StringType()), - StructField("recipient_location_address_line_2", StringType()), + StructField("legal_entity_address_line1", StringType()), + StructField("legal_entity_address_line2", StringType()), StructField("recipient_location_cd_current", StringType()), StructField("recipient_location_cd_original", StringType()), StructField("recipient_location_city_name", StringType()), StructField("recipient_location_country_code", StringType()), StructField("recipient_location_country_name", StringType()), - StructField("recipient_location_county_fips_code", StringType()), + StructField("recipient_location_county_fips", StringType()), StructField("recipient_location_county_name", StringType()), StructField("recipient_location_state_code", StringType()), - StructField("recipient_location_state_fips_code", StringType()), + StructField("recipient_location_state_fips", StringType()), StructField("recipient_location_state_name", StringType()), - StructField("recipient_location_zip_5", StringType()), - StructField("recipient_location_zip_last_4", StringType()), + StructField("recipient_location_zip5", StringType()), + StructField("legal_entity_zip_last4", StringType()), # Typing - StructField("action_type_code", StringType()), + StructField("action_type", StringType()), StructField("action_type_description", StringType()), - StructField("award_type", StringType()), - StructField("award_type_description", StringType()), StructField("is_fpds", BooleanType(), False), StructField("transaction_description", StringType()), ] ) +_faba_agg_schema = StructType( + [ + StructField("covid_19_obligated_amount", DecimalType(23, 2)), + StructField("covid_19_outlayed_amount", DecimalType(23, 2)), + StructField("defc_for_overall_award", StringType()), + StructField("federal_accounts_funding_this_award", StringType()), + StructField("iija_obligated_amount", DecimalType(23, 2)), + StructField("iija_outlayed_amount", DecimalType(23, 2)), + StructField("object_classes_funding_this_award", StringType()), + StructField("program_activities_funding_this_award", StringType()), + StructField("total_outlayed_amount_for_overall_award", DecimalType(23, 2)), + StructField("treasury_accounts_funding_this_award", StringType()), + ] +) + _fabs_specific_schema = StructType( [ # --- Award --- - StructField("total_face_value_of_loan", DecimalType(23, 2)), - StructField("total_loan_subsidy_cost", DecimalType(23, 2)), + StructField("total_loan_value", DecimalType(23, 2)), + StructField("total_subsidy_cost", DecimalType(23, 2)), StructField("total_non_federal_funding_amount", DecimalType(23, 2)), # --- Transaction --- StructField("fain", StringType()), StructField("uri", StringType()), # Amounts - StructField("face_value_of_loan", DecimalType(23, 2)), - StructField("indirect_cost_federal_share_amount", DecimalType(23, 2)), + StructField("face_value_loan_guarantee", DecimalType(23, 2)), + StructField("indirect_federal_sharing", DecimalType(23, 2)), StructField("non_federal_funding_amount", DecimalType(23, 2)), StructField("original_loan_subsidy_cost", DecimalType(23, 2)), # Place of Performance StructField("place_of_performance_code", StringType()), - StructField("place_of_performance_foreign_location", StringType()), + StructField("place_of_performance_forei", StringType()), StructField("place_of_performance_scope", StringType()), # Recipient Location - StructField("recipient_location_foreign_city_name", StringType()), - StructField("recipient_location_foreign_postal_code", StringType()), - StructField("recipient_location_foreign_province_name", StringType()), + StructField("legal_entity_foreign_city", StringType()), + StructField("legal_entity_foreign_posta", StringType()), + StructField("legal_entity_foreign_provi", StringType()), # Additional FABS Fields - StructField("assistance_type_code", StringType()), - StructField("assistance_type_description", StringType()), - StructField("business_funds_indicator_code", StringType()), - StructField("business_funds_indicator_description", StringType()), - StructField("business_types_code", StringType()), - StructField("business_types_description", StringType()), - StructField("cfda_program_code", StringType()), - StructField("cfda_program_description", StringType()), - StructField("correction_delete_indicator_code", StringType()), - StructField("correction_delete_indicator_description", StringType()), - StructField("funding_opportunity_code", StringType()), - StructField("funding_opportunity_description", StringType()), - StructField("record_type_code", StringType()), + StructField("business_funds_indicator", StringType()), + StructField("business_funds_ind_desc", StringType()), + StructField("business_types", StringType()), + StructField("business_types_desc", StringType()), + StructField("cfda_number", StringType()), + StructField("cfda_title", StringType()), + StructField("correction_delete_indicatr", StringType()), + StructField("correction_delete_ind_desc", StringType()), + StructField("funding_opportunity_number", StringType()), + StructField("funding_opportunity_goals", StringType()), + StructField("record_type", IntegerType()), StructField("record_type_description", StringType()), StructField("sai_number", StringType()), + StructField("type", StringType()), + StructField("type_description", StringType()), ] ) @@ -162,210 +154,222 @@ StructField("transaction_number", StringType()), # Dates StructField("ordering_period_end_date", StringType()), - StructField("solicitation_date", StringType()), + StructField("solicitation_date", DateType()), + # Officer Amounts + StructField("officer_1_name", StringType()), + StructField("officer_1_amount", DecimalType(23, 2)), + StructField("officer_2_name", StringType()), + StructField("officer_2_amount", DecimalType(23, 2)), + StructField("officer_3_name", StringType()), + StructField("officer_3_amount", DecimalType(23, 2)), + StructField("officer_4_name", StringType()), + StructField("officer_4_amount", DecimalType(23, 2)), + StructField("officer_5_name", StringType()), + StructField("officer_5_amount", DecimalType(23, 2)), # Parent Award (for IDV) - StructField("parent_award_agency_id", StringType()), - StructField("parent_award_agency_name", StringType()), - StructField("parent_award_id_piid", StringType()), - StructField("parent_award_modification_number", StringType()), - StructField("parent_award_single_or_multiple_code", StringType()), - StructField("parent_award_single_or_multiple_description", StringType()), - StructField("parent_award_type_code", StringType()), - StructField("parent_award_type_description", StringType()), + StructField("referenced_idv_agency_iden", StringType()), + StructField("referenced_idv_agency_desc", StringType()), + StructField("parent_award_id", StringType()), + StructField("referenced_idv_modificatio", StringType()), + StructField("referenced_mult_or_single", StringType()), + StructField("referenced_mult_or_si_desc", StringType()), + StructField("referenced_idv_type", StringType()), + StructField("referenced_idv_type_desc", StringType()), # Recipient - StructField("recipient_phone_number", StringType()), - StructField("recipient_fax_number", StringType()), - StructField("recipient_doing_business_as_name", StringType()), + StructField("vendor_phone_number", StringType()), + StructField("vendor_fax_number", StringType()), + StructField("vendor_doing_as_business_n", StringType()), # Recipient Location - StructField("recipient_location_zip_4", StringType()), + StructField("legal_entity_zip4", StringType()), # Additional FPDS Fields - StructField("1862_land_grant_college", BooleanType()), - StructField("1890_land_grant_college", BooleanType()), - StructField("1994_land_grant_college", BooleanType()), - StructField("a76_fair_act_action_code", StringType()), - StructField("a76_fair_act_action_description", StringType()), + StructField("a_76_fair_act_action", StringType()), + StructField("a_76_fair_act_action_desc", StringType()), StructField("airport_authority", BooleanType()), - StructField("alaskan_native_corporation_owned_firm", BooleanType()), - StructField("alaskan_native_servicing_institution", BooleanType()), - StructField("american_indian_owned_business", BooleanType()), - StructField("asian_pacific_american_owned_business", BooleanType()), - StructField("award_or_idv_flag", StringType()), + StructField("alaskan_native_owned_corpo", BooleanType()), + StructField("alaskan_native_servicing_i", BooleanType()), + StructField("american_indian_owned_busi", BooleanType()), + StructField("asian_pacific_american_own", BooleanType()), + StructField("pulled_from", StringType()), StructField("base_and_all_options_value", StringType()), - StructField("base_and_exercised_options_value", StringType()), - StructField("black_american_owned_business", BooleanType()), + StructField("base_exercised_options_val", StringType()), + StructField("black_american_owned_busin", BooleanType()), + StructField("c1862_land_grant_college", BooleanType()), + StructField("c1890_land_grant_college", BooleanType()), + StructField("c1994_land_grant_college", BooleanType()), StructField("c8a_program_participant", BooleanType()), StructField("cage_code", StringType()), StructField("city_local_government", BooleanType()), - StructField("clinger_cohen_act_planning_code", StringType()), - StructField("clinger_cohen_act_planning_description", StringType()), - StructField("commercial_item_acquisition_procedures_code", StringType()), - StructField("commercial_item_acquisition_procedures_description", StringType()), - StructField("community_developed_corporation_owned_firm", BooleanType()), - StructField("community_development_corporation", BooleanType()), - StructField("consolidated_contract_code", StringType()), - StructField("consolidated_contract_description", StringType()), - StructField("construction_wage_rate_requirements_code", StringType()), - StructField("construction_wage_rate_requirements_description", StringType()), - StructField("contingency_humanitarian_or_peacekeeping_operation_code", StringType()), - StructField("contingency_humanitarian_or_peacekeeping_operation_description", StringType()), - StructField("contract_bundling_code", StringType()), - StructField("contract_bundling_description", StringType()), - StructField("contract_financing_code", StringType()), - StructField("contract_financing_description", StringType()), - StructField("contracting_officers_determination_of_business_size_code", StringType()), - StructField("contracting_officers_determination_of_business_size_description", StringType()), - StructField("corporate_entity_not_tax_exempt", BooleanType()), - StructField("corporate_entity_tax_exempt", BooleanType()), - StructField("cost_accounting_standards_clause_code", StringType()), - StructField("cost_accounting_standards_clause_description", StringType()), - StructField("cost_or_pricing_data_code", StringType()), - StructField("cost_or_pricing_data_description", StringType()), + StructField("clinger_cohen_act_planning", StringType()), + StructField("clinger_cohen_act_pla_desc", StringType()), + StructField("commercial_item_acqui_desc", StringType()), + StructField("commercial_item_acquisitio", StringType()), + StructField("commercial_item_test_progr", StringType()), + StructField("commercial_item_test_desc", StringType()), + StructField("community_developed_corpor", BooleanType()), + StructField("community_development_corp", BooleanType()), + StructField("consolidated_contract", StringType()), + StructField("consolidated_contract_desc", StringType()), + StructField("construction_wage_rat_desc", StringType()), + StructField("construction_wage_rate_req", StringType()), + StructField("contingency_humanitar_desc", StringType()), + StructField("contingency_humanitarian_o", StringType()), + StructField("contract_award_type", StringType()), + StructField("contract_award_type_desc", StringType()), + StructField("contract_bundling", StringType()), + StructField("contract_bundling_descrip", StringType()), + StructField("contract_financing", StringType()), + StructField("contract_financing_descrip", StringType()), + StructField("contracting_officers_desc", StringType()), + StructField("contracting_officers_deter", StringType()), + StructField("contracts", BooleanType()), + StructField("corporate_entity_not_tax_e", BooleanType()), + StructField("corporate_entity_tax_exemp", BooleanType()), + StructField("cost_accounting_standards", StringType()), + StructField("cost_accounting_stand_desc", StringType()), + StructField("cost_or_pricing_data", StringType()), + StructField("cost_or_pricing_data_desc", StringType()), StructField("council_of_governments", BooleanType()), - StructField("country_of_product_or_service_origin_code", StringType()), - StructField("country_of_product_or_service_origin_description", StringType()), + StructField("country_of_product_or_serv", StringType()), + StructField("country_of_product_or_desc", StringType()), StructField("county_local_government", BooleanType()), - StructField("current_total_value_of_award", StringType()), - StructField("dod_acquisition_program_code", StringType()), - StructField("dod_acquisition_program_description", StringType()), + StructField("current_total_value_award", StringType()), + StructField("program_system_or_equipmen", StringType()), + StructField("program_system_or_equ_desc", StringType()), StructField("dod_claimant_program_code", StringType()), - StructField("dod_claimant_program_description", StringType()), - StructField("domestic_or_foreign_entity_code", StringType()), - StructField("domestic_or_foreign_entity_description", StringType()), + StructField("dod_claimant_prog_cod_desc", StringType()), + StructField("domestic_or_foreign_entity", StringType()), + StructField("domestic_or_foreign_e_desc", StringType()), StructField("domestic_shelter", BooleanType()), StructField("dot_certified_disadvantage", BooleanType()), - StructField("economically_disadvantaged_women_owned_small_business", BooleanType()), + StructField("economically_disadvantaged", BooleanType()), StructField("educational_institution", BooleanType()), StructField("emerging_small_business", BooleanType()), - StructField("epa_designated_product_code", StringType()), - StructField("epa_designated_product_description", StringType()), - StructField("evaluated_preference_code", StringType()), - StructField("evaluated_preference_description", StringType()), - StructField("extent_competed_code", StringType()), - StructField("extent_competed_description", StringType()), - StructField("fair_opportunity_limited_sources_code", StringType()), - StructField("fair_opportunity_limited_sources_description", StringType()), - StructField("fed_biz_opps_code", StringType()), + StructField("epa_designated_product", StringType()), + StructField("epa_designated_produc_desc", StringType()), + StructField("evaluated_preference", StringType()), + StructField("evaluated_preference_desc", StringType()), + StructField("extent_competed", StringType()), + StructField("extent_compete_description", StringType()), + StructField("fair_opportunity_limited_s", StringType()), + StructField("fair_opportunity_limi_desc", StringType()), + StructField("fed_biz_opps", StringType()), StructField("fed_biz_opps_description", StringType()), StructField("federal_agency", BooleanType()), - StructField("federally_funded_research_and_development_corp", BooleanType()), + StructField("federally_funded_research", BooleanType()), StructField("for_profit_organization", BooleanType()), - StructField("foreign_funding_code", StringType()), - StructField("foreign_funding_description", StringType()), + StructField("foreign_funding", StringType()), + StructField("foreign_funding_desc", StringType()), StructField("foreign_government", BooleanType()), - StructField("foreign_owned", BooleanType()), + StructField("foreign_owned_and_located", BooleanType()), StructField("foundation", BooleanType()), - StructField("government_furnished_property_code", StringType()), - StructField("government_furnished_property_description", StringType()), - StructField("hispanic_american_owned_business", BooleanType()), - StructField("hispanic_servicing_institution", BooleanType()), + StructField("government_furnished_prope", StringType()), + StructField("government_furnished_desc", StringType()), + StructField("grants", BooleanType()), + StructField("hispanic_american_owned_bu", BooleanType()), + StructField("hispanic_servicing_institu", BooleanType()), StructField("historically_black_college", BooleanType()), - StructField("historically_underutilized_business_zone_hubzone_firm", BooleanType()), + StructField("historically_underutilized", BooleanType()), StructField("hospital_flag", BooleanType()), - StructField("housing_authorities_public_tribal", BooleanType()), - StructField("idv_type_code", StringType()), + StructField("housing_authorities_public", BooleanType()), + StructField("idv_type", StringType()), StructField("idv_type_description", StringType()), - StructField("information_technology_commercial_item_category_code", StringType()), - StructField("information_technology_commercial_item_category_description", StringType()), - StructField("indian_tribe_federally_recognized", BooleanType()), - StructField("inherently_governmental_functions_code", StringType()), - StructField("inherently_governmental_functions_description", StringType()), - StructField("inter_municipal_local_government", BooleanType()), - StructField("interagency_contracting_authority_code", StringType()), - StructField("interagency_contracting_authority_description", StringType()), + StructField("information_technology_com", StringType()), + StructField("information_technolog_desc", StringType()), + StructField("indian_tribe_federally_rec", BooleanType()), + StructField("inherently_government_func", StringType()), + StructField("inherently_government_desc", StringType()), + StructField("inter_municipal_local_gove", BooleanType()), + StructField("interagency_contracting_au", StringType()), + StructField("interagency_contract_desc", StringType()), StructField("international_organization", BooleanType()), StructField("interstate_entity", BooleanType()), - StructField("joint_venture_economic_disadvantaged_women_owned_small_bus", BooleanType()), - StructField("joint_venture_women_owned_small_business", BooleanType()), - StructField("labor_standards_code", StringType()), - StructField("labor_standards_description", StringType()), + StructField("joint_venture_economically", BooleanType()), + StructField("joint_venture_women_owned", BooleanType()), + StructField("labor_standards", StringType()), + StructField("labor_standards_descrip", StringType()), StructField("labor_surplus_area_firm", BooleanType()), - StructField("limited_liability_corporation", BooleanType()), - StructField("local_area_set_aside_code", StringType()), - StructField("local_area_set_aside_description", StringType()), + StructField("limited_liability_corporat", BooleanType()), + StructField("local_area_set_aside", StringType()), + StructField("local_area_set_aside_desc", StringType()), StructField("local_government_owned", BooleanType()), StructField("major_program", StringType()), StructField("manufacturer_of_goods", BooleanType()), - StructField("materials_supplies_articles_equipment_code", StringType()), - StructField("materials_supplies_articles_equipment_description", StringType()), + StructField("materials_supplies_article", StringType()), + StructField("materials_supplies_descrip", StringType()), StructField("minority_institution", BooleanType()), StructField("minority_owned_business", BooleanType()), - StructField("multi_year_contract_code", StringType()), - StructField("multi_year_contract_description", StringType()), - StructField("multiple_or_single_award_idv_code", StringType()), - StructField("multiple_or_single_award_idv_description", StringType()), - StructField("municipality_local_government", BooleanType()), + StructField("multi_year_contract", StringType()), + StructField("multi_year_contract_desc", StringType()), + StructField("multiple_or_single_award_i", StringType()), + StructField("multiple_or_single_aw_desc", StringType()), + StructField("municipality_local_governm", BooleanType()), StructField("naics_code", StringType()), StructField("naics_description", StringType()), - StructField("national_interest_action_code", StringType()), - StructField("national_interest_action_description", StringType()), - StructField("native_american_owned_business", BooleanType()), - StructField("native_hawaiian_organization_owned_firm", BooleanType()), - StructField("native_hawaiian_servicing_institution", BooleanType()), + StructField("national_interest_action", StringType()), + StructField("national_interest_desc", StringType()), + StructField("native_american_owned_busi", BooleanType()), + StructField("native_hawaiian_owned_busi", BooleanType()), + StructField("native_hawaiian_servicing", BooleanType()), StructField("nonprofit_organization", BooleanType()), StructField("number_of_actions", StringType()), StructField("number_of_offers_received", StringType()), StructField("organizational_type", StringType()), - StructField("other_minority_owned_business", BooleanType()), - StructField("other_not_for_profit_organization", BooleanType()), + StructField("other_minority_owned_busin", BooleanType()), + StructField("other_not_for_profit_organ", BooleanType()), StructField("other_statutory_authority", StringType()), - StructField("other_than_full_and_open_competition_code", StringType()), - StructField("other_than_full_and_open_competition_description", StringType()), - StructField("partnership_or_limited_liability_partnership", BooleanType()), - StructField("performance_based_service_acquisition_code", StringType()), - StructField("performance_based_service_acquisition_description", StringType()), - StructField("place_of_manufacture_code", StringType()), - StructField("place_of_manufacture_description", StringType()), + StructField("other_than_full_and_open_c", StringType()), + StructField("other_than_full_and_o_desc", StringType()), + StructField("partnership_or_limited_lia", BooleanType()), + StructField("performance_based_service", StringType()), + StructField("performance_based_se_desc", StringType()), + StructField("place_of_manufacture", StringType()), + StructField("place_of_manufacture_desc", StringType()), StructField("planning_commission", BooleanType()), StructField("port_authority", BooleanType()), - StructField("potential_total_value_of_award", StringType()), - StructField("price_evaluation_adjustment_preference_percent_difference", StringType()), - StructField("private_university_or_college", BooleanType()), + StructField("potential_total_value_awar", StringType()), + StructField("price_evaluation_adjustmen", StringType()), + StructField("private_university_or_coll", BooleanType()), StructField("program_acronym", StringType()), StructField("product_or_service_code", StringType()), - StructField("product_or_service_code_description", StringType()), - StructField("purchase_card_as_payment_method_code", StringType()), - StructField("purchase_card_as_payment_method_description", StringType()), - StructField("receives_contracts", BooleanType()), - StructField("receives_contracts_and_financial_assistance", BooleanType()), - StructField("receives_financial_assistance", BooleanType()), - StructField("recovered_materials_sustainability_code", StringType()), - StructField("recovered_materials_sustainability_description", StringType()), - StructField("research_code", StringType()), + StructField("product_or_service_description", StringType()), + StructField("purchase_card_as_payment_m", StringType()), + StructField("purchase_card_as_paym_desc", StringType()), + StructField("recovered_materials_sustai", StringType()), + StructField("recovered_materials_s_desc", StringType()), + StructField("research", StringType()), StructField("research_description", StringType()), - StructField("sam_exception_code", StringType()), + StructField("sam_exception", StringType()), StructField("sam_exception_description", StringType()), - StructField("sba_certified_8a_joint_venture", BooleanType()), - StructField("school_district_local_government", BooleanType()), + StructField("sba_certified_8_a_joint_ve", BooleanType()), + StructField("school_district_local_gove", BooleanType()), StructField("school_of_forestry", BooleanType()), - StructField("sea_transportation_code", StringType()), - StructField("sea_transportation_description", StringType()), - StructField("self_certified_small_disadvantaged_business", BooleanType()), - StructField("service_disabled_veteran_owned_business", BooleanType()), - StructField("simplified_procedures_for_certain_commercial_items_code", StringType()), - StructField("simplified_procedures_for_certain_commercial_items_description", StringType()), - StructField("small_agricultural_cooperative", BooleanType()), - StructField("small_business_competitiveness_demonstration_program", BooleanType()), - StructField("small_disadvantaged_business", BooleanType()), + StructField("sea_transportation", StringType()), + StructField("sea_transportation_desc", StringType()), + StructField("self_certified_small_disad", BooleanType()), + StructField("service_disabled_veteran_o", BooleanType()), + StructField("small_agricultural_coopera", BooleanType()), + StructField("small_business_competitive", BooleanType()), + StructField("small_disadvantaged_busine", BooleanType()), StructField("sole_proprietorship", BooleanType()), StructField("solicitation_identifier", StringType()), - StructField("state_controlled_institution_of_higher_learning", BooleanType()), + StructField("state_controlled_instituti", BooleanType()), StructField("subchapter_s_corporation", BooleanType()), - StructField("subcontinent_asian_asian_indian_american_owned_business", BooleanType()), - StructField("subcontracting_plan_code", StringType()), - StructField("subcontracting_plan_description", StringType()), + StructField("subcontinent_asian_asian_i", BooleanType()), + StructField("subcontracting_plan", StringType()), + StructField("subcontracting_plan_desc", StringType()), StructField("the_ability_one_program", BooleanType()), StructField("township_local_government", BooleanType()), StructField("transit_authority", BooleanType()), StructField("tribal_college", BooleanType()), - StructField("tribally_owned_firm", BooleanType()), - StructField("type_of_contract_pricing_code", StringType()), - StructField("type_of_contract_pricing_description", StringType()), - StructField("type_of_idc_code", StringType()), + StructField("tribally_owned_business", BooleanType()), + StructField("type_of_contract_pricing", StringType()), + StructField("type_of_contract_pric_desc", StringType()), + StructField("type_of_idc", StringType()), StructField("type_of_idc_description", StringType()), - StructField("type_of_set_aside_code", StringType()), - StructField("type_of_set_aside_description", StringType()), - StructField("undefinitized_action_code", StringType()), - StructField("undefinitized_action_description", StringType()), + StructField("type_set_aside", StringType()), + StructField("type_set_aside_description", StringType()), + StructField("undefinitized_action", StringType()), + StructField("undefinitized_action_desc", StringType()), StructField("us_federal_government", BooleanType()), StructField("us_government_entity", BooleanType()), StructField("us_local_government", BooleanType()), @@ -379,4 +383,12 @@ ] ) -transaction_download_schema = StructType([*_common_schema, *_fabs_specific_schema, *_fpds_specific_schema]) +transaction_download_schema = StructType( + [ + *_common_schema, + *_faba_agg_schema, + *_fabs_specific_schema, + *_fpds_specific_schema, + StructField("merge_hash_key", LongType()), + ] +) diff --git a/usaspending_api/etl/management/commands/load_query_to_delta.py b/usaspending_api/etl/management/commands/load_query_to_delta.py index aa58341cd9..5404dc7b01 100644 --- a/usaspending_api/etl/management/commands/load_query_to_delta.py +++ b/usaspending_api/etl/management/commands/load_query_to_delta.py @@ -2,7 +2,7 @@ from argparse import ArgumentTypeError from typing import Callable -from django.core.management.base import BaseCommand +from django.core.management.base import BaseCommand, CommandParser from pyspark.sql import SparkSession from usaspending_api.common.etl.spark import create_ref_temp_views @@ -30,12 +30,18 @@ load_award_financial, load_award_financial_incremental, ) +from usaspending_api.download.delta_models.dataframes.transaction_download import ( + load_transaction_download, + load_transaction_download_incremental, +) from usaspending_api.download.delta_models.object_class_program_activity_download import ( load_object_class_program_activity, load_object_class_program_activity_incremental, object_class_program_activity_schema, ) -from usaspending_api.download.delta_models.transaction_download import transaction_download_schema +from usaspending_api.download.delta_models.transaction_download import ( + transaction_download_schema, +) from usaspending_api.recipient.delta_models import ( RECIPIENT_LOOKUP_POSTGRES_COLUMNS, RECIPIENT_PROFILE_POSTGRES_COLUMNS, @@ -57,7 +63,10 @@ AWARD_SEARCH_POSTGRES_GOLD_COLUMNS, award_search_create_sql_string, ) -from usaspending_api.search.delta_models.dataframes.award_search import load_award_search, load_award_search_incremental +from usaspending_api.search.delta_models.dataframes.award_search import ( + load_award_search, + load_award_search_incremental, +) from usaspending_api.search.delta_models.dataframes.transaction_search import ( load_transaction_search, load_transaction_search_incremental, @@ -69,7 +78,12 @@ subaward_search_create_sql_string, subaward_search_load_sql_string, ) -from usaspending_api.search.models import AwardSearch, SubawardSearch, SummaryStateView, TransactionSearch +from usaspending_api.search.models import ( + AwardSearch, + SubawardSearch, + SummaryStateView, + TransactionSearch, +) from usaspending_api.settings import HOST from usaspending_api.transactions.delta_models import ( SUMMARY_STATE_VIEW_COLUMNS, @@ -276,8 +290,14 @@ "partition_keys": ["is_fpds"], "partitioning_form": "LIST", "partitions": [ - {"table_suffix": "_fpds", "partitioning_clause": "FOR VALUES IN (TRUE)"}, - {"table_suffix": "_fabs", "partitioning_clause": "FOR VALUES IN (FALSE)"}, + { + "table_suffix": "_fpds", + "partitioning_clause": "FOR VALUES IN (TRUE)", + }, + { + "table_suffix": "_fabs", + "partitioning_clause": "FOR VALUES IN (FALSE)", + }, ], }, "delta_table_create_partitions": None, @@ -372,7 +392,10 @@ "postgres_seq_name": None, "tsvectors": None, "postgres_partition_spec": None, - "delta_table_create_partitions": ["reporting_fiscal_year", "funding_toptier_agency_id"], + "delta_table_create_partitions": [ + "reporting_fiscal_year", + "funding_toptier_agency_id", + ], }, "award_financial_download": { "model": None, @@ -395,7 +418,10 @@ "postgres_seq_name": None, "tsvectors": None, "postgres_partition_spec": None, - "delta_table_create_partitions": ["reporting_fiscal_year", "funding_toptier_agency_id"], + "delta_table_create_partitions": [ + "reporting_fiscal_year", + "funding_toptier_agency_id", + ], }, "object_class_program_activity_download": { "model": None, @@ -418,13 +444,16 @@ "postgres_seq_name": None, "tsvectors": None, "postgres_partition_spec": None, - "delta_table_create_partitions": ["reporting_fiscal_year", "funding_toptier_agency_id"], + "delta_table_create_partitions": [ + "reporting_fiscal_year", + "funding_toptier_agency_id", + ], }, "transaction_download": { "model": None, "is_from_broker": False, - "source_query": None, - "source_query_incremental": None, + "source_query": load_transaction_download, + "source_query_incremental": load_transaction_download_incremental, "source_database": None, "source_table": None, "destination_database": "rpt", @@ -432,7 +461,7 @@ "swap_schema": None, "partition_column": "transaction_id", "partition_column_type": "numeric", - "is_partition_column_unique": False, + "is_partition_column_unique": True, "delta_table_create_sql": transaction_download_schema, "delta_table_create_options": {"delta.enableChangeDataFeed": True}, "source_schema": None, @@ -441,7 +470,11 @@ "postgres_seq_name": None, "tsvectors": None, "postgres_partition_spec": None, - "delta_table_create_partitions": ["awarding_agency_code", "is_fpds", "action_date_fiscal_year"], + "delta_table_create_partitions": [ + "awarding_agency_code", + "is_fpds", + "action_date_fiscal_year", + ], }, } @@ -458,7 +491,7 @@ class Command(BaseCommand): destination_table_name: str spark: SparkSession - def add_arguments(self, parser): + def add_arguments(self, parser: CommandParser) -> None: parser.add_argument( "--destination-table", type=str, @@ -486,7 +519,7 @@ def add_arguments(self, parser): help="Whether or not the table will be updated incrementally", ) - def handle(self, *args, **options): + def handle(self, *args, **options) -> None: extra_conf = { # Config for Delta Lake tables and SQL. Need these to keep Dela table metadata in the metastore "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension", @@ -506,12 +539,20 @@ def handle(self, *args, **options): # Resolve Parameters destination_table = options["destination_table"] table_spec = TABLE_SPEC[destination_table] - self.destination_database = options["alt_db"] or table_spec["destination_database"] - self.destination_table_name = options["alt_name"] or destination_table.split(".")[-1] - source_query_key = "source_query_incremental" if options["incremental"] else "source_query" + self.destination_database = ( + options["alt_db"] or table_spec["destination_database"] + ) + self.destination_table_name = ( + options["alt_name"] or destination_table.split(".")[-1] + ) + source_query_key = ( + "source_query_incremental" if options["incremental"] else "source_query" + ) load_query = table_spec.get(source_query_key) if load_query is None: - raise ArgumentTypeError(f"Invalid source query. `{source_query_key}` must be specified in the TABLE_SPEC.") + raise ArgumentTypeError( + f"Invalid source query. `{source_query_key}` must be specified in the TABLE_SPEC." + ) # Set the database that will be interacted with for all Delta Lake table Spark-based activity logger.info(f"Using Spark Database: {self.destination_database}") @@ -526,7 +567,9 @@ def handle(self, *args, **options): if isinstance(load_query, list): for index, query in enumerate(load_query): - logger.info(f"Running query number: {index + 1}\nPreview of query: {query[:100]}") + logger.info( + f"Running query number: {index + 1}\nPreview of query: {query[:100]}" + ) self.run_spark_sql(query) else: self.run_spark_sql(load_query) @@ -534,7 +577,9 @@ def handle(self, *args, **options): if spark_created_by_command: self.spark.stop() - def run_spark_sql(self, query: str | Callable[[SparkSession, str, str], None]): + def run_spark_sql( + self, query: str | Callable[[SparkSession, str, str], None] + ) -> None: if isinstance(query, str): jdbc_conn_props = get_jdbc_connection_properties() self.spark.sql( @@ -551,4 +596,6 @@ def run_spark_sql(self, query: str | Callable[[SparkSession, str, str], None]): elif isinstance(query, Callable): query(self.spark, self.destination_database, self.destination_table_name) else: - raise ArgumentTypeError(f"Invalid query. `{query}` must be a string or a Callable.") + raise ArgumentTypeError( + f"Invalid query. `{query}` must be a string or a Callable." + ) diff --git a/usaspending_api/etl/tests/integration/test_load_to_from_delta.py b/usaspending_api/etl/tests/integration/test_load_to_from_delta.py index dcd0b38ab8..1b71c7aa04 100644 --- a/usaspending_api/etl/tests/integration/test_load_to_from_delta.py +++ b/usaspending_api/etl/tests/integration/test_load_to_from_delta.py @@ -19,6 +19,9 @@ from pyspark.sql import SparkSession from usaspending_api.common.helpers.sql_helpers import get_database_dsn_string +from usaspending_api.download.delta_models.dataframes.transaction_download import ( + TransactionDownload, +) from usaspending_api.etl.award_helpers import update_awards from usaspending_api.etl.broker_etl_helpers import dictfetchall from usaspending_api.etl.management.commands.create_delta_table import ( @@ -1488,3 +1491,170 @@ def test_load_account_balances_download( load_command="load_query_to_delta", dummy_data=[], ) + + +@pytest.mark.django_db( + databases=[settings.BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True +) +def test_load_transaction_download( + spark, + s3_unittest_data_bucket, + populate_usas_data_and_recipients_from_broker, + hive_unittest_metastore_db, +): + load_delta_table_from_postgres("published_fabs", s3_unittest_data_bucket) + load_delta_table_from_postgres( + "detached_award_procurement", s3_unittest_data_bucket + ) + + tables_to_load = [ + "awards", + "financial_accounts_by_awards", + "recipient_lookup", + "recipient_profile", + "sam_recipient", + "transaction_current_cd_lookup", + "transaction_fabs", + "transaction_fpds", + "transaction_normalized", + "zips", + ] + + create_and_load_all_delta_tables(spark, s3_unittest_data_bucket, tables_to_load) + verify_delta_table_loaded_to_delta( + spark, + "award_search", + s3_unittest_data_bucket, + load_command="load_query_to_delta", + ) + verify_delta_table_loaded_to_delta( + spark, + "transaction_search", + s3_unittest_data_bucket, + load_command="load_query_to_delta", + ignore_fields=["award_update_date", "etl_update_date"], + ) + + tables_to_load = [ + "award_search", + "transaction_search", + ] + create_and_load_all_delta_tables(spark, s3_unittest_data_bucket, tables_to_load) + + call_command( + "create_delta_table", + "--destination-table=transaction_download", + f"--spark-s3-bucket={s3_unittest_data_bucket}", + ) + + expected_data = [ + { + "transaction_id": 1, + "award_id": 1, + "covid_19_obligated_amount": 2.0, + "covid_19_outlayed_amount": 2.0, + "defc_for_overall_award": "L: DEFC L Public Law;M: DEFC M Public Law", + "federal_accounts_funding_this_award": "123-4567", + "iija_obligated_amount": None, + "iija_outlayed_amount": None, + "object_classes_funding_this_award": "", + "program_activities_funding_this_award": "0001: OFFICE OF THE SECRETARY;0002: OPERATIONS AND MAINTENANCE", + "total_outlayed_amount_for_overall_award": 2.0, + "treasury_accounts_funding_this_award": "123-X-4567-000", + }, + { + "transaction_id": 2, + "award_id": 1, + "covid_19_obligated_amount": 2.0, + "covid_19_outlayed_amount": 2.0, + "defc_for_overall_award": "L: DEFC L Public Law;M: DEFC M Public Law", + "federal_accounts_funding_this_award": "123-4567", + "iija_obligated_amount": None, + "iija_outlayed_amount": None, + "object_classes_funding_this_award": "", + "program_activities_funding_this_award": "0001: OFFICE OF THE SECRETARY;0002: OPERATIONS AND MAINTENANCE", + "total_outlayed_amount_for_overall_award": 2.0, + "treasury_accounts_funding_this_award": "123-X-4567-000", + }, + { + "transaction_id": 3, + "award_id": 2, + "covid_19_obligated_amount": None, + "covid_19_outlayed_amount": None, + "defc_for_overall_award": "Q: DEFC Q Public Law", + "federal_accounts_funding_this_award": "123-4567", + "iija_obligated_amount": None, + "iija_outlayed_amount": None, + "object_classes_funding_this_award": "", + "program_activities_funding_this_award": "0003: TRAINING AND RECRUITING", + "total_outlayed_amount_for_overall_award": 3.0, + "treasury_accounts_funding_this_award": "123-X-4567-000", + }, + { + "transaction_id": 4, + "award_id": 2, + "covid_19_obligated_amount": None, + "covid_19_outlayed_amount": None, + "defc_for_overall_award": "Q: DEFC Q Public Law", + "federal_accounts_funding_this_award": "123-4567", + "iija_obligated_amount": None, + "iija_outlayed_amount": None, + "object_classes_funding_this_award": "", + "program_activities_funding_this_award": "0003: TRAINING AND RECRUITING", + "total_outlayed_amount_for_overall_award": 3.0, + "treasury_accounts_funding_this_award": "123-X-4567-000", + }, + { + "transaction_id": 5, + "award_id": 4, + "covid_19_obligated_amount": None, + "covid_19_outlayed_amount": None, + "defc_for_overall_award": None, + "federal_accounts_funding_this_award": None, + "iija_obligated_amount": None, + "iija_outlayed_amount": None, + "object_classes_funding_this_award": None, + "program_activities_funding_this_award": None, + "total_outlayed_amount_for_overall_award": None, + "treasury_accounts_funding_this_award": None, + }, + { + "transaction_id": 434, + "award_id": 3, + "covid_19_obligated_amount": None, + "covid_19_outlayed_amount": None, + "defc_for_overall_award": None, + "federal_accounts_funding_this_award": None, + "iija_obligated_amount": None, + "iija_outlayed_amount": None, + "object_classes_funding_this_award": None, + "program_activities_funding_this_award": None, + "total_outlayed_amount_for_overall_award": None, + "treasury_accounts_funding_this_award": None, + }, + ] + + all_columns = set(TransactionDownload(spark).dataframe.columns) + expected_columns = { + "transaction_id", + "award_id", + "covid_19_obligated_amount", + "covid_19_outlayed_amount", + "defc_for_overall_award", + "federal_accounts_funding_this_award", + "iija_obligated_amount", + "iija_outlayed_amount", + "object_classes_funding_this_award", + "program_activities_funding_this_award", + "total_outlayed_amount_for_overall_award", + "treasury_accounts_funding_this_award", + } + + verify_delta_table_loaded_to_delta( + spark, + "transaction_download", + s3_unittest_data_bucket, + load_command="load_query_to_delta", + dummy_data=expected_data, + ignore_fields=list(all_columns - expected_columns), + ) diff --git a/usaspending_api/tests/conftest_spark.py b/usaspending_api/tests/conftest_spark.py index 60e2a87a9e..7c212bfe1d 100644 --- a/usaspending_api/tests/conftest_spark.py +++ b/usaspending_api/tests/conftest_spark.py @@ -468,19 +468,33 @@ def _build_usas_data_for_spark(): defc_l = baker.make( "references.DisasterEmergencyFundCode", code="L", + title="DEFC L", group_name="covid_19", + public_law="DEFC L Public Law", _fill_optional=True, ) defc_m = baker.make( "references.DisasterEmergencyFundCode", code="M", + title="DEFC M", group_name="covid_19", + public_law="DEFC M Public Law", _fill_optional=True, ) defc_q = baker.make( "references.DisasterEmergencyFundCode", code="Q", + title="DEFC Q", group_name=None, + public_law="DEFC Q Public Law", + _fill_optional=True, + ) + baker.make( + "references.DisasterEmergencyFundCode", + code="Z", + title="DEFC Z", + group_name="infrastructure", + public_law="DEFC Z Public Law", _fill_optional=True, ) rpa_1 = baker.make( @@ -506,11 +520,13 @@ def _build_usas_data_for_spark(): federal_account = baker.make( "accounts.FederalAccount", parent_toptier_agency=funding_toptier_agency, + federal_account_code="123-4567", _fill_optional=True, ) tas = baker.make( "accounts.TreasuryAppropriationAccount", federal_account=federal_account, + tas_rendering_label="123-X-4567-000", allocation_transfer_agency_id=None, _fill_optional=True, )