@@ -51,9 +51,6 @@ class ClickhouseCredentials(Credentials):
51
51
sync_request_timeout : int = 5
52
52
compress_block_size : int = 1048576
53
53
compression : str = ''
54
- use_default_schema : bool = (
55
- False # This is used in tests to make sure we connect always to the default database.
56
- )
57
54
custom_settings : Optional [Dict [str , Any ]] = None
58
55
59
56
@property
@@ -89,7 +86,6 @@ def _connection_keys(self):
89
86
'sync_request_timeout' ,
90
87
'compress_block_size' ,
91
88
'compression' ,
92
- 'use_default_schema' ,
93
89
'custom_settings' ,
94
90
)
95
91
@@ -132,63 +128,22 @@ def open(cls, connection):
132
128
driver = 'http'
133
129
elif clickhouse_driver and port in (9000 , 9440 ):
134
130
driver = 'native'
135
- else :
136
- driver = 'unspecified'
137
- custom_settings = credentials .custom_settings or {}
138
- connection .state = 'fail'
139
- db_err = None
131
+ client = None
140
132
if clickhouse_connect and driver == 'http' :
141
- try :
142
- connection .handle = clickhouse_connect .get_client (
143
- host = credentials .host ,
144
- port = credentials .port ,
145
- database = 'default' if credentials .use_default_schema else credentials .schema ,
146
- username = credentials .user ,
147
- password = credentials .password ,
148
- interface = 'https' if credentials .secure else 'http' ,
149
- compress = False
150
- if credentials .compression == ''
151
- else bool (credentials .compression ),
152
- connect_timeout = credentials .connect_timeout ,
153
- send_receive_timeout = credentials .send_receive_timeout ,
154
- http_user_agent = f'cc-dbt-{ dbt_version } ' ,
155
- verify = credentials .verify ,
156
- query_limit = 0 ,
157
- session_id = 'dbt::' + str (uuid .uuid4 ()),
158
- ** custom_settings ,
159
- )
160
- except clickhouse_connect .driver .exceptions .DatabaseError as exp :
161
- db_err = exp
133
+ client , db_err = _connect_http (credentials )
162
134
elif clickhouse_driver and driver == 'native' :
163
- try :
164
- client = clickhouse_driver .Client (
165
- host = credentials .host ,
166
- port = credentials .port ,
167
- database = 'default' ,
168
- user = credentials .user ,
169
- password = credentials .password ,
170
- client_name = f'dbt-{ dbt_version } ' ,
171
- secure = credentials .secure ,
172
- verify = credentials .verify ,
173
- connect_timeout = credentials .connect_timeout ,
174
- send_receive_timeout = credentials .send_receive_timeout ,
175
- sync_request_timeout = credentials .sync_request_timeout ,
176
- compress_block_size = credentials .compress_block_size ,
177
- compression = False if credentials .compression == '' else credentials .compression ,
178
- ** custom_settings ,
179
- )
180
- connection .handle = ChNativeAdapter (client )
181
- except clickhouse_driver .errors as exp :
182
- db_err = exp
135
+ client , db_err = _connect_native (credentials )
183
136
else :
184
- raise dbt .exceptions .FailedToConnectException (
185
- f'Library for ClickHouse driver type { driver } not found'
186
- )
137
+ db_err = f'Library for ClickHouse driver type { driver } not found'
187
138
if db_err :
139
+ connection .state = 'fail'
188
140
logger .debug (
189
141
'Got an error when attempting to open a clickhouse connection: \' {}\' ' , str (db_err )
190
142
)
143
+ if client :
144
+ client .close ()
191
145
raise dbt .exceptions .FailedToConnectException (str (db_err ))
146
+ connection .handle = client
192
147
connection .state = 'open'
193
148
return connection
194
149
@@ -286,3 +241,61 @@ def begin(self):
286
241
287
242
def commit (self ):
288
243
pass
244
+
245
+
246
+ def _connect_http (credentials ):
247
+ try :
248
+ client = clickhouse_connect .get_client (
249
+ host = credentials .host ,
250
+ port = credentials .port ,
251
+ username = credentials .user ,
252
+ password = credentials .password ,
253
+ interface = 'https' if credentials .secure else 'http' ,
254
+ compress = False if credentials .compression == '' else bool (credentials .compression ),
255
+ connect_timeout = credentials .connect_timeout ,
256
+ send_receive_timeout = credentials .send_receive_timeout ,
257
+ client_name = f'cc-dbt-{ dbt_version } ' ,
258
+ verify = credentials .verify ,
259
+ query_limit = 0 ,
260
+ session_id = 'dbt::' + str (uuid .uuid4 ()),
261
+ ** (credentials .custom_settings or {}),
262
+ )
263
+ return client , _ensure_database (client , credentials .schema )
264
+ except clickhouse_connect .driver .exceptions .DatabaseError as exp :
265
+ return None , exp
266
+
267
+
268
+ def _connect_native (credentials ):
269
+ try :
270
+ client = clickhouse_driver .Client (
271
+ host = credentials .host ,
272
+ port = credentials .port ,
273
+ user = credentials .user ,
274
+ password = credentials .password ,
275
+ client_name = f'dbt-{ dbt_version } ' ,
276
+ secure = credentials .secure ,
277
+ verify = credentials .verify ,
278
+ connect_timeout = credentials .connect_timeout ,
279
+ send_receive_timeout = credentials .send_receive_timeout ,
280
+ sync_request_timeout = credentials .sync_request_timeout ,
281
+ compress_block_size = credentials .compress_block_size ,
282
+ compression = False if credentials .compression == '' else credentials .compression ,
283
+ ** (credentials .custom_settings or {}),
284
+ )
285
+ client = ChNativeAdapter (client )
286
+ return client , _ensure_database (client , credentials .schema )
287
+ except clickhouse_driver .errors .Error as exp :
288
+ return None , exp
289
+
290
+
291
+ def _ensure_database (client , database ):
292
+ if database :
293
+ check_db = f'EXISTS DATABASE { database } '
294
+ db_exists = client .command (check_db )
295
+ if not db_exists :
296
+ client .command (f'CREATE DATABASE { database } ' )
297
+ db_exists = client .command (check_db )
298
+ if not db_exists :
299
+ return f'Unable to create DBT database { database } '
300
+ client .database = database
301
+ return None
0 commit comments