Skip to content

Commit cb9a314

Browse files
authored
reconcile 1.4.1 work with main (#151)
* Initial work on lw delete example * LW delete example checkpoint * Expand high volume lw delete example, fix source schema issue
1 parent 6e96295 commit cb9a314

File tree

18 files changed

+236
-6
lines changed

18 files changed

+236
-6
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version = '1.4.0'
1+
version = '1.4.1'

dbt/adapters/clickhouse/impl.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ def s3source_clause(
150150
if path:
151151
if bucket and path and not bucket.endswith('/') and not bucket.startswith('/'):
152152
path = f'/{path}'
153-
url = f'{url}{path}'
153+
url = f'{url}{path}'.replace('//', '/')
154154
if not url.startswith('http'):
155155
url = f'https://{url}'
156156
access = ''

dbt/adapters/clickhouse/relation.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
from dataclasses import dataclass, field
2-
from typing import Optional
2+
from typing import Any, Optional, Type
33

4-
from dbt.adapters.base.relation import BaseRelation, Policy
4+
from dbt.adapters.base.relation import BaseRelation, Policy, Self
5+
from dbt.contracts.graph.nodes import SourceDefinition
56
from dbt.exceptions import DbtRuntimeError
7+
from dbt.utils import deep_merge
68

79

810
@dataclass
@@ -47,3 +49,27 @@ def matches(
4749
if schema:
4850
raise DbtRuntimeError(f'Passed unexpected schema value {schema} to Relation.matches')
4951
return self.database == database and self.identifier == identifier
52+
53+
@classmethod
54+
def create_from_source(cls: Type[Self], source: SourceDefinition, **kwargs: Any) -> Self:
55+
source_quoting = source.quoting.to_dict(omit_none=True)
56+
source_quoting.pop("column", None)
57+
quote_policy = deep_merge(
58+
cls.get_default_quote_policy().to_dict(omit_none=True),
59+
source_quoting,
60+
kwargs.get("quote_policy", {}),
61+
)
62+
63+
# If the database is set, and the source schema is "defaulted" to the source.name, override the
64+
# schema with the database instead, since that's presumably what's intended for clickhouse
65+
schema = source.schema
66+
if schema == source.source_name and source.database:
67+
schema = source.database
68+
69+
return cls.create(
70+
database=source.database,
71+
schema=schema,
72+
identifier=source.identifier,
73+
quote_policy=quote_policy,
74+
**kwargs,
75+
)

dbt/include/clickhouse/macros/adapters.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@
107107
{% macro exchange_tables_atomic(old_relation, target_relation, obj_types='TABLES') %}
108108

109109
{%- if adapter.get_clickhouse_cluster_name() is not none and obj_types == 'TABLES' %}
110-
{% do run_query("SYSTEM SYNC REPLICA "+target_relation.identifier) %}
110+
{% do run_query("SYSTEM SYNC REPLICA "+ target_relation.identifier + on_cluster_clause()) %}
111111
{%- endif %}
112112

113113
{%- call statement('exchange_tables_atomic') -%}

examples/taxis/.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
2+
target/
3+
dbt_packages/
4+
logs/

examples/taxis/README.md

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# Taxis Large Incremental Model Example/Test
2+
3+
This is an example designed to test large incremental materializations. It builds a `taxis_inc` model in the
4+
`taxis_dbt` database that uses randomized keys to increase on each subsequent run.
5+
6+
## Create the source data
7+
8+
Use this SQL to create and populate the "source" data from the ClickHouse taxis example dataset.
9+
10+
```sql
11+
12+
CREATE DATABASE taxis;
13+
14+
CREATE TABLE taxis.trips (
15+
trip_id UInt32,
16+
pickup_datetime DateTime,
17+
dropoff_datetime DateTime,
18+
pickup_longitude Nullable(Float64),
19+
pickup_latitude Nullable(Float64),
20+
dropoff_longitude Nullable(Float64),
21+
dropoff_latitude Nullable(Float64),
22+
passenger_count UInt8,
23+
trip_distance Float32,
24+
fare_amount Float32,
25+
extra Float32,
26+
tip_amount Float32,
27+
tolls_amount Float32,
28+
total_amount Float32,
29+
payment_type LowCardinality(String),
30+
pickup_ntaname LowCardinality(String),
31+
dropoff_ntaname LowCardinality(String)
32+
)
33+
ENGINE = MergeTree
34+
ORDER BY trip_id;
35+
36+
SET input_format_skip_unknown_fields = 1;
37+
38+
INSERT INTO taxis.trips
39+
SELECT
40+
trip_id,
41+
pickup_datetime,
42+
dropoff_datetime,
43+
pickup_longitude,
44+
pickup_latitude,
45+
dropoff_longitude,
46+
dropoff_latitude,
47+
passenger_count,
48+
trip_distance,
49+
fare_amount,
50+
extra,
51+
tip_amount,
52+
tolls_amount,
53+
total_amount,
54+
payment_type,
55+
pickup_ntaname,
56+
dropoff_ntaname
57+
FROM s3(
58+
'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_{0..10}.gz',
59+
'TabSeparatedWithNames'
60+
);
61+
```
62+
63+
## Create a dbt profile entry
64+
65+
Use the following profile to create the associated dbt profile in the dbt_profiles.yml in ~/.dbt
66+
```yml
67+
taxis:
68+
outputs:
69+
70+
dev:
71+
type: clickhouse
72+
threads: 4
73+
host: localhost
74+
port: 8123
75+
user: dbt_test
76+
password: dbt_password
77+
use_lw_deletes: true
78+
schema: taxis_dbt
79+
80+
target: dev
81+
82+
```
83+
84+
## Run the model
85+
86+
`dbt run` in this directory should execute the model. Each run will create a somewhat larger dataset (by adding
87+
additional random trip_ids).

examples/taxis/analyses/.gitkeep

Whitespace-only changes.

examples/taxis/dbt_project.yml

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
2+
# Name your project! Project names should contain only lowercase characters
3+
# and underscores. A good package name should reflect your organization's
4+
# name or the intended use of these models
5+
name: 'taxis'
6+
version: '1.0.0'
7+
config-version: 2
8+
9+
# This setting configures which "profile" dbt uses for this project.
10+
profile: 'taxis'
11+
12+
# These configurations specify where dbt should look for different types of files.
13+
# The `model-paths` config, for example, states that models in this project can be
14+
# found in the "models/" directory. You probably won't need to change these!
15+
model-paths: ["models"]
16+
analysis-paths: ["analyses"]
17+
test-paths: ["tests"]
18+
seed-paths: ["seeds"]
19+
macro-paths: ["macros"]
20+
snapshot-paths: ["snapshots"]
21+
22+
target-path: "target" # directory which will store compiled SQL files
23+
clean-targets: # directories to be removed by `dbt clean`
24+
- "target"
25+
- "dbt_packages"
26+
27+
vars:
28+
taxi_s3:
29+
bucket: 'datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi'
30+
fmt: 'TabSeparatedWithNames'
31+
structure:
32+
- trip_id UInt32
33+
- pickup_datetime DateTime
34+
- dropoff_datetime DateTime
35+
- pickup_longitude Nullable(Float64)
36+
- pickup_latitude Nullable(Float64)
37+
- dropoff_longitude Nullable(Float64)
38+
- dropoff_latitude Nullable(Float64)
39+
- passenger_count UInt8
40+
- trip_distance Float32
41+
- fare_amount Float32
42+
- extra Float32
43+
- tip_amount Float32
44+
- tolls_amount Float32
45+
- total_amount Float32
46+
- payment_type LowCardinality(String)
47+
- pickup_ntaname LowCardinality(String)
48+
- dropoff_ntaname LowCardinality(String)

examples/taxis/macros/.gitkeep

Whitespace-only changes.

examples/taxis/models/schema.yml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
version: 2
2+
3+
models:
4+
- name: trips_inc
5+
description: NY Taxi dataset from S3
6+
config:
7+
materialized: incremental
8+
order_by: rand_trip_id
9+
unique_key: rand_trip_id
10+
11+
- name: trips_rand
12+
description: Random indexes to apply to incremental materialization
13+
config:
14+
materialized: incremental
15+
order_by: date_time
16+
uniq_id: date_time
17+
incremental_strategy: append

0 commit comments

Comments
 (0)