Skip to content

Commit e084464

Browse files
Feature/datacoves3.3 (#229)
* Updating daily_loan * Update Airflow API example * Fivetran refactor * Change examples to use decorators * Change airflow connection name * Testing instance aware dags for airflow. * Testing is_development_environment * Move get_schedule to utils and add airflow aware dag * Testing key pair airflow service connection * updating versions and adding overrides * Change airbyte connection id * Add note for constants file * update decorator with env * update dataset outlet * Update DAG to use >> dependency management * Fix dlt * Remove dbt out of airflow github action * remove variable import --------- Co-authored-by: Noel Gomez Co-authored-by: Mayra Pena
1 parent d7be73e commit e084464

25 files changed

+389
-360
lines changed

.github/workflows/10_integrate_airflow_changes.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ jobs:
2626
AIRFLOW__CORE__DAGS_FOLDER: /__w/${{ github.event.repository.name }}/${{ github.event.repository.name }}/orchestrate/dags
2727
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 300
2828
AIRFLOW__ARTIFACTS_PATH: /__w/${{ github.event.repository.name }}/${{ github.event.repository.name }}/orchestrate
29-
DBT_PROFILES_DIR: /__w/${{ github.event.repository.name }}/${{ github.event.repository.name }}/automate/dbt
3029
DATACOVES__DBT_HOME: /__w/${{ github.event.repository.name }}/${{ github.event.repository.name }}/transform
3130
DATACOVES__REPO_PATH: /__w/${{ github.event.repository.name }}/${{ github.event.repository.name }}
3231
PYTHONPATH: /__w/${{ github.event.repository.name }}/${{ github.event.repository.name }}

.github/workflows/10_integrate_dbt_changes.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ jobs:
2727
# environment: PR_ENV
2828

2929
# most people should use this one
30-
container: datacoves/ci-basic-dbt-snowflake:3.2
30+
container: datacoves/ci-basic-dbt-snowflake:3.3
3131

3232
defaults:
3333
run:

.github/workflows/20_drop_integration_db.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ jobs:
2121
# Alternatively, You can define multiple ENV for different workflows.
2222
# https://github.com/<org>/<repo>/settings/environments
2323
# environment: PR_ENV
24-
container: datacoves/ci-basic-dbt-snowflake:3.2
24+
container: datacoves/ci-basic-dbt-snowflake:3.3
2525

2626
defaults:
2727
run:

.github/workflows/30_deploy_changes_to_production.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ jobs:
2626
# Alternatively, You can define multiple ENV for different workflows.
2727
# https://github.com/<org>/<repo>/settings/environments
2828
# environment: PR_ENV
29-
container: datacoves/ci-basic-dbt-snowflake:3.2
29+
container: datacoves/ci-basic-dbt-snowflake:3.3
3030

3131
defaults:
3232
run:

.pre-commit-config.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ files: ^transform/models/
22

33
repos:
44
- repo: https://github.com/dbt-checkpoint/dbt-checkpoint
5-
rev: v2.0.5
5+
rev: v2.0.6
66

77
hooks:
88
- id: check-source-table-has-description
@@ -27,7 +27,7 @@ repos:
2727
additional_dependencies:
2828
[
2929
"sqlfluff-templater-dbt==3.1.1",
30-
"dbt-core==1.8.7",
30+
"dbt-core==1.8.8",
3131
"dbt-snowflake==1.8.4",
3232
]
3333
args: [--config, transform/.sqlfluff]
Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
11
import datetime
2-
3-
from airflow.decorators import dag, task_group
2+
from airflow.decorators import dag, task, task_group
43
from airflow.models import Variable
5-
from operators.datacoves.bash import DatacovesBashOperator
64

7-
# This is Here to show what NOT to do. When done this way, Aiflow will
8-
# query for this variable on every parse (every 30 secs). This can be
9-
# bad if using an external secrets manager like AWS Secrets Manager.
10-
# Doing this will incur significant AWS charges
11-
# The proper way to get a value is to do this in a method with the @task decorator
5+
6+
# ❌ BAD PRACTICE: Fetching a variable at the top level
7+
# This will cause Airflow to query for this variable on EVERY DAG PARSE (every 30 seconds),
8+
# which can be costly when using an external secrets manager (e.g., AWS Secrets Manager).
129
bad_used_variable = Variable.get("bad_used_variable", "default_value")
1310

1411
@dag(
@@ -19,20 +16,24 @@
1916
"email_on_failure": True,
2017
"retries": 3
2118
},
22-
description="Sample DAG for dbt build",
19+
description="Sample DAG demonstrating bad variable usage",
2320
schedule="0 0 1 */12 *",
2421
tags=["extract_and_load"],
2522
catchup=False,
2623
)
2724
def bad_variable_usage():
25+
2826
@task_group(group_id="extract_and_load_dlt", tooltip="dlt Extract and Load")
2927
def extract_and_load_dlt():
30-
load_us_population = DatacovesBashOperator(
31-
task_id="load_us_population",
32-
bash_command="cd load/dlt && ./loans_data.py",
33-
)
28+
"""Task group for DLT extract and load process"""
29+
30+
@task.datacoves_bash(env={"BAD_VAR": bad_used_variable}) # ✅ Passing the bad variable to the task
31+
def load_us_population():
32+
return "cd load/dlt && ./loans_data.py"
3433

