Skip to content
This repository was archived by the owner on Sep 2, 2025. It is now read-only.

Commit 8272eb2

Browse files
Avinash-1394nicor88Jrmyy
authored
fix: Fix snapshot valid to value (#149)
Co-authored-by: nicor88 <[email protected]> Co-authored-by: Jérémy Guiselin <[email protected]>
1 parent a96179d commit 8272eb2

File tree

3 files changed

+118
-105
lines changed

3 files changed

+118
-105
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ By default, the materialization keeps the last 4 table versions, you can change
234234
In case high performances are needed consider bucketing instead of partitions
235235
* By default, Glue "duplicate" the versions internally, so the last 2 versions of a table point to the same location
236236
* It's recommended to have versions_to_keep>= 4, as this will avoid to have the older location removed
237+
* The macro athena__end_of_time needs to be overwritten by the user if using Athena v3 since it requires a precision parameter for timestamps
237238
238239
239240
### Snapshots

dbt/include/athena/macros/materializations/snapshots/snapshot.sql

Lines changed: 101 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
{%- endif -%}
2323

2424
{% set sql -%}
25-
SELECT * FROM {{ source }};
25+
select * from {{ source }};
2626
{%- endset -%}
2727

2828
{{ create_table_as(False, target_relation, sql) }}
@@ -33,58 +33,58 @@
3333
%}
3434

3535
{% macro build_snapshot_table(strategy, source_sql) %}
36-
SELECT *
37-
, {{ strategy.unique_key }} AS dbt_unique_key
38-
, {{ strategy.updated_at }} AS dbt_valid_from
39-
, {{ strategy.scd_id }} AS dbt_scd_id
40-
, 'insert' AS dbt_change_type
41-
, CAST('9999-01-01' as timestamp) AS dbt_valid_to
42-
, True AS is_current_record
43-
, {{ current_timestamp() }} AS dbt_snapshot_at
44-
FROM ({{ source_sql }}) source;
36+
select *
37+
, {{ strategy.unique_key }} as dbt_unique_key
38+
, {{ strategy.updated_at }} as dbt_valid_from
39+
, {{ strategy.scd_id }} as dbt_scd_id
40+
, 'insert' as dbt_change_type
41+
, {{ end_of_time() }} as dbt_valid_to
42+
, True as is_current_record
43+
, {{ strategy.updated_at }} as dbt_snapshot_at
44+
from ({{ source_sql }}) source;
4545
{% endmacro %}
4646

4747
{%
4848
Identify records that needs to be upserted or deleted into the snapshot table
4949
%}
5050

