Skip to content

Commit 0595a9d

Browse files
Feat: Add support for Restatements on SCD Type 2 models
1 parent 2006664 commit 0595a9d

File tree

12 files changed

+392
-11
lines changed

12 files changed

+392
-11
lines changed

sqlmesh/core/engine_adapter/base.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1514,6 +1514,7 @@ def _scd_type_2(
15141514
unique_key: t.Sequence[exp.Expression],
15151515
valid_from_col: exp.Column,
15161516
valid_to_col: exp.Column,
1517+
start: TimeLike,
15171518
execution_time: t.Union[TimeLike, exp.Column],
15181519
invalidate_hard_deletes: bool = True,
15191520
updated_at_col: t.Optional[exp.Column] = None,
@@ -1710,6 +1711,22 @@ def remove_managed_columns(
17101711
)
17111712
if truncate:
17121713
existing_rows_query = existing_rows_query.limit(0)
1714+
else:
1715+
# If truncate is false it is not the first insert
1716+
# Determine the cleanup timestamp for restatement or a regular incremental run
1717+
cleanup_ts = to_time_column(start, time_data_type, self.dialect, nullable=True)
1718+
1719+
# Delete records that were created at or after cleanup point
1720+
self.delete_from(table_name=target_table, where=valid_from_col > cleanup_ts)
1721+
1722+
# "Re-open" records that were closed at or after cleanup point
1723+
self.update_table(
1724+
table_name=target_table,
1725+
properties={valid_to_col.name: exp.Null()},
1726+
where=exp.and_(
1727+
valid_to_col > cleanup_ts,
1728+
),
1729+
)
17131730

17141731
with source_queries[0] as source_query:
17151732
prefixed_columns_to_types = []

sqlmesh/core/engine_adapter/trino.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ def _scd_type_2(
256256
unique_key: t.Sequence[exp.Expression],
257257
valid_from_col: exp.Column,
258258
valid_to_col: exp.Column,
259+
start: TimeLike,
259260
execution_time: t.Union[TimeLike, exp.Column],
260261
invalidate_hard_deletes: bool = True,
261262
updated_at_col: t.Optional[exp.Column] = None,
@@ -277,6 +278,7 @@ def _scd_type_2(
277278
unique_key,
278279
valid_from_col,
279280
valid_to_col,
281+
start,
280282
execution_time,
281283
invalidate_hard_deletes,
282284
updated_at_col,

sqlmesh/core/model/kind.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,6 @@ def full_history_restatement_only(self) -> bool:
140140
self.is_incremental_unmanaged
141141
or self.is_incremental_by_unique_key
142142
or self.is_incremental_by_partition
143-
or self.is_scd_type_2
144143
or self.is_managed
145144
or self.is_full
146145
or self.is_view

sqlmesh/core/snapshot/definition.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -795,6 +795,21 @@ def get_removal_interval(
795795

796796
removal_interval = expanded_removal_interval
797797

798+
# SCD Type 2 validation that end date is the latest interval if it was provided
799+
if not is_preview and self.is_scd_type_2 and self.intervals:
800+
requested_start, requested_end = removal_interval
801+
latest_end = max(interval[1] for interval in self.intervals)
802+
if requested_end != latest_end:
803+
from sqlmesh.core.console import get_console
804+
805+
get_console().log_warning(
806+
f"SCD Type 2 model '{self.model.name}' does not support end date in restatements.\n"
807+
f"Requested end date [{to_ts(requested_end)}] doesn't match latest interval end date [{to_ts(latest_end)}].\n"
808+
f"You can set start date but end date must be the latest interval so ."
809+
)
810+
811+
removal_interval = self.inclusive_exclusive(requested_start, latest_end, strict)
812+
798813
return removal_interval
799814

800815
@property

sqlmesh/core/snapshot/evaluator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1788,6 +1788,7 @@ def insert(
17881788
table_description=model.description,
17891789
column_descriptions=model.column_descriptions,
17901790
truncate=is_first_insert,
1791+
start=kwargs["start"],
17911792
)
17921793
elif isinstance(model.kind, SCDType2ByColumnKind):
17931794
self.adapter.scd_type_2_by_column(
@@ -1805,6 +1806,7 @@ def insert(
18051806
table_description=model.description,
18061807
column_descriptions=model.column_descriptions,
18071808
truncate=is_first_insert,
1809+
start=kwargs["start"],
18081810
)
18091811
else:
18101812
raise SQLMeshError(

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,7 @@ def test_scd_type_2_by_time(ctx_query_and_df: TestContext):
744744
columns_to_types=input_schema,
745745
table_format=ctx.default_table_format,
746746
truncate=True,
747+
start="2022-01-01 00:00:00",
747748
)
748749
results = ctx.get_metadata_results()
749750
assert len(results.views) == 0
@@ -807,6 +808,7 @@ def test_scd_type_2_by_time(ctx_query_and_df: TestContext):
807808
columns_to_types=input_schema,
808809
table_format=ctx.default_table_format,
809810
truncate=False,
811+
start="2022-01-01 00:00:00",
810812
)
811813
results = ctx.get_metadata_results()
812814
assert len(results.views) == 0
@@ -899,6 +901,7 @@ def test_scd_type_2_by_column(ctx_query_and_df: TestContext):
899901
execution_time_as_valid_from=False,
900902
columns_to_types=ctx.columns_to_types,
901903
truncate=True,
904+
start="2023-01-01",
902905
)
903906
results = ctx.get_metadata_results()
904907
assert len(results.views) == 0
@@ -970,6 +973,7 @@ def test_scd_type_2_by_column(ctx_query_and_df: TestContext):
970973
execution_time_as_valid_from=False,
971974
columns_to_types=ctx.columns_to_types,
972975
truncate=False,
976+
start="2023-01-01",
973977
)
974978
results = ctx.get_metadata_results()
975979
assert len(results.views) == 0

tests/core/engine_adapter/test_base.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1222,6 +1222,7 @@ def test_scd_type_2_by_time(make_mocked_engine_adapter: t.Callable):
12221222
"test_valid_to": exp.DataType.build("TIMESTAMP"),
12231223
},
12241224
execution_time=datetime(2020, 1, 1, 0, 0, 0),
1225+
start=datetime(2020, 1, 1, 0, 0, 0),
12251226
)
12261227

