Skip to content

Commit d0f69c3

Browse files
Feat: Add support for Restatements on SCD Type 2 models (#4814)
1 parent 7057df7 commit d0f69c3

File tree

14 files changed

+752
-343
lines changed

14 files changed

+752
-343
lines changed

docs/concepts/models/model_kinds.md

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -935,7 +935,13 @@ SQLMesh achieves this by adding a `valid_from` and `valid_to` column to your mod
935935

936936
Therefore, you can use these models to not only tell you what the latest value is for a given record but also what the values were anytime in the past. Note that maintaining this history does come at a cost of increased storage and compute and this may not be a good fit for sources that change frequently since the history could get very large.
937937

938-
**Note**: Partial data [restatement](../plans.md#restatement-plans) is not supported for this model kind, which means that the entire table will be recreated from scratch if restated. This may lead to data loss, so data restatement is disabled for models of this kind by default.
938+
**Note**: SCD Type 2 models support [restatements](../plans.md#restatement-plans) with specific limitations:
939+
940+
- **Full restatements**: The entire table will be recreated from scratch when no start date is specified
941+
- **Partial restatements**: You can specify a start date to restate data from a certain point onwards to the latest interval. The end date will always be set to the latest interval's end date, regardless of what end date you specify
942+
- **Partial sections**: Restatements of specific sections (discontinued ranges) of the table are not supported
943+
944+
Data restatement is disabled for models of this kind by default (`disable_restatement true`). To enable restatements, set `disable_restatement false` in your model configuration.
939945

940946
There are two ways to tracking changes: By Time (Recommended) or By Column.
941947

@@ -1283,11 +1289,11 @@ This is the most accurate representation of the menu based on the source data pr
12831289

12841290
### Processing Source Table with Historical Data
12851291

1286-
The most common case for SCD Type 2 is creating history for a table that it doesn't have it already.
1292+
The most common case for SCD Type 2 is creating history for a table that it doesn't have it already.
12871293
In the example of the restaurant menu, the menu just tells you what is offered right now, but you want to know what was offered over time.
12881294
In this case, the default setting of `None` for `batch_size` is the best option.
12891295

1290-
Another use case though is processing a source table that already has history in it.
1296+
Another use case though is processing a source table that already has history in it.
12911297
A common example of this is a "daily snapshot" table that is created by a source system that takes a snapshot of the data at the end of each day.
12921298
If your source table has historical records, like a "daily snapshot" table, then set `batch_size` to `1` to process each interval (each day if a `@daily` cron) in sequential order.
12931299
That way the historical records will be properly captured in the SCD Type 2 table.
@@ -1433,11 +1439,14 @@ GROUP BY
14331439
id
14341440
```
14351441

1436-
### Reset SCD Type 2 Model (clearing history)
1442+
### SCD Type 2 Restatements
14371443

14381444
SCD Type 2 models are designed by default to protect the data that has been captured because it is not possible to recreate the history once it has been lost.
14391445
However, there are cases where you may want to clear the history and start fresh.
1440-
For this use use case you will want to start by setting `disable_restatement` to `false` in the model definition.
1446+
1447+
#### Enabling Restatements
1448+
1449+
To enable restatements for an SCD Type 2 model, set `disable_restatement` to `false` in the model definition:
14411450

14421451
```sql linenums="1" hl_lines="5"
14431452
MODEL (
@@ -1449,16 +1458,39 @@ MODEL (
14491458
);
14501459
```
14511460

1452-
Plan/apply this change to production.
1453-
Then you will want to [restate the model](../plans.md#restatement-plans).
1461+
#### Full Restatements (Clearing All History)
1462+
1463+
To clear all history and recreate the entire table from scratch:
14541464

14551465
```bash
14561466
sqlmesh plan --restate-model db.menu_items
14571467
```
14581468

14591469
!!! warning
14601470

1461-
This will remove the historical data on the model which in most situations cannot be recovered.
1471+
This will remove **all** historical data on the model which in most situations cannot be recovered.
1472+
1473+
#### Partial Restatements (From a Specific Date)
1474+
1475+
You can restate data from a specific start date onwards. This will:
1476+
- Delete all records with `valid_from >= start_date`
1477+
- Reprocess the data from the start date to the latest interval
1478+
1479+
```bash
1480+
sqlmesh plan --restate-model db.menu_items --start "2023-01-15"
1481+
```
1482+
1483+
!!! note
1484+
1485+
If you specify an end date for SCD Type 2 restatements, it will be ignored and automatically set to the latest interval's end date.
1486+
1487+
```bash
1488+
# This end date will be ignored and set to the latest interval
1489+
sqlmesh plan --restate-model db.menu_items --start "2023-01-15" --end "2023-01-20"
1490+
```
1491+
1492+
1493+
#### Re-enabling Protection
14621494

14631495
Once complete you will want to remove `disable_restatement` on the model definition which will set it back to `true` and prevent accidental data loss.
14641496

sqlmesh/core/engine_adapter/base.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1514,6 +1514,7 @@ def _scd_type_2(
15141514
unique_key: t.Sequence[exp.Expression],
15151515
valid_from_col: exp.Column,
15161516
valid_to_col: exp.Column,
1517+
start: TimeLike,
15171518
execution_time: t.Union[TimeLike, exp.Column],
15181519
invalidate_hard_deletes: bool = True,
15191520
updated_at_col: t.Optional[exp.Column] = None,
@@ -1708,8 +1709,14 @@ def remove_managed_columns(
17081709
existing_rows_query = exp.select(*table_columns, exp.true().as_("_exists")).from_(
17091710
target_table
17101711
)
1712+
1713+
cleanup_ts = None
17111714
if truncate:
17121715
existing_rows_query = existing_rows_query.limit(0)
1716+
else:
1717+
# If truncate is false it is not the first insert
1718+
# Determine the cleanup timestamp for restatement or a regular incremental run
1719+
cleanup_ts = to_time_column(start, time_data_type, self.dialect, nullable=True)
17131720

17141721
with source_queries[0] as source_query:
17151722
prefixed_columns_to_types = []
@@ -1747,12 +1754,41 @@ def remove_managed_columns(
17471754
# Historical Records that Do Not Change
17481755
.with_(
17491756
"static",
1750-
existing_rows_query.where(valid_to_col.is_(exp.Null()).not_()),
1757+
existing_rows_query.where(valid_to_col.is_(exp.Null()).not_())
1758+
if truncate
1759+
else existing_rows_query.where(
1760+
exp.and_(
1761+
valid_to_col.is_(exp.Null().not_()),
1762+
valid_to_col < cleanup_ts,
1763+
),
1764+
),
17511765
)
17521766
# Latest Records that can be updated
17531767
.with_(
17541768
"latest",
1755-
existing_rows_query.where(valid_to_col.is_(exp.Null())),
1769+
existing_rows_query.where(valid_to_col.is_(exp.Null()))
1770+
if truncate
1771+
else exp.select(
1772+
*(
1773+
to_time_column(
1774+
exp.null(), time_data_type, self.dialect, nullable=True
1775+
).as_(col)
1776+
if col == valid_to_col.name
1777+
else exp.column(col)
1778+
for col in columns_to_types
1779+
),
1780+
exp.true().as_("_exists"),
1781+
)
1782+
.from_(target_table)
1783+
.where(
1784+
exp.and_(
1785+
valid_from_col <= cleanup_ts,
1786+
exp.or_(
1787+
valid_to_col.is_(exp.null()),
1788+
valid_to_col >= cleanup_ts,
1789+
),
1790+
)
1791+
),
17561792
)
17571793
# Deleted records which can be used to determine `valid_from` for undeleted source records
17581794
.with_(

sqlmesh/core/engine_adapter/trino.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ def _scd_type_2(
256256
unique_key: t.Sequence[exp.Expression],
257257
valid_from_col: exp.Column,
258258
valid_to_col: exp.Column,
259+
start: TimeLike,
259260
execution_time: t.Union[TimeLike, exp.Column],
260261
invalidate_hard_deletes: bool = True,
261262
updated_at_col: t.Optional[exp.Column] = None,
@@ -277,6 +278,7 @@ def _scd_type_2(
277278
unique_key,
278279
valid_from_col,
279280
valid_to_col,
281+
start,
280282
execution_time,
281283
invalidate_hard_deletes,
282284
updated_at_col,

sqlmesh/core/model/kind.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,6 @@ def full_history_restatement_only(self) -> bool:
140140
self.is_incremental_unmanaged
141141
or self.is_incremental_by_unique_key
142142
or self.is_incremental_by_partition
143-
or self.is_scd_type_2
144143
or self.is_managed
145144
or self.is_full
146145
or self.is_view

sqlmesh/core/plan/builder.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ def _build_restatements(
405405
elif (not self._is_dev or not snapshot.is_paused) and snapshot.disable_restatement:
406406
self._console.log_warning(
407407
f"Cannot restate model '{snapshot.name}'. "
408-
"Restatement is disabled for this model to prevent possible data loss."
408+
"Restatement is disabled for this model to prevent possible data loss. "
409409
"If you want to restate this model, change the model's `disable_restatement` setting to `false`."
410410
)
411411
continue

sqlmesh/core/snapshot/definition.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,21 @@ def get_removal_interval(
797797

798798
removal_interval = expanded_removal_interval
799799

800+
# SCD Type 2 validation that end date is the latest interval if it was provided
801+
if not is_preview and self.is_scd_type_2 and self.intervals:
802+
requested_start, requested_end = removal_interval
803+
latest_end = self.intervals[-1][1]
804+
if requested_end < latest_end:
805+
from sqlmesh.core.console import get_console
806+
807+
get_console().log_warning(
808+
f"SCD Type 2 model '{self.model.name}' does not support end date in restatements.\n"
809+
f"Requested end date [{to_ts(requested_end)}] is less than the latest interval end date.\n"
810+
f"The requested end date will be ignored. Using the latest interval end instead: [{to_ts(latest_end)}]"
811+
)
812+
813+
removal_interval = self.inclusive_exclusive(requested_start, latest_end, strict)
814+
800815
return removal_interval
801816

802817
@property

sqlmesh/core/snapshot/evaluator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1788,6 +1788,7 @@ def insert(
17881788
table_description=model.description,
17891789
column_descriptions=model.column_descriptions,
17901790
truncate=is_first_insert,
1791+
start=kwargs["start"],
17911792
)
17921793
elif isinstance(model.kind, SCDType2ByColumnKind):
17931794
self.adapter.scd_type_2_by_column(
@@ -1805,6 +1806,7 @@ def insert(
18051806
table_description=model.description,
18061807
column_descriptions=model.column_descriptions,
18071808
truncate=is_first_insert,
1809+
start=kwargs["start"],
18081810
)
18091811
else:
18101812
raise SQLMeshError(

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,7 @@ def test_scd_type_2_by_time(ctx_query_and_df: TestContext):
744744
columns_to_types=input_schema,
745745
table_format=ctx.default_table_format,
746746
truncate=True,
747+
start="2022-01-01 00:00:00",
747748
)
748749
results = ctx.get_metadata_results()
749750
assert len(results.views) == 0
@@ -807,6 +808,7 @@ def test_scd_type_2_by_time(ctx_query_and_df: TestContext):
807808
columns_to_types=input_schema,
808809
table_format=ctx.default_table_format,
809810
truncate=False,
811+
start="2022-01-01 00:00:00",
810812
)
811813
results = ctx.get_metadata_results()
812814
assert len(results.views) == 0
@@ -899,6 +901,7 @@ def test_scd_type_2_by_column(ctx_query_and_df: TestContext):
899901
execution_time_as_valid_from=False,
900902
columns_to_types=ctx.columns_to_types,
901903
truncate=True,
904+
start="2023-01-01",
902905
)
903906
results = ctx.get_metadata_results()
904907
assert len(results.views) == 0
@@ -970,6 +973,7 @@ def test_scd_type_2_by_column(ctx_query_and_df: TestContext):
970973
execution_time_as_valid_from=False,
971974
columns_to_types=ctx.columns_to_types,
972975
truncate=False,
976+
start="2023-01-01",
973977
)
974978
results = ctx.get_metadata_results()
975979
assert len(results.views) == 0

0 commit comments

Comments
 (0)