From bee944833f24e4ac7236ac68cc489a3eec45be75 Mon Sep 17 00:00:00 2001 From: Can Bekleyici Date: Mon, 6 Jan 2025 09:26:29 +0100 Subject: [PATCH 1/6] feat: add remote table materialization --- dbt/adapters/clickhouse/dbclient.py | 1 + dbt/adapters/clickhouse/impl.py | 3 +- dbt/adapters/clickhouse/relation.py | 1 + dbt/include/clickhouse/macros/adapters.sql | 2 +- .../materializations/distributed_table.sql | 40 ++++---- .../macros/materializations/remote_table.sql | 27 ++++++ .../adapter/remote_table/test_remote_table.py | 97 +++++++++++++++++++ tests/integration/test_config.xml | 10 ++ 8 files changed, 161 insertions(+), 20 deletions(-) create mode 100644 dbt/include/clickhouse/macros/materializations/remote_table.sql create mode 100644 tests/integration/adapter/remote_table/test_remote_table.py diff --git a/dbt/adapters/clickhouse/dbclient.py b/dbt/adapters/clickhouse/dbclient.py index ba252d03..1355db86 100644 --- a/dbt/adapters/clickhouse/dbclient.py +++ b/dbt/adapters/clickhouse/dbclient.py @@ -107,6 +107,7 @@ def __init__(self, credentials: ClickHouseCredentials): "distributed_table": {}, "distributed_incremental": {}, "general": {}, + "remote_table": {}, } if ( not credentials.allow_automatic_deduplication diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index 48c25054..8e848052 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -117,8 +117,7 @@ def convert_time_type(cls, agate_table: "agate.Table", col_idx: int) -> str: @available.parse(lambda *a, **k: {}) def get_clickhouse_cluster_name(self): conn = self.connections.get_if_exists() - if conn.credentials.cluster: - return f'"{conn.credentials.cluster}"' + return conn.credentials.cluster @available.parse(lambda *a, **k: {}) def get_clickhouse_local_suffix(self): diff --git a/dbt/adapters/clickhouse/relation.py b/dbt/adapters/clickhouse/relation.py index 8ad3b39a..1a2e23f9 100644 --- a/dbt/adapters/clickhouse/relation.py +++ b/dbt/adapters/clickhouse/relation.py @@ -44,6 +44,7 @@ class ClickHouseRelation(BaseRelation): quote_character: str = '`' can_exchange: bool = False can_on_cluster: bool = False + remote_cluster: Optional[str] = None def __post_init__(self): if self.database != self.schema and self.database: diff --git a/dbt/include/clickhouse/macros/adapters.sql b/dbt/include/clickhouse/macros/adapters.sql index 2c2c4a0f..ae39e384 100644 --- a/dbt/include/clickhouse/macros/adapters.sql +++ b/dbt/include/clickhouse/macros/adapters.sql @@ -32,7 +32,7 @@ db.engine as db_engine, {%- if adapter.get_clickhouse_cluster_name() -%} count(distinct _shard_num) > 1 as is_on_cluster - from clusterAllReplicas({{ adapter.get_clickhouse_cluster_name() }}, system.tables) as t + from clusterAllReplicas('{{ adapter.get_clickhouse_cluster_name() }}', system.tables) as t join system.databases as db on t.database = db.name where schema = '{{ schema_relation.schema }}' group by name, schema, type, db_engine diff --git a/dbt/include/clickhouse/macros/materializations/distributed_table.sql b/dbt/include/clickhouse/macros/materializations/distributed_table.sql index d62c6242..1ad2cf66 100644 --- a/dbt/include/clickhouse/macros/materializations/distributed_table.sql +++ b/dbt/include/clickhouse/macros/materializations/distributed_table.sql @@ -86,24 +86,30 @@ {% endmaterialization %} -{% macro create_distributed_table(relation, local_relation) %} - {%- set cluster = adapter.get_clickhouse_cluster_name() -%} - {% if cluster is none %} - {% do exceptions.raise_compiler_error('Cluster name should be defined for using distributed materializations, current is None') %} - {% endif %} +{% macro create_distributed_table(relation, local_relation, sql=none) %} + {% if adapter.get_clickhouse_cluster_name() is none %} + {% do exceptions.raise_compiler_error('Cluster name should be defined for using distributed materializations, current is None') %} + {% endif %} - {%- set cluster = cluster[1:-1] -%} - {%- set sharding = config.get('sharding_key') -%} - - create or replace table {{ relation }} {{ on_cluster_clause(relation) }} as {{ local_relation }} - ENGINE = Distributed('{{ cluster}}', '{{ local_relation.schema }}', '{{ local_relation.name }}' - {%- if sharding is not none and sharding.strip() != '' -%} - , {{ sharding }} - {%- else %} - , rand() - {% endif -%} - ) - {% endmacro %} + {%- set remote_cluster = local_relation.remote_cluster or adapter.get_clickhouse_cluster_name() -%} + {%- set sharding = config.get('sharding_key') -%} + {%- set reference = "as " ~ local_relation -%} + {%- if sql -%} + {%- set col_list = [] -%} + {%- for col in adapter.get_column_schema_from_query(sql) -%} + {%- do col_list.append(col.name + ' ' + col.data_type) -%} + {%- endfor -%} + {%- set reference = "(" ~ (col_list | join(', ')) ~ ")" -%} + {%- endif -%} + create or replace table {{ relation }} {{ on_cluster_clause(relation) }} {{ reference }} + engine = Distributed('{{ remote_cluster }}', '{{ local_relation.schema }}', '{{ local_relation.name }}' + {%- if sharding is not none and sharding.strip() != '' -%} + , {{ sharding }} + {%- else %} + , rand() + {% endif -%} + ) +{% endmacro %} {% macro create_empty_table_from_relation(relation, source_relation, sql=none) -%} {%- set sql_header = config.get('sql_header', none) -%} diff --git a/dbt/include/clickhouse/macros/materializations/remote_table.sql b/dbt/include/clickhouse/macros/materializations/remote_table.sql new file mode 100644 index 00000000..3425dc60 --- /dev/null +++ b/dbt/include/clickhouse/macros/materializations/remote_table.sql @@ -0,0 +1,27 @@ +{% materialization remote_table, adapter='clickhouse', supported_languages=['python', 'sql'] -%} + {%- set remote_config = config.get('remote_config', {}) -%} + {%- set remote_cluster = remote_config.get('cluster') or adapter.get_clickhouse_cluster_name() -%} + {%- set remote_schema = remote_config.get('schema') or adapter.get_clickhouse_local_db_prefix() + this.schema -%} + {%- set remote_identifier = remote_config.get('identifier') or this.identifier + adapter.get_clickhouse_local_suffix() -%} + + {% set target_relation = this.incorporate(type='table') %} + {% set remote_relation = target_relation.incorporate(path={"identifier": remote_identifier, "schema": remote_schema}, remote_cluster=remote_cluster) %} + + {{ run_hooks(pre_hooks, inside_transaction=False) }} + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + {% call statement('main') %} + {{ create_distributed_table(target_relation, remote_relation, sql) }} + {% endcall %} + + {% set should_revoke = should_revoke(target_relation, full_refresh_mode) %} + {% set grant_config = config.get('grants') %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + {% do persist_docs(target_relation, model) %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + {{ adapter.commit() }} + {{ run_hooks(post_hooks, inside_transaction=False) }} + {{ return({'relations': [target_relation]}) }} + +{% endmaterialization %} \ No newline at end of file diff --git a/tests/integration/adapter/remote_table/test_remote_table.py b/tests/integration/adapter/remote_table/test_remote_table.py new file mode 100644 index 00000000..2afbcabc --- /dev/null +++ b/tests/integration/adapter/remote_table/test_remote_table.py @@ -0,0 +1,97 @@ +import pytest +from dbt.tests.util import run_dbt + + +model_with_defaults = """ +{{ + config( + materialized='remote_table', + ) +}} +select toUInt64(number) as key1, toInt64(-number) as key2 from numbers(10) +""" + + +class TestRemoteTable: + @pytest.fixture(scope="class") + def models(self): + return { + "remote_table.sql": model_with_defaults, + } + + def test_with_defaults(self, project): + # initialize a local table on current cluster + project.run_sql(f""" + create table {project.test_schema}.remote_table_local on cluster {project.test_config["cluster"]} + (key1 UInt64, key2 Int64) + engine=ReplicatedMergeTree order by key1 + """) + + # this `remote_table` run with default configuration will point to previously created local table, taking + # `local_db_prefix` and `local_suffix` settings into account. + run_dbt() + + # insert into distributed table + project.run_sql(f""" + insert into {project.test_schema}.remote_table + select toUInt64(number) as key1, toInt64(-number) as key2 from numbers(10) + settings insert_quorum=1 + """) + + _assert_is_distributed_table(project) + _assert_correct_distributed_data(project) + + +model_with_remote_configuration = """ +{{ + config( + materialized='remote_table', + remote_config={'cluster': 'test_remote', 'schema': this.schema, 'identifier': 'remote_target_table'}, + sharding_key='key1', + ) +}} +select toUInt64(number) as key1, toInt64(-number) as key2 from numbers(10) +""" + + +class TestRemoteTableRemoteConfig: + @pytest.fixture(scope="class") + def models(self): + return { + "remote_table.sql": model_with_remote_configuration, + } + + def test_with_remote_configuration(self, project): + # initialize a local table on remote cluster 'test_remote_cluster' + project.run_sql(f"create database if not exists {project.test_schema} on cluster `test_remote`") + project.run_sql(f""" + create table {project.test_schema}.remote_target_table on cluster `test_remote` + engine=MergeTree order by key1 + as select toUInt64(number) as key1, toInt64(-number) as key2 from numbers(10) + """) + + # the created distributed table should point to a local table as defined in the model's `remote_config` + run_dbt() + + _assert_is_distributed_table(project) + _assert_correct_distributed_data(project) + + # assert correct engine parameters + result = project.run_sql(f"select create_table_query from system.tables where name='remote_table'", fetch="one") + assert f"Distributed('test_remote', '{project.test_schema}', 'remote_target_table', key1)" in result[0] + + +def _assert_is_distributed_table(project): + # check correct table creation on current host + result = project.run_sql( + f"select engine from system.tables where name='remote_table'", + fetch="one" + ) + assert result is not None + assert result[0] == "Distributed" + + +def _assert_correct_distributed_data(project): + # query remote data from distributed table + result = project.run_sql("select count(*) as num_rows from remote_table", fetch="one") + assert result[0] == 10 diff --git a/tests/integration/test_config.xml b/tests/integration/test_config.xml index 9f2aec4f..7970fe35 100644 --- a/tests/integration/test_config.xml +++ b/tests/integration/test_config.xml @@ -7,6 +7,8 @@ + /clickhouse/tables/{uuid}/{shard} + {replica} @@ -44,6 +46,14 @@ + + + + ch2 + 9000 + + + 9181 From 05bf55dca97f9ceb1c9576c524f38fc04e129ed1 Mon Sep 17 00:00:00 2001 From: Can Bekleyici Date: Wed, 22 Jan 2025 12:40:03 +0100 Subject: [PATCH 2/6] refactoring parameters and test cases --- .../macros/materializations/remote_table.sql | 29 +++++-- .../adapter/remote_table/test_remote_table.py | 78 ++++++------------- tests/integration/test_config.xml | 8 -- 3 files changed, 47 insertions(+), 68 deletions(-) diff --git a/dbt/include/clickhouse/macros/materializations/remote_table.sql b/dbt/include/clickhouse/macros/materializations/remote_table.sql index 3425dc60..a7e56e77 100644 --- a/dbt/include/clickhouse/macros/materializations/remote_table.sql +++ b/dbt/include/clickhouse/macros/materializations/remote_table.sql @@ -1,17 +1,32 @@ {% materialization remote_table, adapter='clickhouse', supported_languages=['python', 'sql'] -%} - {%- set remote_config = config.get('remote_config', {}) -%} - {%- set remote_cluster = remote_config.get('cluster') or adapter.get_clickhouse_cluster_name() -%} - {%- set remote_schema = remote_config.get('schema') or adapter.get_clickhouse_local_db_prefix() + this.schema -%} - {%- set remote_identifier = remote_config.get('identifier') or this.identifier + adapter.get_clickhouse_local_suffix() -%} + {%- set remote_config = config.get('remote_config', none) -%} + {%- if remote_config is none -%} + {% do exceptions.raise_compiler_error('`remote_config` model configuration needs to be provided to run `remote_table` materialization.') %} + {%- endif -%} - {% set target_relation = this.incorporate(type='table') %} - {% set remote_relation = target_relation.incorporate(path={"identifier": remote_identifier, "schema": remote_schema}, remote_cluster=remote_cluster) %} + {%- set remote_cluster = remote_config.get('cluster') -%} + {%- set remote_schema = remote_config.get('local_db_prefix') + this.schema -%} + {%- set remote_identifier = this.identifier + remote_config.get('local_suffix') -%} + + {%- set target_relation = this.incorporate(type='table') -%} + {%- set remote_relation = target_relation.incorporate(path={"identifier": remote_identifier, "schema": remote_schema}, remote_cluster=remote_cluster) -%} + {%- set existing_relation = load_cached_relation(this) -%} {{ run_hooks(pre_hooks, inside_transaction=False) }} {{ run_hooks(pre_hooks, inside_transaction=True) }} + {%- set column_changes = none -%} + {%- if existing_relation -%} + {%- set column_changes = adapter.check_incremental_schema_changes('ignore', existing_relation, sql) -%} + {%- endif -%} + {% call statement('main') %} - {{ create_distributed_table(target_relation, remote_relation, sql) }} + {%- if column_changes or existing_relation is none or should_full_refresh() -%} + {{ create_distributed_table(target_relation, remote_relation, sql) }} + {%- else -%} + {{ log("no-op run: distributed table exists with correct schema.", info=true) }} + select true; + {%- endif -%} {% endcall %} {% set should_revoke = should_revoke(target_relation, full_refresh_mode) %} diff --git a/tests/integration/adapter/remote_table/test_remote_table.py b/tests/integration/adapter/remote_table/test_remote_table.py index 2afbcabc..957be70c 100644 --- a/tests/integration/adapter/remote_table/test_remote_table.py +++ b/tests/integration/adapter/remote_table/test_remote_table.py @@ -1,84 +1,50 @@ import pytest -from dbt.tests.util import run_dbt +from dbt.tests.util import run_dbt, run_dbt_and_capture - -model_with_defaults = """ +model = """ {{ config( materialized='remote_table', + remote_config={'cluster': 'test_shard', 'local_db_prefix': '_', 'local_suffix': '_local'}, + sharding_key='key1', ) }} select toUInt64(number) as key1, toInt64(-number) as key2 from numbers(10) """ -class TestRemoteTable: +class TestRemoteTableRemoteConfig: @pytest.fixture(scope="class") def models(self): return { - "remote_table.sql": model_with_defaults, + "remote_table.sql": model, } - def test_with_defaults(self, project): - # initialize a local table on current cluster + def test_with_remote_configuration(self, project): + # initialize a local table + project.run_sql(f"create database if not exists _{project.test_schema} on cluster test_shard") project.run_sql(f""" - create table {project.test_schema}.remote_table_local on cluster {project.test_config["cluster"]} + create table _{project.test_schema}.remote_table_local on cluster test_shard (key1 UInt64, key2 Int64) - engine=ReplicatedMergeTree order by key1 + engine=MergeTree order by key1 """) - # this `remote_table` run with default configuration will point to previously created local table, taking - # `local_db_prefix` and `local_suffix` settings into account. + # the created distributed table should point to a local table as defined in the model's `remote_config` run_dbt() - # insert into distributed table + # insert data via distributed table project.run_sql(f""" insert into {project.test_schema}.remote_table select toUInt64(number) as key1, toInt64(-number) as key2 from numbers(10) - settings insert_quorum=1 """) _assert_is_distributed_table(project) - _assert_correct_distributed_data(project) - + _assert_correct_engine(project) + _assert_correct_data(project) -model_with_remote_configuration = """ -{{ - config( - materialized='remote_table', - remote_config={'cluster': 'test_remote', 'schema': this.schema, 'identifier': 'remote_target_table'}, - sharding_key='key1', - ) -}} -select toUInt64(number) as key1, toInt64(-number) as key2 from numbers(10) -""" - - -class TestRemoteTableRemoteConfig: - @pytest.fixture(scope="class") - def models(self): - return { - "remote_table.sql": model_with_remote_configuration, - } - - def test_with_remote_configuration(self, project): - # initialize a local table on remote cluster 'test_remote_cluster' - project.run_sql(f"create database if not exists {project.test_schema} on cluster `test_remote`") - project.run_sql(f""" - create table {project.test_schema}.remote_target_table on cluster `test_remote` - engine=MergeTree order by key1 - as select toUInt64(number) as key1, toInt64(-number) as key2 from numbers(10) - """) - - # the created distributed table should point to a local table as defined in the model's `remote_config` - run_dbt() - - _assert_is_distributed_table(project) - _assert_correct_distributed_data(project) - - # assert correct engine parameters - result = project.run_sql(f"select create_table_query from system.tables where name='remote_table'", fetch="one") - assert f"Distributed('test_remote', '{project.test_schema}', 'remote_target_table', key1)" in result[0] + # rerun (should be no-op) + _, log_output = run_dbt_and_capture() + assert "no-op run" in log_output def _assert_is_distributed_table(project): @@ -91,7 +57,13 @@ def _assert_is_distributed_table(project): assert result[0] == "Distributed" -def _assert_correct_distributed_data(project): +def _assert_correct_engine(project): + # assert correct engine parameters + result = project.run_sql(f"select create_table_query from system.tables where name='remote_table'", fetch="one") + assert f"Distributed('test_shard', '_{project.test_schema}', 'remote_table_local', key1)" in result[0] + + +def _assert_correct_data(project): # query remote data from distributed table result = project.run_sql("select count(*) as num_rows from remote_table", fetch="one") assert result[0] == 10 diff --git a/tests/integration/test_config.xml b/tests/integration/test_config.xml index 7970fe35..814fcc8b 100644 --- a/tests/integration/test_config.xml +++ b/tests/integration/test_config.xml @@ -46,14 +46,6 @@ - - - - ch2 - 9000 - - - 9181 From c1abd796bf5f4cdf39cacc822a0a9fa31541eac5 Mon Sep 17 00:00:00 2001 From: Can Bekleyici Date: Wed, 22 Jan 2025 12:44:08 +0100 Subject: [PATCH 3/6] require remote_config --- .../clickhouse/macros/materializations/remote_table.sql | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/dbt/include/clickhouse/macros/materializations/remote_table.sql b/dbt/include/clickhouse/macros/materializations/remote_table.sql index a7e56e77..12ff8f9c 100644 --- a/dbt/include/clickhouse/macros/materializations/remote_table.sql +++ b/dbt/include/clickhouse/macros/materializations/remote_table.sql @@ -1,8 +1,5 @@ {% materialization remote_table, adapter='clickhouse', supported_languages=['python', 'sql'] -%} - {%- set remote_config = config.get('remote_config', none) -%} - {%- if remote_config is none -%} - {% do exceptions.raise_compiler_error('`remote_config` model configuration needs to be provided to run `remote_table` materialization.') %} - {%- endif -%} + {%- set remote_config = config.require('remote_config') -%} {%- set remote_cluster = remote_config.get('cluster') -%} {%- set remote_schema = remote_config.get('local_db_prefix') + this.schema -%} From 97958c205f337348e4830dc4aed975e0a975d507 Mon Sep 17 00:00:00 2001 From: Can Bekleyici Date: Wed, 22 Jan 2025 17:16:01 +0100 Subject: [PATCH 4/6] add test cases for replicated db --- .../adapter/remote_table/test_remote_table.py | 68 ++++++++++++------- 1 file changed, 45 insertions(+), 23 deletions(-) diff --git a/tests/integration/adapter/remote_table/test_remote_table.py b/tests/integration/adapter/remote_table/test_remote_table.py index 957be70c..937c4427 100644 --- a/tests/integration/adapter/remote_table/test_remote_table.py +++ b/tests/integration/adapter/remote_table/test_remote_table.py @@ -1,5 +1,5 @@ import pytest -from dbt.tests.util import run_dbt, run_dbt_and_capture +from dbt.tests.util import run_dbt, run_dbt_and_capture, get_connection model = """ {{ @@ -20,50 +20,72 @@ def models(self): "remote_table.sql": model, } - def test_with_remote_configuration(self, project): - # initialize a local table + @pytest.fixture(scope="class") + def init_local_table(self, project): project.run_sql(f"create database if not exists _{project.test_schema} on cluster test_shard") project.run_sql(f""" create table _{project.test_schema}.remote_table_local on cluster test_shard (key1 UInt64, key2 Int64) engine=MergeTree order by key1 """) + return + def test_with_remote_configuration(self, project, init_local_table): # the created distributed table should point to a local table as defined in the model's `remote_config` run_dbt() - # insert data via distributed table project.run_sql(f""" insert into {project.test_schema}.remote_table select toUInt64(number) as key1, toInt64(-number) as key2 from numbers(10) """) - _assert_is_distributed_table(project) - _assert_correct_engine(project) - _assert_correct_data(project) + self._assert_is_distributed_table(project) + self._assert_correct_engine(project) + self._assert_correct_data(project) # rerun (should be no-op) _, log_output = run_dbt_and_capture() assert "no-op run" in log_output + @staticmethod + def _assert_is_distributed_table(project): + # check correct table creation on current host + result = project.run_sql( + f"select engine from system.tables where name='remote_table'", + fetch="one" + ) + assert result is not None + assert result[0] == "Distributed" -def _assert_is_distributed_table(project): - # check correct table creation on current host - result = project.run_sql( - f"select engine from system.tables where name='remote_table'", - fetch="one" - ) - assert result is not None - assert result[0] == "Distributed" + @staticmethod + def _assert_correct_engine(project): + # assert correct engine parameters + result = project.run_sql(f"select create_table_query from system.tables where name='remote_table'", fetch="one") + assert f"Distributed('test_shard', '_{project.test_schema}', 'remote_table_local', key1)" in result[0] + @staticmethod + def _assert_correct_data(project): + # query remote data from distributed table + result = project.run_sql("select count(*) as num_rows from remote_table", fetch="one") + assert result[0] == 10 -def _assert_correct_engine(project): - # assert correct engine parameters - result = project.run_sql(f"select create_table_query from system.tables where name='remote_table'", fetch="one") - assert f"Distributed('test_shard', '_{project.test_schema}', 'remote_table_local', key1)" in result[0] +class TestRemoteTableRemoteConfigReplicatedDB(TestRemoteTableRemoteConfig): + @pytest.fixture(scope="class") + def init_local_table(self, project): + schema_name = f"_{project.test_schema}" + with get_connection(project.adapter): + relation = project.adapter.Relation.create(database=project.database, schema=schema_name) + project.adapter.create_schema(relation) + project.created_schemas.append(schema_name) -def _assert_correct_data(project): - # query remote data from distributed table - result = project.run_sql("select count(*) as num_rows from remote_table", fetch="one") - assert result[0] == 10 + project.run_sql(f""" + create table _{project.test_schema}.remote_table_local + (key1 UInt64, key2 Int64) + engine=MergeTree order by key1 + """) + + @pytest.fixture(scope="class") + def test_config(self, test_config): + test_config["db_engine"] = "Replicated('/clickhouse/databases/{uuid}', '{shard}', '{replica}')" + return test_config From 171a4de81c5b222d1f7b00a1de9d45abd2c10cd8 Mon Sep 17 00:00:00 2001 From: Can Bekleyici Date: Wed, 22 Jan 2025 17:16:46 +0100 Subject: [PATCH 5/6] cleanup --- .../adapter/remote_table/test_remote_table.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/integration/adapter/remote_table/test_remote_table.py b/tests/integration/adapter/remote_table/test_remote_table.py index 937c4427..590a511a 100644 --- a/tests/integration/adapter/remote_table/test_remote_table.py +++ b/tests/integration/adapter/remote_table/test_remote_table.py @@ -71,6 +71,11 @@ def _assert_correct_data(project): class TestRemoteTableRemoteConfigReplicatedDB(TestRemoteTableRemoteConfig): + @pytest.fixture(scope="class") + def test_config(self, test_config): + test_config["db_engine"] = "Replicated('/clickhouse/databases/{uuid}', '{shard}', '{replica}')" + return test_config + @pytest.fixture(scope="class") def init_local_table(self, project): schema_name = f"_{project.test_schema}" @@ -84,8 +89,3 @@ def init_local_table(self, project): (key1 UInt64, key2 Int64) engine=MergeTree order by key1 """) - - @pytest.fixture(scope="class") - def test_config(self, test_config): - test_config["db_engine"] = "Replicated('/clickhouse/databases/{uuid}', '{shard}', '{replica}')" - return test_config From 006f3c63afd55eb3be4b117c22924f4ff90c4a84 Mon Sep 17 00:00:00 2001 From: Can Bekleyici Date: Mon, 3 Feb 2025 16:01:41 +0100 Subject: [PATCH 6/6] add materialized var via cli flag --- dbt/adapters/clickhouse/impl.py | 12 ++++++++++++ .../adapter/remote_table/test_remote_table.py | 7 ++++--- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index d4a878a7..e9d0e306 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -27,6 +27,8 @@ from dbt_common.events.functions import warn_or_error from dbt_common.exceptions import DbtInternalError, DbtRuntimeError, NotImplementedError from dbt_common.utils import filter_null_values +from dbt.contracts.graph.nodes import BaseNode +from dbt.flags import get_flags from dbt.adapters.clickhouse.cache import ClickHouseRelationsCache from dbt.adapters.clickhouse.column import ClickHouseColumn, ClickHouseColumnChanges @@ -557,6 +559,16 @@ class ClickHouseDatabase: comment: str + +def get_materialization(self): + flags = get_flags() + materialized = flags.vars.get('materialized') + return materialized or self.config.materialized + +# patches a BaseNode method to allow setting `materialized` config overrides via dbt flags +BaseNode.get_materialization = get_materialization + + def _expect_row_value(key: str, row: "agate.Row"): if key not in row.keys(): raise DbtInternalError(f'Got a row without \'{key}\' column, columns: {row.keys()}') diff --git a/tests/integration/adapter/remote_table/test_remote_table.py b/tests/integration/adapter/remote_table/test_remote_table.py index 590a511a..b04b34eb 100644 --- a/tests/integration/adapter/remote_table/test_remote_table.py +++ b/tests/integration/adapter/remote_table/test_remote_table.py @@ -1,10 +1,10 @@ +import json import pytest from dbt.tests.util import run_dbt, run_dbt_and_capture, get_connection model = """ {{ config( - materialized='remote_table', remote_config={'cluster': 'test_shard', 'local_db_prefix': '_', 'local_suffix': '_local'}, sharding_key='key1', ) @@ -32,7 +32,8 @@ def init_local_table(self, project): def test_with_remote_configuration(self, project, init_local_table): # the created distributed table should point to a local table as defined in the model's `remote_config` - run_dbt() + materialized_var = {"materialized": "remote_table"} + run_dbt(["run", "--vars", json.dumps(materialized_var)]) project.run_sql(f""" insert into {project.test_schema}.remote_table @@ -44,7 +45,7 @@ def test_with_remote_configuration(self, project, init_local_table): self._assert_correct_data(project) # rerun (should be no-op) - _, log_output = run_dbt_and_capture() + _, log_output = run_dbt_and_capture(["run", "--vars", json.dumps(materialized_var)]) assert "no-op run" in log_output @staticmethod