Skip to content

Commit 75e9d30

Browse files
committed
⭐ hl: add databricks/etl.scd2-transaction template.
1 parent d46ee6c commit 75e9d30

12 files changed

+360
-305
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ default_language_version:
44

55
repos:
66
- repo: https://github.com/korawica/clishelf
7-
rev: v0.2.17
7+
rev: v0.2.19
88
hooks:
99
- id: shelf-commit-msg

src/sqlplate/params.py

Lines changed: 0 additions & 22 deletions
This file was deleted.

src/sqlplate/sqlplate.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from __future__ import annotations
77

88
from pathlib import Path
9-
from typing import Any, Iterator
9+
from typing import Any, Iterator, Optional
1010

1111
from jinja2 import Template
1212

@@ -31,13 +31,13 @@ def __init__(self, name: str, path: Path) -> None:
3131
self.path: Path = path
3232

3333
# NOTE: Make default arguments.
34-
self._template_name: str | None = None
35-
self._template_type: str | None = None
36-
self._template: Template | None = None
34+
self._template_name: Optional[str] = None
35+
self._template_type: Optional[str] = None
36+
self._template: Optional[Template] = None
3737
self._option: dict[str, Any] = {}
3838

3939
@classmethod
40-
def format(cls, name: str, path: Path | None = None) -> 'SQLPlate':
40+
def format(cls, name: str, path: Optional[Path] = None) -> 'SQLPlate':
4141
"""Construction this class from a system value name.
4242
4343
Args:

templates/base.jinja

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
{% if not only_main %}
2-
{% block revert_statement %}
3-
{% endblock revert_statement %}
4-
{% endif %}
1+
{% if not only_main -%}
2+
{% block revert_statement %}
3+
{% endblock revert_statement %}
4+
{%- endif %}
55
{% block statement %}
66
{% endblock statement -%}

templates/databricks/etl.delta.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
{% extends "base.jinja" %}
2+
23
{% include "utils/etl_vars.jinja" with context %}
34
{{ raise_undefined('pk') if pk is undefined }}
5+
46
{% import "databricks/macros/delta.jinja" as delta %}
57
{% from "databricks/macros/utils.jinja" import hash, prepare_source %}
68

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
{% extends "base.jinja" %}
2+
{% include "utils/etl_vars.jinja" with context %}
3+
{% from "databricks/macros/utils.jinja" import prepare_source %}
4+
5+
{%- set all_columns = columns + scd2_columns -%}
6+
7+
{% block revert_statement %}
8+
DELETE FROM {{ catalog }}.{{ schema }}.{{ table }}
9+
WHERE load_src = '{{ load_src }}'
10+
AND load_date = {{ load_date | dt_fmt('%Y%m%d') }}
11+
;
12+
{% endblock revert_statement %}
13+
14+
{% block statement %}
15+
INSERT INTO {{ catalog }}.{{ schema }}.{{ table }}
16+
PARTITION ( load_date = {{ load_date | dt_fmt('%Y%m%d') }} )
17+
( {% for col in all_columns -%}
18+
{%- if col != 'load_date' -%}
19+
{%- if not loop.last -%}
20+
{{ '{}, '.format(col) }}
21+
{%- else -%}
22+
{{ '{}'.format(col) }}
23+
{%- endif -%}
24+
{%- endif -%}
25+
{%- endfor %} )
26+
SELECT
27+
{{ columns | join('\n\t,') }}
28+
, to_timestamp('{{ load_date | dt_fmt('%Y%m%d') }}', 'yyyyMMdd') AS start_date
29+
, to_timestamp('9999-12-31', 'yyyy-MM-dd') AS end_date
30+
, '{{ load_src }}' AS load_src
31+
, {{ load_id }} AS load_id
32+
, '{{ load_src }}' AS updt_load_src
33+
, {{ load_id }} AS updt_load_id
34+
, to_timestamp('{{ load_date | dt_fmt('%Y%m%d') }}', 'yyyyMMdd') AS updt_load_date
35+
FROM {{ prepare_source(source, query) }} AS sub_query
36+
;
37+
{% endblock statement %}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
MERGE INTO catalog-name.schema-name.table-name AS target
2+
USING (
3+
WITH change_query AS (
4+
SELECT
5+
src.*,
6+
CASE WHEN tgt.pk_col IS NULL THEN 99
7+
WHEN hash(src.col01, src.col02) <> hash(tgt.col01, tgt.col02) THEN 1
8+
ELSE 0 END AS data_change
9+
FROM ( SELECT * FROM catalog-name.schema-name.source-name ) AS src
10+
LEFT JOIN catalog-name.schema-name.table-name AS tgt
11+
ON tgt.col01 = src.col01
12+
AND tgt.col02 = src.col02
13+
)
14+
SELECT * FROM change_query
15+
) AS source
16+
ON target.pk_col = source.pk_col
17+
WHEN MATCHED AND data_change = 1
18+
THEN UPDATE
19+
SET target.col01= source.col01
20+
,target.col02= source.col02
21+
, target.delete_f = 0
22+
, target.updt_load_src = 'SOURCE_FOO'
23+
, target.updt_load_id = 1
24+
, target.updt_load_date = to_timestamp('20250201', 'yyyyMMdd')
25+
WHEN MATCHED AND data_change = 0 AND target.delete_f = 1
26+
THEN UPDATE
27+
SET target.delete_f = 0
28+
, target.updt_load_src = 'SOURCE_FOO'
29+
, target.updt_load_id = 1
30+
, target.updt_load_date = to_timestamp('20250201', 'yyyyMMdd')
31+
WHEN NOT MATCHED AND data_change = 99
32+
THEN INSERT
33+
(
34+
col01, col02, pk_col, delete_f, load_src, load_id, load_date, updt_load_src, updt_load_id, updt_load_date
35+
)
36+
VALUES (
37+
source.col01,
38+
source.col02,
39+
source.pk_col,
40+
0,
41+
'SOURCE_FOO',
42+
1,
43+
20250201,
44+
'SOURCE_FOO',
45+
1,
46+
to_timestamp('20250201', 'yyyyMMdd')
47+
)
48+
WHEN NOT MATCHED BY SOURCE AND target.delete_f = 0
49+
THEN UPDATE
50+
SET target.delete_f = 1
51+
, target.updt_load_src = 'SOURCE_FOO'
52+
, target.updt_load_id = 1
53+
, target.updt_load_date = to_timestamp('20250201', 'yyyyMMdd')
54+
;
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
DELETE FROM catalog-name.schema-name.table-name
2+
WHERE
3+
load_date >= 20250201,
4+
AND load_src = 'SOURCE_FOO'
5+
;
6+
UPDATE catalog-name.schema-name.table-name
7+
SET delete_f = 0
8+
, updt_load_src = 'SOURCE_FOO'
9+
, updt_load_id = 1
10+
, updt_load_date = to_timestamp('20250201', 'yyyyMMdd')
11+
WHERE
12+
delete_f = 1
13+
AND updt_load_src = 'SOURCE_FOO'
14+
AND updt_load_date >= to_timestamp('20250201', 'yyyyMMdd')
15+
;
16+
MERGE INTO catalog-name.schema-name.table-name AS target
17+
USING (
18+
WITH change_query AS (
19+
SELECT
20+
src.*,
21+
CASE WHEN tgt.pk_col IS NULL THEN 99
22+
WHEN hash(src.col01, src.col02) <> hash(tgt.col01, tgt.col02) THEN 1
23+
ELSE 0 END AS data_change
24+
FROM ( SELECT * FROM catalog-name.schema-name.source-name ) AS src
25+
LEFT JOIN catalog-name.schema-name.table-name AS tgt
26+
ON tgt.col01 = src.col01
27+
AND tgt.col02 = src.col02
28+
)
29+
SELECT * FROM change_query
30+
) AS source
31+
ON target.pk_col = source.pk_col
32+
WHEN MATCHED AND data_change = 1
33+
THEN UPDATE
34+
SET target.col01= source.col01
35+
,target.col02= source.col02
36+
, target.delete_f = 0
37+
, target.updt_load_src = 'SOURCE_FOO'
38+
, target.updt_load_id = 1
39+
, target.updt_load_date = to_timestamp('20250201', 'yyyyMMdd')
40+
WHEN MATCHED AND data_change = 0 AND target.delete_f = 1
41+
THEN UPDATE
42+
SET target.delete_f = 0
43+
, target.updt_load_src = 'SOURCE_FOO'
44+
, target.updt_load_id = 1
45+
, target.updt_load_date = to_timestamp('20250201', 'yyyyMMdd')
46+
WHEN NOT MATCHED AND data_change = 99
47+
THEN INSERT
48+
(
49+
col01, col02, pk_col, delete_f, load_src, load_id, load_date, updt_load_src, updt_load_id, updt_load_date
50+
)
51+
VALUES (
52+
source.col01,
53+
source.col02,
54+
source.pk_col,
55+
0,
56+
'SOURCE_FOO',
57+
1,
58+
20250201,
59+
'SOURCE_FOO',
60+
1,
61+
to_timestamp('20250201', 'yyyyMMdd')
62+
)
63+
WHEN NOT MATCHED BY SOURCE AND target.delete_f = 0
64+
THEN UPDATE
65+
SET target.delete_f = 1
66+
, target.updt_load_src = 'SOURCE_FOO'
67+
, target.updt_load_id = 1
68+
, target.updt_load_date = to_timestamp('20250201', 'yyyyMMdd')
69+
;
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
DELETE FROM catalog-name.schema-name.table-name
2+
WHERE
3+
load_date >= 20250201,
4+
AND load_src = 'SOURCE_FOO'
5+
;
6+
UPDATE catalog-name.schema-name.table-name
7+
SET end_dt = '9999-12-31'
8+
, delete_f = 0
9+
, updt_load_src = 'SOURCE_FOO'
10+
, updt_load_id = 1
11+
, updt_load_date = to_timestamp('20250201', 'yyyyMMdd')
12+
WHERE
13+
end_dt = DATEADD(DAY, -1, to_timestamp('20250201', 'yyyyMMdd'))
14+
AND updt_load_src = 'SOURCE_FOO'
15+
AND updt_load_date >= to_timestamp('20250201', 'yyyyMMdd')
16+
;
17+
MERGE INTO catalog-name.schema-name.table-name AS target
18+
USING (
19+
WITH change_query AS (
20+
SELECT
21+
src.*,
22+
CASE WHEN tgt.pk_col IS NULL THEN 99
23+
WHEN hash(src.col01, src.col02) <> hash(tgt.col01, tgt.col02) THEN 1
24+
ELSE 0 END AS data_change
25+
FROM ( SELECT * FROM catalog-name.schema-name.source-name ) AS src
26+
LEFT JOIN catalog-name.schema-name.{p_table_name} AS tgt
27+
ON tgt.end_dt = '9999-12-31'
28+
AND tgt.col01 = src.col01
29+
AND tgt.col02 = src.col02
30+
)
31+
SELECT pk_col AS merge_pk_col, * FROM change_query WHERE data_change == 1
32+
UNION ALL
33+
SELECT null AS merge_pk_col, * FROM change_query WHERE data_change = (1, 99)
34+
) AS source
35+
ON target.pk_col = source.merge_pk_col
36+
WHEN MATCHED AND source.data_change = 1
37+
THEN UPDATE
38+
SET target.col01= source.col01
39+
,target.col02= source.col02
40+
, target.end_dt = DATEADD(DAY, -1, to_timestamp('20250201', 'yyyyMMdd'))
41+
, target.updt_load_src = 'SOURCE_FOO'
42+
, target.updt_load_id = 1
43+
, target.updt_load_date = to_timestamp('20250201', 'yyyyMMdd')
44+
WHEN NOT MATCHED AND source.data_change IN (1, 99)
45+
THEN INSERT
46+
(
47+
col01, col02, pk_col, start_date, end_date, delete_f, load_src, load_id, load_date, updt_load_src, updt_load_id, updt_load_date
48+
)
49+
VALUES (
50+
source.col01
51+
,source.col02
52+
,source.pk_col
53+
, to_timestamp('20250201', 'yyyyMMdd')
54+
, to_timestamp('9999-12-31', 'yyyy-MM-dd')
55+
, 0
56+
, 'SOURCE_FOO'
57+
, 1
58+
, 20250201
59+
, 'SOURCE_FOO'
60+
, 1
61+
, to_timestamp('20250201', 'yyyyMMdd')
62+
)
63+
WHEN NOT MATCHED BY SOURCE
64+
AND target.end_dt = to_timestamp('9999-12-31', 'yyyy-MM-dd')
65+
AND target.prcs_nm = '{p_process_name}'
66+
THEN UPDATE
67+
SET target.delete_f = 1
68+
, target.end_dt = DATEADD(DAY, -1, to_timestamp('20250201', 'yyyyMMdd'))
69+
, target.updt_load_src = 'SOURCE_FOO'
70+
, target.updt_load_id = 1
71+
, target.updt_load_date = to_timestamp('20250201', 'yyyyMMdd')
72+
;
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
DELETE FROM catalog-name.schema-name.table-name
2+
WHERE
3+
load_date >= 20250201,
4+
AND load_src = 'SOURCE_FOO'
5+
;
6+
UPDATE catalog-name.schema-name.table-name
7+
SET end_dt = '9999-12-31'
8+
, delete_f = 0
9+
, updt_load_src = 'SOURCE_FOO'
10+
, updt_load_id = 1
11+
, updt_load_date = to_timestamp('20250201', 'yyyyMMdd')
12+
WHERE
13+
end_dt = DATEADD(DAY, -1, to_timestamp('20250201', 'yyyyMMdd'))
14+
AND updt_load_src = 'SOURCE_FOO'
15+
AND updt_load_date >= to_timestamp('20250201', 'yyyyMMdd')
16+
;
17+
MERGE INTO catalog-name.schema-name.table-name AS target
18+
USING (
19+
WITH change_query AS (
20+
SELECT
21+
src.*,
22+
CASE WHEN tgt.pk_col IS NULL THEN 99
23+
WHEN hash(src.col01, src.col02) <> hash(tgt.col01, tgt.col02) THEN 1
24+
ELSE 0 END AS data_change
25+
FROM ( SELECT * FROM catalog-name.schema-name.source-name ) AS src
26+
LEFT JOIN catalog-name.schema-name.{p_table_name} AS tgt
27+
ON tgt.end_dt = '9999-12-31'
28+
AND tgt.col01 = src.col01
29+
AND tgt.col02 = src.col02
30+
)
31+
SELECT pk_col AS merge_pk_col, * FROM change_query WHERE data_change == 1
32+
UNION ALL
33+
SELECT null AS merge_pk_col, * FROM change_query WHERE data_change = (1, 99)
34+
) AS source
35+
ON target.pk_col = source.merge_pk_col
36+
WHEN MATCHED AND source.data_change = 1
37+
THEN UPDATE
38+
SET target.col01= source.col01
39+
,target.col02= source.col02
40+
, target.end_dt = DATEADD(DAY, -1, to_timestamp('20250201', 'yyyyMMdd'))
41+
, target.updt_load_src = 'SOURCE_FOO'
42+
, target.updt_load_id = 1
43+
, target.updt_load_date = to_timestamp('20250201', 'yyyyMMdd')
44+
WHEN NOT MATCHED AND source.data_change IN (1, 99)
45+
THEN INSERT
46+
(
47+
col01, col02, pk_col, start_date, end_date, delete_f, load_src, load_id, load_date, updt_load_src, updt_load_id, updt_load_date
48+
)
49+
VALUES (
50+
source.col01
51+
,source.col02
52+
,source.pk_col
53+
, to_timestamp('20250201', 'yyyyMMdd')
54+
, to_timestamp('9999-12-31', 'yyyy-MM-dd')
55+
, 0
56+
, 'SOURCE_FOO'
57+
, 1
58+
, 20250201
59+
, 'SOURCE_FOO'
60+
, 1
61+
, to_timestamp('20250201', 'yyyyMMdd')
62+
)
63+
;

0 commit comments

Comments
 (0)