Skip to content

Commit 9475f4e

Browse files
authored
Merge pull request #8 from DeepLcom/feat/remote-table-materialization
feat: add remote table materialization
2 parents cb631a0 + 006f3c6 commit 9475f4e

File tree

8 files changed

+172
-20
lines changed

8 files changed

+172
-20
lines changed

dbt/adapters/clickhouse/dbclient.py

+1
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ def __init__(self, credentials: ClickHouseCredentials):
107107
"distributed_table": {},
108108
"distributed_incremental": {},
109109
"general": {},
110+
"remote_table": {},
110111
}
111112
if (
112113
not credentials.allow_automatic_deduplication

dbt/adapters/clickhouse/impl.py

+13-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
from dbt_common.events.functions import warn_or_error
2828
from dbt_common.exceptions import DbtInternalError, DbtRuntimeError, NotImplementedError
2929
from dbt_common.utils import filter_null_values
30+
from dbt.contracts.graph.nodes import BaseNode
31+
from dbt.flags import get_flags
3032

3133
from dbt.adapters.clickhouse.cache import ClickHouseRelationsCache
3234
from dbt.adapters.clickhouse.column import ClickHouseColumn, ClickHouseColumnChanges
@@ -117,8 +119,7 @@ def convert_time_type(cls, agate_table: "agate.Table", col_idx: int) -> str:
117119
@available.parse(lambda *a, **k: {})
118120
def get_clickhouse_cluster_name(self):
119121
conn = self.connections.get_if_exists()
120-
if conn.credentials.cluster:
121-
return f'"{conn.credentials.cluster}"'
122+
return conn.credentials.cluster
122123

123124
@available.parse(lambda *a, **k: {})
124125
def get_clickhouse_local_suffix(self):
@@ -558,6 +559,16 @@ class ClickHouseDatabase:
558559
comment: str
559560

560561

562+
563+
def get_materialization(self):
564+
flags = get_flags()
565+
materialized = flags.vars.get('materialized')
566+
return materialized or self.config.materialized
567+
568+
# patches a BaseNode method to allow setting `materialized` config overrides via dbt flags
569+
BaseNode.get_materialization = get_materialization
570+
571+
561572
def _expect_row_value(key: str, row: "agate.Row"):
562573
if key not in row.keys():
563574
raise DbtInternalError(f'Got a row without \'{key}\' column, columns: {row.keys()}')

dbt/adapters/clickhouse/relation.py

+1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class ClickHouseRelation(BaseRelation):
4444
quote_character: str = '`'
4545
can_exchange: bool = False
4646
can_on_cluster: bool = False
47+
remote_cluster: Optional[str] = None
4748

4849
def __post_init__(self):
4950
if self.database != self.schema and self.database:

dbt/include/clickhouse/macros/adapters.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
db.engine as db_engine,
3333
{%- if adapter.get_clickhouse_cluster_name() -%}
3434
count(distinct _shard_num) > 1 as is_on_cluster
35-
from clusterAllReplicas({{ adapter.get_clickhouse_cluster_name() }}, system.tables) as t
35+
from clusterAllReplicas('{{ adapter.get_clickhouse_cluster_name() }}', system.tables) as t
3636
join system.databases as db on t.database = db.name
3737
where schema = '{{ schema_relation.schema }}'
3838
group by name, schema, type, db_engine

dbt/include/clickhouse/macros/materializations/distributed_table.sql

+23-17
Original file line numberDiff line numberDiff line change
@@ -86,24 +86,30 @@
8686

8787
{% endmaterialization %}
8888

89-
{% macro create_distributed_table(relation, local_relation) %}
90-
{%- set cluster = adapter.get_clickhouse_cluster_name() -%}
91-
{% if cluster is none %}
92-
{% do exceptions.raise_compiler_error('Cluster name should be defined for using distributed materializations, current is None') %}
93-
{% endif %}
89+
{% macro create_distributed_table(relation, local_relation, sql=none) %}
90+
{% if adapter.get_clickhouse_cluster_name() is none %}
91+
{% do exceptions.raise_compiler_error('Cluster name should be defined for using distributed materializations, current is None') %}
92+
{% endif %}
9493

95-
{%- set cluster = cluster[1:-1] -%}
96-
{%- set sharding = config.get('sharding_key') -%}
97-
98-
create or replace table {{ relation }} {{ on_cluster_clause(relation) }} as {{ local_relation }}
99-
ENGINE = Distributed('{{ cluster}}', '{{ local_relation.schema }}', '{{ local_relation.name }}'
100-
{%- if sharding is not none and sharding.strip() != '' -%}
101-
, {{ sharding }}
102-
{%- else %}
103-
, rand()
104-
{% endif -%}
105-
)
106-
{% endmacro %}
94+
{%- set remote_cluster = local_relation.remote_cluster or adapter.get_clickhouse_cluster_name() -%}
95+
{%- set sharding = config.get('sharding_key') -%}
96+
{%- set reference = "as " ~ local_relation -%}
97+
{%- if sql -%}
98+
{%- set col_list = [] -%}
99+
{%- for col in adapter.get_column_schema_from_query(sql) -%}
100+
{%- do col_list.append(col.name + ' ' + col.data_type) -%}
101+
{%- endfor -%}
102+
{%- set reference = "(" ~ (col_list | join(', ')) ~ ")" -%}
103+
{%- endif -%}
104+
create or replace table {{ relation }} {{ on_cluster_clause(relation) }} {{ reference }}
105+
engine = Distributed('{{ remote_cluster }}', '{{ local_relation.schema }}', '{{ local_relation.name }}'
106+
{%- if sharding is not none and sharding.strip() != '' -%}
107+
, {{ sharding }}
108+
{%- else %}
109+
, rand()
110+
{% endif -%}
111+
)
112+
{% endmacro %}
107113

108114
{% macro create_empty_table_from_relation(relation, source_relation, sql=none) -%}
109115
{%- set sql_header = config.get('sql_header', none) -%}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
{% materialization remote_table, adapter='clickhouse', supported_languages=['python', 'sql'] -%}
2+
{%- set remote_config = config.require('remote_config') -%}
3+
4+
{%- set remote_cluster = remote_config.get('cluster') -%}
5+
{%- set remote_schema = remote_config.get('local_db_prefix') + this.schema -%}
6+
{%- set remote_identifier = this.identifier + remote_config.get('local_suffix') -%}
7+
8+
{%- set target_relation = this.incorporate(type='table') -%}
9+
{%- set remote_relation = target_relation.incorporate(path={"identifier": remote_identifier, "schema": remote_schema}, remote_cluster=remote_cluster) -%}
10+
{%- set existing_relation = load_cached_relation(this) -%}
11+
12+
{{ run_hooks(pre_hooks, inside_transaction=False) }}
13+
{{ run_hooks(pre_hooks, inside_transaction=True) }}
14+
15+
{%- set column_changes = none -%}
16+
{%- if existing_relation -%}
17+
{%- set column_changes = adapter.check_incremental_schema_changes('ignore', existing_relation, sql) -%}
18+
{%- endif -%}
19+
20+
{% call statement('main') %}
21+
{%- if column_changes or existing_relation is none or should_full_refresh() -%}
22+
{{ create_distributed_table(target_relation, remote_relation, sql) }}
23+
{%- else -%}
24+
{{ log("no-op run: distributed table exists with correct schema.", info=true) }}
25+
select true;
26+
{%- endif -%}
27+
{% endcall %}
28+
29+
{% set should_revoke = should_revoke(target_relation, full_refresh_mode) %}
30+
{% set grant_config = config.get('grants') %}
31+
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
32+
{% do persist_docs(target_relation, model) %}
33+
34+
{{ run_hooks(post_hooks, inside_transaction=True) }}
35+
{{ adapter.commit() }}
36+
{{ run_hooks(post_hooks, inside_transaction=False) }}
37+
{{ return({'relations': [target_relation]}) }}
38+
39+
{% endmaterialization %}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import json
2+
import pytest
3+
from dbt.tests.util import run_dbt, run_dbt_and_capture, get_connection
4+
5+
model = """
6+
{{
7+
config(
8+
remote_config={'cluster': 'test_shard', 'local_db_prefix': '_', 'local_suffix': '_local'},
9+
sharding_key='key1',
10+
)
11+
}}
12+
select toUInt64(number) as key1, toInt64(-number) as key2 from numbers(10)
13+
"""
14+
15+
16+
class TestRemoteTableRemoteConfig:
17+
@pytest.fixture(scope="class")
18+
def models(self):
19+
return {
20+
"remote_table.sql": model,
21+
}
22+
23+
@pytest.fixture(scope="class")
24+
def init_local_table(self, project):
25+
project.run_sql(f"create database if not exists _{project.test_schema} on cluster test_shard")
26+
project.run_sql(f"""
27+
create table _{project.test_schema}.remote_table_local on cluster test_shard
28+
(key1 UInt64, key2 Int64)
29+
engine=MergeTree order by key1
30+
""")
31+
return
32+
33+
def test_with_remote_configuration(self, project, init_local_table):
34+
# the created distributed table should point to a local table as defined in the model's `remote_config`
35+
materialized_var = {"materialized": "remote_table"}
36+
run_dbt(["run", "--vars", json.dumps(materialized_var)])
37+
38+
project.run_sql(f"""
39+
insert into {project.test_schema}.remote_table
40+
select toUInt64(number) as key1, toInt64(-number) as key2 from numbers(10)
41+
""")
42+
43+
self._assert_is_distributed_table(project)
44+
self._assert_correct_engine(project)
45+
self._assert_correct_data(project)
46+
47+
# rerun (should be no-op)
48+
_, log_output = run_dbt_and_capture(["run", "--vars", json.dumps(materialized_var)])
49+
assert "no-op run" in log_output
50+
51+
@staticmethod
52+
def _assert_is_distributed_table(project):
53+
# check correct table creation on current host
54+
result = project.run_sql(
55+
f"select engine from system.tables where name='remote_table'",
56+
fetch="one"
57+
)
58+
assert result is not None
59+
assert result[0] == "Distributed"
60+
61+
@staticmethod
62+
def _assert_correct_engine(project):
63+
# assert correct engine parameters
64+
result = project.run_sql(f"select create_table_query from system.tables where name='remote_table'", fetch="one")
65+
assert f"Distributed('test_shard', '_{project.test_schema}', 'remote_table_local', key1)" in result[0]
66+
67+
@staticmethod
68+
def _assert_correct_data(project):
69+
# query remote data from distributed table
70+
result = project.run_sql("select count(*) as num_rows from remote_table", fetch="one")
71+
assert result[0] == 10
72+
73+
74+
class TestRemoteTableRemoteConfigReplicatedDB(TestRemoteTableRemoteConfig):
75+
@pytest.fixture(scope="class")
76+
def test_config(self, test_config):
77+
test_config["db_engine"] = "Replicated('/clickhouse/databases/{uuid}', '{shard}', '{replica}')"
78+
return test_config
79+
80+
@pytest.fixture(scope="class")
81+
def init_local_table(self, project):
82+
schema_name = f"_{project.test_schema}"
83+
with get_connection(project.adapter):
84+
relation = project.adapter.Relation.create(database=project.database, schema=schema_name)
85+
project.adapter.create_schema(relation)
86+
project.created_schemas.append(schema_name)
87+
88+
project.run_sql(f"""
89+
create table _{project.test_schema}.remote_table_local
90+
(key1 UInt64, key2 Int64)
91+
engine=MergeTree order by key1
92+
""")

tests/integration/test_config.xml

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
<replica from_env="REPLICA_NUM" />
88
<server_index from_env="SERVER_INDEX" />
99
</macros>
10+
<default_replica_path>/clickhouse/tables/{uuid}/{shard}</default_replica_path>
11+
<default_replica_name>{replica}</default_replica_name>
1012
<remote_servers>
1113
<test_shard>
1214
<shard>

0 commit comments

Comments
 (0)