Skip to content

Commit 8f27f60

Browse files
authored
Merge pull request #252 from microsoft/v1.9.0
V1.9.0 release
2 parents 4bd7ea9 + 2ac5e23 commit 8f27f60

File tree

10 files changed

+1142
-111
lines changed

10 files changed

+1142
-111
lines changed

dbt/adapters/fabric/fabric_adapter.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ def valid_incremental_strategies(self):
166166
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
167167
Not used to validate custom strategies defined by end users.
168168
"""
169-
return ["append", "delete+insert", "merge", "insert_overwrite"]
169+
return ["append", "delete+insert", "microbatch"]
170170

171171
# This is for use in the test suite
172172
def run_sql_for_tests(self, sql, fetch, conn):

dbt/include/fabric/macros/adapters/columns.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{% macro fabric__get_empty_subquery_sql(select_sql, select_sql_header=none) %}
2-
{% if sql.strip().lower().startswith('with') %}
2+
{% if select_sql.strip().lower().startswith('with') %}
33
{{ select_sql }}
44
{% else -%}
55
select * from (

dbt/include/fabric/macros/materializations/models/incremental/incremental_strategies.sql

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{% macro fabric__get_incremental_default_sql(arg_dict) %}
22

33
{% if arg_dict["unique_key"] %}
4+
-- Delete + Insert Strategy, calls get_delete_insert_merge_sql
45
{% do return(get_incremental_delete_insert_sql(arg_dict)) %}
56
{% else %}
67
-- Incremental Append will insert data into target table.

dbt/include/fabric/macros/materializations/models/incremental/merge.sql

+32
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,35 @@
5858
from {{ source }}
5959
){{ query_label }}
6060
{% endmacro %}
61+
62+
{% macro fabric__get_incremental_microbatch_sql(arg_dict) %}
63+
{%- set target = arg_dict["target_relation"] -%}
64+
{%- set source = arg_dict["temp_relation"] -%}
65+
{%- set dest_columns = arg_dict["dest_columns"] -%}
66+
{%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%}
67+
68+
{#-- Add additional incremental_predicates to filter for batch --#}
69+
{% if model.config.get("__dbt_internal_microbatch_event_time_start") -%}
70+
{{ log("incremenal append event start time > DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") }}
71+
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") %}
72+
{% endif %}
73+
{% if model.config.__dbt_internal_microbatch_event_time_end -%}
74+
{{ log("incremenal append event end time < DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") }}
75+
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") %}
76+
{% endif %}
77+
{% do arg_dict.update({'incremental_predicates': incremental_predicates}) %}
78+
79+
delete DBT_INTERNAL_TARGET from {{ target }} AS DBT_INTERNAL_TARGET
80+
where (
81+
{% for predicate in incremental_predicates %}
82+
{%- if not loop.first %}and {% endif -%} {{ predicate }}
83+
{% endfor %}
84+
);
85+
86+
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
87+
insert into {{ target }} ({{ dest_cols_csv }})
88+
(
89+
select {{ dest_cols_csv }}
90+
from {{ source }}
91+
)
92+
{% endmacro %}

dbt/include/fabric/macros/materializations/snapshots/helpers.sql

+101-88
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717

1818

1919
{% macro fabric__build_snapshot_table(strategy, relation) %}
20-
20+
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}
2121
select *,
22-
{{ strategy.scd_id }} as dbt_scd_id,
23-
{{ strategy.updated_at }} as dbt_updated_at,
24-
{{ strategy.updated_at }} as dbt_valid_from,
25-
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to
22+
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }},
23+
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
24+
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
25+
{{ get_dbt_valid_to_current(strategy, columns) }}
26+
{%- if strategy.hard_deletes == 'new_record' -%}
27+
, 'False' as {{ columns.dbt_is_deleted }}
28+
{% endif -%}
2629
from (
2730
select * from {{ relation }}
2831
) sbq
@@ -31,115 +34,125 @@
3134

3235
{% macro fabric__snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) -%}
3336

34-
with snapshot_query as (
37+
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}
3538

39+
with snapshot_query as (
3640
select * from {{ temp_snapshot_relation }}
37-
3841
),
39-
4042
snapshotted_data as (
41-
4243
select *,
43-
{{ strategy.unique_key }} as dbt_unique_key
44-
44+
{{ unique_key_fields(strategy.unique_key) }}
4545
from {{ target_relation }}
46-
where dbt_valid_to is null
47-
46+
where
47+
{% if config.get('dbt_valid_to_current') %}
48+
{# Check for either dbt_valid_to_current OR null, in order to correctly update records with nulls #}
49+
( {{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or {{ columns.dbt_valid_to }} is null)
50+
{% else %}
51+
{{ columns.dbt_valid_to }} is null
52+
{% endif %}
4853
),
49-
5054
insertions_source_data as (
51-
52-
select
53-
*,
54-
{{ strategy.unique_key }} as dbt_unique_key,
55-
{{ strategy.updated_at }} as dbt_updated_at,
56-
{{ strategy.updated_at }} as dbt_valid_from,
57-
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to,
58-
{{ strategy.scd_id }} as dbt_scd_id
59-
55+
select *,
56+
{{ unique_key_fields(strategy.unique_key) }},
57+
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
58+
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
59+
{{ get_dbt_valid_to_current(strategy, columns) }},
60+
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }}
6061
from snapshot_query
6162
),
62-
6363
updates_source_data as (
64-
65-
select
66-
*,
67-
{{ strategy.unique_key }} as dbt_unique_key,
68-
{{ strategy.updated_at }} as dbt_updated_at,
69-
{{ strategy.updated_at }} as dbt_valid_from,
70-
{{ strategy.updated_at }} as dbt_valid_to
71-
72-
from snapshot_query
73-
),
74-
75-
{%- if strategy.invalidate_hard_deletes %}
76-
77-
deletes_source_data as (
78-
79-
select
80-
*,
81-
{{ strategy.unique_key }} as dbt_unique_key
64+
select *,
65+
{{ unique_key_fields(strategy.unique_key) }},
66+
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
67+
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
68+
{{ strategy.updated_at }} as {{ columns.dbt_valid_to }}
8269
from snapshot_query
8370
),
71+
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
72+
deletes_source_data as (
73+
select *, {{ unique_key_fields(strategy.unique_key) }}
74+
from snapshot_query
75+
),
8476
{% endif %}
85-
8677
insertions as (
87-
88-
select
89-
'insert' as dbt_change_type,
90-
source_data.*
91-
78+
select 'insert' as dbt_change_type, source_data.*
79+
{%- if strategy.hard_deletes == 'new_record' -%}
80+
,'False' as {{ columns.dbt_is_deleted }}
81+
{%- endif %}
9282
from insertions_source_data as source_data
93-
left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
94-
where snapshotted_data.dbt_unique_key is null
95-
or (
96-
snapshotted_data.dbt_unique_key is not null
97-
and (
98-
{{ strategy.row_changed }}
99-
)
100-
)
101-
83+
left outer join snapshotted_data
84+
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
85+
where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }}
86+
or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and ({{ strategy.row_changed }}))
10287
),
103-
10488
updates as (
105-
106-
select
107-
'update' as dbt_change_type,
108-
source_data.*,
109-
snapshotted_data.dbt_scd_id
110-
89+
select 'update' as dbt_change_type, source_data.*,
90+
snapshotted_data.{{ columns.dbt_scd_id }}
91+
{%- if strategy.hard_deletes == 'new_record' -%}
92+
, snapshotted_data.{{ columns.dbt_is_deleted }}
93+
{%- endif %}
11194
from updates_source_data as source_data
112-
join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
113-
where (
114-
{{ strategy.row_changed }}
115-
)
95+
join snapshotted_data
96+
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
97+
where ({{ strategy.row_changed }})
11698
)
117-
118-
{%- if strategy.invalidate_hard_deletes -%}
119-
,
120-
121-
deletes as (
122-
123-
select
124-
'delete' as dbt_change_type,
99+
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
100+
,
101+
deletes as (
102+
select 'delete' as dbt_change_type,
125103
source_data.*,
126-
{{ snapshot_get_time() }} as dbt_valid_from,
127-
{{ snapshot_get_time() }} as dbt_updated_at,
128-
{{ snapshot_get_time() }} as dbt_valid_to,
129-
snapshotted_data.dbt_scd_id
130-
131-
from snapshotted_data
132-
left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
133-
where source_data.dbt_unique_key is null
134-
)
104+
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
105+
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
106+
{{ snapshot_get_time() }} as {{ columns.dbt_valid_to }},
107+
snapshotted_data.{{ columns.dbt_scd_id }}
108+
{%- if strategy.hard_deletes == 'new_record' -%}
109+
, snapshotted_data.{{ columns.dbt_is_deleted }}
110+
{%- endif %}
111+
from snapshotted_data
112+
left join deletes_source_data as source_data
113+
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
114+
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
115+
)
135116
{%- endif %}
117+
{%- if strategy.hard_deletes == 'new_record' %}
118+
{%set source_query = "select * from "~temp_snapshot_relation%}
119+
{% set source_sql_cols = get_column_schema_from_query(source_query) %}
120+
,
121+
deletion_records as (
136122

123+
select
124+
'insert' as dbt_change_type,
125+
{%- for col in source_sql_cols -%}
126+
snapshotted_data.{{ adapter.quote(col.column) }},
127+
{% endfor -%}
128+
{%- if strategy.unique_key | is_list -%}
129+
{%- for key in strategy.unique_key -%}
130+
snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }},
131+
{% endfor -%}
132+
{%- else -%}
133+
snapshotted_data.dbt_unique_key as dbt_unique_key,
134+
{% endif -%}
135+
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
136+
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
137+
snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }},
138+
snapshotted_data.{{ columns.dbt_scd_id }},
139+
'True' as {{ columns.dbt_is_deleted }}
140+
from snapshotted_data
141+
left join deletes_source_data as source_data
142+
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
143+
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
144+
)
145+
{%- endif %}
137146
select * from insertions
138147
union all
139148
select * from updates
140-
{%- if strategy.invalidate_hard_deletes %}
141-
union all
142-
select * from deletes
149+
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
150+
union all
151+
select * from deletes
152+
{%- endif %}
153+
{%- if strategy.hard_deletes == 'new_record' %}
154+
union all
155+
select * from deletion_records
143156
{%- endif %}
144157

145158
{%- endmacro %}

dbt/include/fabric/macros/materializations/snapshots/snapshot.sql

+15-11
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
{% if not target_relation_exists %}
4141

4242
{% set build_sql = build_snapshot_table(strategy, temp_snapshot_relation) %}
43+
{% set build_or_select_sql = build_sql %}
4344

4445
-- naming a temp relation
4546
{% set tmp_relation_view = target_relation.incorporate(path={"identifier": target_relation.identifier ~ '__dbt_tmp_vw'}, type='view')-%}
@@ -51,40 +52,43 @@
5152

5253
{% else %}
5354

54-
{{ adapter.valid_snapshot_target(target_relation) }}
55+
{% set columns = config.get("snapshot_meta_column_names") or get_snapshot_table_column_names() %}
56+
{{ adapter.valid_snapshot_target(target_relation, columns) }}
57+
{% set build_or_select_sql = snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) %}
5558
{% set staging_table = build_snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) %}
5659
-- this may no-op if the database does not require column expansion
5760
{% do adapter.expand_target_column_types(from_relation=staging_table,
5861
to_relation=target_relation) %}
62+
63+
{% set remove_columns = ['dbt_change_type', 'DBT_CHANGE_TYPE', 'dbt_unique_key', 'DBT_UNIQUE_KEY'] %}
64+
{% if unique_key | is_list %}
65+
{% for key in strategy.unique_key %}
66+
{{ remove_columns.append('dbt_unique_key_' + loop.index|string) }}
67+
{{ remove_columns.append('DBT_UNIQUE_KEY_' + loop.index|string) }}
68+
{% endfor %}
69+
{% endif %}
5970
{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
60-
| rejectattr('name', 'equalto', 'dbt_change_type')
61-
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
62-
| rejectattr('name', 'equalto', 'dbt_unique_key')
63-
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
71+
| rejectattr('name', 'in', remove_columns)
6472
| list %}
6573
{% if missing_columns|length > 0 %}
6674
{{log("Missing columns length is: "~ missing_columns|length)}}
6775
{% do create_columns(target_relation, missing_columns) %}
6876
{% endif %}
6977
{% set source_columns = adapter.get_columns_in_relation(staging_table)
70-
| rejectattr('name', 'equalto', 'dbt_change_type')
71-
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
72-
| rejectattr('name', 'equalto', 'dbt_unique_key')
73-
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
78+
| rejectattr('name', 'in', remove_columns)
7479
| list %}
7580
{% set quoted_source_columns = [] %}
7681
{% for column in source_columns %}
7782
{% do quoted_source_columns.append(adapter.quote(column.name)) %}
7883
{% endfor %}
79-
8084
{% set final_sql = snapshot_merge_sql(
8185
target = target_relation,
8286
source = staging_table,
8387
insert_cols = quoted_source_columns
8488
)
8589
%}
8690
{% endif %}
87-
91+
{{ check_time_data_types(build_or_select_sql) }}
8892
{% call statement('main') %}
8993
{{ final_sql }}
9094
{% endcall %}
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{% macro fabric__snapshot_merge_sql(target, source, insert_cols) %}
22

33
{%- set insert_cols_csv = insert_cols | join(', ') -%}
4+
{%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%}
45
{%- set target_table = target.include(database=False) -%}
56
{%- set source_table = source.include(database=False) -%}
67
{% set target_columns_list = [] %}
@@ -9,17 +10,21 @@
910
{% endfor %}
1011
{%- set target_columns = target_columns_list | join(', ') -%}
1112

12-
UPDATE DBT_INTERNAL_DEST
13-
SET dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to
14-
FROM {{ target_table }} as DBT_INTERNAL_DEST
15-
INNER JOIN {{ source_table }} as DBT_INTERNAL_SOURCE
16-
on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id
17-
WHERE DBT_INTERNAL_DEST.dbt_valid_to is null
18-
AND DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete')
13+
update DBT_INTERNAL_DEST
14+
set {{ columns.dbt_valid_to }} = DBT_INTERNAL_SOURCE.{{ columns.dbt_valid_to }}
15+
from {{ target_table }} as DBT_INTERNAL_DEST
16+
inner join {{ source_table }} as DBT_INTERNAL_SOURCE
17+
on DBT_INTERNAL_SOURCE.{{ columns.dbt_scd_id }} = DBT_INTERNAL_DEST.{{ columns.dbt_scd_id }}
18+
where DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete')
19+
{% if config.get("dbt_valid_to_current") %}
20+
and (DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null)
21+
{% else %}
22+
and DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null
23+
{% endif %}
1924
{{ apply_label() }}
2025

21-
INSERT INTO {{ target_table }} ({{ insert_cols_csv }})
22-
SELECT {{target_columns}} FROM {{ source_table }} as DBT_INTERNAL_SOURCE
23-
WHERE DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
26+
insert into {{ target_table }} ({{ insert_cols_csv }})
27+
select {{target_columns}} from {{ source_table }} as DBT_INTERNAL_SOURCE
28+
where DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
2429
{{ apply_label() }}
2530
{% endmacro %}

0 commit comments

Comments
 (0)