11from datetime import datetime
22from textwrap import dedent
3+ from typing import Callable
4+ from functools import partial
35
46import pytest
57from jinja2 .exceptions import UndefinedError
68from src .sqlplate import SQLPlate
79
8- from .utils import prepare_statement
10+ from tests .utils import prepare_statement
11+
12+
13+ @pytest .fixture (scope = 'module' )
14+ def respect_databricks (respect_sql : Callable [[str , str ], str ]) -> Callable [[str ], str ]:
15+ return partial (respect_sql , fmt = 'databricks' )
916
1017
1118def test_sql_select (template_path ):
1219 select_sql : SQLPlate = (
13- SQLPlate .system ('databricks' , path = template_path )
20+ SQLPlate .format ('databricks' , path = template_path )
1421 .template ('select' )
1522 .option ('schema' , 'schema-name' )
1623 .option ('table' , 'table-name' )
@@ -49,9 +56,9 @@ def test_sql_select(template_path):
4956 )
5057
5158
52- def test_sql_delta (template_path ):
59+ def test_sql_delta (template_path , respect_databricks ):
5360 select_sql : SQLPlate = (
54- SQLPlate .system ('databricks' , path = template_path )
61+ SQLPlate .format ('databricks' , path = template_path )
5562 .template ('etl.delta' )
5663 .option ('catalog' , 'catalog-name' )
5764 .option ('schema' , 'schema-name' )
@@ -71,110 +78,20 @@ def test_sql_delta(template_path):
7178 .option ('query' , 'SELECT * FROM catalog-name.schema-name.source-name' )
7279 .load ()
7380 )
74- assert prepare_statement (statement ) == dedent ("""
75- DELETE FROM catalog-name.schema-name.table-name
76- WHERE
77- load_src = 'SOURCE_FOO'
78- AND load_date = 20250201
79- ;
80- MERGE INTO catalog-name.schema-name.table-name AS target
81- USING (
82- WITH change_query AS (
83- SELECT
84- src.*,
85- CASE WHEN tgt.pk_col IS NULL THEN 99
86- WHEN hash(src.col01, src.col02) <> hash(tgt.col01, tgt.col02) THEN 1
87- ELSE 0 END AS data_change
88- FROM ( SELECT * FROM catalog-name.schema-name.source-name ) AS src
89- LEFT JOIN catalog-name.schema-name.table-name AS tgt
90- ON tgt.col01 = src.col01
91- AND tgt.col02 = src.col02
92- )
93- SELECT * EXCEPT( data_change ) FROM change_query WHERE data_change IN (99, 1)
94- ) AS source
95- ON target.pk_col = source.pk_col
96- WHEN MATCHED THEN UPDATE
97- SET target.col01= source.col01
98- ,target.col02= source.col02
99- , target.updt_load_src = 'SOURCE_FOO'
100- , target.updt_load_id = 1
101- , target.updt_load_date = to_timestamp('20250201', 'yyyyMMdd')
102- WHEN NOT MATCHED THEN INSERT
103- (
104- col01, col02, pk_col, load_src, load_id, load_date, updt_load_src, updt_load_id, updt_load_date
105- )
106- VALUES (
107- source.col01,
108- source.col02,
109- source.pk_col,
110- 'SOURCE_FOO',
111- 1,
112- 20250201,
113- 'SOURCE_FOO',
114- 1,
115- to_timestamp('20250201', 'yyyyMMdd')
116- )
117- ;
118- """ ).strip ('\n ' )
81+ assert prepare_statement (statement ) == respect_databricks ('etl.delta.query' )
11982
12083 statement : str = (
12184 select_sql
12285 .option ('pk' , ['pk_col01' , 'pk_col02' ])
12386 .option ('source' , 'catalog-name.schema-name.source-name' )
12487 .load ()
12588 )
126- assert prepare_statement (statement ) == dedent ("""
127- DELETE FROM catalog-name.schema-name.table-name
128- WHERE
129- load_src = 'SOURCE_FOO'
130- AND load_date = 20250201
131- ;
132- MERGE INTO catalog-name.schema-name.table-name AS target
133- USING (
134- WITH change_query AS (
135- SELECT
136- src.*,
137- CASE WHEN tgt.pk_col01 IS NULL THEN 99
138- WHEN hash(src.col01, src.col02) <> hash(tgt.col01, tgt.col02) THEN 1
139- ELSE 0 END AS data_change
140- FROM catalog-name.schema-name.source-name AS src
141- LEFT JOIN catalog-name.schema-name.table-name AS tgt
142- ON tgt.col01 = src.col01
143- AND tgt.col02 = src.col02
144- )
145- SELECT * EXCEPT( data_change ) FROM change_query WHERE data_change IN (99, 1)
146- ) AS source
147- ON target.pk_col01 = source.pk_col01
148- AND target.pk_col02 = source.pk_col02
149- WHEN MATCHED THEN UPDATE
150- SET target.col01= source.col01
151- ,target.col02= source.col02
152- , target.updt_load_src = 'SOURCE_FOO'
153- , target.updt_load_id = 1
154- , target.updt_load_date = to_timestamp('20250201', 'yyyyMMdd')
155- WHEN NOT MATCHED THEN INSERT
156- (
157- col01, col02, pk_col01, pk_col02, load_src, load_id, load_date, updt_load_src, updt_load_id, updt_load_date
158- )
159- VALUES (
160- source.col01,
161- source.col02,
162- source.pk_col01,
163- source.pk_col02,
164- 'SOURCE_FOO',
165- 1,
166- 20250201,
167- 'SOURCE_FOO',
168- 1,
169- to_timestamp('20250201', 'yyyyMMdd')
170- )
171- ;
172- """ ).strip ('\n ' )
89+ assert prepare_statement (statement ) == respect_databricks ('etl.delta' )
17390
17491
17592def test_sql_scd1_soft_delete (template_path ):
17693 select_sql : SQLPlate = (
177- SQLPlate .system ('databricks' , path = template_path )
94+ SQLPlate .format ('databricks' , path = template_path )
17895 .template ('etl.scd1-soft-delete' )
17996 .option ('catalog' , 'catalog-name' )
18097 .option ('schema' , 'schema-name' )
@@ -330,7 +247,7 @@ def test_sql_scd1_soft_delete(template_path):
330247
331248def test_sql_scd2 (template_path ):
332249 select_sql : SQLPlate = (
333- SQLPlate .system ('databricks' , path = template_path )
250+ SQLPlate .format ('databricks' , path = template_path )
334251 .template ('etl.scd2' )
335252 .option ('catalog' , 'catalog-name' )
336253 .option ('schema' , 'schema-name' )
@@ -415,7 +332,7 @@ def test_sql_scd2(template_path):
415332
416333def test_sql_scd2_delete_src (template_path ):
417334 select_sql : SQLPlate = (
418- SQLPlate .system ('databricks' , path = template_path )
335+ SQLPlate .format ('databricks' , path = template_path )
419336 .template ('etl.scd2-delete-src' )
420337 .option ('catalog' , 'catalog-name' )
421338 .option ('schema' , 'schema-name' )
@@ -509,7 +426,7 @@ def test_sql_scd2_delete_src(template_path):
509426
510427def test_sql_transaction (template_path ):
511428 select_sql : SQLPlate = (
512- SQLPlate .system ('databricks' , path = template_path )
429+ SQLPlate .format ('databricks' , path = template_path )
513430 .template ('etl.transaction' )
514431 .option ('catalog' , 'catalog-name' )
515432 .option ('schema' , 'schema-name' )
@@ -547,7 +464,7 @@ def test_sql_transaction(template_path):
547464
548465def test_sql_full_dump (template_path ):
549466 select_sql : SQLPlate = (
550- SQLPlate .system ('databricks' , path = template_path )
467+ SQLPlate .format ('databricks' , path = template_path )
551468 .template ('etl.fulldump' )
552469 .option ('catalog' , 'catalog-name' )
553470 .option ('schema' , 'schema-name' )
0 commit comments