Skip to content

Feature/rally features main merged #71

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 77 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
8754e17
Move disk_to_s3 and some Slack callbacks from Rally; overload xcom_pu…
jayckaiser Dec 1, 2023
6a5460c
Move Sharefile Airflow objects from Rally.
jayckaiser Dec 1, 2023
3957079
Whitespace cleanup in s3_to_postgres.py.
jayckaiser Dec 1, 2023
6c84ccf
Refactor project to move Python callables into their own location; ad…
jayckaiser Dec 1, 2023
0963da6
Move JSONL and snake_case functionality from Rally.
jayckaiser Dec 1, 2023
60adfd5
Rename callables.airflow to callables.variable to prevent a name-coll…
jayckaiser Dec 1, 2023
f5c86e2
Migrate S3-to-Snowflake from Rally.
jayckaiser Dec 1, 2023
2bf9306
Update google hook import to current iteration (untested).
jayckaiser Dec 1, 2023
114a951
Clean up code in callables.snowflake.
jayckaiser Dec 1, 2023
1608081
Add type-hints to s3 callables where missing; minor code clean-up.
jayckaiser Dec 11, 2023
7c0e92d
Move slack callbacks into callables subdirectory.
jayckaiser Dec 11, 2023
6552c88
Force line endings to LF.
jayckaiser Jan 2, 2024
c71e2b6
Overload Slack alert typing to accept exception strings or exceptions.
jayckaiser Jan 3, 2024
4e7dad7
Add FTP and ZIP util callables (TODO: test).
jayckaiser Jan 3, 2024
9cf28ca
Add Airflow and SQL helpers.
jayckaiser Jan 3, 2024
56ebab0
Fix import errors.
jayckaiser Jan 4, 2024
133dfb2
Refactor project to alias original submodule pathing to new location …
jayckaiser Jan 4, 2024
f8063cc
Fix revised xcom_pull_template logic to check the subclass of task_ids.
jayckaiser Jan 4, 2024
5bbff9c
Add custom EA SFTP Hook to project.
jayckaiser Jan 4, 2024
5f3588d
Minor cleanup before first attempted run.
jayckaiser Jan 4, 2024
cc5ac4c
Move DBT run-operation operator into providers submodule.
jayckaiser Jan 4, 2024
6cb4905
Move s3 Loop operator into providers submodule.
jayckaiser Jan 4, 2024
3d3c1ce
Move SSM helper class into new callables file.
jayckaiser Jan 4, 2024
718f127
Use lazy approach for rerouting deprecated imports (untested).
jayckaiser Jan 5, 2024
7e53d40
Fix DBT operator import.
jayckaiser Jan 6, 2024
2290cd5
Fix S3-to-Snowflake DAG import error.
jayckaiser Jan 6, 2024
e566242
Fix gspread type errors.
jayckaiser Jan 6, 2024
c7fa7ba
Fix gspread type errors (missed one).
jayckaiser Jan 6, 2024
8c7c92a
Attempt refactor of deprecated imports mapping.
jayckaiser Jan 6, 2024
d87af8d
Fix bug in path-renames.
jayckaiser Jan 7, 2024
970e799
Add sharefile list_sharefile_objects callable.
jayckaiser Jan 7, 2024
946b851
Update sharefile.list_sharefile_objects to return only the folder name.
jayckaiser Jan 7, 2024
eb831df
Add error var where missing; move errors to end of all Slack messages.
jayckaiser Jan 13, 2024
bcda835
Clean up all slack callbacks (todo: inject http_conn_id)
jayckaiser Jan 13, 2024
f3f77ae
Add type hints to all slack callbacks.
jayckaiser Jan 13, 2024
38d7ce9
Clean up DAG inits across repo.
jayckaiser Jan 13, 2024
e9b0597
Add optional slack_conn_id to all DAGs.
jayckaiser Jan 13, 2024
41fd014
Populate all slack alerts with slack_conn_id-get from UDMs.
jayckaiser Jan 13, 2024
08f19e5
Fix bugs in DBT Run-Operation Operator and DBT DAG.
jayckaiser Jan 13, 2024
e945036
Fix argument-naming bug in DBT opt-swap.
jayckaiser Jan 14, 2024
07d5641
Fix bug in DBTRunOperationOperator.
jayckaiser Jan 14, 2024
7f4c9ef
Fix bug in DBTRunOperationOperator where CMD pieces were misordered.
jayckaiser Jan 14, 2024
b3fc6c8
Revert latest arguments swap; add try-except wrapper to force the tas…
jayckaiser Jan 19, 2024
d617d74
Revert try-except code update.
jayckaiser Jan 19, 2024
42696fb
Add views runs after opt-swap.
jayckaiser Jan 19, 2024
0e4ac24
Fix bug where DBT task IDs were duplicated.
jayckaiser Jan 19, 2024
7c32220
Fix bug where DBT task IDs were duplicated.
jayckaiser Jan 19, 2024
aa75b66
Fix bug in DBT Run Operation Operator.
jayckaiser Jan 23, 2024
10fb4bc
Fix bug in DBT Run Operation Operator.
jayckaiser Jan 23, 2024
d89e8da
Make DbtRunOperationOperator arguments a dictionary.
jayckaiser Jan 23, 2024
a60e1d0
Add cleanup and TODOs across project.
jayckaiser Jan 23, 2024
77f002b
Fix f-string bug.
jayckaiser Jan 23, 2024
faca28f
Update credentials-extraction logic to use new built-ins.
jayckaiser Jan 25, 2024
728c6e5
Revert code to try to circumvent permissions error.
jayckaiser Jan 26, 2024
059d371
Revert to non-public _get_field() method.
jayckaiser Jan 26, 2024
ba2e69e
Fix bug in argument name when calling Slack insert failure alert.
jayckaiser Jan 26, 2024
bebfb00
Migrate new DBT changes from feature/rally_features to be merged into…
jayckaiser Jan 30, 2024
136eef0
Add missing init file.
jayckaiser Jan 30, 2024
23f222b
Rename csv_path to local_path in jsonl helper.
jayckaiser Jan 30, 2024
16e5c3f
Increment version to 0.3.0.
jayckaiser Jan 30, 2024
a14bc52
Update imports to use new callables subfolder; add callables.
jayckaiser Jan 30, 2024
eef3ea8
Add Sharefile provider; add callables for gsheets, s3, snowflake, sna…
jayckaiser Jan 30, 2024
a0813d4
Merge all updates from Rally testing into original rally_features bra…
jayckaiser Feb 7, 2024
4873124
Re-remove deprecated dag_util files.
jayckaiser Feb 7, 2024
46d511e
Update CHANGELOG.md (still incomplete)
jayckaiser Feb 7, 2024
ec6fa86
Add missing package calls in init.
jayckaiser Feb 7, 2024
0661f4c
Fix callable routing in new-old DAG.
jayckaiser Feb 7, 2024
813388c
Allow deletion with specified where clause
susanxiong Jan 28, 2025
e04aac1
Pass in a list for sum
susanxiong Jan 28, 2025
5e87c29
Fix sum logic to use 0 if None
susanxiong Feb 24, 2025
f3855db
Merge pull request #62 from edanalytics/feature/rally_features__snowf…
susanxiong Mar 3, 2025
ded21d5
Update to main branch version of run_dbt dag and ea_custom_dag
susanxiong Apr 25, 2025
b6ddecd
Merge branch 'main' into feature/rally_features_main_merged
susanxiong Apr 25, 2025
58d307b
Delete snake_case.py (unused)
susanxiong Apr 25, 2025
e3fd72c
Remove references to slack_conn_id
susanxiong Apr 25, 2025
8880bad
Replace references to source_org with tenant/tenant_code
susanxiong Apr 25, 2025
76eca24
Sync files with main
susanxiong Apr 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 252 additions & 0 deletions ea_airflow_util/callables/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,255 @@ def snowflake_to_disk(

conn.close()
return local_path


### Imported from Rally
def _run_table_clear_query(
snowflake_conn: Connection,
dest_table: str,
truncate: bool = False,
delete_tenants: Optional[set] = None,
delete_where_clause: Optional[set] = None
):
"""
Isolated logic for truncating or deleting from a table.
"""
if sum([truncate or 0, delete_tenants or 0, delete_where_clause is not None]) > 1:
raise ValueError(f'!!! Only specify one of (truncate, delete, delete_where_clause) during Snowflake import to `{dest_table}`!')

# Truncate only
if truncate and not delete_tenants and delete_where_clause is None:
logging.info(f'Truncating table `{dest_table}`')
with snowflake_conn.cursor() as cur:
cur.execute(f"truncate {dest_table};")

# Delete source orgs only
elif delete_tenants and not truncate and delete_where_clause is None:
# create comma-separated string of source org set
delete_tenants = "('" + "','".join(delete_tenants) + "')"
logging.info(f'Deleting {delete_tenants} from `{dest_table}`')

with snowflake_conn.cursor() as cur:
delete_qry = f"""
delete from {dest_table}
where tenant_code in {delete_tenants}
"""
cur.execute(delete_qry)

# Delete with where clause only
elif delete_where_clause is not None and not delete_tenants and not truncate:
logging.info(f'Deleting where {delete_where_clause} from `{dest_table}`')

with snowflake_conn.cursor() as cur:
delete_qry = f"""
delete from {dest_table}
where {delete_where_clause}
"""
cur.execute(delete_qry)


def _run_table_import_query(
# S3 parameters
s3_hook: S3Hook,
s3_key: str,

# Postgres parameters
snowflake_conn: Connection,
dest_table: str,
stage: str,
column_customization: str,
column_customization_dtype: str,
metadata: str,
file_format: str,

# Meta-parameters
slack_on_failure: bool,
row_hash: bool,
**context
):
"""
Isolated logic for completing an import of data from S3 to Postgres.
"""
# Collect the credentials to pass to the copy query.
# (This is completed here to avoid passing sensitive information between functions.)
s3_creds = s3_hook.get_connection(s3_hook.aws_conn_id)

# TODO: the key in this method is a single file, not a directory.
# could consider splitting into multiple files in disk to s3
# then use pattern recognition in copy statements (non-row hashing)
# to take advantage of parallel operations (see below)
# https://docs.snowflake.com/en/user-guide/data-load-considerations-load.html#options-for-selecting-staged-data-files

# creating various column strings for the complicated merge
if file_format == 'json_default':
# TODO: is source org always the directory before the file itself
select_cols_str = ", ".join(
f"${num}::variant {name}"
for num, name in enumerate(column_customization.split(","), start=1)
)

else:
# TODO: is source org always the directory before the file itself
select_cols_str = ", ".join(
f"${num}::{column_customization_dtype} {name}"
for num, name in enumerate(column_customization.split(","), start=1)
)

select_cols_str += ", " + (
metadata
.replace('tenant_code', "split_part(metadata$filename, '/', -2) as tenant_code")
.replace('file_path', 'metadata$filename as file_path')
)

# add a forward slash to the end of the s3_key in order to properly find files
if not s3_key.endswith(('csv', '.gz', 'jsonl')):
s3_key = s3_key + '/'

# pull out the raw db from the dest table to use correct db for external stage (s3)
raw_db = dest_table.split('.', 1)[0]

# Build a copy query SQL string (logic differs if row hashing is used).
if row_hash:
# todo: will this need to be split(", ") ((include a space?))
hashed_cols_str = ", ".join(
f"${num}::{column_customization_dtype}"
for num, name in enumerate(column_customization.split(","), start=1)
)

# more necessary column strings because nothing can be easy
insert_cols_string = ", ".join([column_customization, metadata, 'row_md5_hash'])

values_cols_str = "{}, {}, source.row_md5_hash".format(
", ".join(f"source.{col}" for col in column_customization.split(",")),
", ".join(f"source.{col}" for col in metadata.split(","))
)

# todo: change the external path?
copy_query = f"""
merge into {dest_table} raw
using (
select
{select_cols_str},
md5(array_to_string(array_construct({hashed_cols_str}), ',')) as row_md5_hash
from @{raw_db}.public.{stage}/{s3_key} (file_format => {file_format})
) source
on raw.row_md5_hash = source.row_md5_hash
when not matched then
insert (
{insert_cols_string}
) values (
{values_cols_str}
);
"""

else:
copy_query = f"""
copy into {dest_table} ({column_customization}, {metadata})
from (
select {select_cols_str}
from @{raw_db}.public.{stage}/{s3_key} (file_format => {file_format})
)
force = true
on_error = skip_file;
"""

# Perform the actual execution of the copy query.
logging.info(f'Beginning insert to `{dest_table}`')
logging.info(f'Using query: `{copy_query}`')
with snowflake_conn.cursor(DictCursor) as cur:
try:
cur.execute(copy_query)
return_value = cur.rowcount if row_hash else cur.fetchone()
# logging if successful
if row_hash:
logging.info(f'''Number of rows inserted to `{dest_table}`: {return_value}''')
# TODO: this will offer the same info as the previous line
# ^ want to capture any errors for files
logging.info(cur.fetchone())
else:
# TODO: capture errors for individual files (will not fully error bc we set on error skip file)
logging.info(return_value)
except Exception as e:
logging.error(e)

if slack_on_failure and (slack_conn_id := context["dag"].user_defined_macros.get("slack_conn_id")):
slack.slack_alert_insert_failure(
context=context, http_conn_id=slack_conn_id,
file_key=s3_key, table=dest_table, error=str(e).splitlines()[0]
)

snowflake_conn.commit()


def import_s3_to_snowflake(
# S3 parameters
s3_conn_id: str,
s3_bucket: str,
s3_key: str,

# Postgres parameters
snowflake_conn_id: str,
dest_table: str,
stage: str,
column_customization: str = None,
column_customization_dtype: str = "string",

# Table clear parameters
truncate: bool = False,
delete: bool = False,
delete_where_clause: str = None,
metadata: str = None,

# Meta-parameters
slack_on_failure: bool = True,
row_hash: bool = False,
**context
):
# Establish connections to S3 and to Postgres.
s3_hook = S3Hook(s3_conn_id)
hook = SnowflakeHook(snowflake_conn_id)
snowflake_conn = hook.get_conn()

s3_subkeys = s3._list_s3_keys(s3_hook, s3_bucket, s3_key)
logging.info("Attempting to load these files: " + '\n'.join(s3_subkeys))

# check what type of file (this will determine file format for snowflake loading)
# TODO: should I check across all of the files? Not really necessary
file_format = None
if s3_subkeys[0].endswith(('.csv', '.gz')):
file_format = 'csv_enclosed'
if s3_subkeys[0].endswith('.jsonl'):
file_format = 'json_default'

# Extract the tenant from the S3 folder structure, and apply to SQL queries if necessary.
if delete:
delete_tenants = set([pathlib.PurePath(key).parent.name for key in s3_subkeys])
logging.info(f'Deleting tenants = {delete_tenants}')
else:
delete_tenants = None

# Apply table truncations or deletes if specified.
_run_table_clear_query(
snowflake_conn,
dest_table,
truncate=truncate,
delete_tenants=delete_tenants,
delete_where_clause=delete_where_clause
)

_run_table_import_query(
s3_hook=s3_hook,
s3_key=s3_key,

snowflake_conn=snowflake_conn,
dest_table=dest_table,
stage=stage,
column_customization=column_customization,
column_customization_dtype=column_customization_dtype,
metadata=metadata,
file_format=file_format,

slack_on_failure=slack_on_failure,
row_hash=row_hash,
**context
)