Skip to content

Commit 8fc1eda

Browse files
authored
Merge pull request #21 from epoch8/development
Development
2 parents 3c301b6 + a10815f commit 8fc1eda

File tree

3 files changed

+247
-0
lines changed

3 files changed

+247
-0
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
{{-
2+
config(
3+
enabled = env_var('DBT_PACKAGE_GA4__ENABLE__ANCHOR', 'true') == 'true',
4+
tags = ['dbt_package_ga4', 'anchor'],
5+
materialized = 'incremental',
6+
incremental_strategy = 'merge',
7+
unique_key = ['user_id', 'ga4_user_id'],
8+
partition_by = {
9+
"field": "ga4_date_partition",
10+
"data_type": "date",
11+
"granularity": "day"
12+
},
13+
cluster_by = ['user_id', 'ga4_user_id']
14+
)
15+
-}}
16+
17+
18+
WITH t1 AS (
19+
SELECT
20+
PARSE_DATE('%Y%m%d', _TABLE_SUFFIX) AS ga4_date_partition,
21+
events.user_pseudo_id AS ga4_user_id,
22+
TIMESTAMP_MICROS(events.event_timestamp) AS ga4_user_timestamp_updated,
23+
user_id
24+
FROM
25+
{{ source('dbt_package_ga4', 'events') }} AS events
26+
WHERE
27+
_TABLE_SUFFIX NOT LIKE '%intraday%'
28+
AND PARSE_DATE('%Y%m%d', _TABLE_SUFFIX) > DATE_SUB(DATE(CURRENT_DATE()), INTERVAL {{ env_var('DBT_PACKAGE_GA4__INTERVAL') }} DAY)
29+
AND events.stream_id IN UNNEST({{ env_var('DBT_PACKAGE_GA4__STREAM_ID') }})
30+
31+
{% if is_incremental() %}
32+
{% set max_partition_date = macro__get_max_partition_date(this.schema, this.table) %}
33+
AND PARSE_DATE('%Y%m%d', _TABLE_SUFFIX) > DATE_SUB(DATE('{{ max_partition_date }}'), INTERVAL {{ env_var('DBT_PACKAGE_GA4__INTERVAL_INCREMENTAL') }} DAY)
34+
{% endif %}
35+
),
36+
37+
t2 AS (
38+
SELECT
39+
t1.ga4_date_partition,
40+
t1.ga4_user_id,
41+
MAX(t1.ga4_user_timestamp_updated) AS ga4_user_timestamp_updated,
42+
t1.user_id
43+
FROM
44+
t1
45+
WHERE
46+
t1.ga4_date_partition IS NOT NULL
47+
AND t1.ga4_user_id IS NOT NULL
48+
AND t1.ga4_user_timestamp_updated IS NOT NULL
49+
AND t1.user_id IS NOT NULL
50+
GROUP BY
51+
t1.ga4_date_partition,
52+
t1.ga4_user_id,
53+
t1.user_id
54+
),
55+
56+
final AS (
57+
SELECT
58+
t2.ga4_date_partition,
59+
t2.ga4_user_id,
60+
t2.ga4_user_timestamp_updated,
61+
t2.user_id
62+
FROM
63+
t2
64+
)
65+
66+
SELECT * FROM final
67+
68+
{% if is_incremental() %}
69+
WHERE
70+
final.ga4_user_timestamp_updated > COALESCE((
71+
SELECT
72+
this.ga4_user_timestamp_updated
73+
FROM
74+
{{ this }} AS this
75+
WHERE
76+
this.ga4_user_id = final.ga4_user_id
77+
AND this.user_id = final.user_id
78+
), TIMESTAMP('1900-01-01'))
79+
{% endif %}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
{{-
2+
config(
3+
enabled = env_var('DBT_PACKAGE_GA4__ENABLE__ANCHOR', 'true') == 'true',
4+
tags = ['dbt_package_ga4', 'anchor'],
5+
materialized = 'incremental',
6+
incremental_strategy = 'insert_overwrite',
7+
partition_by = {
8+
"field": "ga4_date_partition",
9+
"data_type": "date",
10+
"granularity": "day"
11+
},
12+
cluster_by = ['user_id', 'ga4_event_id']
13+
)
14+
-}}
15+
16+
17+
WITH t1 AS (
18+
SELECT DISTINCT
19+
PARSE_DATE('%Y%m%d', _TABLE_SUFFIX) AS ga4_date_partition,
20+
events.user_id,
21+
TO_HEX(
22+
MD5(
23+
CONCAT(
24+
SAFE_CAST(events.event_timestamp AS STRING),
25+
SAFE_CAST(events.event_name AS STRING),
26+
SAFE_CAST(events.user_pseudo_id AS STRING)
27+
)
28+
)
29+
) AS ga4_event_id
30+
FROM
31+
{{ source('dbt_package_ga4', 'events') }} AS events
32+
WHERE
33+
_TABLE_SUFFIX NOT LIKE '%intraday%'
34+
AND PARSE_DATE('%Y%m%d', _TABLE_SUFFIX) > DATE_SUB(DATE(CURRENT_DATE()), INTERVAL {{ env_var('DBT_PACKAGE_GA4__INTERVAL') }} DAY)
35+
AND events.stream_id IN UNNEST({{ env_var('DBT_PACKAGE_GA4__STREAM_ID') }})
36+
37+
{%- if is_incremental() %}
38+
{%- set max_partition_date = macro__get_max_partition_date(this.schema, this.table) %}
39+
AND PARSE_DATE('%Y%m%d', _TABLE_SUFFIX) > DATE_SUB(DATE('{{ max_partition_date }}'), INTERVAL {{ env_var('DBT_PACKAGE_GA4__INTERVAL_INCREMENTAL') }} DAY)
40+
{%- endif %}
41+
),
42+
43+
t2 AS (
44+
SELECT
45+
t1.ga4_date_partition,
46+
t1.user_id,
47+
t1.ga4_event_id
48+
FROM
49+
t1
50+
WHERE
51+
t1.ga4_date_partition IS NOT NULL
52+
AND t1.user_id IS NOT NULL
53+
AND t1.ga4_event_id IS NOT NULL
54+
),
55+
56+
final AS (
57+
SELECT
58+
t2.ga4_date_partition,
59+
t2.user_id,
60+
t2.ga4_event_id
61+
FROM
62+
t2
63+
)
64+
65+
SELECT * FROM final
66+
67+
{%- if is_incremental() %}
68+
WHERE
69+
final.ga4_date_partition > DATE_SUB(DATE('{{ max_partition_date }}'), INTERVAL {{ env_var('DBT_PACKAGE_GA4__INTERVAL_INCREMENTAL') }} DAY)
70+
{%- endif %}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
{{-
2+
config(
3+
enabled = env_var('DBT_PACKAGE_GA4__ENABLE__ANCHOR', 'true') == 'true',
4+
tags = ['dbt_package_ga4', 'anchor'],
5+
materialized = 'incremental',
6+
incremental_strategy = 'merge',
7+
unique_key = ['user_id', 'ga4_session_id'],
8+
partition_by = {
9+
"field": "ga4_date_partition",
10+
"data_type": "date",
11+
"granularity": "day"
12+
},
13+
cluster_by = ['user_id', 'ga4_session_id']
14+
)
15+
-}}
16+
17+
18+
WITH t1 AS (
19+
SELECT DISTINCT
20+
PARSE_DATE('%Y%m%d', _TABLE_SUFFIX) AS ga4_date_partition,
21+
events.user_id,
22+
TO_HEX(
23+
MD5(
24+
CONCAT(
25+
SAFE_CAST(events.user_pseudo_id AS STRING),
26+
COALESCE(
27+
SAFE_CAST((SELECT value.int_value FROM UNNEST(events.event_params) WHERE key = 'ga_session_id') AS STRING),
28+
SAFE_CAST((SELECT value.int_value FROM UNNEST(events.user_properties) WHERE key = 'ga_session_id') AS STRING)
29+
)
30+
)
31+
)
32+
) AS ga4_session_id,
33+
TIMESTAMP_MICROS(events.event_timestamp) AS user__made__ga4_session__timestamp
34+
FROM
35+
{{ source('dbt_package_ga4', 'events') }} AS events
36+
WHERE
37+
_TABLE_SUFFIX NOT LIKE '%intraday%'
38+
AND PARSE_DATE('%Y%m%d', _TABLE_SUFFIX) > DATE_SUB(DATE(CURRENT_DATE()), INTERVAL {{ env_var('DBT_PACKAGE_GA4__INTERVAL') }} DAY)
39+
AND events.stream_id IN UNNEST({{ env_var('DBT_PACKAGE_GA4__STREAM_ID') }})
40+
41+
{%- if is_incremental() %}
42+
{%- set max_partition_date = macro__get_max_partition_date(this.schema, this.table) %}
43+
AND PARSE_DATE('%Y%m%d', _TABLE_SUFFIX) > DATE_SUB(DATE('{{ max_partition_date }}'), INTERVAL {{ env_var('DBT_PACKAGE_GA4__INTERVAL_INCREMENTAL') }} DAY)
44+
{%- endif %}
45+
),
46+
47+
t2 AS (
48+
SELECT
49+
t1.ga4_date_partition,
50+
t1.user_id,
51+
t1.ga4_session_id,
52+
t1.user__made__ga4_session__timestamp,
53+
ROW_NUMBER() OVER(PARTITION BY t1.user_id, t1.ga4_session_id ORDER BY t1.user__made__ga4_session__timestamp ASC) AS rn
54+
FROM
55+
t1
56+
WHERE
57+
t1.ga4_date_partition IS NOT NULL
58+
AND t1.user_id IS NOT NULL
59+
AND t1.ga4_session_id IS NOT NULL
60+
AND t1.user__made__ga4_session__timestamp IS NOT NULL
61+
),
62+
63+
t3 AS (
64+
SELECT
65+
t2.ga4_date_partition,
66+
t2.user_id,
67+
t2.ga4_session_id,
68+
t2.user__made__ga4_session__timestamp
69+
FROM
70+
t2
71+
WHERE
72+
t2.rn = 1
73+
),
74+
75+
final AS (
76+
SELECT
77+
t3.ga4_date_partition,
78+
t3.user_id,
79+
t3.ga4_session_id,
80+
t3.user__made__ga4_session__timestamp
81+
FROM
82+
t3
83+
)
84+
85+
SELECT * FROM final
86+
87+
{%- if is_incremental() %}
88+
WHERE
89+
final.user__made__ga4_session__timestamp < COALESCE((
90+
SELECT
91+
this.user__made__ga4_session__timestamp
92+
FROM
93+
{{ this }} AS this
94+
WHERE
95+
this.user_id = final.user_id
96+
AND this.ga4_session_id = final.ga4_session_id
97+
), TIMESTAMP(CURRENT_DATE()))
98+
{%- endif %}

0 commit comments

Comments
 (0)