Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 40 additions & 8 deletions docs/concepts/models/model_kinds.md
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,13 @@ SQLMesh achieves this by adding a `valid_from` and `valid_to` column to your mod

Therefore, you can use these models to not only tell you what the latest value is for a given record but also what the values were anytime in the past. Note that maintaining this history does come at a cost of increased storage and compute and this may not be a good fit for sources that change frequently since the history could get very large.

**Note**: Partial data [restatement](../plans.md#restatement-plans) is not supported for this model kind, which means that the entire table will be recreated from scratch if restated. This may lead to data loss, so data restatement is disabled for models of this kind by default.
**Note**: SCD Type 2 models support [restatements](../plans.md#restatement-plans) with specific limitations:

- **Full restatements**: The entire table will be recreated from scratch when no start date is specified
- **Partial restatements**: You can specify a start date to restate data from a certain point onwards to the latest interval. The end date will always be set to the latest interval's end date, regardless of what end date you specify
- **Partial sections**: Restatements of specific sections (discontinued ranges) of the table are not supported

Data restatement is disabled for models of this kind by default (`disable_restatement true`). To enable restatements, set `disable_restatement false` in your model configuration.

There are two ways to tracking changes: By Time (Recommended) or By Column.

Expand Down Expand Up @@ -1283,11 +1289,11 @@ This is the most accurate representation of the menu based on the source data pr

### Processing Source Table with Historical Data

The most common case for SCD Type 2 is creating history for a table that it doesn't have it already.
The most common case for SCD Type 2 is creating history for a table that it doesn't have it already.
In the example of the restaurant menu, the menu just tells you what is offered right now, but you want to know what was offered over time.
In this case, the default setting of `None` for `batch_size` is the best option.

Another use case though is processing a source table that already has history in it.
Another use case though is processing a source table that already has history in it.
A common example of this is a "daily snapshot" table that is created by a source system that takes a snapshot of the data at the end of each day.
If your source table has historical records, like a "daily snapshot" table, then set `batch_size` to `1` to process each interval (each day if a `@daily` cron) in sequential order.
That way the historical records will be properly captured in the SCD Type 2 table.
Expand Down Expand Up @@ -1433,11 +1439,14 @@ GROUP BY
id
```

### Reset SCD Type 2 Model (clearing history)
### SCD Type 2 Restatements

SCD Type 2 models are designed by default to protect the data that has been captured because it is not possible to recreate the history once it has been lost.
However, there are cases where you may want to clear the history and start fresh.
For this use use case you will want to start by setting `disable_restatement` to `false` in the model definition.

#### Enabling Restatements

To enable restatements for an SCD Type 2 model, set `disable_restatement` to `false` in the model definition:

```sql linenums="1" hl_lines="5"
MODEL (
Expand All @@ -1449,16 +1458,39 @@ MODEL (
);
```

Plan/apply this change to production.
Then you will want to [restate the model](../plans.md#restatement-plans).
#### Full Restatements (Clearing All History)

To clear all history and recreate the entire table from scratch:

```bash
sqlmesh plan --restate-model db.menu_items
```

!!! warning

This will remove the historical data on the model which in most situations cannot be recovered.
This will remove **all** historical data on the model which in most situations cannot be recovered.

#### Partial Restatements (From a Specific Date)

You can restate data from a specific start date onwards. This will:
- Delete all records with `valid_from >= start_date`
- Reprocess the data from the start date to the latest interval

```bash
sqlmesh plan --restate-model db.menu_items --start "2023-01-15"
```

!!! note

If you specify an end date for SCD Type 2 restatements, it will be ignored and automatically set to the latest interval's end date.

```bash
# This end date will be ignored and set to the latest interval
sqlmesh plan --restate-model db.menu_items --start "2023-01-15" --end "2023-01-20"
```


#### Re-enabling Protection

Once complete you will want to remove `disable_restatement` on the model definition which will set it back to `true` and prevent accidental data loss.

Expand Down
40 changes: 38 additions & 2 deletions sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1514,6 +1514,7 @@ def _scd_type_2(
unique_key: t.Sequence[exp.Expression],
valid_from_col: exp.Column,
valid_to_col: exp.Column,
start: TimeLike,
execution_time: t.Union[TimeLike, exp.Column],
invalidate_hard_deletes: bool = True,
updated_at_col: t.Optional[exp.Column] = None,
Expand Down Expand Up @@ -1708,8 +1709,14 @@ def remove_managed_columns(
existing_rows_query = exp.select(*table_columns, exp.true().as_("_exists")).from_(
target_table
)

cleanup_ts = None
if truncate:
existing_rows_query = existing_rows_query.limit(0)
else:
# If truncate is false it is not the first insert
# Determine the cleanup timestamp for restatement or a regular incremental run
cleanup_ts = to_time_column(start, time_data_type, self.dialect, nullable=True)

with source_queries[0] as source_query:
prefixed_columns_to_types = []
Expand Down Expand Up @@ -1747,12 +1754,41 @@ def remove_managed_columns(
# Historical Records that Do Not Change
.with_(
"static",
existing_rows_query.where(valid_to_col.is_(exp.Null()).not_()),
existing_rows_query.where(valid_to_col.is_(exp.Null()).not_())
if truncate
else existing_rows_query.where(
exp.and_(
valid_to_col.is_(exp.Null().not_()),
valid_to_col < cleanup_ts,
),
),
)
# Latest Records that can be updated
.with_(
"latest",
existing_rows_query.where(valid_to_col.is_(exp.Null())),
existing_rows_query.where(valid_to_col.is_(exp.Null()))
if truncate
else exp.select(
*(
to_time_column(
exp.null(), time_data_type, self.dialect, nullable=True
).as_(col)
if col == valid_to_col.name
else exp.column(col)
for col in columns_to_types
),
exp.true().as_("_exists"),
)
.from_(target_table)
.where(
exp.and_(
valid_from_col <= cleanup_ts,
exp.or_(
valid_to_col.is_(exp.null()),
valid_to_col >= cleanup_ts,
),
)
),
)
# Deleted records which can be used to determine `valid_from` for undeleted source records
.with_(
Expand Down
2 changes: 2 additions & 0 deletions sqlmesh/core/engine_adapter/trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ def _scd_type_2(
unique_key: t.Sequence[exp.Expression],
valid_from_col: exp.Column,
valid_to_col: exp.Column,
start: TimeLike,
execution_time: t.Union[TimeLike, exp.Column],
invalidate_hard_deletes: bool = True,
updated_at_col: t.Optional[exp.Column] = None,
Expand All @@ -277,6 +278,7 @@ def _scd_type_2(
unique_key,
valid_from_col,
valid_to_col,
start,
execution_time,
invalidate_hard_deletes,
updated_at_col,
Expand Down
1 change: 0 additions & 1 deletion sqlmesh/core/model/kind.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ def full_history_restatement_only(self) -> bool:
self.is_incremental_unmanaged
or self.is_incremental_by_unique_key
or self.is_incremental_by_partition
or self.is_scd_type_2
or self.is_managed
or self.is_full
or self.is_view
Expand Down
2 changes: 1 addition & 1 deletion sqlmesh/core/plan/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def _build_restatements(
elif (not self._is_dev or not snapshot.is_paused) and snapshot.disable_restatement:
self._console.log_warning(
f"Cannot restate model '{snapshot.name}'. "
"Restatement is disabled for this model to prevent possible data loss."
"Restatement is disabled for this model to prevent possible data loss. "
"If you want to restate this model, change the model's `disable_restatement` setting to `false`."
)
continue
Expand Down
15 changes: 15 additions & 0 deletions sqlmesh/core/snapshot/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,21 @@ def get_removal_interval(

removal_interval = expanded_removal_interval

# SCD Type 2 validation that end date is the latest interval if it was provided
if not is_preview and self.is_scd_type_2 and self.intervals:
requested_start, requested_end = removal_interval
latest_end = self.intervals[-1][1]
if requested_end < latest_end:
from sqlmesh.core.console import get_console

get_console().log_warning(
f"SCD Type 2 model '{self.model.name}' does not support end date in restatements.\n"
f"Requested end date [{to_ts(requested_end)}] is less than the latest interval end date.\n"
f"The requested end date will be ignored. Using the latest interval end instead: [{to_ts(latest_end)}]"
)

removal_interval = self.inclusive_exclusive(requested_start, latest_end, strict)

return removal_interval

@property
Expand Down
2 changes: 2 additions & 0 deletions sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1788,6 +1788,7 @@ def insert(
table_description=model.description,
column_descriptions=model.column_descriptions,
truncate=is_first_insert,
start=kwargs["start"],
)
elif isinstance(model.kind, SCDType2ByColumnKind):
self.adapter.scd_type_2_by_column(
Expand All @@ -1805,6 +1806,7 @@ def insert(
table_description=model.description,
column_descriptions=model.column_descriptions,
truncate=is_first_insert,
start=kwargs["start"],
)
else:
raise SQLMeshError(
Expand Down
4 changes: 4 additions & 0 deletions tests/core/engine_adapter/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,7 @@ def test_scd_type_2_by_time(ctx_query_and_df: TestContext):
columns_to_types=input_schema,
table_format=ctx.default_table_format,
truncate=True,
start="2022-01-01 00:00:00",
)
results = ctx.get_metadata_results()
assert len(results.views) == 0
Expand Down Expand Up @@ -807,6 +808,7 @@ def test_scd_type_2_by_time(ctx_query_and_df: TestContext):
columns_to_types=input_schema,
table_format=ctx.default_table_format,
truncate=False,
start="2022-01-01 00:00:00",
)
results = ctx.get_metadata_results()
assert len(results.views) == 0
Expand Down Expand Up @@ -899,6 +901,7 @@ def test_scd_type_2_by_column(ctx_query_and_df: TestContext):
execution_time_as_valid_from=False,
columns_to_types=ctx.columns_to_types,
truncate=True,
start="2023-01-01",
)
results = ctx.get_metadata_results()
assert len(results.views) == 0
Expand Down Expand Up @@ -970,6 +973,7 @@ def test_scd_type_2_by_column(ctx_query_and_df: TestContext):
execution_time_as_valid_from=False,
columns_to_types=ctx.columns_to_types,
truncate=False,
start="2023-01-01",
)
results = ctx.get_metadata_results()
assert len(results.views) == 0
Expand Down
Loading