Skip to content

Commit 28cda29

Browse files
committed
🎯 feat: add databricks/scd1-soft-delete template.
1 parent ec36c8f commit 28cda29

File tree

6 files changed

+141
-39
lines changed

6 files changed

+141
-39
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,12 @@ AND tgt.col02 = src.col02
6666
WHEN MATCHED THEN UPDATE
6767
SET target.col01 = source.col01
6868
, target.col02 = source.col02
69-
, target.updt_prcs_nm = 'SOURCE_FOO'
70-
, target.updt_prcs_ld_id = 1
71-
, target.updt_asat_dt = to_timestamp('20250201', 'yyyyMMdd')
69+
, target.updt_load_src = 'SOURCE_FOO'
70+
, target.updt_load_id = 1
71+
, target.updt_load_date = to_timestamp('20250201', 'yyyyMMdd')
7272
WHEN NOT MATCHED THEN INSERT
7373
(
74-
col01, col02, pk_col, start_dt, end_dt, delete_f, prcs_nm, prcs_ld_id, asat_dt, updt_prcs_nm, updt_prcs_ld_id, updt_asat_dt
74+
col01, col02, pk_col, load_src, load_id, load_date, updt_load_src, updt_load_id, updt_load_date
7575
)
7676
VALUES (
7777
source.col01,

src/sqlplate/core.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515

1616

1717
class SQLPlate:
18-
"""A SQL object for render any SQL template that prepare by Jinja package."""
18+
"""A SQLPlate object for render any SQL template that prepare by Jinja
19+
template.
20+
21+
This object cas pass an option with dot pattern like func-programing.
22+
"""
1923

2024
def __init__(self, name: str, path: Path) -> None:
2125
self.name: str = name
@@ -47,10 +51,17 @@ def template(self, name: str) -> 'SQLPlate':
4751
return self
4852

4953
def option(self, key: str, value: Any) -> 'SQLPlate':
54+
"""Pass an option key-value pair before generate template."""
5055
self._option[key] = value
5156
return self
5257

5358
def options(self, values: dict[str, Any]) -> 'SQLPlate':
59+
"""Pass an option mapping with multiple key-value pairs before generate
60+
template.
61+
62+
Args:
63+
values (dict[str, Any]): A mapping of multiple key-value pairs.
64+
"""
5465
self._option = self._option | values
5566
return self
5667

templates/databricks/etl.delta.sql

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{% include "utils/etl_vars.jinja" %}
2+
{{ raise_undefined('pk') if pk is undefined }}
23
{% import "macros/delta.jinja" as delta %}
3-
{%- set scd2_columns = ['start_dt', 'end_dt', 'delete_f', 'prcs_nm', 'prcs_ld_id', 'asat_dt', 'updt_prcs_nm', 'updt_prcs_ld_id', 'updt_asat_dt'] -%}
4+
{%- set etl_columns = ['load_src', 'load_id', 'load_date', 'updt_load_src', 'updt_load_id', 'updt_load_date'] -%}
45
{% if pk is iterable and pk is not string and pk is not mapping %}
56
{%- set pk_list = pk -%}
67
{% else %}
@@ -13,7 +14,7 @@
1314
{% else %}
1415
{{ raise_undefined('source|query') }}
1516
{% endif %}
16-
{%- set all_columns = columns + pk_list + scd2_columns -%}
17+
{%- set all_columns = columns + pk_list + etl_columns -%}
1718
{%- set data_columns = columns + pk_list -%}
1819
MERGE INTO {{ catalog }}.{{ schema }}.{{ table }} AS target
1920
USING (
Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,62 @@
11
{% include "utils/etl_vars.jinja" %}
2+
{{ raise_undefined('pk') if pk is undefined }}
3+
{% import "macros/delta.jinja" as delta %}
4+
{%- set etl_columns = ['load_src', 'load_id', 'load_date', 'updt_load_src', 'updt_load_id', 'updt_load_date'] -%}
5+
{%- set scd1_columns = ['delete_f'] + etl_columns -%}
6+
{% if pk is iterable and pk is not string and pk is not mapping %}
7+
{%- set pk_list = pk -%}
8+
{% else %}
9+
{%- set pk_list = [pk] -%}
10+
{% endif %}
11+
{% if source is defined %}
12+
{%- set source_query = source|trim -%}
13+
{% elif query is defined %}
14+
{%- set source_query = '( {} )'.format(query) -%}
15+
{% else %}
16+
{{ raise_undefined('source|query') }}
17+
{% endif %}
18+
{%- set all_columns = columns + pk_list + etl_columns -%}
19+
{%- set data_columns = columns + pk_list -%}
220
MERGE INTO {{ catalog }}.{{ schema }}.{{ table }} AS target
321
USING (
422
WITH change_query AS (
523
SELECT
624
src.*,
7-
CASE WHEN tgt.es_id IS NULL THEN 99
8-
WHEN hash({_p_col_without_pk_src_str}) <> hash({_p_col_without_pk_tgt_str}) THEN 1
25+
CASE WHEN tgt.{{ pk_list | first }} IS NULL THEN 99
26+
WHEN hash({{ columns | map_fmt('src.{0}') | join(', ') }}) <> hash({{ columns | map_fmt('tgt.{0}') | join(', ') }}) THEN 1
927
ELSE 0 END AS data_change
10-
FROM ( {query} ) AS src
28+
FROM {{ source_query }} AS src
1129
LEFT JOIN {{ catalog }}.{{ schema }}.{{ table }} AS tgt
12-
ON {' AND '.join(_p_pk_cols_pairs_sub_query)}
30+
ON {{ columns | map_fmt("tgt.{0} = src.{0}") | join('\n\t\t\tAND ') }}
1331
)
1432
SELECT * FROM change_query
1533
) AS source
16-
ON {' AND '.join(_p_pk_cols_pairs)}
34+
ON {{ pk_list | map_fmt('target.{0} = source.{0}') | join('\n\tAND ') }}
1735
WHEN MATCHED AND data_change = 1
1836
THEN UPDATE
1937
SET {', '.join(_p_col_update)}
2038
, target.delete_f = 0
21-
, target.prcs_nm = '{p_process_name}'
22-
, target.prcs_ld_id = {p_process_load_id}
23-
, target.asat_dt = {p_asat_dt}
24-
, target.updt_prcs_nm = '{p_process_name}'
25-
, target.updt_prcs_ld_id = {p_process_load_id}
26-
, target.updt_asat_dt = to_timestamp('{p_asat_dt}', 'yyyyMMdd')
39+
{{ delta.sys_update_match(load_src, load_id, load_date) }}
2740
WHEN MATCHED AND data_change = 0 AND target.delete_f = 1
2841
THEN UPDATE
2942
SET target.delete_f = 0
30-
, target.updt_prcs_nm = '{p_process_name}'
31-
, target.updt_prcs_ld_id = {p_process_load_id}
32-
, target.updt_asat_dt = to_timestamp('{p_asat_dt}', 'yyyyMMdd')
43+
{{ delta.sys_update_match(load_src, load_id, load_date) }}
3344
WHEN NOT MATCHED AND data_change = 99
3445
THEN INSERT
35-
( {', '.join(i.name for i in rs_col_all)} )
46+
(
47+
{{ all_columns | join(', ') }}
48+
)
3649
VALUES (
37-
{', '.join('source.' + i.name for i in rs_col_real)},
38-
0, '{p_process_name}', {p_process_load_id}, {p_asat_dt},
39-
'{p_process_name}', {p_process_load_id}, to_timestamp('{p_asat_dt}', 'yyyyMMdd')
50+
{{ data_columns | map_fmt('source.{0}') | join(',\n\t\t') }},
51+
0,
52+
'{{ load_src }}',
53+
{{ load_id }},
54+
{{ load_date | dt_fmt('%Y%m%d') }},
55+
'{{ load_src }}',
56+
{{ load_id }},
57+
to_timestamp('{{ load_date | dt_fmt('%Y%m%d') }}', 'yyyyMMdd')
4058
)
4159
WHEN NOT MATCHED BY SOURCE AND target.delete_f = 0
4260
THEN UPDATE
4361
SET target.delete_f = 1
44-
, target.updt_prcs_nm = '{p_process_name}'
45-
, target.updt_prcs_ld_id = {p_process_load_id}
46-
, target.updt_asat_dt = to_timestamp('{p_asat_dt}', 'yyyyMMdd')
62+
{{ delta.sys_update_match(load_src, load_id, load_date) }}

templates/macros/delta.jinja

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{% macro sys_update_match(load_src, load_id, load_date) -%}
2-
, target.updt_prcs_nm = '{{ load_src }}'
3-
, target.updt_prcs_ld_id = {{ load_id }}
4-
, target.updt_asat_dt = to_timestamp('{{ load_date | dt_fmt('%Y%m%d') }}', 'yyyyMMdd')
2+
, target.updt_load_src = '{{ load_src }}'
3+
, target.updt_load_id = {{ load_id }}
4+
, target.updt_load_date = to_timestamp('{{ load_date | dt_fmt('%Y%m%d') }}', 'yyyyMMdd')
55
{%- endmacro %}

tests/test_databricks.py

Lines changed: 82 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,12 @@ def test_sql_delta(template_path):
9191
WHEN MATCHED THEN UPDATE
9292
SET target.col01= source.col01
9393
,target.col02= source.col02
94-
, target.updt_prcs_nm = 'SOURCE_FOO'
95-
, target.updt_prcs_ld_id = 1
96-
, target.updt_asat_dt = to_timestamp('20250201', 'yyyyMMdd')
94+
, target.updt_load_src = 'SOURCE_FOO'
95+
, target.updt_load_id = 1
96+
, target.updt_load_date = to_timestamp('20250201', 'yyyyMMdd')
9797
WHEN NOT MATCHED THEN INSERT
9898
(
99-
col01, col02, pk_col, start_dt, end_dt, delete_f, prcs_nm, prcs_ld_id, asat_dt, updt_prcs_nm, updt_prcs_ld_id, updt_asat_dt
99+
col01, col02, pk_col, load_src, load_id, load_date, updt_load_src, updt_load_id, updt_load_date
100100
)
101101
VALUES (
102102
source.col01,
@@ -138,12 +138,12 @@ def test_sql_delta(template_path):
138138
WHEN MATCHED THEN UPDATE
139139
SET target.col01= source.col01
140140
,target.col02= source.col02
141-
, target.updt_prcs_nm = 'SOURCE_FOO'
142-
, target.updt_prcs_ld_id = 1
143-
, target.updt_asat_dt = to_timestamp('20250201', 'yyyyMMdd')
141+
, target.updt_load_src = 'SOURCE_FOO'
142+
, target.updt_load_id = 1
143+
, target.updt_load_date = to_timestamp('20250201', 'yyyyMMdd')
144144
WHEN NOT MATCHED THEN INSERT
145145
(
146-
col01, col02, pk_col01, pk_col02, start_dt, end_dt, delete_f, prcs_nm, prcs_ld_id, asat_dt, updt_prcs_nm, updt_prcs_ld_id, updt_asat_dt
146+
col01, col02, pk_col01, pk_col02, load_src, load_id, load_date, updt_load_src, updt_load_id, updt_load_date
147147
)
148148
VALUES (
149149
source.col01,
@@ -158,3 +158,77 @@ def test_sql_delta(template_path):
158158
to_timestamp('20250201', 'yyyyMMdd')
159159
)
160160
""").strip('\n')
161+
162+
163+
def test_sql_scd1_soft_delete(template_path):
164+
select_sql: SQLPlate = (
165+
SQLPlate.system('databricks', path=template_path)
166+
.template('etl.scd1-soft-delete')
167+
.option('catalog', 'catalog-name')
168+
.option('schema', 'schema-name')
169+
.option('table', 'table-name')
170+
.option('pk', 'pk_col')
171+
.option('load_src', 'SOURCE_FOO')
172+
.option('load_id', 1)
173+
.option('load_date', datetime(2025, 2, 1, 10))
174+
)
175+
statement: str = (
176+
select_sql
177+
.option('columns', ['col01', 'col02'])
178+
.option('query', 'SELECT * FROM catalog-name.schema-name.source-name')
179+
.load()
180+
)
181+
assert prepare_statement(statement) == dedent("""
182+
MERGE INTO catalog-name.schema-name.table-name AS target
183+
USING (
184+
WITH change_query AS (
185+
SELECT
186+
src.*,
187+
CASE WHEN tgt.pk_col IS NULL THEN 99
188+
WHEN hash(src.col01, src.col02) <> hash(tgt.col01, tgt.col02) THEN 1
189+
ELSE 0 END AS data_change
190+
FROM ( SELECT * FROM catalog-name.schema-name.source-name ) AS src
191+
LEFT JOIN catalog-name.schema-name.table-name AS tgt
192+
ON tgt.col01 = src.col01
193+
AND tgt.col02 = src.col02
194+
)
195+
SELECT * FROM change_query
196+
) AS source
197+
ON target.pk_col = source.pk_col
198+
WHEN MATCHED AND data_change = 1
199+
THEN UPDATE
200+
SET {', '.join(_p_col_update)}
201+
, target.delete_f = 0
202+
, target.updt_load_src = 'SOURCE_FOO'
203+
, target.updt_load_id = 1
204+
, target.updt_load_date = to_timestamp('20250201', 'yyyyMMdd')
205+
WHEN MATCHED AND data_change = 0 AND target.delete_f = 1
206+
THEN UPDATE
207+
SET target.delete_f = 0
208+
, target.updt_load_src = 'SOURCE_FOO'
209+
, target.updt_load_id = 1
210+
, target.updt_load_date = to_timestamp('20250201', 'yyyyMMdd')
211+
WHEN NOT MATCHED AND data_change = 99
212+
THEN INSERT
213+
(
214+
col01, col02, pk_col, load_src, load_id, load_date, updt_load_src, updt_load_id, updt_load_date
215+
)
216+
VALUES (
217+
source.col01,
218+
source.col02,
219+
source.pk_col,
220+
0,
221+
'SOURCE_FOO',
222+
1,
223+
20250201,
224+
'SOURCE_FOO',
225+
1,
226+
to_timestamp('20250201', 'yyyyMMdd')
227+
)
228+
WHEN NOT MATCHED BY SOURCE AND target.delete_f = 0
229+
THEN UPDATE
230+
SET target.delete_f = 1
231+
, target.updt_load_src = 'SOURCE_FOO'
232+
, target.updt_load_id = 1
233+
, target.updt_load_date = to_timestamp('20250201', 'yyyyMMdd')
234+
""").strip('\n')

0 commit comments

Comments
 (0)