Skip to content

Commit ec36c8f

Browse files
committed
🎯 feat: add macros and utils for dynamic template generator.
1 parent f94001c commit ec36c8f

File tree

18 files changed

+298
-82
lines changed

18 files changed

+298
-82
lines changed

README.md

Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,13 @@ statement.
88
```text
99
📂templates/
1010
├─ 📂databricks/
11-
│ ├─ 📜 etl-delta.sql
12-
│ ╰─ 📜 etl-scd2.sql
13-
╰─ 📂synapse-dedicate/
14-
╰─ 📜 delta.sql
11+
│ ├─ 📜 etl.delta.sql
12+
│ ├─ 📜 etl.scd2.sql
13+
│ ╰─ 📜 select.sql
14+
├─ 📂synapse-dedicate/
15+
│ ╰─ 📜 etl.delta.sql
16+
╰─ 📂utils/
17+
╰─ 📜 etl_vars.jinja
1518
```
1619

1720
## :package: Installation
@@ -24,32 +27,63 @@ pip install -U sqlplate
2427

2528
```python
2629
from datetime import datetime
27-
from sqlplate import SQL
30+
from sqlplate import SQLPlate
2831

2932
statement: str = (
30-
SQL.system('databricks')
31-
.template('etl.scd2')
33+
SQLPlate.system('databricks')
34+
.template('etl.delta')
3235
.option('catalog', 'catalog-name')
3336
.option('schema', 'schema-name')
3437
.option('table', 'table-name')
35-
.option('pk', ['pk_col_name'])
36-
.option('columns', ['pk_col_name', 'col_01', 'col_02', 'col_03'])
37-
.option('load_date', f"{datetime.now():%Y-%m-%d %H:%M:%S}")
38-
.option('load_src', 'SOURCE_SYS_FOO')
38+
.option('pk', 'pk_col')
39+
.option('columns', ['col01', 'col02'])
40+
.option('query', 'SELECT * FROM catalog-name.schema-name.source-name')
41+
.option('load_src', 'SOURCE_FOO')
42+
.option('load_id', 1)
43+
.option('load_date', datetime(2025, 2, 1, 10))
3944
.load()
4045
)
41-
print(statement)
46+
print(statement.strip().strip('\n'))
4247
```
4348

4449
```sql
4550
MERGE INTO catalog-name.schema-name.table-name AS target
4651
USING (
47-
...
52+
WITH change_query AS (
53+
SELECT
54+
src.*,
55+
CASE WHEN tgt.pk_col IS NULL THEN 99
56+
WHEN hash(src.col01, src.col02) <> hash(tgt.col01, tgt.col02) THEN 1
57+
ELSE 0 END AS data_change
58+
FROM ( SELECT * FROM catalog-name.schema-name.source-name ) AS src
59+
LEFT JOIN catalog-name.schema-name.table-name AS tgt
60+
ON tgt.col01 = src.col01
61+
AND tgt.col02 = src.col02
62+
)
63+
SELECT * EXCEPT( data_change ) FROM change_query WHERE data_change IN (99, 1)
4864
) AS source
49-
ON target.pk_col_name = source.pk_col_name
50-
WHEN MATCH AND source.data_change = 1
51-
THEN UPDATE
52-
SET ...
65+
ON target.pk_col = source.pk_col
66+
WHEN MATCHED THEN UPDATE
67+
SET target.col01 = source.col01
68+
, target.col02 = source.col02
69+
, target.updt_prcs_nm = 'SOURCE_FOO'
70+
, target.updt_prcs_ld_id = 1
71+
, target.updt_asat_dt = to_timestamp('20250201', 'yyyyMMdd')
72+
WHEN NOT MATCHED THEN INSERT
73+
(
74+
col01, col02, pk_col, start_dt, end_dt, delete_f, prcs_nm, prcs_ld_id, asat_dt, updt_prcs_nm, updt_prcs_ld_id, updt_asat_dt
75+
)
76+
VALUES (
77+
source.col01,
78+
source.col02,
79+
source.pk_col,
80+
'SOURCE_FOO',
81+
1,
82+
20250201,
83+
'SOURCE_FOO',
84+
1,
85+
to_timestamp('20250201', 'yyyyMMdd')
86+
)
5387
```
5488

