forked from ClickHouse/dbt-clickhouse
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_remote_table.py
92 lines (77 loc) · 3.4 KB
/
test_remote_table.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import json
import pytest
from dbt.tests.util import run_dbt, run_dbt_and_capture, get_connection
model = """
{{
config(
remote_config={'cluster': 'test_shard', 'local_db_prefix': '_', 'local_suffix': '_local'},
sharding_key='key1',
)
}}
select toUInt64(number) as key1, toInt64(-number) as key2 from numbers(10)
"""
class TestRemoteTableRemoteConfig:
@pytest.fixture(scope="class")
def models(self):
return {
"remote_table.sql": model,
}
@pytest.fixture(scope="class")
def init_local_table(self, project):
project.run_sql(f"create database if not exists _{project.test_schema} on cluster test_shard")
project.run_sql(f"""
create table _{project.test_schema}.remote_table_local on cluster test_shard
(key1 UInt64, key2 Int64)
engine=MergeTree order by key1
""")
return
def test_with_remote_configuration(self, project, init_local_table):
# the created distributed table should point to a local table as defined in the model's `remote_config`
materialized_var = {"materialized": "remote_table"}
run_dbt(["run", "--vars", json.dumps(materialized_var)])
project.run_sql(f"""
insert into {project.test_schema}.remote_table
select toUInt64(number) as key1, toInt64(-number) as key2 from numbers(10)
""")
self._assert_is_distributed_table(project)
self._assert_correct_engine(project)
self._assert_correct_data(project)
# rerun (should be no-op)
_, log_output = run_dbt_and_capture(["run", "--vars", json.dumps(materialized_var)])
assert "no-op run" in log_output
@staticmethod
def _assert_is_distributed_table(project):
# check correct table creation on current host
result = project.run_sql(
f"select engine from system.tables where name='remote_table'",
fetch="one"
)
assert result is not None
assert result[0] == "Distributed"
@staticmethod
def _assert_correct_engine(project):
# assert correct engine parameters
result = project.run_sql(f"select create_table_query from system.tables where name='remote_table'", fetch="one")
assert f"Distributed('test_shard', '_{project.test_schema}', 'remote_table_local', key1)" in result[0]
@staticmethod
def _assert_correct_data(project):
# query remote data from distributed table
result = project.run_sql("select count(*) as num_rows from remote_table", fetch="one")
assert result[0] == 10
class TestRemoteTableRemoteConfigReplicatedDB(TestRemoteTableRemoteConfig):
@pytest.fixture(scope="class")
def test_config(self, test_config):
test_config["db_engine"] = "Replicated('/clickhouse/databases/{uuid}', '{shard}', '{replica}')"
return test_config
@pytest.fixture(scope="class")
def init_local_table(self, project):
schema_name = f"_{project.test_schema}"
with get_connection(project.adapter):
relation = project.adapter.Relation.create(database=project.database, schema=schema_name)
project.adapter.create_schema(relation)
project.created_schemas.append(schema_name)
project.run_sql(f"""
create table _{project.test_schema}.remote_table_local
(key1 UInt64, key2 Int64)
engine=MergeTree order by key1
""")