diff --git a/dbt/adapters/clickhouse/dbclient.py b/dbt/adapters/clickhouse/dbclient.py index ba252d03..1355db86 100644 --- a/dbt/adapters/clickhouse/dbclient.py +++ b/dbt/adapters/clickhouse/dbclient.py @@ -107,6 +107,7 @@ def __init__(self, credentials: ClickHouseCredentials): "distributed_table": {}, "distributed_incremental": {}, "general": {}, + "remote_table": {}, } if ( not credentials.allow_automatic_deduplication diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index 0789413c..e9d0e306 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -27,6 +27,8 @@ from dbt_common.events.functions import warn_or_error from dbt_common.exceptions import DbtInternalError, DbtRuntimeError, NotImplementedError from dbt_common.utils import filter_null_values +from dbt.contracts.graph.nodes import BaseNode +from dbt.flags import get_flags from dbt.adapters.clickhouse.cache import ClickHouseRelationsCache from dbt.adapters.clickhouse.column import ClickHouseColumn, ClickHouseColumnChanges @@ -117,8 +119,7 @@ def convert_time_type(cls, agate_table: "agate.Table", col_idx: int) -> str: @available.parse(lambda *a, **k: {}) def get_clickhouse_cluster_name(self): conn = self.connections.get_if_exists() - if conn.credentials.cluster: - return f'"{conn.credentials.cluster}"' + return conn.credentials.cluster @available.parse(lambda *a, **k: {}) def get_clickhouse_local_suffix(self): @@ -558,6 +559,16 @@ class ClickHouseDatabase: comment: str + +def get_materialization(self): + flags = get_flags() + materialized = flags.vars.get('materialized') + return materialized or self.config.materialized + +# patches a BaseNode method to allow setting `materialized` config overrides via dbt flags +BaseNode.get_materialization = get_materialization + + def _expect_row_value(key: str, row: "agate.Row"): if key not in row.keys(): raise DbtInternalError(f'Got a row without \'{key}\' column, columns: {row.keys()}') diff --git a/dbt/adapters/clickhouse/relation.py b/dbt/adapters/clickhouse/relation.py index 351275ba..ee6108f9 100644 --- a/dbt/adapters/clickhouse/relation.py +++ b/dbt/adapters/clickhouse/relation.py @@ -44,6 +44,7 @@ class ClickHouseRelation(BaseRelation): quote_character: str = '`' can_exchange: bool = False can_on_cluster: bool = False + remote_cluster: Optional[str] = None def __post_init__(self): if self.database != self.schema and self.database: diff --git a/dbt/include/clickhouse/macros/adapters.sql b/dbt/include/clickhouse/macros/adapters.sql index a454e110..0bfc6c10 100644 --- a/dbt/include/clickhouse/macros/adapters.sql +++ b/dbt/include/clickhouse/macros/adapters.sql @@ -32,7 +32,7 @@ db.engine as db_engine, {%- if adapter.get_clickhouse_cluster_name() -%} count(distinct _shard_num) > 1 as is_on_cluster - from clusterAllReplicas({{ adapter.get_clickhouse_cluster_name() }}, system.tables) as t + from clusterAllReplicas('{{ adapter.get_clickhouse_cluster_name() }}', system.tables) as t join system.databases as db on t.database = db.name where schema = '{{ schema_relation.schema }}' group by name, schema, type, db_engine diff --git a/dbt/include/clickhouse/macros/materializations/distributed_table.sql b/dbt/include/clickhouse/macros/materializations/distributed_table.sql index d62c6242..1ad2cf66 100644 --- a/dbt/include/clickhouse/macros/materializations/distributed_table.sql +++ b/dbt/include/clickhouse/macros/materializations/distributed_table.sql @@ -86,24 +86,30 @@ {% endmaterialization %} -{% macro create_distributed_table(relation, local_relation) %} - {%- set cluster = adapter.get_clickhouse_cluster_name() -%} - {% if cluster is none %} - {% do exceptions.raise_compiler_error('Cluster name should be defined for using distributed materializations, current is None') %} - {% endif %} +{% macro create_distributed_table(relation, local_relation, sql=none) %} + {% if adapter.get_clickhouse_cluster_name() is none %} + {% do exceptions.raise_compiler_error('Cluster name should be defined for using distributed materializations, current is None') %} + {% endif %} - {%- set cluster = cluster[1:-1] -%} - {%- set sharding = config.get('sharding_key') -%} - - create or replace table {{ relation }} {{ on_cluster_clause(relation) }} as {{ local_relation }} - ENGINE = Distributed('{{ cluster}}', '{{ local_relation.schema }}', '{{ local_relation.name }}' - {%- if sharding is not none and sharding.strip() != '' -%} - , {{ sharding }} - {%- else %} - , rand() - {% endif -%} - ) - {% endmacro %} + {%- set remote_cluster = local_relation.remote_cluster or adapter.get_clickhouse_cluster_name() -%} + {%- set sharding = config.get('sharding_key') -%} + {%- set reference = "as " ~ local_relation -%} + {%- if sql -%} + {%- set col_list = [] -%} + {%- for col in adapter.get_column_schema_from_query(sql) -%} + {%- do col_list.append(col.name + ' ' + col.data_type) -%} + {%- endfor -%} + {%- set reference = "(" ~ (col_list | join(', ')) ~ ")" -%} + {%- endif -%} + create or replace table {{ relation }} {{ on_cluster_clause(relation) }} {{ reference }} + engine = Distributed('{{ remote_cluster }}', '{{ local_relation.schema }}', '{{ local_relation.name }}' + {%- if sharding is not none and sharding.strip() != '' -%} + , {{ sharding }} + {%- else %} + , rand() + {% endif -%} + ) +{% endmacro %} {% macro create_empty_table_from_relation(relation, source_relation, sql=none) -%} {%- set sql_header = config.get('sql_header', none) -%} diff --git a/dbt/include/clickhouse/macros/materializations/remote_table.sql b/dbt/include/clickhouse/macros/materializations/remote_table.sql new file mode 100644 index 00000000..12ff8f9c --- /dev/null +++ b/dbt/include/clickhouse/macros/materializations/remote_table.sql @@ -0,0 +1,39 @@ +{% materialization remote_table, adapter='clickhouse', supported_languages=['python', 'sql'] -%} + {%- set remote_config = config.require('remote_config') -%} + + {%- set remote_cluster = remote_config.get('cluster') -%} + {%- set remote_schema = remote_config.get('local_db_prefix') + this.schema -%} + {%- set remote_identifier = this.identifier + remote_config.get('local_suffix') -%} + + {%- set target_relation = this.incorporate(type='table') -%} + {%- set remote_relation = target_relation.incorporate(path={"identifier": remote_identifier, "schema": remote_schema}, remote_cluster=remote_cluster) -%} + {%- set existing_relation = load_cached_relation(this) -%} + + {{ run_hooks(pre_hooks, inside_transaction=False) }} + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + {%- set column_changes = none -%} + {%- if existing_relation -%} + {%- set column_changes = adapter.check_incremental_schema_changes('ignore', existing_relation, sql) -%} + {%- endif -%} + + {% call statement('main') %} + {%- if column_changes or existing_relation is none or should_full_refresh() -%} + {{ create_distributed_table(target_relation, remote_relation, sql) }} + {%- else -%} + {{ log("no-op run: distributed table exists with correct schema.", info=true) }} + select true; + {%- endif -%} + {% endcall %} + + {% set should_revoke = should_revoke(target_relation, full_refresh_mode) %} + {% set grant_config = config.get('grants') %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + {% do persist_docs(target_relation, model) %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + {{ adapter.commit() }} + {{ run_hooks(post_hooks, inside_transaction=False) }} + {{ return({'relations': [target_relation]}) }} + +{% endmaterialization %} \ No newline at end of file diff --git a/tests/integration/adapter/remote_table/test_remote_table.py b/tests/integration/adapter/remote_table/test_remote_table.py new file mode 100644 index 00000000..b04b34eb --- /dev/null +++ b/tests/integration/adapter/remote_table/test_remote_table.py @@ -0,0 +1,92 @@ +import json +import pytest +from dbt.tests.util import run_dbt, run_dbt_and_capture, get_connection + +model = """ +{{ + config( + remote_config={'cluster': 'test_shard', 'local_db_prefix': '_', 'local_suffix': '_local'}, + sharding_key='key1', + ) +}} +select toUInt64(number) as key1, toInt64(-number) as key2 from numbers(10) +""" + + +class TestRemoteTableRemoteConfig: + @pytest.fixture(scope="class") + def models(self): + return { + "remote_table.sql": model, + } + + @pytest.fixture(scope="class") + def init_local_table(self, project): + project.run_sql(f"create database if not exists _{project.test_schema} on cluster test_shard") + project.run_sql(f""" + create table _{project.test_schema}.remote_table_local on cluster test_shard + (key1 UInt64, key2 Int64) + engine=MergeTree order by key1 + """) + return + + def test_with_remote_configuration(self, project, init_local_table): + # the created distributed table should point to a local table as defined in the model's `remote_config` + materialized_var = {"materialized": "remote_table"} + run_dbt(["run", "--vars", json.dumps(materialized_var)]) + + project.run_sql(f""" + insert into {project.test_schema}.remote_table + select toUInt64(number) as key1, toInt64(-number) as key2 from numbers(10) + """) + + self._assert_is_distributed_table(project) + self._assert_correct_engine(project) + self._assert_correct_data(project) + + # rerun (should be no-op) + _, log_output = run_dbt_and_capture(["run", "--vars", json.dumps(materialized_var)]) + assert "no-op run" in log_output + + @staticmethod + def _assert_is_distributed_table(project): + # check correct table creation on current host + result = project.run_sql( + f"select engine from system.tables where name='remote_table'", + fetch="one" + ) + assert result is not None + assert result[0] == "Distributed" + + @staticmethod + def _assert_correct_engine(project): + # assert correct engine parameters + result = project.run_sql(f"select create_table_query from system.tables where name='remote_table'", fetch="one") + assert f"Distributed('test_shard', '_{project.test_schema}', 'remote_table_local', key1)" in result[0] + + @staticmethod + def _assert_correct_data(project): + # query remote data from distributed table + result = project.run_sql("select count(*) as num_rows from remote_table", fetch="one") + assert result[0] == 10 + + +class TestRemoteTableRemoteConfigReplicatedDB(TestRemoteTableRemoteConfig): + @pytest.fixture(scope="class") + def test_config(self, test_config): + test_config["db_engine"] = "Replicated('/clickhouse/databases/{uuid}', '{shard}', '{replica}')" + return test_config + + @pytest.fixture(scope="class") + def init_local_table(self, project): + schema_name = f"_{project.test_schema}" + with get_connection(project.adapter): + relation = project.adapter.Relation.create(database=project.database, schema=schema_name) + project.adapter.create_schema(relation) + project.created_schemas.append(schema_name) + + project.run_sql(f""" + create table _{project.test_schema}.remote_table_local + (key1 UInt64, key2 Int64) + engine=MergeTree order by key1 + """) diff --git a/tests/integration/test_config.xml b/tests/integration/test_config.xml index 9f2aec4f..814fcc8b 100644 --- a/tests/integration/test_config.xml +++ b/tests/integration/test_config.xml @@ -7,6 +7,8 @@ + /clickhouse/tables/{uuid}/{shard} + {replica}