5589
## Systems

src/sqlplate/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
from .core import SQL
1+
from .core import SQLPlate

src/sqlplate/conf.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# ------------------------------------------------------------------------------
2+
# Copyright (c) 2022 Korawich Anuttra. All rights reserved.
3+
# Licensed under the MIT License. See LICENSE in the project root for
4+
# license information.
5+
# ------------------------------------------------------------------------------
6+
from __future__ import annotations
7+
8+
9+
class BaseConf:
10+
scd2_columns: list[str] = [
11+
'start_dt',
12+
'end_dt',
13+
'delete_f',
14+
'prcs_nm',
15+
'prcs_ld_id',
16+
'asat_dt',
17+
'updt_prcs_nm',
18+
'updt_prcs_ld_id',
19+
'updt_asat_dt',
20+
]

src/sqlplate/core.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414
from .exceptions import TemplateNotSet
1515

1616

17-
class SQL:
17+
class SQLPlate:
1818
"""A SQL object for render any SQL template that prepare by Jinja package."""
1919

20-
def __init__(self, name: str, path: Path):
20+
def __init__(self, name: str, path: Path) -> None:
2121
self.name: str = name
2222

2323
if not path.exists():
@@ -28,29 +28,29 @@ def __init__(self, name: str, path: Path):
2828
self._option: dict[str, Any] = {}
2929

3030
@classmethod
31-
def system(cls, name: str, path: Path | None = None) -> 'SQL':
31+
def system(cls, name: str, path: Path | None = None) -> 'SQLPlate':
3232
"""Construction this class from a system value name.
3333
3434
Args:
35-
name (str): A system name of the SQL template.
35+
name (str): A system name of the SQLPlate template.
3636
path (Path | None): A template path.
3737
"""
3838
if path is None:
3939
path: Path = Path('./templates')
4040
return cls(name=name, path=path)
4141

42-
def template(self, name: str) -> 'SQL':
42+
def template(self, name: str) -> 'SQLPlate':
4343
"""Create template object attribute on this instance."""
4444
self._template: Template = (
4545
get_env(self.path).get_template(f'{self.name}/{name}.sql')
4646
)
4747
return self
4848

49-
def option(self, key: str, value: Any) -> 'SQL':
49+
def option(self, key: str, value: Any) -> 'SQLPlate':
5050
self._option[key] = value
5151
return self
5252

53-
def options(self, values: dict[str, Any]) -> 'SQL':
53+
def options(self, values: dict[str, Any]) -> 'SQLPlate':
5454
self._option = self._option | values
5555
return self
5656

src/sqlplate/exceptions.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,9 @@
1+
# ------------------------------------------------------------------------------
2+
# Copyright (c) 2022 Korawich Anuttra. All rights reserved.
3+
# Licensed under the MIT License. See LICENSE in the project root for
4+
# license information.
5+
# ------------------------------------------------------------------------------
6+
from __future__ import annotations
7+
8+
19
class TemplateNotSet(Exception): ...

src/sqlplate/params.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
# ------------------------------------------------------------------------------
2+
# Copyright (c) 2022 Korawich Anuttra. All rights reserved.
3+
# Licensed under the MIT License. See LICENSE in the project root for
4+
# license information.
5+
# ------------------------------------------------------------------------------
6+
from __future__ import annotations
7+
18
from dataclasses import dataclass
29

310

src/sqlplate/utils.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
# ------------------------------------------------------------------------------
66
from __future__ import annotations
77

8+
from datetime import datetime
89
from pathlib import Path
910

1011
from jinja2 import Environment, PackageLoader
@@ -21,17 +22,23 @@ def map_fmt(value: list[str], fmt: str) -> list[str]:
2122
return [fmt.format(i) for i in value]
2223

2324

