diff --git a/scripts/helpers/athena_helpers.py b/scripts/helpers/athena_helpers.py index 2512bf8be..4149d041f 100644 --- a/scripts/helpers/athena_helpers.py +++ b/scripts/helpers/athena_helpers.py @@ -276,6 +276,29 @@ def empty_s3_partition(s3_table_output_location: str) -> None: logger.info(f"S3 partition at {s3_local_for_physical_date} emptied successfully.") +def table_exists(database_name: str, table_name: str) -> bool: + """Check if a table exists in the Glue catalog. + + Parameters + ---------- + database_name : str + the name of the database in which the table resides + table_name : str + the name of the table to check for existence + + Returns + ------- + bool + True if the table exists, False otherwise + """ + glue_client = boto3.client("glue") + try: + glue_client.get_table(DatabaseName=database_name, Name=table_name) + return True + except glue_client.exceptions.EntityNotFoundException: + return False + + def create_or_update_table( sql_query_body: str, database_name: str, @@ -286,26 +309,42 @@ def create_or_update_table( ) -> None: """ Creates or updates an Athena table using the provided SQL query. + Uses INSERT INTO for updating the table with new data or CTAS for creating a new + table. """ - create_table_header = f""" - CREATE TABLE "{database_name}"."{table_name}" - WITH ( - format = 'PARQUET', - write_compression = 'SNAPPY', - external_location = '{s3_table_output_location}', - partitioned_by = ARRAY['import_year', 'import_month', 'import_day', 'import_date'] - ) AS - """ - full_query_ctas = create_table_header + sql_query_body - - run_query_on_athena( - query=full_query_ctas, - database_name=database_name, - output_location=s3_temp_path_for_cache, - fetch_results=False, - kms_key=kms_key, - ) - logger.info("Table created or updated successfully.") + if table_exists(database_name, table_name): + insert_query = f""" + INSERT INTO `{database_name}`.`{table_name}` + {sql_query_body} + """ + run_query_on_athena( + query=insert_query, + database_name=database_name, + output_location=s3_temp_path_for_cache, + fetch_results=False, + kms_key=kms_key, + ) + logger.info("Data inserted into the table successfully.") + else: + create_table_header = f""" + CREATE TABLE "{database_name}"."{table_name}" + WITH ( + format = 'PARQUET', + write_compression = 'SNAPPY', + external_location = '{s3_table_output_location}', + partitioned_by = ARRAY['import_year', 'import_month', 'import_day', 'import_date'] + ) AS + """ + full_query_ctas = create_table_header + sql_query_body + + run_query_on_athena( + query=full_query_ctas, + database_name=database_name, + output_location=s3_temp_path_for_cache, + fetch_results=False, + kms_key=kms_key, + ) + logger.info("New table created successfully.") def repair_table( @@ -355,7 +394,6 @@ def create_update_table_with_partition( "-prod-", f"-{environment}-" ) try: - drop_table(database_name, table_name, s3_temp_path_for_cache) empty_s3_partition(s3_table_output_location) create_or_update_table( query_on_athena, @@ -365,7 +403,6 @@ def create_update_table_with_partition( s3_temp_path_for_cache, kms_key, ) - repair_table(database_name, table_name, s3_temp_path_for_cache) except Exception as e: logger.error(f"An error occurred while executing the query: {e}") raise