12271228
assert (
@@ -1421,6 +1422,7 @@ def test_scd_type_2_by_time_no_invalidate_hard_deletes(make_mocked_engine_adapte
14211422
"test_valid_to": exp.DataType.build("TIMESTAMP"),
14221423
},
14231424
execution_time=datetime(2020, 1, 1, 0, 0, 0),
1425+
start=datetime(2020, 1, 1, 0, 0, 0),
14241426
)
14251427

14261428
assert (
@@ -1609,6 +1611,7 @@ def test_merge_scd_type_2_pandas(make_mocked_engine_adapter: t.Callable):
16091611
"test_valid_to": exp.DataType.build("TIMESTAMPTZ"),
16101612
},
16111613
execution_time=datetime(2020, 1, 1, 0, 0, 0),
1614+
start=datetime(2020, 1, 1, 0, 0, 0),
16121615
)
16131616

16141617
assert (
@@ -1790,6 +1793,7 @@ def test_scd_type_2_by_column(make_mocked_engine_adapter: t.Callable):
17901793
"test_valid_to": exp.DataType.build("TIMESTAMP"),
17911794
},
17921795
execution_time=datetime(2020, 1, 1, 0, 0, 0),
1796+
start=datetime(2020, 1, 1, 0, 0, 0),
17931797
extra_col_ignore="testing",
17941798
)
17951799

@@ -1972,6 +1976,7 @@ def test_scd_type_2_by_column_composite_key(make_mocked_engine_adapter: t.Callab
19721976
"test_valid_to": exp.DataType.build("TIMESTAMP"),
19731977
},
19741978
execution_time=datetime(2020, 1, 1, 0, 0, 0),
1979+
start=datetime(2020, 1, 1, 0, 0, 0),
19751980
)
19761981

19771982
assert (
@@ -2164,6 +2169,7 @@ def test_scd_type_2_truncate(make_mocked_engine_adapter: t.Callable):
21642169
},
21652170
execution_time=datetime(2020, 1, 1, 0, 0, 0),
21662171
truncate=True,
2172+
start=datetime(2020, 1, 1, 0, 0, 0),
21672173
)
21682174

21692175
assert (
@@ -2346,6 +2352,7 @@ def test_scd_type_2_by_column_star_check(make_mocked_engine_adapter: t.Callable)
23462352
"test_valid_to": exp.DataType.build("TIMESTAMP"),
23472353
},
23482354
execution_time=datetime(2020, 1, 1, 0, 0, 0),
2355+
start=datetime(2020, 1, 1, 0, 0, 0),
23492356
)
23502357

23512358
assert (
@@ -2541,6 +2548,7 @@ def test_scd_type_2_by_column_no_invalidate_hard_deletes(make_mocked_engine_adap
25412548
"test_valid_to": exp.DataType.build("TIMESTAMP"),
25422549
},
25432550
execution_time=datetime(2020, 1, 1, 0, 0, 0),
2551+
start=datetime(2020, 1, 1, 0, 0, 0),
25442552
)
25452553

