Skip to content

Feat/2200 add clickhouse distributed support #2573

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: devel
Choose a base branch
from

Conversation

zstipanicev
Copy link

Description

When Clickhouse is setup with replicas and distributed tables, DDL & DML statements need to be modified:

  • ON CLUSTER needs to added after the table name
  • A pair of base table and a distributed table needs to be created
  • ALTER and DROP need to be executed for both tables, base and distributed
  • Deletes need to be done on the base table
  • Add GLOBAL to all joins as we can't know which do not needed

This was achieved by adding additional configuration parameters for Clickhouse and modifying queries in the execute_query function in the sql_client file.
This way we didn't change any core dlt functionality and all changes are restricted only to clickhouse destination and only in a single function.
And for Clickhouse changes will be applied only if the configuration is set for the distributed Clickhouse setup

To test the changes Clickhouse with a few replicated nodes is needed.

Related Issues

Additional Context

Copy link

netlify bot commented Apr 29, 2025

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit 5593683
🔍 Latest deploy log https://app.netlify.com/projects/dlt-hub-docs/deploys/6841c5ac9f976d00082ff682

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for this @zstipanicev !

what I'd ideally do here:

  1. try to implement all DDL statements by changing clickhouse specific code without rewriting the queries:
  • CREATE?ALTER TABLE: def _get_table_update_sql( you can call SQL generation twice with different table names. you can also temporarily change database name for distributed table
  • I do not think you need to create distributed pair for SENTINEL table (we mark existence of dataset with it)
  • DROP: in drop_tables. we also have duplicate code in drop_dataset, you can just call drop_tables from there.

Other queries it is indeed best to rewrite. Are you familiar with SQLGlot? we have it as required dependency. This is for example you need to do to add GLOBAL join:

# 1.  Parse using the ClickHouse dialect
    tree = sqlglot.parse_one(sql, read="clickhouse")

    # 2.  Walk the AST once and flip the `global` flag
    def _add_global(node: exp.Expression) -> exp.Expression:
        if isinstance(node, exp.Join) and not node.args.get("global"):
            node.set("global", True)          # <- the magic line
        return node

    tree = tree.transform(_add_global)

    # 3.  Re-emit SQL for ClickHouse
    return tree.sql(dialect="clickhouse")

renaming tables is also easy. we are trying to port all our SQL generation to sqlglot (slowly)

this OFC depends how much time do you have. if you cannot do those changes, we'll take over (this fix makes a lot of sense) but that will take ~2 releases to complete

"""Set to True if ClickHouse tables are distributed/shareded across multiple nodes, this will enable creating base and distributed tables."""
cluster: Optional[str] = None
"""Cluster name for sharded tables. This is used in ON CLUSTER clause for sharded distributed tables"""
base_table_database_prefix: Optional[str] = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need to have two databases/catalog?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At my current company, this is our internal rule/best practice, we store base tables in "_db" with added "_base" postfix and disitributed tables in "db". Only distributed tables are accessed directly by tools (dbt, reporting, ...)
If database prefix is left blank it would use the same database for both type of tables

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK this makes sense! we'll need to document this workflow in our clickhouse docs (clickhouse.md AFAIK, also the default behavior)

"""Cluster name for sharded tables. This is used in ON CLUSTER clause for sharded distributed tables"""
base_table_database_prefix: Optional[str] = None
"""Prefix for the database name of the base table. This is used for sharded distributed tables."""
base_table_name_postfix: Optional[str] = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we have a separate database, why table names must differ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just trying to comply with my current company rules, it can be kept blank to keep the same name.
Having both options adds flexibility to support various setups in different companies.

# db name for the base table
base_db = self.config.base_table_database_prefix + self.credentials.database

if (self.contains_string(qry, "CREATE") and self.contains_string(qry, "ENGINE = Memory")):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can just change it in _to_temp_table for all types of tables if that performs better

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is also a solution.
The change here is not about perfomance, memory engine tables exist on a single replica only and that doesn't work with distributed setup. If the next query reading from that table is connected to a different replica than the one used for inserting data, it would see an empty table.
The change here is to ensure we always see the data.
Sorry for not documenting that in the comment as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at the _to_temp_table function I don't think it's possible to make the change there as it doesn't have access to the configuration and without it it's not possible to determine if it's distributed setup or not.
Is it possible change the class or the function to have access to the config?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we can do that. in your PR you can change it to ReplicatedMergeTree and then we can do the final tweak (passing config will need code refactor with which you should IMO not bother).

@zstipanicev
Copy link
Author

Thx for all the suggestions @rudolfix!
I'll have a look at those.

@rudolfix
Copy link
Collaborator

rudolfix commented May 2, 2025

Thx for all the suggestions @rudolfix! I'll have a look at those.

please keep me posted, this PR is pretty cool so if you get stuck or don't have time lmk.

@zstipanicev
Copy link
Author

zstipanicev commented May 15, 2025

Hey @rudolfix

I adjusted the PR.

  • Changed CREATE, ALTER, and DROP statements in functions where they are defined
  • Added new config options to clickhouse.md with an additional explanation.

I decided not to change the code within _to_temp_table because it's not only about the engine. The ON CLUSTER clause would also need to be added, but without access to the configured cluster name, it cannot be made generic.
I'm also unsure whether ReplicatedMergeTree would work for setups with a single node, as I don't have an instance to test it.
This will still be handled in execute_query.

I used SQLGlot where possible. Unfortunately, SQLGlot doesn't fully support every ClickHouse statement, and I clearly stated this in the comments.
The solution looks a bit like Frankenstein's monster, combining sqlglot and RegExp.
I tested the code with GSheet, Stripe, and Salesforce, and it runs without issue.
Let me know if you have any suggestions or questions.

P.S. SQLGlot changes ` to ", I couldn't find a way to change that behaviour

Edit:
Latest commit fixes a bug with TRUNCATE statments, they should work on base tables and not distributed tables or the data is not removed (also stated in the comment). Good thing about testing in production is that all bugs are cought... eventually...

Edit 2:
I have initially missed that it's not allowed to run updates on the disitributed table, those need to be executed on the base table. This is now adjusted. Sorry for missing this initially. I am now testing merge with scd2 and this came up.
Also, I could not find the function which generates update statments. I found gen_update_table_prefix and that one currently doesn't have access to config so I can't make it work for a single node and a disitributed setup. Again, I'm hadling this in the execute_query

@rudolfix rudolfix added the ci from fork run ci workflows on a pr even if they are from a fork label May 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci from fork run ci workflows on a pr even if they are from a fork
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Cluster support for clickhouse
2 participants