Skip to content
This repository was archived by the owner on Jan 16, 2023. It is now read-only.

Diffdatabase #1

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions macros/plugins/snowflake/create_external_table.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro snowflake__create_external_table(source_node) %}
{% macro snowflake__create_external_table(relation, source_node) %}

{%- set columns = source_node.columns.values() -%}
{%- set external = source_node.external -%}
Expand All @@ -8,7 +8,7 @@

{# https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html #}
{# This assumes you have already created an external stage #}
create or replace external table {{source(source_node.source_name, source_node.name)}}
create or replace external table {{ relation.include(database=(not temporary), schema=(not temporary)) }}
{%- if columns or partitions -%}
(
{%- if partitions -%}{%- for partition in partitions %}
Expand Down
22 changes: 14 additions & 8 deletions macros/plugins/snowflake/get_external_build_plan.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,34 @@
database = source_node.database,
schema = source_node.schema,
identifier = source_node.identifier
) %}

) %}

{%- set target_relation = api.Relation.create(
database = source_node.database,
schema = source_node.schema,
identifier = source_node.name
) -%}

{% set create_or_replace = (old_relation is none or var('ext_full_refresh', false)) %}

{% if source_node.external.get('snowpipe', none) is not none %}

{% if create_or_replace %}
{% set build_plan = build_plan + [
dbt_external_tables.snowflake_create_empty_table(source_node),
dbt_external_tables.snowflake_get_copy_sql(source_node, explicit_transaction=true),
dbt_external_tables.snowflake_create_snowpipe(source_node)
dbt_external_tables.snowflake_create_empty_table(target_relation, source_node),
dbt_external_tables.snowflake_get_copy_sql(target_relation, source_node, explicit_transaction=true),
dbt_external_tables.snowflake_create_snowpipe(target_relation, source_node)
] %}
{% else %}
{% set build_plan = build_plan + dbt_external_tables.snowflake_refresh_snowpipe(source_node) %}
{% set build_plan = build_plan + dbt_external_tables.snowflake_refresh_snowpipe(target_relation, source_node) %}
{% endif %}

{% else %}

{% if create_or_replace %}
{% set build_plan = build_plan + [dbt_external_tables.create_external_table(source_node)] %}
{% set build_plan = build_plan + [dbt_external_tables.create_external_table(target_relation, source_node)] %}
{% else %}
{% set build_plan = build_plan + dbt_external_tables.refresh_external_table(source_node) %}
{% set build_plan = build_plan + dbt_external_tables.refresh_external_table(target_relation, source_node) %}
{% endif %}

{% endif %}
Expand Down
6 changes: 3 additions & 3 deletions macros/plugins/snowflake/helpers/is_csv.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro is_csv(file_format) %}
{% macro is_csv(file_format, database = target.database) %}

{# From https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html:

Expand Down Expand Up @@ -35,9 +35,9 @@ you should only specify one or the other when creating an external table.
{% if fqn | length == 3 %}
{% set ff_database, ff_schema, ff_identifier = fqn[0], fqn[1], fqn[2] %}
{% elif fqn | length == 2 %}
{% set ff_database, ff_schema, ff_identifier = target.database, fqn[0], fqn[1] %}
{% set ff_database, ff_schema, ff_identifier = database, fqn[0], fqn[1] %}
{% else %}
{% set ff_database, ff_schema, ff_identifier = target.database, target.schema, fqn[0] %}
{% set ff_database, ff_schema, ff_identifier = database, target.schema, fqn[0] %}
{% endif %}

{% call statement('get_file_format', fetch_result = True) %}
Expand Down
4 changes: 2 additions & 2 deletions macros/plugins/snowflake/refresh_external_table.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro snowflake__refresh_external_table(source_node) %}
{% macro snowflake__refresh_external_table(relation, source_node) %}

{% set external = source_node.external %}
{% set snowpipe = source_node.external.get('snowpipe', none) %}
Expand All @@ -12,7 +12,7 @@

{% set ddl %}
begin;
alter external table {{source(source_node.source_name, source_node.name)}} refresh;
alter external table {{ relation.include(database=(not temporary), schema=(not temporary)) }} refresh;
commit;
{% endset %}