35-
tg_extract_and_load_dlt = extract_and_load_dlt()
34+
load_us_population()
3635

36+
extract_and_load_dlt()
3737

38+
# Invoke DAG
3839
dag = bad_variable_usage()

orchestrate/dags/daily_loan_run.py

Lines changed: 75 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,6 @@
11
import datetime
2-
3-
from airflow.decorators import dag, task_group
4-
from airflow.providers.airbyte.operators.airbyte import \
5-
AirbyteTriggerSyncOperator
6-
from fivetran_provider_async.operators import FivetranOperator
7-
from fivetran_provider_async.sensors import FivetranSensor
8-
from operators.datacoves.bash import DatacovesBashOperator
9-
from operators.datacoves.dbt import DatacovesDbtOperator
2+
from airflow.decorators import dag, task, task_group
3+
from datahub_airflow_plugin.entities import Dataset
104

115
@dag(
126
default_args={"start_date": datetime.datetime(2024, 1, 1, 0, 0), "retries": 3},
@@ -16,79 +10,98 @@
1610
catchup=False,
1711
)
1812
def daily_loan_run():
13+
1914
@task_group(group_id="extract_and_load_airbyte", tooltip="Airbyte Extract and Load")
2015
def extract_and_load_airbyte():
21-
country_populations_datacoves_snowflake = AirbyteTriggerSyncOperator(
22-
task_id="country_populations_datacoves_snowflake",
23-
connection_id="ac02ea96-58a1-4061-be67-78900bb5aaf6",
24-
airbyte_conn_id="airbyte_connection",
25-
)
16+
@task
17+
def sync_airbyte():
18+
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
19+
return AirbyteTriggerSyncOperator(
20+
task_id="country_populations_datacoves_snowflake",
21+
connection_id="b293aaea-6557-4506-8cfb-6b621ec4c6ef",
22+
airbyte_conn_id="airbyte_connection",
23+
).execute({})
24+
25+
sync_airbyte()
2626

2727
tg_extract_and_load_airbyte = extract_and_load_airbyte()
2828

