forked from ClickHouse/dbt-clickhouse
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrelation.py
136 lines (114 loc) · 4.83 KB
/
relation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Type
from dbt.adapters.base.relation import BaseRelation, Path, Policy, Self
from dbt.adapters.contracts.relation import HasQuoting, RelationConfig
from dbt_common.dataclass_schema import StrEnum
from dbt_common.exceptions import DbtRuntimeError
from dbt_common.utils import deep_merge
from dbt.adapters.clickhouse.query import quote_identifier
NODE_TYPE_SOURCE = 'source'
@dataclass
class ClickHouseQuotePolicy(Policy):
database: bool = True
schema: bool = True
identifier: bool = True
@dataclass
class ClickHouseIncludePolicy(Policy):
database: bool = False
schema: bool = True
identifier: bool = True
class ClickHouseRelationType(StrEnum):
Table = "table"
View = "view"
CTE = "cte"
MaterializedView = "materialized_view"
External = "external"
Ephemeral = "ephemeral"
Dictionary = "dictionary"
@dataclass(frozen=True, eq=False, repr=False)
class ClickHouseRelation(BaseRelation):
type: Optional[ClickHouseRelationType] = None
quote_policy: Policy = field(default_factory=lambda: ClickHouseQuotePolicy())
include_policy: Policy = field(default_factory=lambda: ClickHouseIncludePolicy())
quote_character: str = '`'
can_exchange: bool = False
can_on_cluster: bool = False
remote_cluster: Optional[str] = None
def __post_init__(self):
if self.database != self.schema and self.database:
raise DbtRuntimeError(f'Cannot set database {self.database} in clickhouse!')
self.path.database = ''
def render(self) -> str:
return ".".join(quote_identifier(part) for _, part in self._render_iterator() if part)
def derivative(self, suffix: str, relation_type: Optional[str] = None) -> BaseRelation:
path = Path(schema=self.path.schema, database='', identifier=self.path.identifier + suffix)
derivative_type = ClickHouseRelationType(relation_type) if relation_type else self.type
return ClickHouseRelation(type=derivative_type, path=path)
def matches(
self,
database: Optional[str] = '',
schema: Optional[str] = None,
identifier: Optional[str] = None,
):
if schema:
raise DbtRuntimeError(f'Passed unexpected schema value {schema} to Relation.matches')
return self.database == database and self.identifier == identifier
@property
def should_on_cluster(self) -> bool:
if self.include_policy.identifier:
return self.can_on_cluster
else:
# create database/schema on cluster by default
return True
@classmethod
def get_on_cluster(
cls: Type[Self], cluster: str = '', materialized: str = '', engine: str = '', database_engine: str = ''
) -> bool:
if 'replicated' in database_engine.lower():
return False
if cluster.strip():
return (
materialized in ('view', 'dictionary')
or 'distributed' in materialized
or 'Replicated' in engine
)
return False
@classmethod
def create_from(
cls: Type[Self],
quoting: HasQuoting,
relation_config: RelationConfig,
**kwargs: Any,
) -> Self:
quote_policy = kwargs.pop("quote_policy", {})
config_quoting = relation_config.quoting_dict
config_quoting.pop("column", None)
# precedence: kwargs quoting > relation config quoting > base quoting > default quoting
quote_policy = deep_merge(
cls.get_default_quote_policy().to_dict(omit_none=True),
quoting.quoting,
config_quoting,
quote_policy,
)
# If the database is set, and the source schema is "defaulted" to the source.name, override the
# schema with the database instead, since that's presumably what's intended for clickhouse
schema = relation_config.schema
can_on_cluster = None
# We placed a hardcoded const (instead of importing it from dbt-core) in order to decouple the packages
if relation_config.resource_type == NODE_TYPE_SOURCE:
if schema == relation_config.source_name and relation_config.database:
schema = relation_config.database
else:
cluster = quoting.credentials.cluster or ''
materialized = relation_config.config.get('materialized', '')
engine = relation_config.config.get('engine', '')
database_engine = quoting.credentials.database_engine or ''
can_on_cluster = cls.get_on_cluster(cluster, materialized, engine, database_engine)
return cls.create(
database='',
schema=schema,
identifier=relation_config.identifier,
quote_policy=quote_policy,
can_on_cluster=can_on_cluster,
**kwargs,
)