diff --git a/docs/concepts/models/model_kinds.md b/docs/concepts/models/model_kinds.md index d01cc738a6..e748313c81 100644 --- a/docs/concepts/models/model_kinds.md +++ b/docs/concepts/models/model_kinds.md @@ -935,7 +935,13 @@ SQLMesh achieves this by adding a `valid_from` and `valid_to` column to your mod Therefore, you can use these models to not only tell you what the latest value is for a given record but also what the values were anytime in the past. Note that maintaining this history does come at a cost of increased storage and compute and this may not be a good fit for sources that change frequently since the history could get very large. -**Note**: Partial data [restatement](../plans.md#restatement-plans) is not supported for this model kind, which means that the entire table will be recreated from scratch if restated. This may lead to data loss, so data restatement is disabled for models of this kind by default. +**Note**: SCD Type 2 models support [restatements](../plans.md#restatement-plans) with specific limitations: + +- **Full restatements**: The entire table will be recreated from scratch when no start date is specified +- **Partial restatements**: You can specify a start date to restate data from a certain point onwards to the latest interval. The end date will always be set to the latest interval's end date, regardless of what end date you specify +- **Partial sections**: Restatements of specific sections (discontinued ranges) of the table are not supported + +Data restatement is disabled for models of this kind by default (`disable_restatement true`). To enable restatements, set `disable_restatement false` in your model configuration. There are two ways to tracking changes: By Time (Recommended) or By Column. @@ -1283,11 +1289,11 @@ This is the most accurate representation of the menu based on the source data pr ### Processing Source Table with Historical Data -The most common case for SCD Type 2 is creating history for a table that it doesn't have it already. +The most common case for SCD Type 2 is creating history for a table that it doesn't have it already. In the example of the restaurant menu, the menu just tells you what is offered right now, but you want to know what was offered over time. In this case, the default setting of `None` for `batch_size` is the best option. -Another use case though is processing a source table that already has history in it. +Another use case though is processing a source table that already has history in it. A common example of this is a "daily snapshot" table that is created by a source system that takes a snapshot of the data at the end of each day. If your source table has historical records, like a "daily snapshot" table, then set `batch_size` to `1` to process each interval (each day if a `@daily` cron) in sequential order. That way the historical records will be properly captured in the SCD Type 2 table. @@ -1433,11 +1439,14 @@ GROUP BY id ``` -### Reset SCD Type 2 Model (clearing history) +### SCD Type 2 Restatements SCD Type 2 models are designed by default to protect the data that has been captured because it is not possible to recreate the history once it has been lost. However, there are cases where you may want to clear the history and start fresh. -For this use use case you will want to start by setting `disable_restatement` to `false` in the model definition. + +#### Enabling Restatements + +To enable restatements for an SCD Type 2 model, set `disable_restatement` to `false` in the model definition: ```sql linenums="1" hl_lines="5" MODEL ( @@ -1449,8 +1458,9 @@ MODEL ( ); ``` -Plan/apply this change to production. -Then you will want to [restate the model](../plans.md#restatement-plans). +#### Full Restatements (Clearing All History) + +To clear all history and recreate the entire table from scratch: ```bash sqlmesh plan --restate-model db.menu_items @@ -1458,7 +1468,29 @@ sqlmesh plan --restate-model db.menu_items !!! warning - This will remove the historical data on the model which in most situations cannot be recovered. + This will remove **all** historical data on the model which in most situations cannot be recovered. + +#### Partial Restatements (From a Specific Date) + +You can restate data from a specific start date onwards. This will: +- Delete all records with `valid_from >= start_date` +- Reprocess the data from the start date to the latest interval + +```bash +sqlmesh plan --restate-model db.menu_items --start "2023-01-15" +``` + +!!! note + + If you specify an end date for SCD Type 2 restatements, it will be ignored and automatically set to the latest interval's end date. + +```bash +# This end date will be ignored and set to the latest interval +sqlmesh plan --restate-model db.menu_items --start "2023-01-15" --end "2023-01-20" +``` + + +#### Re-enabling Protection Once complete you will want to remove `disable_restatement` on the model definition which will set it back to `true` and prevent accidental data loss. diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index a317008b1a..924aca8c99 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -1514,6 +1514,7 @@ def _scd_type_2( unique_key: t.Sequence[exp.Expression], valid_from_col: exp.Column, valid_to_col: exp.Column, + start: TimeLike, execution_time: t.Union[TimeLike, exp.Column], invalidate_hard_deletes: bool = True, updated_at_col: t.Optional[exp.Column] = None, @@ -1708,8 +1709,14 @@ def remove_managed_columns( existing_rows_query = exp.select(*table_columns, exp.true().as_("_exists")).from_( target_table ) + + cleanup_ts = None if truncate: existing_rows_query = existing_rows_query.limit(0) + else: + # If truncate is false it is not the first insert + # Determine the cleanup timestamp for restatement or a regular incremental run + cleanup_ts = to_time_column(start, time_data_type, self.dialect, nullable=True) with source_queries[0] as source_query: prefixed_columns_to_types = [] @@ -1747,12 +1754,41 @@ def remove_managed_columns( # Historical Records that Do Not Change .with_( "static", - existing_rows_query.where(valid_to_col.is_(exp.Null()).not_()), + existing_rows_query.where(valid_to_col.is_(exp.Null()).not_()) + if truncate + else existing_rows_query.where( + exp.and_( + valid_to_col.is_(exp.Null().not_()), + valid_to_col < cleanup_ts, + ), + ), ) # Latest Records that can be updated .with_( "latest", - existing_rows_query.where(valid_to_col.is_(exp.Null())), + existing_rows_query.where(valid_to_col.is_(exp.Null())) + if truncate + else exp.select( + *( + to_time_column( + exp.null(), time_data_type, self.dialect, nullable=True + ).as_(col) + if col == valid_to_col.name + else exp.column(col) + for col in columns_to_types + ), + exp.true().as_("_exists"), + ) + .from_(target_table) + .where( + exp.and_( + valid_from_col <= cleanup_ts, + exp.or_( + valid_to_col.is_(exp.null()), + valid_to_col >= cleanup_ts, + ), + ) + ), ) # Deleted records which can be used to determine `valid_from` for undeleted source records .with_( diff --git a/sqlmesh/core/engine_adapter/trino.py b/sqlmesh/core/engine_adapter/trino.py index df8e45b520..06d693e11c 100644 --- a/sqlmesh/core/engine_adapter/trino.py +++ b/sqlmesh/core/engine_adapter/trino.py @@ -256,6 +256,7 @@ def _scd_type_2( unique_key: t.Sequence[exp.Expression], valid_from_col: exp.Column, valid_to_col: exp.Column, + start: TimeLike, execution_time: t.Union[TimeLike, exp.Column], invalidate_hard_deletes: bool = True, updated_at_col: t.Optional[exp.Column] = None, @@ -277,6 +278,7 @@ def _scd_type_2( unique_key, valid_from_col, valid_to_col, + start, execution_time, invalidate_hard_deletes, updated_at_col, diff --git a/sqlmesh/core/model/kind.py b/sqlmesh/core/model/kind.py index 86eb6e665c..4a15023f2f 100644 --- a/sqlmesh/core/model/kind.py +++ b/sqlmesh/core/model/kind.py @@ -140,7 +140,6 @@ def full_history_restatement_only(self) -> bool: self.is_incremental_unmanaged or self.is_incremental_by_unique_key or self.is_incremental_by_partition - or self.is_scd_type_2 or self.is_managed or self.is_full or self.is_view diff --git a/sqlmesh/core/plan/builder.py b/sqlmesh/core/plan/builder.py index f3f78e1714..ff953c75a2 100644 --- a/sqlmesh/core/plan/builder.py +++ b/sqlmesh/core/plan/builder.py @@ -405,7 +405,7 @@ def _build_restatements( elif (not self._is_dev or not snapshot.is_paused) and snapshot.disable_restatement: self._console.log_warning( f"Cannot restate model '{snapshot.name}'. " - "Restatement is disabled for this model to prevent possible data loss." + "Restatement is disabled for this model to prevent possible data loss. " "If you want to restate this model, change the model's `disable_restatement` setting to `false`." ) continue diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index af6641b5c0..ecd547664f 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -797,6 +797,21 @@ def get_removal_interval( removal_interval = expanded_removal_interval + # SCD Type 2 validation that end date is the latest interval if it was provided + if not is_preview and self.is_scd_type_2 and self.intervals: + requested_start, requested_end = removal_interval + latest_end = self.intervals[-1][1] + if requested_end < latest_end: + from sqlmesh.core.console import get_console + + get_console().log_warning( + f"SCD Type 2 model '{self.model.name}' does not support end date in restatements.\n" + f"Requested end date [{to_ts(requested_end)}] is less than the latest interval end date.\n" + f"The requested end date will be ignored. Using the latest interval end instead: [{to_ts(latest_end)}]" + ) + + removal_interval = self.inclusive_exclusive(requested_start, latest_end, strict) + return removal_interval @property diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index d33748630d..baac96f64a 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -1788,6 +1788,7 @@ def insert( table_description=model.description, column_descriptions=model.column_descriptions, truncate=is_first_insert, + start=kwargs["start"], ) elif isinstance(model.kind, SCDType2ByColumnKind): self.adapter.scd_type_2_by_column( @@ -1805,6 +1806,7 @@ def insert( table_description=model.description, column_descriptions=model.column_descriptions, truncate=is_first_insert, + start=kwargs["start"], ) else: raise SQLMeshError( diff --git a/tests/core/engine_adapter/integration/test_integration.py b/tests/core/engine_adapter/integration/test_integration.py index e48bea318f..ee839d7593 100644 --- a/tests/core/engine_adapter/integration/test_integration.py +++ b/tests/core/engine_adapter/integration/test_integration.py @@ -744,6 +744,7 @@ def test_scd_type_2_by_time(ctx_query_and_df: TestContext): columns_to_types=input_schema, table_format=ctx.default_table_format, truncate=True, + start="2022-01-01 00:00:00", ) results = ctx.get_metadata_results() assert len(results.views) == 0 @@ -807,6 +808,7 @@ def test_scd_type_2_by_time(ctx_query_and_df: TestContext): columns_to_types=input_schema, table_format=ctx.default_table_format, truncate=False, + start="2022-01-01 00:00:00", ) results = ctx.get_metadata_results() assert len(results.views) == 0 @@ -899,6 +901,7 @@ def test_scd_type_2_by_column(ctx_query_and_df: TestContext): execution_time_as_valid_from=False, columns_to_types=ctx.columns_to_types, truncate=True, + start="2023-01-01", ) results = ctx.get_metadata_results() assert len(results.views) == 0 @@ -970,6 +973,7 @@ def test_scd_type_2_by_column(ctx_query_and_df: TestContext): execution_time_as_valid_from=False, columns_to_types=ctx.columns_to_types, truncate=False, + start="2023-01-01", ) results = ctx.get_metadata_results() assert len(results.views) == 0 diff --git a/tests/core/engine_adapter/test_base.py b/tests/core/engine_adapter/test_base.py index 8ab15ffdca..faf1386877 100644 --- a/tests/core/engine_adapter/test_base.py +++ b/tests/core/engine_adapter/test_base.py @@ -1222,10 +1222,11 @@ def test_scd_type_2_by_time(make_mocked_engine_adapter: t.Callable): "test_valid_to": exp.DataType.build("TIMESTAMP"), }, execution_time=datetime(2020, 1, 1, 0, 0, 0), + start=datetime(2020, 1, 1, 0, 0, 0), ) assert ( - adapter.cursor.execute.call_args[0][0] + parse_one(adapter.cursor.execute.call_args[0][0]).sql() == parse_one( """ CREATE OR REPLACE TABLE "target" AS @@ -1254,8 +1255,7 @@ def test_scd_type_2_by_time(make_mocked_engine_adapter: t.Callable): "test_valid_to", TRUE AS "_exists" FROM "target" - WHERE - NOT "test_valid_to" IS NULL + WHERE NOT "test_valid_to" IS NULL AND "test_valid_to" < CAST('2020-01-01 00:00:00' AS TIMESTAMP) ), "latest" AS ( SELECT "id", @@ -1263,11 +1263,11 @@ def test_scd_type_2_by_time(make_mocked_engine_adapter: t.Callable): "price", "test_UPDATED_at", "test_valid_from", - "test_valid_to", + CAST(NULL AS TIMESTAMP) AS "test_valid_to", TRUE AS "_exists" FROM "target" - WHERE - "test_valid_to" IS NULL + WHERE "test_valid_from" <= CAST('2020-01-01 00:00:00' AS TIMESTAMP) + AND ("test_valid_to" IS NULL OR "test_valid_to" >= CAST('2020-01-01 00:00:00' AS TIMESTAMP)) ), "deleted" AS ( SELECT "static"."id", @@ -1421,10 +1421,11 @@ def test_scd_type_2_by_time_no_invalidate_hard_deletes(make_mocked_engine_adapte "test_valid_to": exp.DataType.build("TIMESTAMP"), }, execution_time=datetime(2020, 1, 1, 0, 0, 0), + start=datetime(2020, 1, 1, 0, 0, 0), ) assert ( - adapter.cursor.execute.call_args[0][0] + parse_one(adapter.cursor.execute.call_args[0][0]).sql() == parse_one( """ CREATE OR REPLACE TABLE "target" AS @@ -1453,8 +1454,7 @@ def test_scd_type_2_by_time_no_invalidate_hard_deletes(make_mocked_engine_adapte "test_valid_to", TRUE AS "_exists" FROM "target" - WHERE - NOT "test_valid_to" IS NULL + WHERE NOT "test_valid_to" IS NULL AND "test_valid_to" < CAST('2020-01-01 00:00:00' AS TIMESTAMP) ), "latest" AS ( SELECT "id", @@ -1462,11 +1462,11 @@ def test_scd_type_2_by_time_no_invalidate_hard_deletes(make_mocked_engine_adapte "price", "test_updated_at", "test_valid_from", - "test_valid_to", + CAST(NULL AS TIMESTAMP) AS "test_valid_to", TRUE AS "_exists" FROM "target" - WHERE - "test_valid_to" IS NULL + WHERE "test_valid_from" <= CAST('2020-01-01 00:00:00' AS TIMESTAMP) + AND ("test_valid_to" IS NULL OR "test_valid_to" >= CAST('2020-01-01 00:00:00' AS TIMESTAMP)) ), "deleted" AS ( SELECT "static"."id", @@ -1609,35 +1609,37 @@ def test_merge_scd_type_2_pandas(make_mocked_engine_adapter: t.Callable): "test_valid_to": exp.DataType.build("TIMESTAMPTZ"), }, execution_time=datetime(2020, 1, 1, 0, 0, 0), + start=datetime(2020, 1, 1, 0, 0, 0), ) assert ( - adapter.cursor.execute.call_args[0][0] + parse_one(adapter.cursor.execute.call_args[0][0]).sql() == parse_one( """ -CREATE OR REPLACE TABLE "target" AS -WITH "source" AS ( - SELECT DISTINCT ON ("id1", "id2") + CREATE OR REPLACE TABLE "target" AS + WITH "source" AS ( + SELECT DISTINCT ON ("id1", "id2") TRUE AS "_exists", "id1", "id2", "name", "price", CAST("test_updated_at" AS TIMESTAMPTZ) AS "test_updated_at" - FROM ( + FROM ( SELECT CAST("id1" AS INT) AS "id1", CAST("id2" AS INT) AS "id2", CAST("name" AS VARCHAR) AS "name", CAST("price" AS DOUBLE) AS "price", - CAST("test_updated_at" AS TIMESTAMPTZ) AS "test_updated_at", + CAST("test_updated_at" AS TIMESTAMPTZ) AS "test_updated_at" FROM (VALUES (1, 4, 'muffins', 4.0, '2020-01-01 10:00:00'), (2, 5, 'chips', 5.0, '2020-01-02 15:00:00'), - (3, 6, 'soda', 6.0, '2020-01-03 12:00:00')) AS "t"("id1", "id2", "name", "price", "test_updated_at") - ) AS "raw_source" -), "static" AS ( - SELECT + (3, 6, 'soda', 6.0, '2020-01-03 12:00:00') + ) AS "t"("id1", "id2", "name", "price", "test_updated_at") + ) AS "raw_source" + ), "static" AS ( + SELECT "id1", "id2", "name", @@ -1646,24 +1648,23 @@ def test_merge_scd_type_2_pandas(make_mocked_engine_adapter: t.Callable): "test_valid_from", "test_valid_to", TRUE AS "_exists" - FROM "target" - WHERE - NOT "test_valid_to" IS NULL -), "latest" AS ( - SELECT + FROM "target" + WHERE NOT "test_valid_to" IS NULL AND "test_valid_to" < CAST('2020-01-01 00:00:00+00:00' AS TIMESTAMPTZ) + ), "latest" AS ( + SELECT "id1", "id2", "name", "price", "test_updated_at", "test_valid_from", - "test_valid_to", + CAST(NULL AS TIMESTAMPTZ) AS "test_valid_to", TRUE AS "_exists" - FROM "target" - WHERE - "test_valid_to" IS NULL -), "deleted" AS ( - SELECT + FROM "target" + WHERE "test_valid_from" <= CAST('2020-01-01 00:00:00+00:00' AS TIMESTAMPTZ) + AND ("test_valid_to" IS NULL OR "test_valid_to" >= CAST('2020-01-01 00:00:00+00:00' AS TIMESTAMPTZ)) + ), "deleted" AS ( + SELECT "static"."id1", "static"."id2", "static"."name", @@ -1671,23 +1672,20 @@ def test_merge_scd_type_2_pandas(make_mocked_engine_adapter: t.Callable): "static"."test_updated_at", "static"."test_valid_from", "static"."test_valid_to" - FROM "static" - LEFT JOIN "latest" + FROM "static" + LEFT JOIN "latest" ON "static"."id1" = "latest"."id1" AND "static"."id2" = "latest"."id2" - WHERE - "latest"."test_valid_to" IS NULL -), "latest_deleted" AS ( - SELECT + WHERE "latest"."test_valid_to" IS NULL + ), "latest_deleted" AS ( + SELECT TRUE AS "_exists", "id1" AS "_key0", "id2" AS "_key1", MAX("test_valid_to") AS "test_valid_to" - FROM "deleted" - GROUP BY - "id1", - "id2" -), "joined" AS ( - SELECT + FROM "deleted" + GROUP BY "id1", "id2" + ), "joined" AS ( + SELECT "source"."_exists" AS "_exists", "latest"."id1" AS "t_id1", "latest"."id2" AS "t_id2", @@ -1701,11 +1699,11 @@ def test_merge_scd_type_2_pandas(make_mocked_engine_adapter: t.Callable): "source"."name" AS "name", "source"."price" AS "price", "source"."test_updated_at" AS "test_updated_at" - FROM "latest" - LEFT JOIN "source" + FROM "latest" + LEFT JOIN "source" ON "latest"."id1" = "source"."id1" AND "latest"."id2" = "source"."id2" - UNION ALL - SELECT + UNION ALL + SELECT "source"."_exists" AS "_exists", "latest"."id1" AS "t_id1", "latest"."id2" AS "t_id2", @@ -1719,13 +1717,12 @@ def test_merge_scd_type_2_pandas(make_mocked_engine_adapter: t.Callable): "source"."name" AS "name", "source"."price" AS "price", "source"."test_updated_at" AS "test_updated_at" - FROM "latest" - RIGHT JOIN "source" + FROM "latest" + RIGHT JOIN "source" ON "latest"."id1" = "source"."id1" AND "latest"."id2" = "source"."id2" - WHERE - "latest"."_exists" IS NULL -), "updated_rows" AS ( - SELECT + WHERE "latest"."_exists" IS NULL + ), "updated_rows" AS ( + SELECT COALESCE("joined"."t_id1", "joined"."id1") AS "id1", COALESCE("joined"."t_id2", "joined"."id2") AS "id2", COALESCE("joined"."t_name", "joined"."name") AS "name", @@ -1734,9 +1731,9 @@ def test_merge_scd_type_2_pandas(make_mocked_engine_adapter: t.Callable): CASE WHEN "t_test_valid_from" IS NULL AND NOT "latest_deleted"."_exists" IS NULL THEN CASE - WHEN "latest_deleted"."test_valid_to" > "test_updated_at" - THEN "latest_deleted"."test_valid_to" - ELSE "test_updated_at" + WHEN "latest_deleted"."test_valid_to" > "test_updated_at" + THEN "latest_deleted"."test_valid_to" + ELSE "test_updated_at" END WHEN "t_test_valid_from" IS NULL THEN CAST('1970-01-01 00:00:00+00:00' AS TIMESTAMPTZ) @@ -1749,12 +1746,11 @@ def test_merge_scd_type_2_pandas(make_mocked_engine_adapter: t.Callable): THEN CAST('2020-01-01 00:00:00+00:00' AS TIMESTAMPTZ) ELSE "t_test_valid_to" END AS "test_valid_to" - FROM "joined" - LEFT JOIN "latest_deleted" - ON "joined"."id1" = "latest_deleted"."_key0" - AND "joined"."id2" = "latest_deleted"."_key1" -), "inserted_rows" AS ( - SELECT + FROM "joined" + LEFT JOIN "latest_deleted" + ON "joined"."id1" = "latest_deleted"."_key0" AND "joined"."id2" = "latest_deleted"."_key1" + ), "inserted_rows" AS ( + SELECT "id1", "id2", "name", @@ -1762,12 +1758,23 @@ def test_merge_scd_type_2_pandas(make_mocked_engine_adapter: t.Callable): "test_updated_at", "test_updated_at" AS "test_valid_from", CAST(NULL AS TIMESTAMPTZ) AS "test_valid_to" - FROM "joined" - WHERE - "joined"."test_updated_at" > "joined"."t_test_updated_at" -) -SELECT CAST("id1" AS INT) AS "id1", CAST("id2" AS INT) AS "id2", CAST("name" AS VARCHAR) AS "name", CAST("price" AS DOUBLE) AS "price", CAST("test_updated_at" AS TIMESTAMPTZ) AS "test_updated_at", CAST("test_valid_from" AS TIMESTAMPTZ) AS "test_valid_from", CAST("test_valid_to" AS TIMESTAMPTZ) AS "test_valid_to" FROM (SELECT "id1", "id2", "name", "price", "test_updated_at", "test_valid_from", "test_valid_to" FROM "static" UNION ALL SELECT "id1", "id2", "name", "price", "test_updated_at", "test_valid_from", "test_valid_to" FROM "updated_rows" UNION ALL SELECT "id1", "id2", "name", "price", "test_updated_at", "test_valid_from", "test_valid_to" FROM "inserted_rows") AS "_subquery" -""" + FROM "joined" + WHERE "joined"."test_updated_at" > "joined"."t_test_updated_at" + ) + SELECT + CAST("id1" AS INT) AS "id1", + CAST("id2" AS INT) AS "id2", + CAST("name" AS VARCHAR) AS "name", + CAST("price" AS DOUBLE) AS "price", + CAST("test_updated_at" AS TIMESTAMPTZ) AS "test_updated_at", + CAST("test_valid_from" AS TIMESTAMPTZ) AS "test_valid_from", + CAST("test_valid_to" AS TIMESTAMPTZ) AS "test_valid_to" + FROM ( + SELECT "id1", "id2", "name", "price", "test_updated_at", "test_valid_from", "test_valid_to" FROM "static" + UNION ALL SELECT "id1", "id2", "name", "price", "test_updated_at", "test_valid_from", "test_valid_to" FROM "updated_rows" + UNION ALL SELECT "id1", "id2", "name", "price", "test_updated_at", "test_valid_from", "test_valid_to" FROM "inserted_rows" + ) AS "_subquery" + """ ).sql() ) @@ -1790,71 +1797,71 @@ def test_scd_type_2_by_column(make_mocked_engine_adapter: t.Callable): "test_valid_to": exp.DataType.build("TIMESTAMP"), }, execution_time=datetime(2020, 1, 1, 0, 0, 0), + start=datetime(2020, 1, 1, 0, 0, 0), extra_col_ignore="testing", ) assert ( - adapter.cursor.execute.call_args[0][0] + parse_one(adapter.cursor.execute.call_args[0][0]).sql() == parse_one( """ -CREATE OR REPLACE TABLE "target" AS -WITH "source" AS ( - SELECT DISTINCT ON ("id") + CREATE OR REPLACE TABLE "target" AS + WITH "source" AS ( + SELECT DISTINCT ON ("id") TRUE AS "_exists", "id", "name", "price" - FROM ( + FROM ( SELECT "id", "name", "price" FROM "source" - ) AS "raw_source" -), "static" AS ( - SELECT + ) AS "raw_source" + ), "static" AS ( + SELECT "id", "name", "price", "test_VALID_from", "test_valid_to", TRUE AS "_exists" - FROM "target" - WHERE - NOT "test_valid_to" IS NULL -), "latest" AS ( - SELECT + FROM "target" + WHERE NOT "test_valid_to" IS NULL AND "test_valid_to" < CAST('2020-01-01 00:00:00' AS TIMESTAMP) + ), "latest" AS ( + SELECT "id", "name", "price", "test_VALID_from", - "test_valid_to", + CAST(NULL AS TIMESTAMP) AS "test_valid_to", TRUE AS "_exists" - FROM "target" - WHERE - "test_valid_to" IS NULL -), "deleted" AS ( - SELECT + FROM "target" + WHERE "test_VALID_from" <= CAST('2020-01-01 00:00:00' AS TIMESTAMP) + AND ("test_valid_to" IS NULL OR "test_valid_to" >= CAST('2020-01-01 00:00:00' AS TIMESTAMP)) + ), "deleted" AS ( + SELECT "static"."id", "static"."name", "static"."price", "static"."test_VALID_from", "static"."test_valid_to" - FROM "static" - LEFT JOIN "latest" + FROM "static" + LEFT JOIN "latest" ON "static"."id" = "latest"."id" - WHERE + WHERE "latest"."test_valid_to" IS NULL -), "latest_deleted" AS ( - SELECT + ), "latest_deleted" AS ( + SELECT TRUE AS "_exists", "id" AS "_key0", MAX("test_valid_to") AS "test_valid_to" - FROM "deleted" - GROUP BY + FROM "deleted" + GROUP BY "id" -), "joined" AS ( - SELECT + ), "joined" AS ( + SELECT "source"."_exists" AS "_exists", "latest"."id" AS "t_id", "latest"."name" AS "t_name", @@ -1864,11 +1871,11 @@ def test_scd_type_2_by_column(make_mocked_engine_adapter: t.Callable): "source"."id" AS "id", "source"."name" AS "name", "source"."price" AS "price" - FROM "latest" - LEFT JOIN "source" + FROM "latest" + LEFT JOIN "source" ON "latest"."id" = "source"."id" - UNION ALL - SELECT + UNION ALL + SELECT "source"."_exists" AS "_exists", "latest"."id" AS "t_id", "latest"."name" AS "t_name", @@ -1878,13 +1885,13 @@ def test_scd_type_2_by_column(make_mocked_engine_adapter: t.Callable): "source"."id" AS "id", "source"."name" AS "name", "source"."price" AS "price" - FROM "latest" - RIGHT JOIN "source" + FROM "latest" + RIGHT JOIN "source" ON "latest"."id" = "source"."id" - WHERE + WHERE "latest"."_exists" IS NULL -), "updated_rows" AS ( - SELECT + ), "updated_rows" AS ( + SELECT COALESCE("joined"."t_id", "joined"."id") AS "id", COALESCE("joined"."t_name", "joined"."name") AS "name", COALESCE("joined"."t_price", "joined"."price") AS "price", @@ -1892,63 +1899,73 @@ def test_scd_type_2_by_column(make_mocked_engine_adapter: t.Callable): CASE WHEN "joined"."_exists" IS NULL OR ( - ( - NOT "joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL + ( + NOT "joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL + ) + AND ( + "joined"."name" <> "joined"."t_name" + OR ( + "joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL ) - AND ( - "joined"."name" <> "joined"."t_name" - OR ( - "joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL - ) - OR ( - NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL - ) - OR "joined"."price" <> "joined"."t_price" - OR ( - "joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL - ) - OR ( - NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL - ) + OR ( + NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL + ) + OR "joined"."price" <> "joined"."t_price" + OR ( + "joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL + ) + OR ( + NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL ) ) + ) THEN CAST('2020-01-01 00:00:00' AS TIMESTAMP) ELSE "t_test_valid_to" END AS "test_valid_to" - FROM "joined" - LEFT JOIN "latest_deleted" + FROM "joined" + LEFT JOIN "latest_deleted" ON "joined"."id" = "latest_deleted"."_key0" -), "inserted_rows" AS ( - SELECT + ), "inserted_rows" AS ( + SELECT "id", "name", "price", CAST('2020-01-01 00:00:00' AS TIMESTAMP) AS "test_VALID_from", CAST(NULL AS TIMESTAMP) AS "test_valid_to" - FROM "joined" - WHERE + FROM "joined" + WHERE ( NOT "joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL ) AND ( "joined"."name" <> "joined"."t_name" OR ( - "joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL + "joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL ) OR ( - NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL + NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL ) OR "joined"."price" <> "joined"."t_price" OR ( - "joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL + "joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL ) OR ( - NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL + NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL ) ) -) -SELECT CAST("id" AS INT) AS "id", CAST("name" AS VARCHAR) AS "name", CAST("price" AS DOUBLE) AS "price", CAST("test_VALID_from" AS TIMESTAMP) AS "test_VALID_from", CAST("test_valid_to" AS TIMESTAMP) AS "test_valid_to" FROM (SELECT "id", "name", "price", "test_VALID_from", "test_valid_to" FROM "static" UNION ALL SELECT "id", "name", "price", "test_VALID_from", "test_valid_to" FROM "updated_rows" UNION ALL SELECT "id", "name", "price", "test_VALID_from", "test_valid_to" FROM "inserted_rows") AS "_subquery" - """ + ) + SELECT + CAST("id" AS INT) AS "id", + CAST("name" AS VARCHAR) AS "name", + CAST("price" AS DOUBLE) AS "price", + CAST("test_VALID_from" AS TIMESTAMP) AS "test_VALID_from", + CAST("test_valid_to" AS TIMESTAMP) AS "test_valid_to" + FROM ( + SELECT "id", "name", "price", "test_VALID_from", "test_valid_to" FROM "static" + UNION ALL SELECT "id", "name", "price", "test_VALID_from", "test_valid_to" FROM "updated_rows" + UNION ALL SELECT "id", "name", "price", "test_VALID_from", "test_valid_to" FROM "inserted_rows" + ) AS "_subquery" + """ ).sql() ) @@ -1972,30 +1989,30 @@ def test_scd_type_2_by_column_composite_key(make_mocked_engine_adapter: t.Callab "test_valid_to": exp.DataType.build("TIMESTAMP"), }, execution_time=datetime(2020, 1, 1, 0, 0, 0), + start=datetime(2020, 1, 1, 0, 0, 0), ) - assert ( - adapter.cursor.execute.call_args[0][0] + parse_one(adapter.cursor.execute.call_args[0][0]).sql() == parse_one( """ -CREATE OR REPLACE TABLE "target" AS -WITH "source" AS ( - SELECT DISTINCT ON (CONCAT("id_a", "id_b")) + CREATE OR REPLACE TABLE "target" AS + WITH "source" AS ( + SELECT DISTINCT ON (CONCAT("id_a", "id_b")) TRUE AS "_exists", "id_a", "id_b", "name", - "price", - FROM ( + "price" + FROM ( SELECT "id_a", "id_b", "name", "price" FROM "source" - ) AS "raw_source" -), "static" AS ( - SELECT + ) AS "raw_source" + ), "static" AS ( + SELECT "id_a", "id_b", "name", @@ -2003,44 +2020,41 @@ def test_scd_type_2_by_column_composite_key(make_mocked_engine_adapter: t.Callab "test_VALID_from", "test_valid_to", TRUE AS "_exists" - FROM "target" - WHERE - NOT "test_valid_to" IS NULL -), "latest" AS ( - SELECT + FROM "target" + WHERE NOT "test_valid_to" IS NULL AND "test_valid_to" < CAST('2020-01-01 00:00:00' AS TIMESTAMP) + ), "latest" AS ( + SELECT "id_a", "id_b", "name", "price", "test_VALID_from", - "test_valid_to", + CAST(NULL AS TIMESTAMP) AS "test_valid_to", TRUE AS "_exists" - FROM "target" - WHERE - "test_valid_to" IS NULL -), "deleted" AS ( - SELECT + FROM "target" + WHERE "test_VALID_from" <= CAST('2020-01-01 00:00:00' AS TIMESTAMP) + AND ("test_valid_to" IS NULL OR "test_valid_to" >= CAST('2020-01-01 00:00:00' AS TIMESTAMP)) + ), "deleted" AS ( + SELECT "static"."id_a", "static"."id_b", "static"."name", "static"."price", "static"."test_VALID_from", "static"."test_valid_to" - FROM "static" - LEFT JOIN "latest" + FROM "static" + LEFT JOIN "latest" ON CONCAT("static"."id_a", "static"."id_b") = CONCAT("latest"."id_a", "latest"."id_b") - WHERE - "latest"."test_valid_to" IS NULL -), "latest_deleted" AS ( - SELECT + WHERE "latest"."test_valid_to" IS NULL + ), "latest_deleted" AS ( + SELECT TRUE AS "_exists", CONCAT("id_a", "id_b") AS "_key0", MAX("test_valid_to") AS "test_valid_to" - FROM "deleted" - GROUP BY - CONCAT("id_a", "id_b") -), "joined" AS ( - SELECT + FROM "deleted" + GROUP BY CONCAT("id_a", "id_b") + ), "joined" AS ( + SELECT "source"."_exists" AS "_exists", "latest"."id_a" AS "t_id_a", "latest"."id_b" AS "t_id_b", @@ -2052,11 +2066,11 @@ def test_scd_type_2_by_column_composite_key(make_mocked_engine_adapter: t.Callab "source"."id_b" AS "id_b", "source"."name" AS "name", "source"."price" AS "price" - FROM "latest" - LEFT JOIN "source" + FROM "latest" + LEFT JOIN "source" ON CONCAT("latest"."id_a", "latest"."id_b") = CONCAT("source"."id_a", "source"."id_b") - UNION ALL - SELECT + UNION ALL + SELECT "source"."_exists" AS "_exists", "latest"."id_a" AS "t_id_a", "latest"."id_b" AS "t_id_b", @@ -2068,13 +2082,12 @@ def test_scd_type_2_by_column_composite_key(make_mocked_engine_adapter: t.Callab "source"."id_b" AS "id_b", "source"."name" AS "name", "source"."price" AS "price" - FROM "latest" - RIGHT JOIN "source" + FROM "latest" + RIGHT JOIN "source" ON CONCAT("latest"."id_a", "latest"."id_b") = CONCAT("source"."id_a", "source"."id_b") - WHERE - "latest"."_exists" IS NULL -), "updated_rows" AS ( - SELECT + WHERE "latest"."_exists" IS NULL + ), "updated_rows" AS ( + SELECT COALESCE("joined"."t_id_a", "joined"."id_a") AS "id_a", COALESCE("joined"."t_id_b", "joined"."id_b") AS "id_b", COALESCE("joined"."t_name", "joined"."name") AS "name", @@ -2083,64 +2096,55 @@ def test_scd_type_2_by_column_composite_key(make_mocked_engine_adapter: t.Callab CASE WHEN "joined"."_exists" IS NULL OR ( - ( - NOT CONCAT("t_id_a", "t_id_b") IS NULL AND NOT CONCAT("id_a", "id_b") IS NULL - ) + (NOT CONCAT("t_id_a", "t_id_b") IS NULL AND NOT CONCAT("id_a", "id_b") IS NULL) AND ( - "joined"."name" <> "joined"."t_name" - OR ( - "joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL - ) - OR ( - NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL - ) - OR "joined"."price" <> "joined"."t_price" - OR ( - "joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL - ) - OR ( - NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL - ) + "joined"."name" <> "joined"."t_name" + OR ("joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL) + OR (NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL) + OR "joined"."price" <> "joined"."t_price" + OR ("joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL) + OR (NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL) ) ) THEN CAST('2020-01-01 00:00:00' AS TIMESTAMP) ELSE "t_test_valid_to" END AS "test_valid_to" - FROM "joined" - LEFT JOIN "latest_deleted" + FROM "joined" + LEFT JOIN "latest_deleted" ON CONCAT("joined"."id_a", "joined"."id_b") = "latest_deleted"."_key0" -), "inserted_rows" AS ( - SELECT + ), "inserted_rows" AS ( + SELECT "id_a", "id_b", "name", "price", CAST('2020-01-01 00:00:00' AS TIMESTAMP) AS "test_VALID_from", CAST(NULL AS TIMESTAMP) AS "test_valid_to" - FROM "joined" - WHERE - ( - NOT CONCAT("t_id_a", "t_id_b") IS NULL AND NOT CONCAT("id_a", "id_b") IS NULL - ) + FROM "joined" + WHERE + (NOT CONCAT("t_id_a", "t_id_b") IS NULL AND NOT CONCAT("id_a", "id_b") IS NULL) AND ( "joined"."name" <> "joined"."t_name" - OR ( - "joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL - ) - OR ( - NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL - ) + OR ("joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL) + OR (NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL) OR "joined"."price" <> "joined"."t_price" - OR ( - "joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL - ) - OR ( - NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL - ) + OR ("joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL) + OR (NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL) ) -) -SELECT CAST("id_a" AS VARCHAR) AS "id_a", CAST("id_b" AS VARCHAR) AS "id_b", CAST("name" AS VARCHAR) AS "name", CAST("price" AS DOUBLE) AS "price", CAST("test_VALID_from" AS TIMESTAMP) AS "test_VALID_from", CAST("test_valid_to" AS TIMESTAMP) AS "test_valid_to" FROM (SELECT "id_a", "id_b", "name", "price", "test_VALID_from", "test_valid_to" FROM "static" UNION ALL SELECT "id_a", "id_b", "name", "price", "test_VALID_from", "test_valid_to" FROM "updated_rows" UNION ALL SELECT "id_a", "id_b", "name", "price", "test_VALID_from", "test_valid_to" FROM "inserted_rows") AS "_subquery" - """ + ) + SELECT + CAST("id_a" AS VARCHAR) AS "id_a", + CAST("id_b" AS VARCHAR) AS "id_b", + CAST("name" AS VARCHAR) AS "name", + CAST("price" AS DOUBLE) AS "price", + CAST("test_VALID_from" AS TIMESTAMP) AS "test_VALID_from", + CAST("test_valid_to" AS TIMESTAMP) AS "test_valid_to" + FROM ( + SELECT "id_a", "id_b", "name", "price", "test_VALID_from", "test_valid_to" FROM "static" + UNION ALL SELECT "id_a", "id_b", "name", "price", "test_VALID_from", "test_valid_to" FROM "updated_rows" + UNION ALL SELECT "id_a", "id_b", "name", "price", "test_VALID_from", "test_valid_to" FROM "inserted_rows" + ) AS "_subquery" + """ ).sql() ) @@ -2164,6 +2168,7 @@ def test_scd_type_2_truncate(make_mocked_engine_adapter: t.Callable): }, execution_time=datetime(2020, 1, 1, 0, 0, 0), truncate=True, + start=datetime(2020, 1, 1, 0, 0, 0), ) assert ( @@ -2346,70 +2351,69 @@ def test_scd_type_2_by_column_star_check(make_mocked_engine_adapter: t.Callable) "test_valid_to": exp.DataType.build("TIMESTAMP"), }, execution_time=datetime(2020, 1, 1, 0, 0, 0), + start=datetime(2020, 1, 1, 0, 0, 0), ) assert ( - adapter.cursor.execute.call_args[0][0] + parse_one(adapter.cursor.execute.call_args[0][0]).sql() == parse_one( """ -CREATE OR REPLACE TABLE "target" AS -WITH "source" AS ( - SELECT DISTINCT ON ("id") + CREATE OR REPLACE TABLE "target" AS + WITH "source" AS ( + SELECT DISTINCT ON ("id") TRUE AS "_exists", "id", "name", "price" - FROM ( + FROM ( SELECT "id", "name", "price" FROM "source" - ) AS "raw_source" -), "static" AS ( - SELECT + ) AS "raw_source" + ), "static" AS ( + SELECT "id", "name", "price", "test_valid_from", "test_valid_to", TRUE AS "_exists" - FROM "target" - WHERE - NOT "test_valid_to" IS NULL -), "latest" AS ( - SELECT + FROM "target" + WHERE NOT "test_valid_to" IS NULL AND "test_valid_to" < CAST('2020-01-01 00:00:00' AS TIMESTAMP) + ), "latest" AS ( + SELECT "id", "name", "price", "test_valid_from", - "test_valid_to", + CAST(NULL AS TIMESTAMP) AS "test_valid_to", TRUE AS "_exists" - FROM "target" - WHERE - "test_valid_to" IS NULL -), "deleted" AS ( - SELECT + FROM "target" + WHERE "test_valid_from" <= CAST('2020-01-01 00:00:00' AS TIMESTAMP) + AND ("test_valid_to" IS NULL OR "test_valid_to" >= CAST('2020-01-01 00:00:00' AS TIMESTAMP)) + ), "deleted" AS ( + SELECT "static"."id", "static"."name", "static"."price", "static"."test_valid_from", "static"."test_valid_to" - FROM "static" - LEFT JOIN "latest" + FROM "static" + LEFT JOIN "latest" ON "static"."id" = "latest"."id" - WHERE - "latest"."test_valid_to" IS NULL -), "latest_deleted" AS ( - SELECT + WHERE "latest"."test_valid_to" IS NULL + ), "latest_deleted" AS ( + SELECT TRUE AS "_exists", "id" AS "_key0", MAX("test_valid_to") AS "test_valid_to" - FROM "deleted" - GROUP BY + FROM "deleted" + GROUP BY "id" -), "joined" AS ( - SELECT + ), "joined" AS ( + SELECT "source"."_exists" AS "_exists", "latest"."id" AS "t_id", "latest"."name" AS "t_name", @@ -2419,11 +2423,11 @@ def test_scd_type_2_by_column_star_check(make_mocked_engine_adapter: t.Callable) "source"."id" AS "id", "source"."name" AS "name", "source"."price" AS "price" - FROM "latest" - LEFT JOIN "source" + FROM "latest" + LEFT JOIN "source" ON "latest"."id" = "source"."id" - UNION ALL - SELECT + UNION ALL + SELECT "source"."_exists" AS "_exists", "latest"."id" AS "t_id", "latest"."name" AS "t_name", @@ -2433,13 +2437,12 @@ def test_scd_type_2_by_column_star_check(make_mocked_engine_adapter: t.Callable) "source"."id" AS "id", "source"."name" AS "name", "source"."price" AS "price" - FROM "latest" - RIGHT JOIN "source" + FROM "latest" + RIGHT JOIN "source" ON "latest"."id" = "source"."id" - WHERE - "latest"."_exists" IS NULL -), "updated_rows" AS ( - SELECT + WHERE "latest"."_exists" IS NULL + ), "updated_rows" AS ( + SELECT COALESCE("joined"."t_id", "joined"."id") AS "id", COALESCE("joined"."t_name", "joined"."name") AS "name", COALESCE("joined"."t_price", "joined"."price") AS "price", @@ -2447,77 +2450,59 @@ def test_scd_type_2_by_column_star_check(make_mocked_engine_adapter: t.Callable) CASE WHEN "joined"."_exists" IS NULL OR ( - ( - NOT "joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL - ) - AND ( - "joined"."id" <> "joined"."t_id" - OR ( - "joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL - ) - OR ( - NOT "joined"."t_id" IS NULL AND "joined"."id" IS NULL - ) - OR "joined"."name" <> "joined"."t_name" - OR ( - "joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL - ) - OR ( - NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL - ) - OR "joined"."price" <> "joined"."t_price" - OR ( - "joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL - ) - OR ( - NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL - ) - ) + (NOT "joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL) + AND ( + "joined"."id" <> "joined"."t_id" + OR ("joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL) + OR (NOT "joined"."t_id" IS NULL AND "joined"."id" IS NULL) + OR "joined"."name" <> "joined"."t_name" + OR ("joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL) + OR (NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL) + OR "joined"."price" <> "joined"."t_price" + OR ("joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL) + OR (NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL) + ) ) THEN CAST('2020-01-01 00:00:00' AS TIMESTAMP) ELSE "t_test_valid_to" END AS "test_valid_to" - FROM "joined" - LEFT JOIN "latest_deleted" + FROM "joined" + LEFT JOIN "latest_deleted" ON "joined"."id" = "latest_deleted"."_key0" -), "inserted_rows" AS ( - SELECT + ), "inserted_rows" AS ( + SELECT "id", "name", "price", CAST('2020-01-01 00:00:00' AS TIMESTAMP) AS "test_valid_from", CAST(NULL AS TIMESTAMP) AS "test_valid_to" - FROM "joined" - WHERE - ( - NOT "joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL - ) + FROM "joined" + WHERE + (NOT "joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL) AND ( "joined"."id" <> "joined"."t_id" - OR ( - "joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL - ) - OR ( - NOT "joined"."t_id" IS NULL AND "joined"."id" IS NULL - ) + OR ("joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL) + OR (NOT "joined"."t_id" IS NULL AND "joined"."id" IS NULL) OR "joined"."name" <> "joined"."t_name" - OR ( - "joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL - ) - OR ( - NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL - ) + OR ("joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL) + OR (NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL) OR "joined"."price" <> "joined"."t_price" - OR ( - "joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL - ) - OR ( - NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL - ) + OR ("joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL) + OR (NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL) ) -) -SELECT CAST("id" AS INT) AS "id", CAST("name" AS VARCHAR) AS "name", CAST("price" AS DOUBLE) AS "price", CAST("test_valid_from" AS TIMESTAMP) AS "test_valid_from", CAST("test_valid_to" AS TIMESTAMP) AS "test_valid_to" FROM (SELECT "id", "name", "price", "test_valid_from", "test_valid_to" FROM "static" UNION ALL SELECT "id", "name", "price", "test_valid_from", "test_valid_to" FROM "updated_rows" UNION ALL SELECT "id", "name", "price", "test_valid_from", "test_valid_to" FROM "inserted_rows") AS "_subquery" - """ + ) + SELECT + CAST("id" AS INT) AS "id", + CAST("name" AS VARCHAR) AS "name", + CAST("price" AS DOUBLE) AS "price", + CAST("test_valid_from" AS TIMESTAMP) AS "test_valid_from", + CAST("test_valid_to" AS TIMESTAMP) AS "test_valid_to" + FROM ( + SELECT "id", "name", "price", "test_valid_from", "test_valid_to" FROM "static" + UNION ALL SELECT "id", "name", "price", "test_valid_from", "test_valid_to" FROM "updated_rows" + UNION ALL SELECT "id", "name", "price", "test_valid_from", "test_valid_to" FROM "inserted_rows" + ) AS "_subquery" + """ ).sql() ) @@ -2541,10 +2526,11 @@ def test_scd_type_2_by_column_no_invalidate_hard_deletes(make_mocked_engine_adap "test_valid_to": exp.DataType.build("TIMESTAMP"), }, execution_time=datetime(2020, 1, 1, 0, 0, 0), + start=datetime(2020, 1, 1, 0, 0, 0), ) assert ( - adapter.cursor.execute.call_args[0][0] + parse_one(adapter.cursor.execute.call_args[0][0]).sql() == parse_one( """ CREATE OR REPLACE TABLE "target" AS @@ -2570,19 +2556,18 @@ def test_scd_type_2_by_column_no_invalidate_hard_deletes(make_mocked_engine_adap "test_valid_to", TRUE AS "_exists" FROM "target" - WHERE - NOT "test_valid_to" IS NULL + WHERE NOT "test_valid_to" IS NULL AND "test_valid_to" < CAST('2020-01-01 00:00:00' AS TIMESTAMP) ), "latest" AS ( SELECT "id", "name", "price", "test_valid_from", - "test_valid_to", + CAST(NULL AS TIMESTAMP) AS "test_valid_to", TRUE AS "_exists" FROM "target" - WHERE - "test_valid_to" IS NULL + WHERE "test_valid_from" <= CAST('2020-01-01 00:00:00' AS TIMESTAMP) + AND ("test_valid_to" IS NULL OR "test_valid_to" >= CAST('2020-01-01 00:00:00' AS TIMESTAMP)) ), "deleted" AS ( SELECT "static"."id", diff --git a/tests/core/engine_adapter/test_clickhouse.py b/tests/core/engine_adapter/test_clickhouse.py index 9d63d9400e..1665239e36 100644 --- a/tests/core/engine_adapter/test_clickhouse.py +++ b/tests/core/engine_adapter/test_clickhouse.py @@ -606,6 +606,8 @@ def test_scd_type_2_by_time( "test_valid_to": exp.DataType.build("TIMESTAMP"), }, execution_time=datetime(2020, 1, 1, 0, 0, 0), + start=datetime(2020, 1, 1, 0, 0, 0), + truncate=True, ) assert to_sql_calls(adapter)[4] == parse_one( @@ -637,7 +639,7 @@ def test_scd_type_2_by_time( TRUE AS "_exists" FROM ""__temp_target_efgh"" WHERE - NOT "test_valid_to" IS NULL + NOT "test_valid_to" IS NULL LIMIT 0 ), "latest" AS ( SELECT "id", @@ -649,7 +651,7 @@ def test_scd_type_2_by_time( TRUE AS "_exists" FROM ""__temp_target_efgh"" WHERE - "test_valid_to" IS NULL + "test_valid_to" IS NULL LIMIT 0 ), "deleted" AS ( SELECT "static"."id", @@ -811,6 +813,8 @@ def test_scd_type_2_by_column( "test_valid_to": exp.DataType.build("TIMESTAMP"), }, execution_time=datetime(2020, 1, 1, 0, 0, 0), + start=datetime(2020, 1, 1, 0, 0, 0), + truncate=True, ) assert to_sql_calls(adapter)[4] == parse_one( @@ -840,7 +844,7 @@ def test_scd_type_2_by_column( TRUE AS "_exists" FROM "__temp_target_efgh" WHERE - NOT "test_valid_to" IS NULL + NOT ("test_valid_to" IS NULL) LIMIT 0 ), "latest" AS ( SELECT "id", @@ -851,7 +855,7 @@ def test_scd_type_2_by_column( TRUE AS "_exists" FROM "__temp_target_efgh" WHERE - "test_valid_to" IS NULL + "test_valid_to" IS NULL LIMIT 0 ), "deleted" AS ( SELECT "static"."id", @@ -907,7 +911,7 @@ def test_scd_type_2_by_column( COALESCE("joined"."t_id", "joined"."id") AS "id", COALESCE("joined"."t_name", "joined"."name") AS "name", COALESCE("joined"."t_price", "joined"."price") AS "price", - COALESCE("t_test_VALID_from", CAST('2020-01-01 00:00:00' AS Nullable(DateTime64(6)))) AS "test_VALID_from", + COALESCE("t_test_VALID_from", CAST('1970-01-01 00:00:00' AS Nullable(DateTime64(6)))) AS "test_VALID_from", CASE WHEN "joined"."_exists" IS NULL OR ( diff --git a/tests/core/engine_adapter/test_spark.py b/tests/core/engine_adapter/test_spark.py index f1c658a23a..8a455c47a3 100644 --- a/tests/core/engine_adapter/test_spark.py +++ b/tests/core/engine_adapter/test_spark.py @@ -569,6 +569,8 @@ def check_table_exists(table_name: exp.Table) -> bool: "test_valid_to": exp.DataType.build("TIMESTAMP"), }, execution_time=datetime(2020, 1, 1, 0, 0, 0), + start=datetime(2020, 1, 1, 0, 0, 0), + truncate=True, ) assert to_sql_calls(adapter) == [ @@ -613,7 +615,7 @@ def check_table_exists(table_name: exp.Table) -> bool: TRUE AS `_exists` FROM `db`.`temp_target_abcdefgh` WHERE - NOT `test_valid_to` IS NULL + NOT `test_valid_to` IS NULL LIMIT 0 ), `latest` AS ( SELECT `id`, @@ -625,7 +627,7 @@ def check_table_exists(table_name: exp.Table) -> bool: TRUE AS `_exists` FROM `db`.`temp_target_abcdefgh` WHERE - `test_valid_to` IS NULL + `test_valid_to` IS NULL LIMIT 0 ), `deleted` AS ( SELECT `static`.`id`, diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index 81e8d2dcb4..766a788ac8 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -6467,3 +6467,329 @@ def plan_with_output(ctx: Context, environment: str): for environment in ["dev", "prod"]: context_diff = ctx._context_diff(environment) assert context_diff.environment == environment + + +@time_machine.travel("2023-01-08 15:00:00 UTC") +def test_scd_type_2_restatement(init_and_plan_context: t.Callable): + context, plan = init_and_plan_context("examples/sushi") + context.apply(plan) + + raw_employee_status = d.parse(""" + MODEL ( + name memory.hr_system.raw_employee_status, + kind FULL + ); + + SELECT + 1001 AS employee_id, + 'engineering' AS department, + 'EMEA' AS region, + '2023-01-08 15:00:00 UTC' AS last_modified; + """) + + # Create SCD Type 2 model for employee history tracking + employee_history = d.parse(""" + MODEL ( + name memory.hr_system.employee_history, + kind SCD_TYPE_2_BY_TIME ( + unique_key employee_id, + updated_at_name last_modified, + disable_restatement false + ), + owner hr_analytics, + cron '*/5 * * * *', + grain employee_id, + description 'Historical tracking of employee status changes' + ); + + SELECT + employee_id::INT AS employee_id, + department::TEXT AS department, + region::TEXT AS region, + last_modified AS last_modified + FROM + memory.hr_system.raw_employee_status; + """) + + raw_employee_status_model = load_sql_based_model(raw_employee_status) + employee_history_model = load_sql_based_model(employee_history) + context.upsert_model(raw_employee_status_model) + context.upsert_model(employee_history_model) + + # Initial plan and apply + plan = context.plan_builder("prod", skip_tests=True).build() + context.apply(plan) + + query = "SELECT employee_id, department, region, valid_from, valid_to FROM memory.hr_system.employee_history ORDER BY employee_id, valid_from" + initial_data = context.engine_adapter.fetchdf(query) + + assert len(initial_data) == 1 + assert initial_data["valid_to"].isna().all() + assert initial_data["department"].tolist() == ["engineering"] + assert initial_data["region"].tolist() == ["EMEA"] + + # Apply a future plan with source changes + with time_machine.travel("2023-01-08 15:10:00 UTC"): + # Update source model, employee 1001 changed region + raw_employee_status_v2 = d.parse(""" + MODEL ( + name memory.hr_system.raw_employee_status, + kind FULL + ); + + SELECT + 1001 AS employee_id, + 'engineering' AS department, + 'AMER' AS region, + '2023-01-08 15:10:00 UTC' AS last_modified; + """) + raw_employee_status_v2_model = load_sql_based_model(raw_employee_status_v2) + context.upsert_model(raw_employee_status_v2_model) + context.plan( + auto_apply=True, no_prompts=True, categorizer_config=CategorizerConfig.all_full() + ) + + with time_machine.travel("2023-01-08 15:20:00 UTC"): + context.run() + data_after_change = context.engine_adapter.fetchdf(query) + + # Validate the SCD2 history for employee 1001 + assert len(data_after_change) == 2 + assert data_after_change.iloc[0]["employee_id"] == 1001 + assert data_after_change.iloc[0]["department"] == "engineering" + assert data_after_change.iloc[0]["region"] == "EMEA" + assert str(data_after_change.iloc[0]["valid_from"]) == "1970-01-01 00:00:00" + assert str(data_after_change.iloc[0]["valid_to"]) == "2023-01-08 15:10:00" + assert data_after_change.iloc[1]["employee_id"] == 1001 + assert data_after_change.iloc[1]["department"] == "engineering" + assert data_after_change.iloc[1]["region"] == "AMER" + assert str(data_after_change.iloc[1]["valid_from"]) == "2023-01-08 15:10:00" + assert pd.isna(data_after_change.iloc[1]["valid_to"]) + + # Update source model, employee 1001 changed region again and department + raw_employee_status_v2 = d.parse(""" + MODEL ( + name memory.hr_system.raw_employee_status, + kind FULL + ); + + SELECT + 1001 AS employee_id, + 'sales' AS department, + 'ANZ' AS region, + '2023-01-08 15:26:00 UTC' AS last_modified; + """) + raw_employee_status_v2_model = load_sql_based_model(raw_employee_status_v2) + context.upsert_model(raw_employee_status_v2_model) + context.plan( + auto_apply=True, no_prompts=True, categorizer_config=CategorizerConfig.all_full() + ) + + with time_machine.travel("2023-01-08 15:35:00 UTC"): + context.run() + data_after_change = context.engine_adapter.fetchdf(query) + + # Validate the SCD2 history for employee 1001 after second change + assert len(data_after_change) == 3 + assert data_after_change.iloc[0]["employee_id"] == 1001 + assert data_after_change.iloc[0]["department"] == "engineering" + assert data_after_change.iloc[0]["region"] == "EMEA" + assert str(data_after_change.iloc[0]["valid_from"]) == "1970-01-01 00:00:00" + assert str(data_after_change.iloc[0]["valid_to"]) == "2023-01-08 15:10:00" + assert data_after_change.iloc[1]["employee_id"] == 1001 + assert data_after_change.iloc[1]["department"] == "engineering" + assert data_after_change.iloc[1]["region"] == "AMER" + assert str(data_after_change.iloc[1]["valid_from"]) == "2023-01-08 15:10:00" + assert str(data_after_change.iloc[1]["valid_to"]) == "2023-01-08 15:26:00" + assert data_after_change.iloc[2]["employee_id"] == 1001 + assert data_after_change.iloc[2]["department"] == "sales" + assert data_after_change.iloc[2]["region"] == "ANZ" + assert str(data_after_change.iloc[2]["valid_from"]) == "2023-01-08 15:26:00" + assert pd.isna(data_after_change.iloc[2]["valid_to"]) + + # Now test restatement cleanup by restating from 15:10 (first change) + with time_machine.travel("2023-01-08 15:38:00 UTC"): + plan = context.plan_builder( + "prod", + skip_tests=True, + restate_models=["memory.hr_system.employee_history"], + start="2023-01-08 15:09:00", + ).build() + context.apply(plan) + restated_data = context.engine_adapter.fetchdf(query) + + # Validate the SCD2 history after restatement + assert len(restated_data) == 2 + assert restated_data.iloc[0]["employee_id"] == 1001 + assert restated_data.iloc[0]["department"] == "engineering" + assert restated_data.iloc[0]["region"] == "EMEA" + assert str(restated_data.iloc[0]["valid_from"]) == "1970-01-01 00:00:00" + assert str(restated_data.iloc[0]["valid_to"]) == "2023-01-08 15:26:00" + assert restated_data.iloc[1]["employee_id"] == 1001 + assert restated_data.iloc[1]["department"] == "sales" + assert restated_data.iloc[1]["region"] == "ANZ" + assert str(restated_data.iloc[1]["valid_from"]) == "2023-01-08 15:26:00" + assert pd.isna(restated_data.iloc[1]["valid_to"]) + + +@time_machine.travel("2020-01-01 00:00:00 UTC") +def test_scd_type_2_full_restatement_no_start_date(init_and_plan_context: t.Callable): + context, plan = init_and_plan_context("examples/sushi") + context.apply(plan) + + # Initial product catalog of 3 products + raw_products = d.parse(""" + MODEL ( + name memory.store.raw_products, + kind FULL + ); + + SELECT * FROM VALUES + (101, 'Laptop Pro', 1299.99, 'Electronics', '2020-01-01 00:00:00'::TIMESTAMP), + (102, 'Wireless Mouse', 49.99, 'Electronics', '2020-01-01 00:00:00'::TIMESTAMP), + (103, 'Office Chair', 199.99, 'Furniture', '2020-01-01 00:00:00'::TIMESTAMP) + AS t(product_id, product_name, price, category, last_updated); + """) + + # SCD Type 2 model for product history tracking + product_history = d.parse(""" + MODEL ( + name memory.store.product_history, + kind SCD_TYPE_2_BY_TIME ( + unique_key product_id, + updated_at_name last_updated, + disable_restatement false + ), + owner catalog_team, + cron '0 */6 * * *', + grain product_id, + description 'Product catalog change history' + ); + + SELECT + product_id::INT AS product_id, + product_name::TEXT AS product_name, + price::DECIMAL(10,2) AS price, + category::TEXT AS category, + last_updated AS last_updated + FROM + memory.store.raw_products; + """) + + raw_products_model = load_sql_based_model(raw_products) + product_history_model = load_sql_based_model(product_history) + context.upsert_model(raw_products_model) + context.upsert_model(product_history_model) + + # Initial plan and apply + plan = context.plan_builder("prod", skip_tests=True).build() + context.apply(plan) + + query = "SELECT product_id, product_name, price, category, last_updated, valid_from, valid_to FROM memory.store.product_history ORDER BY product_id, valid_from" + initial_data = context.engine_adapter.fetchdf(query) + + # Validate initial state of 3 products all active + assert len(initial_data) == 3 + assert initial_data["valid_to"].isna().all() + initial_product_names = set(initial_data["product_name"].tolist()) + assert initial_product_names == {"Laptop Pro", "Wireless Mouse", "Office Chair"} + + # Price update and category change + with time_machine.travel("2020-01-15 12:00:00 UTC"): + raw_products_v2 = d.parse(""" + MODEL ( + name memory.store.raw_products, + kind FULL + ); + + SELECT * FROM VALUES + (101, 'Laptop Pro', 1199.99, 'Electronics', '2020-01-15 00:00:00'::TIMESTAMP), + (102, 'Wireless Mouse', 49.99, 'Electronics', '2020-01-01 00:00:00'::TIMESTAMP), + (103, 'Ergonomic Office Chair', 229.99, 'Office Furniture', '2020-01-15 00:00:00'::TIMESTAMP) + AS t(product_id, product_name, price, category, last_updated); + """) + raw_products_v2_model = load_sql_based_model(raw_products_v2) + context.upsert_model(raw_products_v2_model) + context.plan( + auto_apply=True, no_prompts=True, categorizer_config=CategorizerConfig.all_full() + ) + context.run() + + data_after_first_change = context.engine_adapter.fetchdf(query) + + # Should have 5 records (3 original closed, 2 new activε, 1 unchanged) + assert len(data_after_first_change) == 5 + + # Second change + with time_machine.travel("2020-02-01 10:00:00 UTC"): + raw_products_v3 = d.parse(""" + MODEL ( + name memory.store.raw_products, + kind FULL + ); + + SELECT * FROM VALUES + (101, 'Laptop Pro Max', 1399.99, 'Electronics', '2020-02-01 00:00:00'::TIMESTAMP), + (103, 'Ergonomic Office Chair', 229.99, 'Office Furniture', '2020-01-15 00:00:00'::TIMESTAMP), + (102, 'Wireless Mouse', 49.99, 'Electronics', '2020-01-01 00:00:00'::TIMESTAMP) + AS t(product_id, product_name, price, category, last_updated); + """) + raw_products_v3_model = load_sql_based_model(raw_products_v3) + context.upsert_model(raw_products_v3_model) + context.plan( + auto_apply=True, no_prompts=True, categorizer_config=CategorizerConfig.all_full() + ) + context.run() + data_after_second_change = context.engine_adapter.fetchdf(query) + assert len(data_after_second_change) == 6 + + # Store the current state before full restatement + data_before_full_restatement = data_after_second_change.copy() + + # Perform full restatement (no start date provided) + with time_machine.travel("2020-02-01 15:00:00 UTC"): + plan = context.plan_builder( + "prod", skip_tests=True, restate_models=["memory.store.product_history"] + ).build() + context.apply(plan) + data_after_full_restatement = context.engine_adapter.fetchdf(query) + assert len(data_after_full_restatement) == 3 + + # Check that all currently active products before restatement are still active after restatement + active_before = data_before_full_restatement[ + data_before_full_restatement["valid_to"].isna() + ] + active_after = data_after_full_restatement + assert set(active_before["product_id"]) == set(active_after["product_id"]) + + expected_products = { + 101: { + "product_name": "Laptop Pro Max", + "price": 1399.99, + "category": "Electronics", + "last_updated": "2020-02-01", + }, + 102: { + "product_name": "Wireless Mouse", + "price": 49.99, + "category": "Electronics", + "last_updated": "2020-01-01", + }, + 103: { + "product_name": "Ergonomic Office Chair", + "price": 229.99, + "category": "Office Furniture", + "last_updated": "2020-01-15", + }, + } + for _, row in data_after_full_restatement.iterrows(): + pid = row["product_id"] + assert pid in expected_products + expected = expected_products[pid] + assert row["product_name"] == expected["product_name"] + assert float(row["price"]) == expected["price"] + assert row["category"] == expected["category"] + + # valid_from should be the epoch, valid_to should be NaT + assert str(row["valid_from"]) == "1970-01-01 00:00:00" + assert pd.isna(row["valid_to"]) diff --git a/tests/core/test_model.py b/tests/core/test_model.py index 8d16c9422b..4fc82875d7 100644 --- a/tests/core/test_model.py +++ b/tests/core/test_model.py @@ -9906,9 +9906,9 @@ def test_signal_always_true(batch, arg1, arg2): def test_scd_type_2_full_history_restatement(): - assert ModelKindName.SCD_TYPE_2.full_history_restatement_only is True - assert ModelKindName.SCD_TYPE_2_BY_TIME.full_history_restatement_only is True - assert ModelKindName.SCD_TYPE_2_BY_COLUMN.full_history_restatement_only is True + assert ModelKindName.SCD_TYPE_2.full_history_restatement_only is False + assert ModelKindName.SCD_TYPE_2_BY_TIME.full_history_restatement_only is False + assert ModelKindName.SCD_TYPE_2_BY_COLUMN.full_history_restatement_only is False assert ModelKindName.INCREMENTAL_BY_TIME_RANGE.full_history_restatement_only is False diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index d131e6aa95..3704c192bd 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -1986,6 +1986,7 @@ def test_insert_into_scd_type_2_by_time( column_descriptions={}, updated_at_as_valid_from=False, truncate=truncate, + start="2020-01-01", ) adapter_mock.columns.assert_called_once_with(snapshot.table_name()) @@ -2158,6 +2159,7 @@ def test_insert_into_scd_type_2_by_column( table_description=None, column_descriptions={}, truncate=truncate, + start="2020-01-01", ) adapter_mock.columns.assert_called_once_with(snapshot.table_name())