24-
def raise_undefined(value: str):
25+
def raise_undefined(value: str) -> None:
26+
"""Raise with UndefinedError for a needed variable on the Jinja template."""
2527
if len(value.split('|')) > 1:
2628
value: str = "' or '".join(value.split('|'))
2729
raise UndefinedError(f"The '{value}' is undefined")
2830

2931

32+
def dt_fmt(value: datetime, fmt: str) -> str:
33+
"""Format a datetime object to string value."""
34+
return value.strftime(fmt)
35+
36+
3037
def get_env(
3138
path: Path,
3239
trim_blocks: bool = True,
3340
lstrip_blocks: bool = True,
34-
):
41+
) -> Environment:
3542
"""Get jinja environment object for the SQL template files.
3643
3744
Args:
@@ -48,5 +55,6 @@ def get_env(
4855
lstrip_blocks=lstrip_blocks
4956
)
5057
env.filters['map_fmt'] = map_fmt
58+
env.filters['dt_fmt'] = dt_fmt
5159
env.globals['raise_undefined'] = raise_undefined
5260
return env

templates/databricks/etl.delta.bk.sql

Lines changed: 0 additions & 33 deletions
This file was deleted.

templates/databricks/etl.delta.sql

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,48 @@
1-
{{ raise_undefined('columns') if columns is undefined }}
1+
{% include "utils/etl_vars.jinja" %}
2+
{% import "macros/delta.jinja" as delta %}
3+
{%- set scd2_columns = ['start_dt', 'end_dt', 'delete_f', 'prcs_nm', 'prcs_ld_id', 'asat_dt', 'updt_prcs_nm', 'updt_prcs_ld_id', 'updt_asat_dt'] -%}
4+
{% if pk is iterable and pk is not string and pk is not mapping %}
5+
{%- set pk_list = pk -%}
6+
{% else %}
7+
{%- set pk_list = [pk] -%}
8+
{% endif %}
9+
{% if source is defined %}
10+
{%- set source_query = source|trim -%}
11+
{% elif query is defined %}
12+
{%- set source_query = '( {} )'.format(query) -%}
13+
{% else %}
14+
{{ raise_undefined('source|query') }}
15+
{% endif %}
16+
{%- set all_columns = columns + pk_list + scd2_columns -%}
17+
{%- set data_columns = columns + pk_list -%}
218
MERGE INTO {{ catalog }}.{{ schema }}.{{ table }} AS target
319
USING (
420
WITH change_query AS (
521
SELECT
622
src.*,
7-
CASE WHEN tgt.{% if pk is iterable and pk is not string and pk is not mapping %}{{ pk | first }}{% else %}{{ pk }}{% endif %} IS NULL THEN 99
23+
CASE WHEN tgt.{{ pk_list | first }} IS NULL THEN 99
824
WHEN hash({{ columns | map_fmt('src.{0}') | join(', ') }}) <> hash({{ columns | map_fmt('tgt.{0}') | join(', ') }}) THEN 1
925
ELSE 0 END AS data_change
10-
FROM {% if source is defined %}{{ source | trim }}{% elif query is defined %}{{ '( {} )'.format(query) }}{% else %}{{ raise_undefined('source|query') }}{% endif %} AS src
26+
FROM {{ source_query }} AS src
1127
LEFT JOIN {{ catalog }}.{{ schema }}.{{ table }} AS tgt
12-
ON {{ columns | map_fmt("tgt.{0} = src.{0}") | join(' AND ') }}
28+
ON {{ columns | map_fmt("tgt.{0} = src.{0}") | join('\n\t\t\tAND ') }}
1329
)
1430
SELECT * EXCEPT( data_change ) FROM change_query WHERE data_change IN (99, 1)
1531
) AS source
16-
ON {% if pk is iterable and pk is not string and pk is not mapping %}{{ pk | map_fmt('target.{0} = source.{0}') | join(' AND ') }}{% else %}{{ 'target.{0} = source.{0}'.format(pk) }}{% endif %}
32+
ON {{ pk_list | map_fmt('target.{0} = source.{0}') | join('\n\tAND ') }}
1733
WHEN MATCHED THEN UPDATE
18-
SET {', '.join(_p_col_update)}
19-
, target.updt_prcs_nm = '{{ load_src }}'
20-
, target.updt_prcs_ld_id = {{ load_id }}
21-
, target.updt_asat_dt = to_timestamp('{p_asat_dt}', 'yyyyMMdd')
34+
SET {{ columns | map_fmt("target.{0}\t\t\t= source.{0}") | join('\n\t,\t') }}
35+
{{ delta.sys_update_match(load_src, load_id, load_date) }}
2236
WHEN NOT MATCHED THEN INSERT
2337
(
24-
{', '.join(i.name for i in rs_col_all)}
38+
{{ all_columns | join(', ') }}
2539
)
2640
VALUES (
27-
{', '.join('source.' + i.name for i in rs_col_real)},
41+
{{ data_columns | map_fmt('source.{0}') | join(',\n\t\t') }},
2842
'{{ load_src }}',
2943
{{ load_id }},
30-
{p_asat_dt},
44+
{{ load_date | dt_fmt('%Y%m%d') }},
3145
'{{ load_src }}',
3246
{{ load_id }},
33-
to_timestamp('{p_asat_dt}', 'yyyyMMdd')
47+
to_timestamp('{{ load_date | dt_fmt('%Y%m%d') }}', 'yyyyMMdd')
3448
)
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
{% include "utils/etl_vars.jinja" %}
2+
MERGE INTO {{ catalog }}.{{ schema }}.{{ table }} AS target
3+
USING (
4+
WITH change_query AS (
5+
SELECT
6+
src.*,
7+
CASE WHEN tgt.es_id IS NULL THEN 99
8+
WHEN hash({_p_col_without_pk_src_str}) <> hash({_p_col_without_pk_tgt_str}) THEN 1
9+
ELSE 0 END AS data_change
10+
FROM ( {query} ) AS src
11+
LEFT JOIN {{ catalog }}.{{ schema }}.{{ table }} AS tgt
12+
ON {' AND '.join(_p_pk_cols_pairs_sub_query)}
13+
)
14+
SELECT * FROM change_query
15+
) AS source
16+
ON {' AND '.join(_p_pk_cols_pairs)}
17+
WHEN MATCHED AND data_change = 1
18+
THEN UPDATE
19+
SET {', '.join(_p_col_update)}
20+
, target.delete_f = 0
21+
, target.prcs_nm = '{p_process_name}'
22+
, target.prcs_ld_id = {p_process_load_id}
23+
, target.asat_dt = {p_asat_dt}
24+
, target.updt_prcs_nm = '{p_process_name}'
25+
, target.updt_prcs_ld_id = {p_process_load_id}
26+
, target.updt_asat_dt = to_timestamp('{p_asat_dt}', 'yyyyMMdd')
27+
WHEN MATCHED AND data_change = 0 AND target.delete_f = 1
28+
THEN UPDATE
29+
SET target.delete_f = 0
30+
, target.updt_prcs_nm = '{p_process_name}'
31+
, target.updt_prcs_ld_id = {p_process_load_id}
32+
, target.updt_asat_dt = to_timestamp('{p_asat_dt}', 'yyyyMMdd')
33+
WHEN NOT MATCHED AND data_change = 99
34+
THEN INSERT
35+
( {', '.join(i.name for i in rs_col_all)} )
36+
VALUES (
37+
{', '.join('source.' + i.name for i in rs_col_real)},
38+
0, '{p_process_name}', {p_process_load_id}, {p_asat_dt},
39+
'{p_process_name}', {p_process_load_id}, to_timestamp('{p_asat_dt}', 'yyyyMMdd')
40+
)
41+
WHEN NOT MATCHED BY SOURCE AND target.delete_f = 0
42+
THEN UPDATE
43+
SET target.delete_f = 1
44+
, target.updt_prcs_nm = '{p_process_name}'
45+
, target.updt_prcs_ld_id = {p_process_load_id}
46+
, target.updt_asat_dt = to_timestamp('{p_asat_dt}', 'yyyyMMdd')

0 commit comments

Comments
 (0)