29-
@task_group(
30-
group_id="extract_and_load_fivetran", tooltip="Fivetran Extract and Load"
31-
)
29+
@task_group(group_id="extract_and_load_fivetran", tooltip="Fivetran Extract and Load")
3230
def extract_and_load_fivetran():
33-
datacoves_snowflake_google_analytics_4_trigger = FivetranOperator(
34-
task_id="datacoves_snowflake_google_analytics_4_trigger",
35-
fivetran_conn_id="fivetran_connection",
36-
connector_id="speak_menial",
37-
wait_for_completion=False,
38-
)
39-
datacoves_snowflake_google_analytics_4_sensor = FivetranSensor(
40-
task_id="datacoves_snowflake_google_analytics_4_sensor",
41-
fivetran_conn_id="fivetran_connection",
42-
connector_id="speak_menial",
43-
poke_interval=60,
44-
)
45-
(
46-
datacoves_snowflake_google_analytics_4_trigger
47-
>> datacoves_snowflake_google_analytics_4_sensor
48-
)
31+
32+
@task
33+
def trigger_fivetran():
34+
from fivetran_provider_async.operators import FivetranOperator
35+
return FivetranOperator(
36+
task_id="datacoves_snowflake_google_analytics_4_trigger",
37+
fivetran_conn_id="fivetran_connection",
38+
connector_id="speak_menial",
39+
wait_for_completion=False,
40+
).execute({})
41+
42+
@task
43+
def sensor_fivetran():
44+
from fivetran_provider_async.sensors import FivetranSensor
45+
return FivetranSensor(
46+
task_id="datacoves_snowflake_google_analytics_4_sensor",
47+
fivetran_conn_id="fivetran_connection",
48+
connector_id="speak_menial",
49+
poke_interval=60,
50+
).poke({})
51+
52+
trigger = trigger_fivetran()
53+
sensor = sensor_fivetran()
54+
55+
trigger >> sensor
56+
return sensor # Return last task in the group
4957

5058
tg_extract_and_load_fivetran = extract_and_load_fivetran()
5159

