Skip to content

Commit 6028f2e

Browse files
authored
incremental support - strategy: append, insert_overwrite; onschema_change: fail, ignore (#1)
not supported strategy: merge, delete_overwrite; onschema_change: append_new_columns, sync_new_columns not supported: unique_key Ticket (internal JIRA): https://jira.cloudera.com/browse/DBT-38 https://jira.cloudera.com/browse/DBT-39 https://jira.cloudera.com/browse/DBT-48 https://jira.cloudera.com/browse/DBT-49 Testplan: 1. Basic dependencies need to be installed (dbt-core). 2. Build and install the dbt-impala adapter using: python3 setup.py install 3. Create a template dbt project using following: dbt init 4. Edit $HOME/.dbt/profiles.yml so that it looks similar to: demo_dbt: outputs: dev_impala: type: impala host: localhost port: 21050 dbname: s3test schema: s3test target: dev_impala 5. In the dbt project generated in step (2), run the following, which should succeed if local instance of Impala is up: dbt debug (check connection) 6. Create an incremental model with entry similar to, name it say, incremental_model.sql: {{ config( materialized='incremental', unique_key='id', incremental_strategy='insert_overwrite', ) }} select * from {{ ref('seed_sample') }} {% if is_incremental() %} where updated_at > (select max(updated_at) from {{ this }}) {% endif %} 7. Run this model using: dbt run [--full-refresh] --select incremental_model This should produce output similar to: 18:01:35 1 of 1 OK created incremental model s3test.my_third_dbt_model................... [OK in 63.16s] 18:01:35 18:01:35 Finished running 1 incremental model in 63.24s. 18:01:35 18:01:35 Completed successfully 18:01:35 18:01:35 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1 Co-authored-by: V. Ganesh <[email protected]>
1 parent 4e5cf5b commit 6028f2e

File tree

2 files changed

+152
-1
lines changed

2 files changed

+152
-1
lines changed

dbt/include/impala/macros/adapters.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@
184184

185185
{% macro impala__drop_schema(relation) -%}
186186
{%- call statement('drop_schema') -%}
187-
drop schema if exists {{ relation }}
187+
drop schema if exists {{ relation }} cascade
188188
{%- endcall -%}
189189
{% endmacro %}
190190

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
{#
2+
# Copyright 2022 Cloudera Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#}
16+
17+
{% macro validate_get_incremental_strategy(raw_strategy) %}
18+
{% set invalid_strategy_msg -%}
19+
Invalid incremental strategy provided: {{ raw_strategy }}
20+
Expected one of: 'append', 'insert_overwrite'
21+
{%- endset %}
22+
23+
{% if raw_strategy not in ['append', 'insert_overwrite'] %}
24+
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
25+
{% endif %}
26+
27+
{% do return(raw_strategy) %}
28+
{% endmacro %}
29+
30+
{% macro incremental_validate_on_schema_change(on_schema_change, default='ignore') %}
31+
{% if on_schema_change not in ['fail', 'ignore'] %}
32+
{% set log_message = 'Invalid value for on_schema_change (%s) specified. Setting default value of %s.' % (on_schema_change, default) %}
33+
{% do log(log_message) %}
34+
35+
{% do exceptions.raise_compiler_error(log_message) %}
36+
37+
{{ return(default) }}
38+
{% else %}
39+
{{ return(on_schema_change) }}
40+
{% endif %}
41+
{% endmacro %}
42+
43+
{% materialization incremental, adapter='impala' -%}
44+
45+
{% set unique_key = config.get('unique_key') %}
46+
{% set overwrite_msg -%}
47+
impala adapter does not support 'unique_key'
48+
{%- endset %}
49+
{% if unique_key is not none %}
50+
{% do exceptions.raise_compiler_error(overwrite_msg) %}
51+
{% endif %}
52+
53+
{% set raw_strategy = config.get('incremental_strategy', default='append') %}
54+
{% set strategy = validate_get_incremental_strategy(raw_strategy) %}
55+
56+
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}
57+
58+
{%- set time_stamp = modules.datetime.datetime.now().isoformat().replace("-","").replace(":","").replace(".","") -%}
59+
60+
{% set target_relation = this.incorporate(type='table') %}
61+
{% set existing_relation = load_relation(this) %}
62+
{% set tmp_relation = make_temp_relation(target_relation, '__' + time_stamp + '__dbt_tmp') %}
63+
{%- set full_refresh_mode = (should_full_refresh()) -%}
64+
65+
{% set tmp_identifier = model['name'] + '__' + time_stamp + '__dbt_tmp' %}
66+
{% set backup_identifier = model['name'] + '__' + time_stamp + "__dbt_backup" %}
67+
68+
-- the intermediate_ and backup_ relations should not already exist in the database; get_relation
69+
-- will return None in that case. Otherwise, we get a relation that we can drop
70+
-- later, before we try to use this name for the current operation. This has to happen before
71+
-- BEGIN, in a separate transaction
72+
{% set preexisting_intermediate_relation = adapter.get_relation(identifier=tmp_identifier,
73+
schema=schema,
74+
database=database) %}
75+
{% set preexisting_backup_relation = adapter.get_relation(identifier=backup_identifier,
76+
schema=schema,
77+
database=database) %}
78+
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
79+
{{ drop_relation_if_exists(preexisting_backup_relation) }}
80+
81+
{{ run_hooks(pre_hooks, inside_transaction=False) }}
82+
83+
-- `BEGIN` happens here:
84+
{{ run_hooks(pre_hooks, inside_transaction=True) }}
85+
86+
{% set to_drop = [] %}
87+
88+
{% do to_drop.append(tmp_relation) %}
89+
90+
{# -- first check whether we want to full refresh for source view or config reasons #}
91+
{% set trigger_full_refresh = (full_refresh_mode or existing_relation.is_view) %}
92+
93+
{% if existing_relation is none %}
94+
{% set build_sql = create_table_as(False, target_relation, sql) %}
95+
{% elif trigger_full_refresh %}
96+
{#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #}
97+
{% set tmp_identifier = model['name'] + '__' + time_stamp + '__dbt_tmp' %}
98+
{% set backup_identifier = model['name'] + '__' + time_stamp + '__dbt_backup' %}
99+
{% set intermediate_relation = existing_relation.incorporate(path={"identifier": tmp_identifier}) %}
100+
{% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}
101+
102+
{% set build_sql = create_table_as(False, intermediate_relation, sql) %}
103+
{% set need_swap = true %}
104+
{% do to_drop.append(backup_relation) %}
105+
{% do to_drop.append(intermediate_relation) %}
106+
{% else %}
107+
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
108+
{% do adapter.expand_target_column_types(
109+
from_relation=tmp_relation,
110+
to_relation=target_relation) %}
111+
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
112+
{% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
113+
{% if not dest_columns %}
114+
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
115+
{% endif %}
116+
117+
{#-- since unique key is not supported, the follow macro (default impl), will only return insert stm, and hence is directly used here --#}
118+
{% set build_sql = get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns) %}
119+
120+
{% endif %}
121+
122+
{% call statement("main") %}
123+
{{ build_sql }}
124+
{% endcall %}
125+
126+
{% if need_swap %}
127+
{% do adapter.rename_relation(target_relation, backup_relation) %}
128+
{% do adapter.rename_relation(intermediate_relation, target_relation) %}
129+
{% endif %}
130+
131+
{% do persist_docs(target_relation, model) %}
132+
133+
{% if existing_relation is none or existing_relation.is_view or should_full_refresh() %}
134+
{% do create_indexes(target_relation) %}
135+
{% endif %}
136+
137+
{{ run_hooks(post_hooks, inside_transaction=True) }}
138+
139+
-- `COMMIT` happens here
140+
{% do adapter.commit() %}
141+
142+
{% for rel in to_drop %}
143+
{% do adapter.drop_relation(rel) %}
144+
{% endfor %}
145+
146+
{{ run_hooks(post_hooks, inside_transaction=False) }}
147+
148+
{{ return({'relations': [target_relation]}) }}
149+
150+
{%- endmaterialization %}
151+

0 commit comments

Comments
 (0)