Skip to content

feat: microbatch strategy #404

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### Release [x.x.x]
### Improvements
* Add support for "microbatch" incremental strategy.

### Release [1.8.9], 2025-02-16

#### Improvements
Expand Down
27 changes: 26 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pip install dbt-clickhouse
- [x] Table materialization
- [x] View materialization
- [x] Incremental materialization
- [x] Microbatch incremental materialization
- [x] Materialized View materializations (uses the `TO` form of MATERIALIZED VIEW, experimental)
- [x] Seeds
- [x] Sources
Expand Down Expand Up @@ -114,13 +115,23 @@ your_profile_name:
| primary_key | Like order_by, a ClickHouse primary key expression. If not specified, ClickHouse will use the order by expression as the primary key | |
| unique_key | A tuple of column names that uniquely identify rows. Used with incremental models for updates. | |
| inserts_only | If set to True for an incremental model, incremental updates will be inserted directly to the target table without creating intermediate table. It has been deprecated in favor of the `append` incremental `strategy`, which operates in the same way. If `inserts_only` is set, `incremental_strategy` is ignored. | |
| incremental_strategy | Incremental model update strategy: `delete+insert`, `append`, or `insert_overwrite`. See the following Incremental Model Strategies | `default` |
| incremental_strategy | Incremental model update strategy: `delete+insert`, `append`, `insert_overwrite`, or `microbatch`. See the following Incremental Model Strategies | `default` |
| incremental_predicates | Additional conditions to be applied to the incremental materialization (only applied to `delete+insert` strategy | |
| settings | A map/dictionary of "TABLE" settings to be used to DDL statements like 'CREATE TABLE' with this model | |
| query_settings | A map/dictionary of ClickHouse user level settings to be used with `INSERT` or `DELETE` statements in conjunction with this model | |
| ttl | A TTL expression to be used with the table. The TTL expression is a string that can be used to specify the TTL for the table. | |
| indexes | A list of indexes to create, available only for `table` materialization. For examples look at ([#397](https://github.com/ClickHouse/dbt-clickhouse/pull/397)) | |

## Microbatch Configuration

| Option | Description | Default if any |
|--------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------|
| event_time | The column indicating "at what time did the row occur." Required for your microbatch model and any direct parents that should be filtered. | |
| begin | The "beginning of time" for the microbatch model. This is the starting point for any initial or full-refresh builds. For example, a daily-grain microbatch model run on 2024-10-01 with begin = '2023-10-01 will process 366 batches (it's a leap year!) plus the batch for "today." | |
| batch_size | The granularity of your batches. Supported values are `hour`, `day`, `month`, and `year` | |
| lookback | Process X batches prior to the latest bookmark to capture late-arriving records. | 1 |
| concurrent_batches | Overrides dbt's auto detect for running batches concurrently (at the same time). Read more about [configuring concurrent batches](https://docs.getdbt.com/docs/build/incremental-microbatch#configure-concurrent_batches). Setting to true runs batches concurrently (in parallel). false runs batches sequentially (one after the other). | |

## Column Configuration

| Option | Description | Default if any |
Expand Down Expand Up @@ -221,6 +232,20 @@ caveats to using this strategy:
incremental predicates should only include sub-queries on data that will not be modified during the incremental
materialization.

### The Microbatch Strategy (Requires dbt-core >= 1.9)

The incremental strategy `microbatch` has been a dbt-core feature since version 1.9, designed to handle large
time-series data transformations efficiently. In dbt-clickhouse, it builds on top of the existing `delete_insert`
incremental strategy by splitting the increment into predefined time-series batches based on the `event_time` and
`batch_size` model configurations.

Beyond handling large transformations, microbatch provides the ability to:
- [Reprocess failed batches](https://docs.getdbt.com/docs/build/incremental-microbatch#retry).
- Auto-detect [parallel batch execution](https://docs.getdbt.com/docs/build/parallel-batch-execution).
- Eliminate the need for complex conditional logic in [backfilling](https://docs.getdbt.com/docs/build/incremental-microbatch#backfills).

For detailed microbatch usage, refer to the [official documentation](https://docs.getdbt.com/docs/build/incremental-microbatch).

### The Append Strategy

This strategy replaces the `inserts_only` setting in previous versions of dbt-clickhouse. This approach simply appends
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/clickhouse/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = '1.8.9'
version = '1.9.0'
35 changes: 28 additions & 7 deletions dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,37 @@ def calculate_incremental_strategy(self, strategy: str) -> str:
if not strategy or strategy == 'default':
strategy = 'delete_insert' if conn.handle.use_lw_deletes else 'legacy'
strategy = strategy.replace('+', '_')
if strategy not in ['legacy', 'append', 'delete_insert', 'insert_overwrite']:
return strategy

@available.parse_none
def validate_incremental_strategy(
self,
strategy: str,
predicates: list,
unique_key: str,
partition_by: str,
) -> None:
conn = self.connections.get_if_exists()
if strategy not in ('legacy', 'append', 'delete_insert', 'insert_overwrite', 'microbatch'):
raise DbtRuntimeError(
f"The incremental strategy '{strategy}' is not valid for ClickHouse"
f"The incremental strategy '{strategy}' is not valid for ClickHouse."
)
if not conn.handle.has_lw_deletes and strategy == 'delete_insert':
logger.warning(
'Lightweight deletes are not available, using legacy ClickHouse strategy'
if strategy in ('delete_insert', 'microbatch') and not conn.handle.has_lw_deletes:
raise DbtRuntimeError(
f"'{strategy}' strategy requires setting the profile config 'use_lw_deletes' to true."
)
strategy = 'legacy'
return strategy
if strategy in ('delete_insert', 'microbatch') and not unique_key:
raise DbtRuntimeError(f"'{strategy}' strategy requires a non-empty 'unique_key'.")
if strategy not in ('delete_insert', 'microbatch') and predicates:
raise DbtRuntimeError(
f"Cannot apply incremental predicates with '{strategy}' strategy."
)
if strategy == 'insert_overwrite' and not partition_by:
raise DbtRuntimeError(
f"'{strategy}' strategy requires non-empty 'partition_by'. Current partition_by is {partition_by}."
)
if strategy == 'insert_overwrite' and unique_key:
raise DbtRuntimeError(f"'{strategy}' strategy does not support unique_key.")

@available.parse_none
def check_incremental_schema_changes(
Expand Down
23 changes: 20 additions & 3 deletions dbt/adapters/clickhouse/relation.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Type
from typing import Any, Optional, Type

from dbt.adapters.base.relation import BaseRelation, Path, Policy, Self
from dbt.adapters.base.relation import BaseRelation, EventTimeFilter, Path, Policy, Self
from dbt.adapters.contracts.relation import HasQuoting, RelationConfig
from dbt_common.dataclass_schema import StrEnum
from dbt_common.exceptions import DbtRuntimeError
Expand Down Expand Up @@ -53,6 +53,20 @@ def __post_init__(self):
def render(self) -> str:
return ".".join(quote_identifier(part) for _, part in self._render_iterator() if part)

def _render_event_time_filtered(self, event_time_filter: EventTimeFilter) -> str:
"""
Returns "" if start and end are both None
"""
filter = ""
if event_time_filter.start and event_time_filter.end:
filter = f"{event_time_filter.field_name} >= '{event_time_filter.start.strftime('%Y-%m-%d %H:%M:%S')}' and {event_time_filter.field_name} < '{event_time_filter.end.strftime('%Y-%m-%d %H:%M:%S')}'"
elif event_time_filter.start:
filter = f"{event_time_filter.field_name} >= '{event_time_filter.start.strftime('%Y-%m-%d %H:%M:%S')}'"
elif event_time_filter.end:
filter = f"{event_time_filter.field_name} < '{event_time_filter.end.strftime('%Y-%m-%d %H:%M:%S')}'"

return filter

def derivative(self, suffix: str, relation_type: Optional[str] = None) -> BaseRelation:
path = Path(schema=self.path.schema, database='', identifier=self.path.identifier + suffix)
derivative_type = ClickHouseRelationType(relation_type) if relation_type else self.type
Expand Down Expand Up @@ -113,12 +127,15 @@ def create_from(
# schema with the database instead, since that's presumably what's intended for clickhouse
schema = relation_config.schema

cluster = quoting.credentials.cluster or ''
can_on_cluster = None
cluster = ""
# We placed a hardcoded const (instead of importing it from dbt-core) in order to decouple the packages
if relation_config.resource_type == NODE_TYPE_SOURCE:
if schema == relation_config.source_name and relation_config.database:
schema = relation_config.database
else:
# quoting is only available for non-source nodes
cluster = quoting.credentials.cluster or ""

if cluster and str(relation_config.config.get("force_on_cluster")).lower() == "true":
can_on_cluster = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,30 +80,22 @@
{% endif %}

{% set incremental_strategy = adapter.calculate_incremental_strategy(config.get('incremental_strategy')) %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% set incremental_predicates = config.get('predicates', []) or config.get('incremental_predicates', []) %}
{% set partition_by = config.get('partition_by') %}
{% do adapter.validate_incremental_strategy(incremental_strategy, incremental_predicates, unique_key, partition_by) %}
{%- if on_schema_change != 'ignore' %}
{%- set local_column_changes = adapter.check_incremental_schema_changes(on_schema_change, existing_relation_local, sql) -%}
{% if local_column_changes and incremental_strategy != 'legacy' %}
{% do clickhouse__apply_column_changes(local_column_changes, existing_relation, True) %}
{% set existing_relation = load_cached_relation(this) %}
{% endif %}
{% endif %}
{% if incremental_strategy != 'delete_insert' and incremental_predicates %}
{% do exceptions.raise_compiler_error('Cannot apply incremental predicates with ' + incremental_strategy + ' strategy.') %}
{% endif %}
{% if incremental_strategy == 'legacy' %}
{% do clickhouse__incremental_legacy(existing_relation, intermediate_relation, local_column_changes, unique_key, True) %}
{% set need_swap = true %}
{% elif incremental_strategy == 'delete_insert' %}
{% do clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates, True) %}
{% elif incremental_strategy == 'insert_overwrite' %}
{%- set partition_by = config.get('partition_by') -%}
{% if partition_by is none or partition_by|length == 0 %}
{% do exceptions.raise_compiler_error(incremental_strategy + ' strategy requires nonempty partition_by. Current partition_by is ' ~ partition_by) %}
{% endif %}
{% if inserts_only or unique_key is not none or incremental_predicates is not none %}
{% do exceptions.raise_compiler_error(incremental_strategy + ' strategy does not support inserts_only, unique_key, and incremental predicates.') %}
{% endif %}
{% do clickhouse__incremental_insert_overwrite(existing_relation, partition_by, True) %}
{% elif incremental_strategy == 'append' %}
{% call statement('main') %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,34 +56,34 @@
{% else %}
{% set column_changes = none %}
{% set incremental_strategy = adapter.calculate_incremental_strategy(config.get('incremental_strategy')) %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% set incremental_predicates = config.get('predicates', []) or config.get('incremental_predicates', []) %}
{% set partition_by = config.get('partition_by') %}
{% do adapter.validate_incremental_strategy(incremental_strategy, incremental_predicates, unique_key, partition_by) %}
{%- if on_schema_change != 'ignore' %}
{%- set column_changes = adapter.check_incremental_schema_changes(on_schema_change, existing_relation, sql) -%}
{% if column_changes and incremental_strategy != 'legacy' %}
{% do clickhouse__apply_column_changes(column_changes, existing_relation) %}
{% set existing_relation = load_cached_relation(this) %}
{% endif %}
{% endif %}
{% if incremental_strategy != 'delete_insert' and incremental_predicates %}
{% do exceptions.raise_compiler_error('Cannot apply incremental predicates with ' + incremental_strategy + ' strategy.') %}
{% endif %}
{% if incremental_strategy == 'legacy' %}
{% do clickhouse__incremental_legacy(existing_relation, intermediate_relation, column_changes, unique_key) %}
{% set need_swap = true %}
{% elif incremental_strategy == 'delete_insert' %}
{% do clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates) %}
{% elif incremental_strategy == 'microbatch' %}
{%- if config.get("__dbt_internal_microbatch_event_time_start") -%}
{% do incremental_predicates.append(config.get("event_time") ~ " >= toDateTime('" ~ config.get("__dbt_internal_microbatch_event_time_start").strftime("%Y-%m-%d %H:%M:%S") ~ "')") %}
{%- endif -%}
{%- if model.config.__dbt_internal_microbatch_event_time_end -%}
{% do incremental_predicates.append(config.get("event_time") ~ " < toDateTime('" ~ config.get("__dbt_internal_microbatch_event_time_end").strftime("%Y-%m-%d %H:%M:%S") ~ "')") %}
{%- endif -%}
{% do clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates) %}
{% elif incremental_strategy == 'append' %}
{% call statement('main') %}
{{ clickhouse__insert_into(target_relation, sql) }}
{% endcall %}
{% elif incremental_strategy == 'insert_overwrite' %}
{%- set partition_by = config.get('partition_by') -%}
{% if partition_by is none or partition_by|length == 0 %}
{% do exceptions.raise_compiler_error(incremental_strategy + ' strategy requires nonempty partition_by. Current partition_by is ' ~ partition_by) %}
{% endif %}
{% if inserts_only or unique_key is not none or incremental_predicates is not none %}
{% do exceptions.raise_compiler_error(incremental_strategy + ' strategy does not support inserts_only, unique_key, and incremental predicates.') %}
{% endif %}
{% do clickhouse__incremental_insert_overwrite(existing_relation, partition_by, False) %}
{% endif %}
{% endif %}
Expand Down
6 changes: 3 additions & 3 deletions dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
dbt-core~=1.8.0
dbt-adapters~=1.1.1
dbt-tests-adapter==1.8.0
dbt-core>=1.9.0
dbt-adapters>=1.10,<2.0
dbt-tests-adapter>=1.10,<2.0
clickhouse-connect>=0.7.6
clickhouse-driver>=0.2.7
pytest>=7.2.0
Expand Down
9 changes: 4 additions & 5 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@ def _dbt_clickhouse_version():
package_version = _dbt_clickhouse_version()
description = '''The Clickhouse plugin for dbt (data build tool)'''

dbt_version = '1.8.0'
dbt_minor = '.'.join(dbt_version.split('.')[0:2])
dbt_minor_version = '1.9'

if not package_version.startswith(dbt_minor):
if not package_version.startswith(dbt_minor_version):
raise ValueError(
f'Invalid setup.py: package_version={package_version} must start with '
f'dbt_version={dbt_minor}'
f'dbt_version={dbt_minor_version}'
)


Expand All @@ -54,7 +53,7 @@ def _dbt_clickhouse_version():
]
},
install_requires=[
f'dbt-core~={dbt_version}',
f'dbt-core>={dbt_minor_version}',
'clickhouse-connect>=0.6.22',
'clickhouse-driver>=0.2.6',
'setuptools>=0.69',
Expand Down
Loading