5151
{% macro snapshot_staging_table(strategy, source_sql, target_relation) -%}
52-
WITH snapshot_query AS (
52+
with snapshot_query as (
5353
{{ source_sql }}
5454
)
55-
, snapshotted_data_base AS (
56-
SELECT *
55+
, snapshotted_data_base as (
56+
select *
5757
, ROW_NUMBER() OVER (
5858
PARTITION BY dbt_unique_key
5959
ORDER BY dbt_valid_from DESC
60-
) AS dbt_snapshot_rn
61-
FROM {{ target_relation }}
60+
) as dbt_snapshot_rn
61+
from {{ target_relation }}
6262
)
63-
, snapshotted_data AS (
64-
SELECT *
65-
FROM snapshotted_data_base
66-
WHERE dbt_snapshot_rn = 1
63+
, snapshotted_data as (
64+
select *
65+
from snapshotted_data_base
66+
where dbt_snapshot_rn = 1
6767
AND dbt_change_type != 'delete'
6868
)
69-
, source_data AS (
70-
SELECT *
71-
, {{ strategy.unique_key }} AS dbt_unique_key
72-
, {{ strategy.updated_at }} AS dbt_valid_from
73-
, {{ strategy.scd_id }} AS dbt_scd_id
74-
FROM snapshot_query
69+
, source_data as (
70+
select *
71+
, {{ strategy.unique_key }} as dbt_unique_key
72+
, {{ strategy.updated_at }} as dbt_valid_from
73+
, {{ strategy.scd_id }} as dbt_scd_id
74+
from snapshot_query
7575
)
76-
, upserts AS (
77-
SELECT source_data.*
78-
, CASE
79-
WHEN snapshotted_data.dbt_unique_key IS NULL THEN 'insert'
80-
ELSE 'update'
81-
END as dbt_change_type
82-
, CAST('9999-01-01' as timestamp) AS dbt_valid_to
83-
, True AS is_current_record
84-
FROM source_data
85-
LEFT JOIN snapshotted_data
86-
ON snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
87-
WHERE snapshotted_data.dbt_unique_key IS NULL
76+
, upserts as (
77+
select source_data.*
78+
, case
79+
when snapshotted_data.dbt_unique_key IS NULL THEN 'insert'
80+
else 'update'
81+
end as dbt_change_type
82+
, {{ end_of_time() }} as dbt_valid_to
83+
, True as is_current_record
84+
from source_data
85+
LEFT join snapshotted_data
86+
on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
87+
where snapshotted_data.dbt_unique_key IS NULL
8888
OR (
8989
snapshotted_data.dbt_unique_key IS NOT NULL
9090
AND (
@@ -93,32 +93,22 @@
9393
)
9494
)
9595
{%- if strategy.invalidate_hard_deletes -%}
96-
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%}
97-
, deletes AS (
98-
SELECT
99-
{% for column in target_columns if not column.name == 'dbt_snapshot_at' %}
100-
{% if column.name == 'dbt_valid_from' %}
101-
{{ current_timestamp() }} AS dbt_valid_from {%- if not loop.last -%},{%- endif -%}
102-
{% elif column.name == 'dbt_change_type' %}
103-
'delete' AS dbt_change_type {%- if not loop.last -%},{%- endif -%}
104-
{% elif column.name == 'dbt_valid_to' %}
105-
CAST('9999-01-01' as timestamp) AS dbt_valid_to {%- if not loop.last -%},{%- endif -%}
106-
{% elif column.name == 'is_current_record' %}
107-
True AS is_current_record {%- if not loop.last -%},{%- endif -%}
108-
{% else %}
109-
snapshotted_data.{{ column.name }} {%- if not loop.last -%},{%- endif -%}
110-
{% endif %}
111-
{% endfor %}
112-
FROM snapshotted_data
113-
LEFT JOIN source_data
114-
ON snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
115-
WHERE source_data.dbt_unique_key IS NULL
116-
)
117-
SELECT * FROM upserts
118-
UNION ALL
119-
SELECT * FROM deletes;
96+
, deletes as (
97+
select
98+
source_data.*
99+
, 'delete' as dbt_change_type
100+
, {{ end_of_time() }} as dbt_valid_to
101+
, True as is_current_record
102+
from snapshotted_data
103+
LEFT join source_data
104+
on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
105+
where source_data.dbt_unique_key IS NULL
106+
)
107+
select * from upserts
108+
union all
109+
select * from deletes;
120110
{% else %}
121-
SELECT * FROM upserts;
111+
select * from upserts;
122112
{% endif %}
123113

124114
{%- endmacro %}
@@ -151,43 +141,39 @@
151141
#}
152142

153143
{% macro athena__create_columns(relation, columns) -%}
144+
{% set query -%}
145+
alter table {{ relation }} add columns (
154146
{%- for column in columns -%}
155147
{% if column.data_type|lower == 'boolean' %}
156-
{% set query -%}
157-
ALTER TABLE {{ relation }} ADD COLUMNS ({{ column.name }} BOOLEAN);
158-
{%- endset -%}
148+
{{ column.name }} boolean {%- if not loop.last -%},{%- endif -%}
159149
{% elif column.data_type|lower == 'character varying(256)' %}
160-
{% set query -%}
161-
ALTER TABLE {{ relation }} ADD COLUMNS ({{ column.name }} VARCHAR);
162-
{%- endset -%}
150+
{{ column.name }} varchar(255) {%- if not loop.last -%},{%- endif -%}
163151
{% elif column.data_type|lower == 'integer' %}
164-
{% set query -%}
165-
ALTER TABLE {{ relation }} ADD COLUMNS ({{ column.name }} INT);
166-
{%- endset -%}
152+
{{ column.name }} bigint {%- if not loop.last -%},{%- endif -%}
167153
{% elif column.data_type|lower == 'float' %}
168-
{% set query -%}
169-
ALTER TABLE {{ relation }} ADD COLUMNS ({{ column.name }} FLOAT);
170-
{%- endset -%}
154+
{{ column.name }} float {%- if not loop.last -%},{%- endif -%}
171155
{% else %}
172-
ALTER TABLE {{ relation }} ADD COLUMNS ({{ column.name }} {{ column.data_type }});
156+
{{ column.name }} {{ column.data_type }} {%- if not loop.last -%},{%- endif -%}
173157
{% endif %}
174-
{% do run_query(query) %}
175158
{%- endfor %}
159+
)
160+
{%- endset -%}
161+
{% do run_query(query) %}
176162
{% endmacro %}
177163

178164
{#
179165
Update the dbt_valid_to and is_current_record for
180166
snapshot rows being updated and create a new temporary table to hold them
181167
#}
182168

183-
{% macro athena__create_new_snapshot_table(target, source) %}
169+
{% macro athena__create_new_snapshot_table(strategy, strategy_name, target, source) %}
184170
{%- set tmp_identifier = target.identifier ~ '__dbt_tmp_1' -%}
185171

186172
{%- set tmp_relation = adapter.get_relation(database=target.database, schema=target.schema, identifier=tmp_identifier) -%}
187173

188174
{%- set target_relation = api.Relation.create(identifier=tmp_identifier,
189175
schema=target.schema,
190-
database=none,
176+
database=target.database,
191177
type='table') -%}
192178

193179
{%- set source_columns = adapter.get_columns_in_relation(source) -%}
@@ -197,39 +183,49 @@
197183
{% endif %}
198184

199185
{% set sql -%}
200-
SELECT
186+
select
201187
{% for column in source_columns %}
202188
{{ column.name }} {%- if not loop.last -%},{%- endif -%}
203189
{% endfor %}
204190
,dbt_snapshot_at
205191
from {{ target }}
206-
WHERE dbt_unique_key NOT IN ( SELECT dbt_unique_key FROM {{ source }} )
207-
UNION ALL
208-
SELECT
192+
where dbt_unique_key NOT IN ( select dbt_unique_key from {{ source }} )
193+
union all
194+
select
209195
{% for column in source_columns %}
210-
{% if column.name == 'dbt_valid_to' %}
211-
CASE
212-
WHEN dbt_valid_to=CAST('9999-01-01' as timestamp) AND is_current_record=True
213-
THEN {{ current_timestamp() }}
214-
ELSE dbt_valid_to
215-
END AS dbt_valid_to {%- if not loop.last -%},{%- endif -%}
216-
{% elif column.name == 'is_current_record' %}
217-
CASE WHEN is_current_record=True THEN False ELSE is_current_record END
218-
AS is_current_record {%- if not loop.last -%},{%- endif -%}
219-
{% else %}
220-
{{ column.name }} {%- if not loop.last -%},{%- endif -%}
221-
{% endif %}
196+
{% if column.name == 'dbt_valid_to' %}
197+
case
198+
when tgt.is_current_record
199+
THEN
200+
{% if strategy_name == 'timestamp' %}
201+
src.{{ strategy.updated_at }}
202+
{% else %}
203+
{{ strategy.updated_at }}
204+
{% endif %}
205+
else tgt.dbt_valid_to
206+
end as dbt_valid_to {%- if not loop.last -%},{%- endif -%}
207+
{% elif column.name == 'is_current_record' %}
208+
False as is_current_record {%- if not loop.last -%},{%- endif -%}
209+
{% else %}
210+
tgt.{{ column.name }} {%- if not loop.last -%},{%- endif -%}
211+
{% endif %}
222212
{% endfor %}
223-
,{{ current_timestamp() }} AS dbt_snapshot_at
224-
from {{ target }}
225-
WHERE dbt_unique_key IN ( SELECT dbt_unique_key FROM {{ source }} )
226-
UNION ALL
227-
SELECT
213+
,
214+
{% if strategy_name == 'timestamp' %}
215+
tgt.{{ strategy.updated_at }}
216+
{% else %}
217+
{{ strategy.updated_at }}
218+
{% endif %} as dbt_snapshot_at
219+
from {{ target }} tgt
220+
join {{ source }} src
221+
on tgt.dbt_unique_key = src.dbt_unique_key
222+
union all
223+
select
228224
{% for column in source_columns %}
229225
{{ column.name }} {%- if not loop.last -%},{%- endif -%}
230226
{% endfor %}
231-
,{{ current_timestamp() }} AS dbt_snapshot_at
232-
FROM {{ source }};
227+
,{{ strategy.updated_at }} as dbt_snapshot_at
228+
from {{ source }};
233229
{%- endset -%}
234230

235231
{% call statement('create_new_snapshot_table') %}
@@ -277,9 +273,11 @@
277273

278274
{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation) %}
279275

280-
{% do create_columns(target_relation, missing_columns) %}
276+
{% if missing_columns %}
277+
{% do create_columns(target_relation, missing_columns) %}
278+
{% endif %}
281279

282-
{% set new_snapshot_table = athena__create_new_snapshot_table(target = target_relation, source = staging_table) %}
280+
{% set new_snapshot_table = athena__create_new_snapshot_table(strategy, strategy_name, target_relation, staging_table) %}
283281

284282
{% set final_sql = athena__snapshot_merge_sql(
285283
target = target_relation,
Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,19 @@
1+
{%
2+
pyathena converts time zoned timestamps to strings so lets avoid them now()
3+
%}
4+
15
{% macro athena__current_timestamp() -%}
2-
-- pyathena converts time zoned timestamps to strings so lets avoid them
3-
-- now()
46
cast(now() as timestamp)
57
{%- endmacro %}
8+
9+
{%
10+
Macro to get the end_of_time timestamp
11+
%}
12+
13+
{% macro end_of_time() -%}
14+
{{ return(adapter.dispatch('end_of_time')()) }}
15+
{%- endmacro %}
16+
17+
{% macro athena__end_of_time() -%}
18+
cast('9999-01-01' AS timestamp)
19+
{%- endmacro %}

0 commit comments

Comments
 (0)