Expand Down
4 changes: 2 additions & 2 deletions macros/plugins/snowflake/snowpipe/create_empty_table.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{% macro snowflake_create_empty_table(source_node) %}
{% macro snowflake_create_empty_table(relation, source_node) %}

{%- set columns = source_node.columns.values() %}

create or replace table {{source(source_node.source_name, source_node.name)}} (
create or replace table {{ relation.include(database=(not temporary), schema=(not temporary)) }} (
{% if columns|length == 0 %}
value variant,
{% else -%}
Expand Down
6 changes: 3 additions & 3 deletions macros/plugins/snowflake/snowpipe/create_snowpipe.sql
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
{% macro snowflake_create_snowpipe(source_node) %}
{% macro snowflake_create_snowpipe(relation, source_node) %}

{%- set external = source_node.external -%}
{%- set snowpipe = external.snowpipe -%}

{# https://docs.snowflake.com/en/sql-reference/sql/create-pipe.html #}
create or replace pipe {{source(source_node.source_name, source_node.name)}}
create or replace pipe {{ relation.include(database=(not temporary), schema=(not temporary)) }}
{% if snowpipe.auto_ingest -%} auto_ingest = {{snowpipe.auto_ingest}} {%- endif %}
{% if snowpipe.aws_sns_topic -%} aws_sns_topic = '{{snowpipe.aws_sns_topic}}' {%- endif %}
{% if snowpipe.integration -%} integration = '{{snowpipe.integration}}' {%- endif %}
{% if snowpipe.error_integration -%} error_integration = '{{snowpipe.error_integration}}' {%- endif %}
as {{ dbt_external_tables.snowflake_get_copy_sql(source_node) }}
as {{ dbt_external_tables.snowflake__get_copy_sql(relation, source_node) }}

{% endmacro %}
10 changes: 5 additions & 5 deletions macros/plugins/snowflake/snowpipe/get_copy_sql.sql
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
{% macro snowflake_get_copy_sql(source_node, explicit_transaction=false) %}
{% macro snowflake__get_copy_sql(relation, source_node, explicit_transaction=false) %}
{# This assumes you have already created an external stage #}

{%- set columns = source_node.columns.values() -%}
{%- set external = source_node.external -%}
{%- set is_csv = dbt_external_tables.is_csv(external.file_format) %}
{%- set is_csv = dbt_external_tables.is_csv(external.file_format, relation.database) %}
{%- set copy_options = external.snowpipe.get('copy_options', none) -%}

{%- if explicit_transaction -%} begin; {%- endif %}

copy into {{source(source_node.source_name, source_node.name)}}
copy into {{ relation.include(database=(not temporary), schema=(not temporary)) }}
from (
select
{% if columns|length == 0 %}
Expand All @@ -26,9 +26,9 @@
metadata$filename::varchar as metadata_filename,
metadata$file_row_number::bigint as metadata_file_row_number,
current_timestamp::timestamp as _dbt_copied_at
from {{external.location}} {# stage #}
from {{external.location | replace("@", "@" ~ relation.database ~ ".")}} {# stage #}
)
file_format = {{external.file_format}}
file_format = {{relation.database ~ "." ~ external.file_format}}
{% if external.pattern -%} pattern = '{{external.pattern}}' {%- endif %}
{% if copy_options %} {{copy_options}} {% endif %};

Expand Down
4 changes: 2 additions & 2 deletions macros/plugins/snowflake/snowpipe/refresh_snowpipe.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro snowflake_refresh_snowpipe(source_node) %}
{% macro snowflake_refresh_snowpipe(relation, source_node) %}

{% set snowpipe = source_node.external.snowpipe %}
{% set auto_ingest = snowpipe.get('auto_ingest', false) if snowpipe is mapping %}
Expand All @@ -10,7 +10,7 @@
{% else %}

{% set ddl %}
alter pipe {{source(source_node.source_name, source_node.name)}} refresh
alter pipe {{ relation.include(database=(not temporary), schema=(not temporary)) }} refresh
{% endset %}

{{ return([ddl]) }}
Expand Down