Skip to content

Commit 932c527

Browse files
authored
Updates for dlt not in main (#235)
* Rewrite daily loan run * add DAG to fetch var from aws secrets directly using secret manager hook * dlt updates
1 parent 9e01c87 commit 932c527

File tree

5 files changed

+72
-38
lines changed

5 files changed

+72
-38
lines changed

.github/workflows/10_integrate_airflow_changes.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ jobs:
1919
airflow:
2020
name: Pull Request Airflow Tests
2121
runs-on: ubuntu-latest
22-
# container: datacoves/ci-airflow-dbt-snowflake:3.3
23-
container: datacoves/ci-airflow-dbt-snowflake:3.4.202505062131-6a05965e
22+
container: datacoves/ci-airflow-dbt-snowflake:3.3
2423

2524
env:
2625
AIRFLOW__CORE__DAGS_FOLDER: /__w/${{ github.event.repository.name }}/${{ github.event.repository.name }}/orchestrate/dags

orchestrate/dags/daily_loan_run.py

Lines changed: 22 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
from airflow.decorators import dag, task, task_group
77
from orchestrate.utils import datacoves_utils
88

9-
from datahub_airflow_plugin.entities import Dataset
9+
from fivetran_provider_async.operators import FivetranOperator
10+
from fivetran_provider_async.sensors import FivetranSensor
11+
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
1012

1113
@dag(
1214
doc_md = __doc__,
@@ -28,47 +30,32 @@ def daily_loan_run():
2830
tooltip="Airbyte Extract and Load"
2931
)
3032
def extract_and_load_airbyte():
31-
3233
# Extact and load
33-
@task
34-
def sync_airbyte():
35-
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
36-
return AirbyteTriggerSyncOperator(
37-
task_id="country_populations_datacoves_snowflake",
38-
connection_id="ac02ea96-58a1-4061-be67-78900bb5aaf6",
39-
airbyte_conn_id="airbyte_connection",
40-
).execute({})
41-
42-
sync_airbyte()
34+
sync_airbyte = AirbyteTriggerSyncOperator(
35+
task_id="country_populations_datacoves_snowflake",
36+
connection_id="ac02ea96-58a1-4061-be67-78900bb5aaf6",
37+
airbyte_conn_id="airbyte_connection",
38+
)
4339

4440

4541
@task_group(
4642
group_id="extract_and_load_fivetran",
4743
tooltip="Fivetran Extract and Load"
4844
)
4945
def extract_and_load_fivetran():
50-
51-
@task
52-
def trigger_fivetran():
53-
from fivetran_provider_async.operators import FivetranOperator
54-
return FivetranOperator(
55-
task_id="datacoves_snowflake_google_analytics_4_trigger",
56-
fivetran_conn_id="fivetran_connection",
57-
connector_id="speak_menial",
58-
wait_for_completion=False,
59-
).execute({})
60-
61-
@task
62-
def sensor_fivetran():
63-
from fivetran_provider_async.sensors import FivetranSensor
64-
return FivetranSensor(
65-
task_id="datacoves_snowflake_google_analytics_4_sensor",
66-
fivetran_conn_id="fivetran_connection",
67-
connector_id="speak_menial",
68-
poke_interval=60,
69-
).poke({})
70-
71-
trigger_fivetran() >> sensor_fivetran()
46+
trigger_fivetran = FivetranOperator(
47+
task_id="datacoves_snowflake_google_analytics_4_trigger",
48+
fivetran_conn_id="fivetran_connection",
49+
connector_id="speak_menial",
50+
wait_for_completion=False,
51+
)
52+
sensor_fivetran = FivetranSensor(
53+
task_id="datacoves_snowflake_google_analytics_4_sensor",
54+
fivetran_conn_id="fivetran_connection",
55+
connector_id="speak_menial",
56+
poke_interval=60,
57+
)
58+
trigger_fivetran >> sensor_fivetran
7259

7360

7461
@task_group(
@@ -93,7 +80,7 @@ def load_loans_data():
9380
connection_id="main"
9481
)
9582
def transform():
96-
return "dbt build -s 'tag:daily_run_airbyte+ tag:daily_run_fivetran+ -t prd'"
83+
return "dbt build -s 'tag:daily_run_airbyte+ tag:daily_run_fivetran+'"
9784

9885

9986
# Post transformation tasks

orchestrate/dags/dbt_dag.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
1818
schedule = datacoves_utils.set_schedule("0 0 1 */12 *"),
1919
description="Sample DAG demonstrating how to run dbt in airflow",
20-
tags=["transform", "version_2"],
20+
tags=["transform"],
2121
)
2222
def dbt_dag():
2323

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
"""
2+
## Sample DAG using variables
3+
This DAG is a sample using the Datacoves decorators with variable from AWS.
4+
"""
5+
6+
from airflow.decorators import dag, task
7+
from pendulum import datetime
8+
from airflow.providers.amazon.aws.hooks.secrets_manager import SecretsManagerHook
9+
10+
@dag(
11+
doc_md = __doc__,
12+
catchup = False,
13+
14+
default_args={
15+
"start_date": datetime(2024, 1, 1),
16+
"owner": "Noel Gomez",
17+
"email": "[email protected]",
18+
"email_on_failure": True,
19+
"retries": 3,
20+
},
21+
tags=["sample"],
22+
description="Testing task decorators",
23+
schedule_interval="0 0 1 */12 *",
24+
)
25+
def variable_usage():
26+
27+
@task.datacoves_bash
28+
def aws_var():
29+
secrets_manager_hook = SecretsManagerHook(aws_conn_id='aws_secrets_manager')
30+
var = secrets_manager_hook.get_secret("airflow/variables/aws_ngtest")
31+
return f"export MY_VAR={var} && echo $MY_VAR"
32+
33+
aws_var()
34+
35+
variable_usage()

orchestrate/utils/datacoves_utils.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,19 @@
2121
# Environment Variables
2222
############################################
2323

24+
def generate_env_exports(env_vars):
25+
"""
26+
Generates bash export statements for environment variables.
27+
28+
Args:
29+
env_vars (dict): Dictionary of environment variables
30+
31+
Returns:
32+
str: Bash commands to export all environment variables
33+
"""
34+
return "; ".join([f'export {k}="{v}"' for k, v in env_vars.items()])
35+
36+
2437
def set_dlt_env_vars(dlt_connections):# = {"sources": {}, "destinations": {}}):
2538
all_vars = {}
2639
vars = {}

0 commit comments

Comments
 (0)