25462554
assert (

tests/core/engine_adapter/test_clickhouse.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,8 @@ def test_scd_type_2_by_time(
606606
"test_valid_to": exp.DataType.build("TIMESTAMP"),
607607
},
608608
execution_time=datetime(2020, 1, 1, 0, 0, 0),
609+
start=datetime(2020, 1, 1, 0, 0, 0),
610+
truncate=True,
609611
)
610612

611613
assert to_sql_calls(adapter)[4] == parse_one(
@@ -637,7 +639,7 @@ def test_scd_type_2_by_time(
637639
TRUE AS "_exists"
638640
FROM ""__temp_target_efgh""
639641
WHERE
640-
NOT "test_valid_to" IS NULL
642+
NOT "test_valid_to" IS NULL LIMIT 0
641643
), "latest" AS (
642644
SELECT
643645
"id",
@@ -649,7 +651,7 @@ def test_scd_type_2_by_time(
649651
TRUE AS "_exists"
650652
FROM ""__temp_target_efgh""
651653
WHERE
652-
"test_valid_to" IS NULL
654+
"test_valid_to" IS NULL LIMIT 0
653655
), "deleted" AS (
654656
SELECT
655657
"static"."id",
@@ -811,6 +813,8 @@ def test_scd_type_2_by_column(
811813
"test_valid_to": exp.DataType.build("TIMESTAMP"),
812814
},
813815
execution_time=datetime(2020, 1, 1, 0, 0, 0),
816+
start=datetime(2020, 1, 1, 0, 0, 0),
817+
truncate=True,
814818
)
815819

816820
assert to_sql_calls(adapter)[4] == parse_one(
@@ -840,7 +844,7 @@ def test_scd_type_2_by_column(
840844
TRUE AS "_exists"
841845
FROM "__temp_target_efgh"
842846
WHERE
843-
NOT "test_valid_to" IS NULL
847+
NOT ("test_valid_to" IS NULL) LIMIT 0
844848
), "latest" AS (
845849
SELECT
846850
"id",
@@ -851,7 +855,7 @@ def test_scd_type_2_by_column(
851855
TRUE AS "_exists"
852856
FROM "__temp_target_efgh"
853857
WHERE
854-
"test_valid_to" IS NULL
858+
"test_valid_to" IS NULL LIMIT 0
855859
), "deleted" AS (
856860
SELECT
857861
"static"."id",
@@ -907,7 +911,7 @@ def test_scd_type_2_by_column(
907911
COALESCE("joined"."t_id", "joined"."id") AS "id",
908912
COALESCE("joined"."t_name", "joined"."name") AS "name",
909913
COALESCE("joined"."t_price", "joined"."price") AS "price",
910-
COALESCE("t_test_VALID_from", CAST('2020-01-01 00:00:00' AS Nullable(DateTime64(6)))) AS "test_VALID_from",
914+
COALESCE("t_test_VALID_from", CAST('1970-01-01 00:00:00' AS Nullable(DateTime64(6)))) AS "test_VALID_from",
911915
CASE
912916
WHEN "joined"."_exists" IS NULL
913917
OR (

tests/core/engine_adapter/test_spark.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,8 @@ def check_table_exists(table_name: exp.Table) -> bool:
569569
"test_valid_to": exp.DataType.build("TIMESTAMP"),
570570
},
571571
execution_time=datetime(2020, 1, 1, 0, 0, 0),
572+
start=datetime(2020, 1, 1, 0, 0, 0),
573+
truncate=True,
572574
)
573575

574576
assert to_sql_calls(adapter) == [
@@ -613,7 +615,7 @@ def check_table_exists(table_name: exp.Table) -> bool:
613615
TRUE AS `_exists`
614616
FROM `db`.`temp_target_abcdefgh`
615617
WHERE
616-
NOT `test_valid_to` IS NULL
618+
NOT `test_valid_to` IS NULL LIMIT 0
617619
), `latest` AS (
618620
SELECT
619621
`id`,
@@ -625,7 +627,7 @@ def check_table_exists(table_name: exp.Table) -> bool:
625627
TRUE AS `_exists`
626628
FROM `db`.`temp_target_abcdefgh`
627629
WHERE
628-
`test_valid_to` IS NULL
630+
`test_valid_to` IS NULL LIMIT 0
629631
), `deleted` AS (
630632
SELECT
631633
`static`.`id`,

0 commit comments

Comments
 (0)