Skip to content

Commit 7acde54

Browse files
authored
Refactor dlt in dag and model names (#227)
* update dlt script * change progress bar to log for airflow * remove target from dbt run in dag * fix tag on dag and add info in example to find if change tracking is enabled * Add instructions on how to use dbt-osmosis and update project with refactored descriptions and col types * rename L1 models with stg_ prefit
1 parent 7dac4e8 commit 7acde54

File tree

67 files changed

+856
-183
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+856
-183
lines changed

load/dlt/loans_data.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#!/usr/bin/env -S uv run --verbose --cache-dir /tmp/.uv_cache
1+
#!/usr/bin/env -S uv run
22
# /// script
33
# dependencies = [
44
# "dlt[snowflake, parquet]==1.5.0",
@@ -28,7 +28,7 @@ def zip_coordinates():
2828

2929
@dlt.source
3030
def loans_data():
31-
return personal_loans, zip_coordinates
31+
return [personal_loans, zip_coordinates]
3232

3333
if __name__ == "__main__":
3434
datacoves_snowflake = dlt.destinations.snowflake(
@@ -46,8 +46,6 @@ def loans_data():
4646
dataset_name="loans"
4747
)
4848

49-
load_info = pipeline.run([
50-
loans_data()
51-
])
49+
load_info = pipeline.run(loans_data())
5250

5351
print(load_info)

load/dlt/us_population.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#!/usr/bin/env -S uv run --cache-dir /tmp/.uv_cache
1+
#!/usr/bin/env -S uv run
22
# /// script
33
# dependencies = [
44
# "dlt[snowflake, parquet]==1.5.0",
@@ -30,7 +30,7 @@ def us_population_source():
3030
)
3131

3232
pipeline = dlt.pipeline(
33-
progress = "enlighten",
33+
progress = "log",
3434
pipeline_name = "loans",
3535
destination = datacoves_snowflake,
3636
pipelines_dir = pipelines_dir,
@@ -40,7 +40,7 @@ def us_population_source():
4040
)
4141

4242
load_info = pipeline.run([
43-
us_population()
43+
us_population_source()
4444
])
4545

4646
print(load_info)

observe/osmosis/.env.sample

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
OPENAI_API_KEY=abc....

observe/osmosis/readme.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# dbt-osmosis
2+
3+
## Set openai key from .env file
4+
`set -a && source /config/workspace/observe/osmosis/.env && set +a`
5+
6+
## Run Refactor
7+
Executes organize which syncs yaml files with database schema.
8+
This also uses OpenAI to add descriptions to cols.
9+
10+
`uvx --with=dbt-snowflake~=1.8.0 --from 'dbt-osmosis[openai]' dbt-osmosis yaml refactor --synthesize`
11+
12+
Run without OpenAI
13+
14+
`uvx --with=dbt-snowflake~=1.8.0 dbt-osmosis yaml refactor`
15+
16+
You can pass `--fqn` with a name of a folder in `models/` to limit the refactor
17+
18+
`dbt-osmosis yaml refactor --fqn L3_coves`
19+
20+
Separate sub-folders with a `.`
21+
22+
`dbt-osmosis yaml refactor --fqn L3_coves.loan_analytics`

orchestrate/dags/daily_loan_run.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,26 @@ def extract_and_load_fivetran():
5252
@task_group(group_id="extract_and_load_dlt", tooltip="dlt Extract and Load")
5353
def extract_and_load_dlt():
5454
load_us_population = DatacovesBashOperator(
55-
task_id="load_us_population",
56-
bash_command="cd load/dlt && ./loans_data.py",
55+
task_id="load_loads_data",
56+
bash_command="""
57+
cd load/dlt \
58+
&& ./loans_data.py
59+
""",
60+
env={
61+
"UV_CACHE_DIR": "/tmp/uv_cache",
62+
"EXTRACT__NEXT_ITEM_MODE":"fifo",
63+
"EXTRACT__MAX_PARALLEL_ITEMS":"1",
64+
"EXTRACT__WORKERS":"1",
65+
"NORMALIZE__WORKERS":"1",
66+
"LOAD__WORKERS":"1",
67+
},
68+
append_env=True,
5769
)
5870

