From 304b7627e92dfb7304a6f1faa56517340914f383 Mon Sep 17 00:00:00 2001 From: jonhopper-dataengineers Date: Thu, 3 Nov 2022 10:35:02 +1300 Subject: [PATCH 1/5] updated snowflake to use target when creating objects instead of using source and leaving out the target database --- .../snowflake/create_external_table.sql | 4 ++-- .../snowflake/get_external_build_plan.sql | 22 ++++++++++++------- .../snowflake/refresh_external_table.sql | 4 ++-- .../snowflake/snowpipe/create_empty_table.sql | 4 ++-- .../snowflake/snowpipe/create_snowpipe.sql | 4 ++-- .../snowflake/snowpipe/get_copy_sql.sql | 4 ++-- .../snowflake/snowpipe/refresh_snowpipe.sql | 4 ++-- 7 files changed, 26 insertions(+), 20 deletions(-) diff --git a/macros/plugins/snowflake/create_external_table.sql b/macros/plugins/snowflake/create_external_table.sql index 7bb46291..355b7945 100644 --- a/macros/plugins/snowflake/create_external_table.sql +++ b/macros/plugins/snowflake/create_external_table.sql @@ -1,4 +1,4 @@ -{% macro snowflake__create_external_table(source_node) %} +{% macro snowflake__create_external_table(relation, source_node) %} {%- set columns = source_node.columns.values() -%} {%- set external = source_node.external -%} @@ -8,7 +8,7 @@ {# https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html #} {# This assumes you have already created an external stage #} - create or replace external table {{source(source_node.source_name, source_node.name)}} + create or replace external table {{ relation.include(database=(not temporary), schema=(not temporary)) }} {%- if columns or partitions -%} ( {%- if partitions -%}{%- for partition in partitions %} diff --git a/macros/plugins/snowflake/get_external_build_plan.sql b/macros/plugins/snowflake/get_external_build_plan.sql index 0f73a0b2..e4ee5296 100644 --- a/macros/plugins/snowflake/get_external_build_plan.sql +++ b/macros/plugins/snowflake/get_external_build_plan.sql @@ -6,28 +6,34 @@ database = source_node.database, schema = source_node.schema, identifier = source_node.identifier - ) %} - + ) %} + + {%- set target_relation = api.Relation.create( + database = source_node.database, + schema = source_node.schema, + identifier = source_node.name + ) -%} + {% set create_or_replace = (old_relation is none or var('ext_full_refresh', false)) %} {% if source_node.external.get('snowpipe', none) is not none %} {% if create_or_replace %} {% set build_plan = build_plan + [ - dbt_external_tables.snowflake_create_empty_table(source_node), - dbt_external_tables.snowflake_get_copy_sql(source_node, explicit_transaction=true), - dbt_external_tables.snowflake_create_snowpipe(source_node) + dbt_external_tables.snowflake_create_empty_table(target_relation, source_node), + dbt_external_tables.snowflake_get_copy_sql(target_relation, source_node, explicit_transaction=true), + dbt_external_tables.snowflake_create_snowpipe(target_relation, source_node) ] %} {% else %} - {% set build_plan = build_plan + dbt_external_tables.snowflake_refresh_snowpipe(source_node) %} + {% set build_plan = build_plan + dbt_external_tables.snowflake_refresh_snowpipe(target_relation, source_node) %} {% endif %} {% else %} {% if create_or_replace %} - {% set build_plan = build_plan + [dbt_external_tables.create_external_table(source_node)] %} + {% set build_plan = build_plan + [dbt_external_tables.create_external_table(target_relation, source_node)] %} {% else %} - {% set build_plan = build_plan + dbt_external_tables.refresh_external_table(source_node) %} + {% set build_plan = build_plan + dbt_external_tables.refresh_external_table(target_relation, source_node) %} {% endif %} {% endif %} diff --git a/macros/plugins/snowflake/refresh_external_table.sql b/macros/plugins/snowflake/refresh_external_table.sql index 8f28ae57..07ced732 100644 --- a/macros/plugins/snowflake/refresh_external_table.sql +++ b/macros/plugins/snowflake/refresh_external_table.sql @@ -1,4 +1,4 @@ -{% macro snowflake__refresh_external_table(source_node) %} +{% macro snowflake__refresh_external_table(relation, source_node) %} {% set external = source_node.external %} {% set snowpipe = source_node.external.get('snowpipe', none) %} @@ -12,7 +12,7 @@ {% set ddl %} begin; - alter external table {{source(source_node.source_name, source_node.name)}} refresh; + alter external table {{ relation.include(database=(not temporary), schema=(not temporary)) }} refresh; commit; {% endset %} diff --git a/macros/plugins/snowflake/snowpipe/create_empty_table.sql b/macros/plugins/snowflake/snowpipe/create_empty_table.sql index 45e97b60..fb5a6e28 100644 --- a/macros/plugins/snowflake/snowpipe/create_empty_table.sql +++ b/macros/plugins/snowflake/snowpipe/create_empty_table.sql @@ -1,8 +1,8 @@ -{% macro snowflake_create_empty_table(source_node) %} +{% macro snowflake_create_empty_table(relation, source_node) %} {%- set columns = source_node.columns.values() %} - create or replace table {{source(source_node.source_name, source_node.name)}} ( + create or replace table {{ relation.include(database=(not temporary), schema=(not temporary)) }} ( {% if columns|length == 0 %} value variant, {% else -%} diff --git a/macros/plugins/snowflake/snowpipe/create_snowpipe.sql b/macros/plugins/snowflake/snowpipe/create_snowpipe.sql index a9ba44eb..2d3886fa 100644 --- a/macros/plugins/snowflake/snowpipe/create_snowpipe.sql +++ b/macros/plugins/snowflake/snowpipe/create_snowpipe.sql @@ -1,10 +1,10 @@ -{% macro snowflake_create_snowpipe(source_node) %} +{% macro snowflake_create_snowpipe(relation, source_node) %} {%- set external = source_node.external -%} {%- set snowpipe = external.snowpipe -%} {# https://docs.snowflake.com/en/sql-reference/sql/create-pipe.html #} - create or replace pipe {{source(source_node.source_name, source_node.name)}} + create or replace pipe {{ relation.include(database=(not temporary), schema=(not temporary)) }} {% if snowpipe.auto_ingest -%} auto_ingest = {{snowpipe.auto_ingest}} {%- endif %} {% if snowpipe.aws_sns_topic -%} aws_sns_topic = '{{snowpipe.aws_sns_topic}}' {%- endif %} {% if snowpipe.integration -%} integration = '{{snowpipe.integration}}' {%- endif %} diff --git a/macros/plugins/snowflake/snowpipe/get_copy_sql.sql b/macros/plugins/snowflake/snowpipe/get_copy_sql.sql index ef7e54a7..1d4fa8bd 100644 --- a/macros/plugins/snowflake/snowpipe/get_copy_sql.sql +++ b/macros/plugins/snowflake/snowpipe/get_copy_sql.sql @@ -1,4 +1,4 @@ -{% macro snowflake_get_copy_sql(source_node, explicit_transaction=false) %} +{% macro snowflake_get_copy_sql(relation, source_node, explicit_transaction=false) %} {# This assumes you have already created an external stage #} {%- set columns = source_node.columns.values() -%} @@ -8,7 +8,7 @@ {%- if explicit_transaction -%} begin; {%- endif %} - copy into {{source(source_node.source_name, source_node.name)}} + copy into {{ relation.include(database=(not temporary), schema=(not temporary)) }} from ( select {% if columns|length == 0 %} diff --git a/macros/plugins/snowflake/snowpipe/refresh_snowpipe.sql b/macros/plugins/snowflake/snowpipe/refresh_snowpipe.sql index cf800554..c929b613 100644 --- a/macros/plugins/snowflake/snowpipe/refresh_snowpipe.sql +++ b/macros/plugins/snowflake/snowpipe/refresh_snowpipe.sql @@ -1,4 +1,4 @@ -{% macro snowflake_refresh_snowpipe(source_node) %} +{% macro snowflake_refresh_snowpipe(relation, source_node) %} {% set snowpipe = source_node.external.snowpipe %} {% set auto_ingest = snowpipe.get('auto_ingest', false) if snowpipe is mapping %} @@ -10,7 +10,7 @@ {% else %} {% set ddl %} - alter pipe {{source(source_node.source_name, source_node.name)}} refresh + alter pipe {{ relation.include(database=(not temporary), schema=(not temporary)) }} refresh {% endset %} {{ return([ddl]) }} From 4113bcdf641e7b14d9d8629a93af3dc4b516cf3d Mon Sep 17 00:00:00 2001 From: jonhopper-dataengineers Date: Thu, 3 Nov 2022 10:59:34 +1300 Subject: [PATCH 2/5] added relation to the source node --- macros/plugins/snowflake/snowpipe/create_snowpipe.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/macros/plugins/snowflake/snowpipe/create_snowpipe.sql b/macros/plugins/snowflake/snowpipe/create_snowpipe.sql index 2d3886fa..17c7ab29 100644 --- a/macros/plugins/snowflake/snowpipe/create_snowpipe.sql +++ b/macros/plugins/snowflake/snowpipe/create_snowpipe.sql @@ -9,6 +9,6 @@ {% if snowpipe.aws_sns_topic -%} aws_sns_topic = '{{snowpipe.aws_sns_topic}}' {%- endif %} {% if snowpipe.integration -%} integration = '{{snowpipe.integration}}' {%- endif %} {% if snowpipe.error_integration -%} error_integration = '{{snowpipe.error_integration}}' {%- endif %} - as {{ dbt_external_tables.snowflake_get_copy_sql(source_node) }} + as {{ dbt_external_tables.snowflake_get_copy_sql(relation, source_node) }} {% endmacro %} From 4efc1761db599b1c3ce95c116100402ddeac44cd Mon Sep 17 00:00:00 2001 From: jonhopper-dataengineers Date: Thu, 3 Nov 2022 15:58:41 +1300 Subject: [PATCH 3/5] allowed for custom database --- macros/plugins/snowflake/helpers/is_csv.sql | 6 +++--- macros/plugins/snowflake/snowpipe/get_copy_sql.sql | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/macros/plugins/snowflake/helpers/is_csv.sql b/macros/plugins/snowflake/helpers/is_csv.sql index 996e535b..eb2150bf 100644 --- a/macros/plugins/snowflake/helpers/is_csv.sql +++ b/macros/plugins/snowflake/helpers/is_csv.sql @@ -1,4 +1,4 @@ -{% macro is_csv(file_format) %} +{% macro is_csv(file_format, database = target.database) %} {# From https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html: @@ -35,9 +35,9 @@ you should only specify one or the other when creating an external table. {% if fqn | length == 3 %} {% set ff_database, ff_schema, ff_identifier = fqn[0], fqn[1], fqn[2] %} {% elif fqn | length == 2 %} - {% set ff_database, ff_schema, ff_identifier = target.database, fqn[0], fqn[1] %} + {% set ff_database, ff_schema, ff_identifier = database, fqn[0], fqn[1] %} {% else %} - {% set ff_database, ff_schema, ff_identifier = target.database, target.schema, fqn[0] %} + {% set ff_database, ff_schema, ff_identifier = database, target.schema, fqn[0] %} {% endif %} {% call statement('get_file_format', fetch_result = True) %} diff --git a/macros/plugins/snowflake/snowpipe/get_copy_sql.sql b/macros/plugins/snowflake/snowpipe/get_copy_sql.sql index 1d4fa8bd..aaec9f55 100644 --- a/macros/plugins/snowflake/snowpipe/get_copy_sql.sql +++ b/macros/plugins/snowflake/snowpipe/get_copy_sql.sql @@ -3,7 +3,7 @@ {%- set columns = source_node.columns.values() -%} {%- set external = source_node.external -%} - {%- set is_csv = dbt_external_tables.is_csv(external.file_format) %} + {%- set is_csv = dbt_external_tables.is_csv(external.file_format, relation.database) %} {%- set copy_options = external.snowpipe.get('copy_options', none) -%} {%- if explicit_transaction -%} begin; {%- endif %} From 892330ce422f19fb495ad8bd45b877ce541cc471 Mon Sep 17 00:00:00 2001 From: jonhopper-dataengineers Date: Thu, 3 Nov 2022 20:15:51 +1300 Subject: [PATCH 4/5] updated copy to include the database name --- macros/plugins/snowflake/snowpipe/get_copy_sql.sql | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/macros/plugins/snowflake/snowpipe/get_copy_sql.sql b/macros/plugins/snowflake/snowpipe/get_copy_sql.sql index aaec9f55..ac60a1a9 100644 --- a/macros/plugins/snowflake/snowpipe/get_copy_sql.sql +++ b/macros/plugins/snowflake/snowpipe/get_copy_sql.sql @@ -1,4 +1,4 @@ -{% macro snowflake_get_copy_sql(relation, source_node, explicit_transaction=false) %} +{% macro snowflake__get_copy_sql(relation, source_node, explicit_transaction=false) %} {# This assumes you have already created an external stage #} {%- set columns = source_node.columns.values() -%} @@ -26,9 +26,9 @@ metadata$filename::varchar as metadata_filename, metadata$file_row_number::bigint as metadata_file_row_number, current_timestamp::timestamp as _dbt_copied_at - from {{external.location}} {# stage #} + from {{external.location | replace("@", "@" ~ relation.database ~ ".")}} {# stage #} ) - file_format = {{external.file_format}} + file_format = {{relation.database ~ "." ~ external.file_format}} {% if external.pattern -%} pattern = '{{external.pattern}}' {%- endif %} {% if copy_options %} {{copy_options}} {% endif %}; From 70c5ad2c2a661603941703e1571bec604d4cd3d2 Mon Sep 17 00:00:00 2001 From: jonhopper-dataengineers Date: Mon, 7 Nov 2022 13:47:26 +1300 Subject: [PATCH 5/5] fixed up underscores --- macros/plugins/snowflake/snowpipe/create_snowpipe.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/macros/plugins/snowflake/snowpipe/create_snowpipe.sql b/macros/plugins/snowflake/snowpipe/create_snowpipe.sql index 17c7ab29..85cd8777 100644 --- a/macros/plugins/snowflake/snowpipe/create_snowpipe.sql +++ b/macros/plugins/snowflake/snowpipe/create_snowpipe.sql @@ -9,6 +9,6 @@ {% if snowpipe.aws_sns_topic -%} aws_sns_topic = '{{snowpipe.aws_sns_topic}}' {%- endif %} {% if snowpipe.integration -%} integration = '{{snowpipe.integration}}' {%- endif %} {% if snowpipe.error_integration -%} error_integration = '{{snowpipe.error_integration}}' {%- endif %} - as {{ dbt_external_tables.snowflake_get_copy_sql(relation, source_node) }} + as {{ dbt_external_tables.snowflake__get_copy_sql(relation, source_node) }} {% endmacro %}