From 7094da350391de84d8f518f8c6ddbaa13e139406 Mon Sep 17 00:00:00 2001 From: Petar Date: Thu, 2 Apr 2026 23:51:11 +0200 Subject: [PATCH 1/6] fix(snapshot): support dbt_valid_to_current config in snapshot staging macros --- CHANGELOG.md | 3 + .../macros/materializations/snapshot.sql | 22 +- .../test_snapshot_dbt_valid_to_current.py | 215 ++++++++++++++++++ 3 files changed, 236 insertions(+), 4 deletions(-) create mode 100644 tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py diff --git a/CHANGELOG.md b/CHANGELOG.md index f92c599d..05d75088 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ #### Improvements * Starting with this release the `dbt-clickhouse` packages will be published to PyPI using Github Actions as a [Trusted Publisher](https://docs.pypi.org/trusted-publishers/). This will improve both the usability and the security of the release process ([#614](https://github.com/ClickHouse/dbt-clickhouse/pull/614)). +#### Bugs +* Fix `dbt_valid_to_current` snapshot configuration being ignored in the ClickHouse adapter. The snapshot macros for both timestamp and check strategies now correctly read and apply the `dbt_valid_to_current` config value, matching dbt-core's expected behavior. Previously, snapshots configured with `dbt_valid_to_current` would produce duplicate rows on subsequent runs because the adapter always filtered current records with `WHERE dbt_valid_to IS NULL`, missing rows that had the configured sentinel value. + ### Release [1.10.0], 2026-02-16 diff --git a/dbt/include/clickhouse/macros/materializations/snapshot.sql b/dbt/include/clickhouse/macros/materializations/snapshot.sql index 327b00ae..5e5a1ff6 100644 --- a/dbt/include/clickhouse/macros/materializations/snapshot.sql +++ b/dbt/include/clickhouse/macros/materializations/snapshot.sql @@ -99,6 +99,8 @@ {% macro clickhouse__snapshot_staging_table_check_strategy(strategy, source_sql, target_relation) -%} + {% set dbt_valid_to_current = config.get('dbt_valid_to_current') %} + with snapshot_time as ( select {{ strategy.updated_at }} as ts -- Single timestamp ), @@ -114,7 +116,12 @@ {{ strategy.unique_key }} as dbt_unique_key from {{ target_relation }} - where dbt_valid_to is null + where + {% if dbt_valid_to_current %} + ( dbt_valid_to = {{ dbt_valid_to_current }} or dbt_valid_to is null ) + {% else %} + dbt_valid_to is null + {% endif %} ), @@ -125,7 +132,7 @@ {{ strategy.unique_key }} as dbt_unique_key, snapshot_time.ts as dbt_updated_at, snapshot_time.ts as dbt_valid_from, - nullif(snapshot_time.ts, snapshot_time.ts) as dbt_valid_to, + coalesce(nullif(snapshot_time.ts, snapshot_time.ts), {{ dbt_valid_to_current or 'null' }}) as dbt_valid_to, {{ strategy.scd_id }} as dbt_scd_id from snapshot_query, snapshot_time @@ -217,6 +224,8 @@ {% macro clickhouse__snapshot_staging_table_timestamp_strategy(strategy, source_sql, target_relation) -%} + {% set dbt_valid_to_current = config.get('dbt_valid_to_current') %} + with snapshot_query as ( {{ source_sql }} @@ -229,7 +238,12 @@ {{ strategy.unique_key }} as dbt_unique_key from {{ target_relation }} - where dbt_valid_to is null + where + {% if dbt_valid_to_current %} + ( dbt_valid_to = {{ dbt_valid_to_current }} or dbt_valid_to is null ) + {% else %} + dbt_valid_to is null + {% endif %} ), @@ -240,7 +254,7 @@ {{ strategy.unique_key }} as dbt_unique_key, {{ strategy.updated_at }} as dbt_updated_at, {{ strategy.updated_at }} as dbt_valid_from, - nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to, + coalesce(nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}), {{ dbt_valid_to_current or 'null' }}) as dbt_valid_to, {{ strategy.scd_id }} as dbt_scd_id from snapshot_query diff --git a/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py b/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py new file mode 100644 index 00000000..0e30dc45 --- /dev/null +++ b/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py @@ -0,0 +1,215 @@ +import pytest + +from dbt.tests.util import run_dbt, relation_from_name + + +seeds_base_csv = """ +id,name,some_date +1,Easton,1981-05-20T06:46:51 +2,Lillian,1978-09-03T18:10:33 +3,Jeremiah,1982-03-11T03:59:51 +4,Nolan,1976-05-06T20:21:35 +5,Hannah,1982-06-23T05:41:26 +6,Eleanor,1991-08-10T23:12:21 +7,Lily,1971-03-29T14:58:02 +8,Jonathan,1988-02-26T02:55:24 +9,Adrian,1994-02-09T13:14:23 +10,Nora,1976-03-01T16:51:39 +""".lstrip() + +seeds_added_csv = ( + seeds_base_csv + + """11,Mateo,2014-09-07T17:04:27 +12,Julian,2000-02-04T11:48:30 +""".lstrip() +) + +# Timestamp strategy snapshot with dbt_valid_to_current configured +ts_snapshot_valid_to_current_sql = """ +{% snapshot ts_snapshot %} + {{ config( + strategy='timestamp', + unique_key='id', + updated_at='some_date', + target_database=database, + target_schema=schema, + dbt_valid_to_current="toDateTime('9999-12-31 00:00:00')", + )}} + select * from {{ ref(var('seed_name', 'base')) }} +{% endsnapshot %} +""".strip() + +# Check strategy snapshot with dbt_valid_to_current configured +cc_snapshot_valid_to_current_sql = """ +{% snapshot cc_snapshot %} + {{ config( + strategy='check', + unique_key='id', + check_cols='all', + target_database=database, + target_schema=schema, + dbt_valid_to_current="toDateTime('9999-12-31 00:00:00')", + )}} + select * from {{ ref(var('seed_name', 'base')) }} +{% endsnapshot %} +""".strip() + + +def get_row_count(project, snapshot_name): + relation = relation_from_name(project.adapter, snapshot_name) + result = project.run_sql(f"select count(*) from {relation}", fetch="one") + return result[0] + + +def get_valid_to_values(project, snapshot_name): + """Return list of dbt_valid_to values for all rows, ordered by id and dbt_valid_from.""" + relation = relation_from_name(project.adapter, snapshot_name) + result = project.run_sql( + f"select id, dbt_valid_to from {relation} order by id, dbt_valid_from", + fetch="all", + ) + return result + + +def get_current_row_count(project, snapshot_name): + """Count rows where dbt_valid_to equals the configured current value.""" + relation = relation_from_name(project.adapter, snapshot_name) + result = project.run_sql( + f"select count(*) from {relation} where dbt_valid_to = toDateTime('9999-12-31 00:00:00')", + fetch="one", + ) + return result[0] + + +def get_expired_row_count(project, snapshot_name): + """Count rows where dbt_valid_to is a real timestamp (not the current sentinel).""" + relation = relation_from_name(project.adapter, snapshot_name) + result = project.run_sql( + f"select count(*) from {relation} where dbt_valid_to != toDateTime('9999-12-31 00:00:00')", + fetch="one", + ) + return result[0] + + +class TestSnapshotTimestampDbtValidToCurrent: + @pytest.fixture(scope="class") + def seeds(self): + return { + "base.csv": seeds_base_csv, + "added.csv": seeds_added_csv, + } + + @pytest.fixture(scope="class") + def snapshots(self): + return { + "ts_snapshot.sql": ts_snapshot_valid_to_current_sql, + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return {"name": "snapshot_valid_to_current_timestamp"} + + @pytest.fixture(autouse=True) + def clean_up(self, project): + yield + with project.adapter.connection_named("__test"): + relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + project.adapter.drop_schema(relation) + + def test_snapshot_valid_to_current_timestamp(self, project): + # Seed the base data (10 rows) + results = run_dbt(["seed"]) + assert len(results) == 2 + + # --- First snapshot run --- + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # Should have 10 rows, all with dbt_valid_to = '9999-12-31' (not NULL) + assert get_row_count(project, "ts_snapshot") == 10 + assert get_current_row_count(project, "ts_snapshot") == 10 + assert get_expired_row_count(project, "ts_snapshot") == 0 + + # --- Second snapshot run (no changes) --- + # This is the critical test: without the fix, the second run would + # fail to find current records (because it looks for dbt_valid_to IS NULL + # but they have '9999-12-31'), causing duplicate inserts. + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # Should still have exactly 10 rows - no duplicates + assert get_row_count(project, "ts_snapshot") == 10 + assert get_current_row_count(project, "ts_snapshot") == 10 + assert get_expired_row_count(project, "ts_snapshot") == 0 + + # --- Third snapshot run with new data --- + # Point at the "added" seed so the snapshot sees 2 new rows + results = run_dbt(["snapshot", "--vars", "seed_name: added"]) + assert len(results) == 1 + + # Should now have 12 rows (10 original + 2 new), all current + assert get_row_count(project, "ts_snapshot") == 12 + assert get_current_row_count(project, "ts_snapshot") == 12 + assert get_expired_row_count(project, "ts_snapshot") == 0 + + +class TestSnapshotCheckDbtValidToCurrent: + @pytest.fixture(scope="class") + def seeds(self): + return { + "base.csv": seeds_base_csv, + "added.csv": seeds_added_csv, + } + + @pytest.fixture(scope="class") + def snapshots(self): + return { + "cc_snapshot.sql": cc_snapshot_valid_to_current_sql, + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return {"name": "snapshot_valid_to_current_check"} + + @pytest.fixture(autouse=True) + def clean_up(self, project): + yield + with project.adapter.connection_named("__test"): + relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + project.adapter.drop_schema(relation) + + def test_snapshot_valid_to_current_check(self, project): + # Seed the base data (10 rows) + results = run_dbt(["seed"]) + assert len(results) == 2 + + # --- First snapshot run --- + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # Should have 10 rows, all with dbt_valid_to = '9999-12-31' (not NULL) + assert get_row_count(project, "cc_snapshot") == 10 + assert get_current_row_count(project, "cc_snapshot") == 10 + assert get_expired_row_count(project, "cc_snapshot") == 0 + + # --- Second snapshot run (no changes) --- + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # Should still have exactly 10 rows - no duplicates + assert get_row_count(project, "cc_snapshot") == 10 + assert get_current_row_count(project, "cc_snapshot") == 10 + assert get_expired_row_count(project, "cc_snapshot") == 0 + + # --- Third snapshot run with new data --- + results = run_dbt(["snapshot", "--vars", "seed_name: added"]) + assert len(results) == 1 + + # Should now have 12 rows (10 original + 2 new), all current + assert get_row_count(project, "cc_snapshot") == 12 + assert get_current_row_count(project, "cc_snapshot") == 12 + assert get_expired_row_count(project, "cc_snapshot") == 0 From 29aed2fdedb32534982aa7457370d0dbf10a801b Mon Sep 17 00:00:00 2001 From: Petar Salinovic <5ar.salinovic@gmail.com> Date: Mon, 20 Apr 2026 21:58:01 +0200 Subject: [PATCH 2/6] lint issue fix --- .../adapter/basic/test_snapshot_dbt_valid_to_current.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py b/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py index 0e30dc45..82e2d6d2 100644 --- a/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py +++ b/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py @@ -1,6 +1,6 @@ import pytest -from dbt.tests.util import run_dbt, relation_from_name +from dbt.tests.util import relation_from_name, run_dbt seeds_base_csv = """ From d5203489fde70842659546dceaa412b43cba8b8f Mon Sep 17 00:00:00 2001 From: Petar Salinovic <5ar.salinovic@gmail.com> Date: Tue, 21 Apr 2026 16:10:25 +0200 Subject: [PATCH 3/6] added hard_deletes test case --- .../test_snapshot_dbt_valid_to_current.py | 154 +++++++++++++++++- 1 file changed, 147 insertions(+), 7 deletions(-) diff --git a/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py b/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py index 82e2d6d2..442309f8 100644 --- a/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py +++ b/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py @@ -24,6 +24,20 @@ """.lstrip() ) +# Seed with one row deleted (id=10 removed) +seeds_deleted_csv = """ +id,name,some_date +1,Easton,1981-05-20T06:46:51 +2,Lillian,1978-09-03T18:10:33 +3,Jeremiah,1982-03-11T03:59:51 +4,Nolan,1976-05-06T20:21:35 +5,Hannah,1982-06-23T05:41:26 +6,Eleanor,1991-08-10T23:12:21 +7,Lily,1971-03-29T14:58:02 +8,Jonathan,1988-02-26T02:55:24 +9,Adrian,1994-02-09T13:14:23 +""".lstrip() + # Timestamp strategy snapshot with dbt_valid_to_current configured ts_snapshot_valid_to_current_sql = """ {% snapshot ts_snapshot %} @@ -33,7 +47,7 @@ updated_at='some_date', target_database=database, target_schema=schema, - dbt_valid_to_current="toDateTime('9999-12-31 00:00:00')", + dbt_valid_to_current="toDateTime('2100-01-01 00:00:00')", )}} select * from {{ ref(var('seed_name', 'base')) }} {% endsnapshot %} @@ -48,7 +62,23 @@ check_cols='all', target_database=database, target_schema=schema, - dbt_valid_to_current="toDateTime('9999-12-31 00:00:00')", + dbt_valid_to_current="toDateTime('2100-01-01 00:00:00')", + )}} + select * from {{ ref(var('seed_name', 'base')) }} +{% endsnapshot %} +""".strip() + +# Check strategy snapshot with dbt_valid_to_current AND hard_deletes='invalidate' +cc_snapshot_valid_to_current_hard_deletes_sql = """ +{% snapshot cc_snapshot_hd %} + {{ config( + strategy='check', + unique_key='id', + check_cols='all', + target_database=database, + target_schema=schema, + dbt_valid_to_current="toDateTime('2100-01-01 00:00:00')", + hard_deletes='invalidate', )}} select * from {{ ref(var('seed_name', 'base')) }} {% endsnapshot %} @@ -75,7 +105,7 @@ def get_current_row_count(project, snapshot_name): """Count rows where dbt_valid_to equals the configured current value.""" relation = relation_from_name(project.adapter, snapshot_name) result = project.run_sql( - f"select count(*) from {relation} where dbt_valid_to = toDateTime('9999-12-31 00:00:00')", + f"select count(*) from {relation} where dbt_valid_to = toDateTime('2100-01-01 00:00:00')", fetch="one", ) return result[0] @@ -85,12 +115,32 @@ def get_expired_row_count(project, snapshot_name): """Count rows where dbt_valid_to is a real timestamp (not the current sentinel).""" relation = relation_from_name(project.adapter, snapshot_name) result = project.run_sql( - f"select count(*) from {relation} where dbt_valid_to != toDateTime('9999-12-31 00:00:00')", + f"select count(*) from {relation} where dbt_valid_to != toDateTime('2100-01-01 00:00:00')", + fetch="one", + ) + return result[0] + + +def get_deleted_row_count(project, snapshot_name): + """Count rows for id=10 (the deleted row) to verify it was invalidated.""" + relation = relation_from_name(project.adapter, snapshot_name) + result = project.run_sql( + f"select count(*) from {relation} where id = 10", fetch="one", ) return result[0] +def get_deleted_row_valid_to(project, snapshot_name): + """Get dbt_valid_to for the deleted row (id=10) to verify it's not the sentinel.""" + relation = relation_from_name(project.adapter, snapshot_name) + result = project.run_sql( + f"select dbt_valid_to from {relation} where id = 10", + fetch="one", + ) + return result[0] if result else None + + class TestSnapshotTimestampDbtValidToCurrent: @pytest.fixture(scope="class") def seeds(self): @@ -127,7 +177,7 @@ def test_snapshot_valid_to_current_timestamp(self, project): results = run_dbt(["snapshot"]) assert len(results) == 1 - # Should have 10 rows, all with dbt_valid_to = '9999-12-31' (not NULL) + # Should have 10 rows, all with dbt_valid_to = '2100-01-01' (not NULL) assert get_row_count(project, "ts_snapshot") == 10 assert get_current_row_count(project, "ts_snapshot") == 10 assert get_expired_row_count(project, "ts_snapshot") == 0 @@ -135,7 +185,7 @@ def test_snapshot_valid_to_current_timestamp(self, project): # --- Second snapshot run (no changes) --- # This is the critical test: without the fix, the second run would # fail to find current records (because it looks for dbt_valid_to IS NULL - # but they have '9999-12-31'), causing duplicate inserts. + # but they have '2100-01-01'), causing duplicate inserts. results = run_dbt(["snapshot"]) assert len(results) == 1 @@ -191,7 +241,7 @@ def test_snapshot_valid_to_current_check(self, project): results = run_dbt(["snapshot"]) assert len(results) == 1 - # Should have 10 rows, all with dbt_valid_to = '9999-12-31' (not NULL) + # Should have 10 rows, all with dbt_valid_to = '2100-01-01' (not NULL) assert get_row_count(project, "cc_snapshot") == 10 assert get_current_row_count(project, "cc_snapshot") == 10 assert get_expired_row_count(project, "cc_snapshot") == 0 @@ -213,3 +263,93 @@ def test_snapshot_valid_to_current_check(self, project): assert get_row_count(project, "cc_snapshot") == 12 assert get_current_row_count(project, "cc_snapshot") == 12 assert get_expired_row_count(project, "cc_snapshot") == 0 + + +class TestSnapshotCheckDbtValidToCurrentWithHardDeletes: + """Test hard_deletes='invalidate' with dbt_valid_to_current. + + This tests the combination from issue #481 where hard_deletes: 'invalidate' + is used together with dbt_valid_to_current. + + NOTE: This test is skipped by default because it requires + join_use_nulls=1 to pass. ClickHouse's default join_use_nulls=0 causes + LEFT JOINs to return default values (0, '') instead of NULL for missing + keys. The snapshot deletes CTE uses 'WHERE x IS NULL' to detect deleted + rows, which never matches when default values are returned instead of + NULL. This is a known ClickHouse behavior that affects all snapshots + using hard_deletes='invalidate', not specific to the dbt_valid_to_current + fix (see issues #271, #291). + + To run this test locally, add 'join_use_nulls': 1 to custom_settings in + your dbt profile connection config. + """ + + @pytest.fixture(scope="class") + def seeds(self): + return { + "base.csv": seeds_base_csv, + "deleted.csv": seeds_deleted_csv, + } + + @pytest.fixture(scope="class") + def snapshots(self): + return { + "cc_snapshot_hd.sql": cc_snapshot_valid_to_current_hard_deletes_sql, + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return {"name": "snapshot_valid_to_current_hard_deletes"} + + @pytest.fixture(autouse=True) + def clean_up(self, project): + yield + with project.adapter.connection_named("__test"): + relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + project.adapter.drop_schema(relation) + + @pytest.mark.skip( + reason=( + "Requires join_use_nulls=1 to pass (see class docstring). " + "ClickHouse's default join_use_nulls=0 returns default values " + "(0, '') instead of NULL for missing LEFT JOIN keys, preventing " + "the snapshot deletes CTE from detecting deleted rows. " + "Not specific to dbt_valid_to_current - affects all snapshots " + "using hard_deletes='invalidate' (issues #271, #291)." + ) + ) + def test_snapshot_valid_to_current_with_hard_deletes(self, project): + # Seed the base data (10 rows) + results = run_dbt(["seed"]) + assert len(results) == 2 + + # --- First snapshot run --- + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # Should have 10 rows, all with dbt_valid_to = '2100-01-01' + assert get_row_count(project, "cc_snapshot_hd") == 10 + assert get_current_row_count(project, "cc_snapshot_hd") == 10 + assert get_expired_row_count(project, "cc_snapshot_hd") == 0 + + # --- Second snapshot run with deleted data --- + # Point at the "deleted" seed so id=10 is now missing from source + results = run_dbt(["snapshot", "--vars", "seed_name: deleted"]) + assert len(results) == 1 + + # Should still have 10 rows (9 current + 1 invalidated) + assert get_row_count(project, "cc_snapshot_hd") == 10 + + # 9 rows should still be current (dbt_valid_to = sentinel) + assert get_current_row_count(project, "cc_snapshot_hd") == 9 + + # 1 row should be expired (the deleted row id=10) + assert get_expired_row_count(project, "cc_snapshot_hd") == 1 + + # Verify the deleted row (id=10) exists but is invalidated + assert get_deleted_row_count(project, "cc_snapshot_hd") == 1 + deleted_valid_to = get_deleted_row_valid_to(project, "cc_snapshot_hd") + # The deleted row should have a real timestamp, not the sentinel + assert deleted_valid_to != "2100-01-01 00:00:00" From a66290b3731f01f88d857fc0fd012024931a08db Mon Sep 17 00:00:00 2001 From: Petar Salinovic <5ar.salinovic@gmail.com> Date: Tue, 21 Apr 2026 16:25:20 +0200 Subject: [PATCH 4/6] added a test case for updated record --- .../test_snapshot_dbt_valid_to_current.py | 105 +++++++++++++++++- 1 file changed, 103 insertions(+), 2 deletions(-) diff --git a/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py b/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py index 442309f8..30c95041 100644 --- a/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py +++ b/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py @@ -1,3 +1,5 @@ +from datetime import datetime + import pytest from dbt.tests.util import relation_from_name, run_dbt @@ -38,6 +40,21 @@ 9,Adrian,1994-02-09T13:14:23 """.lstrip() +# Seed with one row updated (id=1 name changed, some_date changed for timestamp strategy) +seeds_updated_csv = """ +id,name,some_date +1,Easton_updated,2020-01-01T00:00:00 +2,Lillian,1978-09-03T18:10:33 +3,Jeremiah,1982-03-11T03:59:51 +4,Nolan,1976-05-06T20:21:35 +5,Hannah,1982-06-23T05:41:26 +6,Eleanor,1991-08-10T23:12:21 +7,Lily,1971-03-29T14:58:02 +8,Jonathan,1988-02-26T02:55:24 +9,Adrian,1994-02-09T13:14:23 +10,Nora,1976-03-01T16:51:39 +""".lstrip() + # Timestamp strategy snapshot with dbt_valid_to_current configured ts_snapshot_valid_to_current_sql = """ {% snapshot ts_snapshot %} @@ -141,12 +158,33 @@ def get_deleted_row_valid_to(project, snapshot_name): return result[0] if result else None +def get_rows_for_id(project, snapshot_name, row_id): + """Return all rows for a specific id, ordered by dbt_valid_from (oldest first).""" + relation = relation_from_name(project.adapter, snapshot_name) + result = project.run_sql( + f"select id, name, dbt_valid_from, dbt_valid_to from {relation} where id = {row_id} order by dbt_valid_from", + fetch="all", + ) + return result + + +def get_row_count_for_id(project, snapshot_name, row_id): + """Count rows for a specific id.""" + relation = relation_from_name(project.adapter, snapshot_name) + result = project.run_sql( + f"select count(*) from {relation} where id = {row_id}", + fetch="one", + ) + return result[0] + + class TestSnapshotTimestampDbtValidToCurrent: @pytest.fixture(scope="class") def seeds(self): return { "base.csv": seeds_base_csv, "added.csv": seeds_added_csv, + "updated.csv": seeds_updated_csv, } @pytest.fixture(scope="class") @@ -171,7 +209,7 @@ def clean_up(self, project): def test_snapshot_valid_to_current_timestamp(self, project): # Seed the base data (10 rows) results = run_dbt(["seed"]) - assert len(results) == 2 + assert len(results) == 3 # --- First snapshot run --- results = run_dbt(["snapshot"]) @@ -204,6 +242,37 @@ def test_snapshot_valid_to_current_timestamp(self, project): assert get_current_row_count(project, "ts_snapshot") == 12 assert get_expired_row_count(project, "ts_snapshot") == 0 + # --- Fourth snapshot run with updated data --- + # Point at the "updated" seed so id=1 has a new name and later some_date + results = run_dbt( + ["--no-partial-parse", "snapshot", "--vars", "seed_name: updated"] + ) + assert len(results) == 1 + + # Should now have 13 rows (12 + 1 new version for updated id=1) + assert get_row_count(project, "ts_snapshot") == 13 + + # 12 rows should still be current (dbt_valid_to = sentinel) + assert get_current_row_count(project, "ts_snapshot") == 12 + + # 1 row should be expired (the old version of id=1) + assert get_expired_row_count(project, "ts_snapshot") == 1 + + # Verify id=1 has exactly 2 rows: old expired + new current + id_1_rows = get_rows_for_id(project, "ts_snapshot", 1) + assert len(id_1_rows) == 2, f"Expected 2 rows for id=1, got {len(id_1_rows)}" + + # First row (oldest) should be the original version, now expired + old_row = id_1_rows[0] + assert old_row[1] == "Easton" # original name + # dbt_valid_to should be a real timestamp (not the sentinel) + assert old_row[3] != datetime(2100, 1, 1, 0, 0) + + # Second row (newest) should be the updated version, still current + new_row = id_1_rows[1] + assert new_row[1] == "Easton_updated" # updated name + assert new_row[3] == datetime(2100, 1, 1, 0, 0) # dbt_valid_to = sentinel + class TestSnapshotCheckDbtValidToCurrent: @pytest.fixture(scope="class") @@ -211,6 +280,7 @@ def seeds(self): return { "base.csv": seeds_base_csv, "added.csv": seeds_added_csv, + "updated.csv": seeds_updated_csv, } @pytest.fixture(scope="class") @@ -235,7 +305,7 @@ def clean_up(self, project): def test_snapshot_valid_to_current_check(self, project): # Seed the base data (10 rows) results = run_dbt(["seed"]) - assert len(results) == 2 + assert len(results) == 3 # --- First snapshot run --- results = run_dbt(["snapshot"]) @@ -264,6 +334,37 @@ def test_snapshot_valid_to_current_check(self, project): assert get_current_row_count(project, "cc_snapshot") == 12 assert get_expired_row_count(project, "cc_snapshot") == 0 + # --- Fourth snapshot run with updated data --- + # Point at the "updated" seed so id=1 has a changed name + results = run_dbt( + ["--no-partial-parse", "snapshot", "--vars", "seed_name: updated"] + ) + assert len(results) == 1 + + # Should now have 13 rows (12 + 1 new version for updated id=1) + assert get_row_count(project, "cc_snapshot") == 13 + + # 12 rows should still be current (dbt_valid_to = sentinel) + assert get_current_row_count(project, "cc_snapshot") == 12 + + # 1 row should be expired (the old version of id=1) + assert get_expired_row_count(project, "cc_snapshot") == 1 + + # Verify id=1 has exactly 2 rows: old expired + new current + id_1_rows = get_rows_for_id(project, "cc_snapshot", 1) + assert len(id_1_rows) == 2, f"Expected 2 rows for id=1, got {len(id_1_rows)}" + + # First row (oldest) should be the original version, now expired + old_row = id_1_rows[0] + assert old_row[1] == "Easton" # original name + # dbt_valid_to should be a real timestamp (not the sentinel) + assert old_row[3] != datetime(2100, 1, 1, 0, 0) + + # Second row (newest) should be the updated version, still current + new_row = id_1_rows[1] + assert new_row[1] == "Easton_updated" # updated name + assert new_row[3] == datetime(2100, 1, 1, 0, 0) # dbt_valid_to = sentinel + class TestSnapshotCheckDbtValidToCurrentWithHardDeletes: """Test hard_deletes='invalidate' with dbt_valid_to_current. From ec7927fedf66dc6082a597ab111cc58c3fa03e6b Mon Sep 17 00:00:00 2001 From: Petar Salinovic <5ar.salinovic@gmail.com> Date: Tue, 21 Apr 2026 16:48:07 +0200 Subject: [PATCH 5/6] unused functions --- .../test_snapshot_dbt_valid_to_current.py | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py b/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py index 30c95041..e426da53 100644 --- a/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py +++ b/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py @@ -108,16 +108,6 @@ def get_row_count(project, snapshot_name): return result[0] -def get_valid_to_values(project, snapshot_name): - """Return list of dbt_valid_to values for all rows, ordered by id and dbt_valid_from.""" - relation = relation_from_name(project.adapter, snapshot_name) - result = project.run_sql( - f"select id, dbt_valid_to from {relation} order by id, dbt_valid_from", - fetch="all", - ) - return result - - def get_current_row_count(project, snapshot_name): """Count rows where dbt_valid_to equals the configured current value.""" relation = relation_from_name(project.adapter, snapshot_name) @@ -168,16 +158,6 @@ def get_rows_for_id(project, snapshot_name, row_id): return result -def get_row_count_for_id(project, snapshot_name, row_id): - """Count rows for a specific id.""" - relation = relation_from_name(project.adapter, snapshot_name) - result = project.run_sql( - f"select count(*) from {relation} where id = {row_id}", - fetch="one", - ) - return result[0] - - class TestSnapshotTimestampDbtValidToCurrent: @pytest.fixture(scope="class") def seeds(self): From 67f9348f30950578e165016b283be669b9d7e45c Mon Sep 17 00:00:00 2001 From: Petar Salinovic <5ar.salinovic@gmail.com> Date: Tue, 21 Apr 2026 16:48:32 +0200 Subject: [PATCH 6/6] compare datetime objects instead of datetime and string --- .../adapter/basic/test_snapshot_dbt_valid_to_current.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py b/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py index e426da53..deff3838 100644 --- a/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py +++ b/tests/integration/adapter/basic/test_snapshot_dbt_valid_to_current.py @@ -433,4 +433,4 @@ def test_snapshot_valid_to_current_with_hard_deletes(self, project): assert get_deleted_row_count(project, "cc_snapshot_hd") == 1 deleted_valid_to = get_deleted_row_valid_to(project, "cc_snapshot_hd") # The deleted row should have a real timestamp, not the sentinel - assert deleted_valid_to != "2100-01-01 00:00:00" + assert deleted_valid_to != datetime(2100, 1, 1, 0, 0)