5971
tg_extract_and_load_dlt = extract_and_load_dlt()
6072
transform = DatacovesDbtOperator(
6173
task_id="transform",
62-
bash_command="dbt build -s 'tag:daily_run_airbyte+ tag:daily_run_fivetran+ -t prd'",
74+
bash_command="dbt build -s 'tag:daily_run_airbyte+ tag:daily_run_fivetran+'",
6375
)
6476
transform.set_upstream(
6577
[
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from airflow.decorators import dag
2+
from orchestrate.utils.default_args import default_args
3+
from operators.datacoves.dbt import DatacovesDbtOperator
4+
5+
6+
@dag(
7+
default_args=default_args,
8+
description="Daily dbt run",
9+
schedule="0 12 * * *",
10+
tags=["transform"],
11+
catchup=False,
12+
13+
)
14+
def default_args_dag():
15+
run_dbt = DatacovesDbtOperator(
16+
task_id="run_dbt", bash_command="dbt run -s country_codes"
17+
)
18+
19+
20+
dag = default_args_dag()

orchestrate/utils/default_args.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# utils/default_args.py
2+
from datetime import datetime, timedelta
3+
4+
default_args = {
5+
'owner': '[email protected]',
6+
'email_on_failure': False,
7+
'email_on_retry': False,
8+
'retries': 3,
9+
'retry_delay': timedelta(minutes=5),
10+
'start_date': datetime(2023, 12, 1),
11+
}

transform/.dbt_coves/config.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ generate:
44
# schemas: # List of schema names where to look for source tables
55
# - RAW
66
sources_destination: "models/L1_inlets/{{schema}}/_{{schema}}.yml" # Where sources yml files will be generated
7-
models_destination: "models/L1_inlets/{{schema}}/{{relation}}.sql" # Where models sql files will be generated
8-
model_props_destination: "models/L1_inlets/{{schema}}/{{relation}}.yml" # Where models yml files will be generated
7+
models_destination: "models/L1_inlets/{{schema}}/stg_{{relation}}.sql" # Where models sql files will be generated
8+
model_props_destination: "models/L1_inlets/{{schema}}/stg_{{relation}}.yml" # Where models yml files will be generated
99
update_strategy: update # Action to perform when a property file exists. Options: update, recreate, fail, ask
1010
templates_folder: ".dbt_coves/templates" # Folder where source generation jinja templates are located.
1111
flatten_json_fields: "no" # Action to perform when VARIANT / JSON field is encounted

transform/.dbt_coves/templates/staging_model_props.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
version: 2
22

33
models:
4-
- name: {{model}}
4+
- name: stg_{{ model | lower }}
55
description: ''
66
columns:
77
{%- for cols in nested.values() %}

transform/dbt_project.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ seeds:
4343
+persist_docs:
4444
relation: true
4545
columns: true
46+
+dbt-osmosis: "{model}.yml"
4647

4748
snapshots:
4849
+enabled: "{{ True if target.database == 'balboa' else False }}"
@@ -56,6 +57,7 @@ snapshots:
5657
models:
5758
# elementary:
5859
# +schema: "elementary"
60+
+dbt-osmosis: "{model}.yml"
5961

6062
balboa:
6163
+materialized: view
@@ -126,7 +128,7 @@ models:
126128
- "{{ dbt_snow_mask.apply_masking_policy('models') if target.name == 'prd_pii' }}"
127129
# This macro creates a non_versioned view of a versioned dbt model
128130
# Useful if you want to have a versioned mart but dont want BI tool to break if version
129-
# is updated.
131+
# is updated.
130132
# - "{{ create_latest_version_view() }}"
131133

132134
# Snowflake Defaults

0 commit comments

Comments
 (0)