5260
@task_group(group_id="extract_and_load_dlt", tooltip="dlt Extract and Load")
5361
def extract_and_load_dlt():
54-
load_us_population = DatacovesBashOperator(
55-
task_id="load_loads_data",
56-
bash_command="""
57-
cd load/dlt \
58-
&& ./loans_data.py
59-
""",
62+
@task.datacoves_bash(
63+
outlets=[Dataset("snowflake", "raw.loans_data.loans_data")],
6064
env={
6165
"UV_CACHE_DIR": "/tmp/uv_cache",
62-
"EXTRACT__NEXT_ITEM_MODE":"fifo",
63-
"EXTRACT__MAX_PARALLEL_ITEMS":"1",
64-
"EXTRACT__WORKERS":"1",
65-
"NORMALIZE__WORKERS":"1",
66-
"LOAD__WORKERS":"1",
66+
"EXTRACT__NEXT_ITEM_MODE": "fifo",
67+
"EXTRACT__MAX_PARALLEL_ITEMS": "1",
68+
"EXTRACT__WORKERS": "1",
69+
"NORMALIZE__WORKERS": "1",
70+
"LOAD__WORKERS": "1",
6771
},
68-
append_env=True,
72+
append_env=True
6973
)
74+
def load_loans_data():
75+
return "cd load/dlt && ./loans_data.py"
76+
load_loans_data()
7077

7178
tg_extract_and_load_dlt = extract_and_load_dlt()
72-
transform = DatacovesDbtOperator(
73-
task_id="transform",
74-
bash_command="dbt build -s 'tag:daily_run_airbyte+ tag:daily_run_fivetran+'",
75-
)
76-
transform.set_upstream(
77-
[
78-
tg_extract_and_load_airbyte,
79-
tg_extract_and_load_dlt,
80-
tg_extract_and_load_fivetran,
79+
80+
@task.datacoves_dbt(
81+
connection_id="main",
82+
inlets=[
83+
Dataset("snowflake", "raw.loans_data.loans_data"),
84+
Dataset("snowflake", "raw.google_analytics_4.engagement_events_report")
8185
]
8286
)
83-
marketing_automation = DatacovesBashOperator(
84-
task_id="marketing_automation",
85-
bash_command="echo 'send data to marketing tool'",
86-
)
87-
marketing_automation.set_upstream([transform])
88-
update_catalog = DatacovesBashOperator(
89-
task_id="update_catalog", bash_command="echo 'refresh data catalog'"
90-
)
91-
update_catalog.set_upstream([transform])
87+
def transform():
88+
return "dbt build -s 'tag:daily_run_airbyte+ tag:daily_run_fivetran+ -t prd'"
89+
90+
@task.datacoves_bash
91+
def marketing_automation():
92+
return "echo 'send data to marketing tool'"
93+
94+
@task.datacoves_bash
95+
def update_catalog():
96+
return "echo 'refresh data catalog'"
97+
98+
99+
transform_task = transform()
100+
marketing_automation_task = marketing_automation()
101+
update_catalog_task = update_catalog()
92102

103+
[tg_extract_and_load_airbyte, tg_extract_and_load_dlt, tg_extract_and_load_fivetran] >> transform_task
104+
transform_task >> [marketing_automation_task, update_catalog_task]
93105

106+
# Invoke DAG
94107
dag = daily_loan_run()

orchestrate/dags/data_aware/consumer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from airflow.decorators import dag, task
44
from airflow.datasets import Dataset
5-
from operators.datacoves.bash import DatacovesBashOperator
5+
66

77
MY_SOURCE = Dataset("upstream_data")
88

@@ -20,10 +20,10 @@
2020
)
2121
def data_aware_consumer_dag():
2222
@task
23-
def run_dbt():
23+
def run_consumer():
2424
print("I'm the consumer")
2525

26-
run_dbt()
26+
run_consumer()
2727

2828

2929
dag = data_aware_consumer_dag()

orchestrate/dags/data_aware/producer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from airflow.decorators import dag, task
44
from airflow.datasets import Dataset
5-
from operators.datacoves.bash import DatacovesBashOperator
5+
66

77
# A dataset can be anything, it will be a poiner in the Airflow db.
88
# If you need to access url like s3://my_bucket/my_file.txt then you can set

orchestrate/dags/ng_test.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import datetime
2+
from airflow.decorators import dag, task
3+
from airflow.models import Variable
4+
from datahub_airflow_plugin.entities import Dataset
5+
6+
7+
# ❌ BAD PRACTICE: Fetching a variable at the top level
8+
# This will cause Airflow to query for this variable on EVERY DAG PARSE (every 30 seconds),
9+
# which can be costly when using an external secrets manager (e.g., AWS Secrets Manager).
10+
bad_used_variable = Variable.get("bad_used_variable", "default_value")
11+
12+
@dag(
13+
default_args={
14+
"start_date": datetime.datetime(2024, 1, 1, 0, 0),
15+
"owner": "Noel Gomez",
16+
"email": "[email protected]",
17+
"email_on_failure": True,
18+
"retries": 3
19+
},
20+
description="Sample DAG demonstrating bad variable usage",
21+
schedule="0 0 1 */12 *",
22+
tags=["extract_and_load","transform"],
23+
catchup=False,
24+
)
25+
def ng_test():
26+
27+
# @task.datacoves_dbt(connection_id="main")
28+
# def show_env_value():
29+
# return """
30+
# echo dbt_home: && echo $DATACOVES__DBT_HOME &&
31+
# echo repo_path: && echo $DATACOVES__REPO_PATH &&
32+
# echo cwd: && pwd
33+
# """
34+
# show_env_value()
35+
36+
@task.datacoves_bash(
37+
outlets=[Dataset("snowflake", "raw.us_population.us_population")],
38+
env={
39+
"UV_CACHE_DIR": "/tmp/uv_cache",
40+
"EXTRACT__NEXT_ITEM_MODE":"fifo",
41+
"EXTRACT__MAX_PARALLEL_ITEMS":"1",
42+
"EXTRACT__WORKERS":"1",
43+
"NORMALIZE__WORKERS":"1",
44+
"LOAD__WORKERS":"1",
45+
},
46+
append_env=True
47+
)
48+
def load_us_population():
49+
return "cd load/dlt/ && ./us_population.py"
50+
51+
52+
@task.datacoves_dbt(
53+
connection_id="main",
54+
inlets=[
55+
Dataset("snowflake", "raw.us_population.us_population"),
56+
Dataset("snowflake", "raw.google_analytics_4.engagement_events_report")
57+
]
58+
)
59+
def run_dbt():
60+
return "dbt build -s 'tag:daily_run_airbyte+ tag:daily_run_fivetran+ -t prd'"
61+
62+
load_us_population() >> run_dbt()
63+
64+
# Invoke DAG
65+
dag = ng_test()

0 commit comments

Comments
 (0)