| page_title | clickhouse_clickpipe Resource - clickhouse |
|---|---|
| subcategory | |
| description | You can use the clickhouse_clickpipe resource to create and manage ClickPipes data ingestion pipelines in ClickHouse Cloud. Supported source types: Kafka (Confluent, MSK, Azure Event Hubs, Redpanda, WarpStream), Object Storage (S3, GCS, Azure Blob), Kinesis, Postgres CDC, MySQL CDC, BigQuery, and MongoDB CDC. Known limitations: ClickPipe does not support table updates for managed tables. If you need to update the table schema, you will have to do that externally.Changing the source type of an existing ClickPipe will force replacement (destroy and recreate). |
You can use the clickhouse_clickpipe resource to create and manage ClickPipes data ingestion pipelines in ClickHouse Cloud.
Supported source types: Kafka (Confluent, MSK, Azure Event Hubs, Redpanda, WarpStream), Object Storage (S3, GCS, Azure Blob), Kinesis, Postgres CDC, MySQL CDC, BigQuery, and MongoDB CDC.
Known limitations:
- ClickPipe does not support table updates for managed tables. If you need to update the table schema, you will have to do that externally.
- Changing the source type of an existing ClickPipe will force replacement (destroy and recreate).
resource "clickhouse_clickpipe" "kafka_clickpipe" {
name = "My Kafka ClickPipe"
service_id = "e9465b4b-f7e5-4937-8e21-8d508b02843d"
scaling {
replicas = 2
replica_cpu_millicores = 250
replica_memory_gb = 1.0
}
source {
kafka {
type = "confluent"
format = "JSONEachRow"
brokers = "my-kafka-broker:9092"
topics = "my_topic"
consumer_group = "clickpipe-test"
credentials {
username = "user"
password = "***"
}
}
}
destination {
table = "my_table"
managed_table = true
table_definition {
engine {
type = "MergeTree"
}
}
columns {
name = "my_field1"
type = "String"
}
columns {
name = "my_field2"
type = "UInt64"
}
}
field_mappings = [
{
source_field = "my_field"
destination_field = "my_field1"
}
]
}destination(Attributes) The destination for the ClickPipe. (see below for nested schema)name(String) The name of the ClickPipe.service_id(String) The ID of the service to which the ClickPipe belongs.source(Attributes) The data source for the ClickPipe. At least one source configuration must be provided. (see below for nested schema)
field_mappings(Attributes List) Field mapping between source and destination table. (see below for nested schema)scaling(Attributes) (see below for nested schema)settings(Dynamic) Advanced configuration options for the ClickPipe. These settings are specific to each pipe. For the complete list of available options, see the OpenAPI documentation at https://clickhouse.com/docs/cloud/manage/api/swagger (search for the ClickPipes settings endpoint).stopped(Boolean) Whether the ClickPipe should be stopped. Default isfalse(ClickPipe will be running). Cannot be set totrueon creation — the ClickPipe must be created in a running state and then stopped via a subsequent apply.trigger_resync(Boolean) Set totrueto trigger a resync operation. Only applicable for Postgres pipes. Automatically resets tofalseafter the resync is triggered. Note: This will always show a diff interraform planafter setting totruesince it resets tofalsein state.
id(String) The ID of the ClickPipe. Generated by the ClickHouse Cloud.state(String) The current state of the ClickPipe. This is a read-only field that reports the actual state from ClickHouse Cloud. Possible values includeRunning,Stopped,Paused,Provisioning,Failed,InternalError, etc.
Optional:
columns(Attributes List) The list of columns for the ClickHouse table. Required for all sources except Postgres CDC (where columns are determined from source tables). (see below for nested schema)database(String) The name of the ClickHouse database. Default isdefault.managed_table(Boolean) Whether the table is managed by ClickHouse Cloud. Iffalse, the table must exist in the database. Default istrue.roles(List of String) ClickPipe will create a ClickHouse user with these roles. Add your custom roles here if required.table(String) The name of the ClickHouse table. Required for all sources except Postgres CDC (where tables are created from table_mappings).table_definition(Attributes) Definition of the destination table. Required for ClickPipes managed tables. (see below for nested schema)
Required:
name(String) The name of the column.type(String) The type of the column.
Required:
engine(Attributes) The engine of the ClickHouse table. (see below for nested schema)
Optional:
partition_by(String) The column to partition the table by.primary_key(String) The primary key of the table.sorting_key(List of String) The list of columns for the sorting key.
Required:
type(String) The type of the engine. Supported engines:MergeTree,ReplacingMergeTree,SummingMergeTree,Null.
Optional:
column_ids(List of String) Column IDs to sum for SummingMergeTree engine. Required when engine type isSummingMergeTree.version_column_id(String) Column ID to use as version for ReplacingMergeTree engine. Required when engine type isReplacingMergeTree.
Optional:
bigquery(Attributes) The BigQuery source configuration for the ClickPipe. (see below for nested schema)kafka(Attributes) The Kafka source configuration for the ClickPipe. (see below for nested schema)kinesis(Attributes) The Kinesis source configuration for the ClickPipe. Onlyauthentication,iam_roleandaccess_keycan be updated in place; changing any other field forces resource replacement (destroy and recreate). (see below for nested schema)mongodb(Attributes) The MongoDB CDC source configuration for the ClickPipe. (see below for nested schema)mysql(Attributes) The MySQL CDC source configuration for the ClickPipe. (see below for nested schema)object_storage(Attributes) The compatible object storage source configuration for the ClickPipe. (see below for nested schema)postgres(Attributes) The Postgres CDC source configuration for the ClickPipe. (see below for nested schema)pubsub(Attributes) The GCP Pub/Sub source configuration for the ClickPipe. (see below for nested schema)
Required:
credentials(Attributes, Sensitive) The credentials for BigQuery access. (see below for nested schema)settings(Attributes) Settings for the BigQuery pipe. (see below for nested schema)snapshot_staging_path(String) GCS bucket path for staging snapshot data (e.g., gs://my-bucket/staging/). Data will be automatically cleaned up after initial load.table_mappings(Attributes List) Table mappings from BigQuery source to ClickHouse destination. (see below for nested schema)
Required:
service_account_file(String, Sensitive) Google Cloud service account JSON key file content, base64 encoded.
Required:
replication_mode(String) Replication mode for the BigQuery pipe. (snapshot)
Optional:
allow_nullable_columns(Boolean) Allow nullable columns in the destination table.initial_load_parallelism(Number) Number of parallel workers during initial load.snapshot_num_rows_per_partition(Number) Number of rows to snapshot per partition.snapshot_number_of_parallel_tables(Number) Number of parallel tables to snapshot.
Required:
source_dataset_name(String) Source BigQuery dataset name.source_table(String) Source table name in BigQuery.target_table(String) Target table name in ClickHouse.
Optional:
excluded_columns(List of String) Columns to exclude from replication.sorting_keys(List of String) Ordered list of columns to use as sorting key for the target table. Required when use_custom_sorting_key is true.table_engine(String) Table engine to use for the target table. (MergeTree,ReplacingMergeTree,Null)use_custom_sorting_key(Boolean) Whether to use a custom sorting key for the target table.
Required:
brokers(String) The list of Kafka bootstrap brokers. (comma separated)format(String) The format of the Kafka source. (JSONEachRow,Avro,AvroConfluent,Protobuf)topics(String) The list of Kafka topics. (comma separated)
Optional:
authentication(String) The authentication method for the Kafka source. (PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,IAM_ROLE,IAM_USER,MUTUAL_TLS). Default isPLAIN.ca_certificate(String) PEM encoded CA certificates to validate the broker's certificate.consumer_group(String) Consumer group of the Kafka source. If not providedclickpipes-<ID>will be used.credentials(Attributes) The credentials for the Kafka source. (see below for nested schema)iam_role(String) The IAM role for the Kafka source. Use withIAM_ROLEauthentication. It can be used with AWS ClickHouse service only. Read more at https://clickhouse.com/docs/en/integrations/clickpipes/kafka#iamoffset(Attributes) The Kafka offset. (see below for nested schema)reverse_private_endpoint_ids(List of String) The list of reverse private endpoint IDs for the Kafka source. (comma separated)schema_registry(Attributes) The schema registry for the Kafka source. (see below for nested schema)type(String) The type of the Kafka source. (kafka,redpanda,confluent,msk,warpstream,azureeventhub,gcmk). Default iskafka.
Optional:
access_key_id(String, Sensitive) The access key ID for the Kafka source. Use withIAM_USERauthentication.certificate(String, Sensitive) PEM encoded client certificate for mTLS authentication. Use withMUTUAL_TLSauthentication.connection_string(String, Sensitive) The connection string for the Kafka source. Use withazureeventhubKafka source type. Use withPLAINauthentication.password(String, Sensitive) The password for the Kafka source. Usepassword_woinstead to keep the value out of state.password_wo(String, Sensitive, Write-only) Write-only password for the Kafka source. Not persisted to state. Pair withpassword_wo_versionto trigger updates.password_wo_version(Number) Version trigger forpassword_wo. Increment to push a new password to the API.private_key(String, Sensitive) PEM encoded client private key for mTLS authentication. Use withMUTUAL_TLSauthentication.secret_key(String, Sensitive) The secret key for the Kafka source. Use withIAM_USERauthentication.username(String, Sensitive) The username for the Kafka source.
Required:
strategy(String) The offset strategy for the Kafka source. (from_beginning,from_latest,from_timestamp)
Optional:
timestamp(String) The timestamp for the Kafka offset. Use withfrom_timestampoffset strategy. (format2021-01-01T00:00)
Required:
authentication(String) The authentication method for the Schema Registry. Only supported isPLAIN.credentials(Attributes, Sensitive) The credentials for the Schema Registry. (see below for nested schema)url(String) The URL of the schema registry.
Required:
username(String, Sensitive) The username for the Schema Registry.
Optional:
password(String, Sensitive) The password for the Schema Registry. Eitherpasswordorpassword_womust be provided.password_wo(String, Sensitive, Write-only) Write-only password for the Schema Registry. Not persisted to state. Pair withpassword_wo_versionto trigger updates.password_wo_version(Number) Version trigger forpassword_wo. Increment to push a new password to the API.
Required:
authentication(String) The authentication method for the Kinesis source. (IAM_ROLE,IAM_USER).format(String) The format of the Kinesis source. (JSONEachRow,Avro,AvroConfluent)iterator_type(String) The iterator type for the Kinesis source. (TRIM_HORIZON,LATEST,AT_TIMESTAMP)region(String) The AWS region of the Kinesis stream.stream_name(String) The name of the Kinesis stream.
Optional:
access_key(Attributes) The access key for the Kinesis source. Use withIAM_USERauthentication. Can be rotated in place via an update. (see below for nested schema)iam_role(String) The IAM role for the Kinesis source. Use withIAM_ROLEauthentication. It can be used with AWS ClickHouse service only. Read more at https://clickhouse.com/docs/en/integrations/clickpipes/kinesis.timestamp(String) The timestamp for the Kinesis source. Use withAT_TIMESTAMPiterator type. (format2021-01-01T00:00)use_enhanced_fan_out(Boolean) Whether to use enhanced fan-out consumer.
Required:
access_key_id(String, Sensitive) The access key ID for the Kinesis source.secret_key(String, Sensitive) The secret key for the Kinesis source.
Required:
read_preference(String) MongoDB read preference for replica set reads. (primary,primaryPreferred,secondary,secondaryPreferred,nearest)settings(Attributes) Settings for the MongoDB CDC pipe. (see below for nested schema)table_mappings(Attributes Set) Collection mappings from MongoDB source to ClickHouse destination. (see below for nested schema)uri(String) MongoDB connection URI. Supports both standard URIs (mongodb://...) and SRV URIs (mongodb+srv://...).
Optional:
ca_certificate(String) PEM encoded CA certificate to validate the MongoDB server certificate.credentials(Attributes, Sensitive) The credentials for the MongoDB instance (username and password). Optional if credentials are embedded in the URI. (see below for nested schema)disable_tls(Boolean) Disable TLS for the MongoDB connection. Defaults to false (TLS enabled).tls_host(String) TLS/SSL host for secure connections.
Required:
replication_mode(String) Replication mode for the MongoDB pipe. (cdc,snapshot,cdc_only)
Optional:
delete_on_merge(Boolean) Enable hard delete behavior in ReplacingMergeTree for MongoDB DELETE operations.pull_batch_size(Number) Number of rows to pull in each batch during CDC replication.snapshot_num_rows_per_partition(Number) Number of rows per partition during the snapshot phase.snapshot_number_of_parallel_tables(Number) Number of collections to snapshot in parallel during the initial load phase.sync_interval_seconds(Number) Interval in seconds to sync data from MongoDB during CDC replication.use_json_native_format(Boolean) Store JSON values in native ClickHouse JSON format. When disabled, JSON data is stored as String.
Required:
source_collection(String) MongoDB source collection name.source_database_name(String) MongoDB source database name.target_table(String) ClickHouse target table name. The table will be created automatically if it does not exist.
Optional:
table_engine(String) Table engine to use for the target table. (MergeTree,ReplacingMergeTree,Null)
Required:
username(String) The username for the MongoDB instance.
Optional:
password(String, Sensitive) The password for the MongoDB instance. Usepassword_woinstead to keep the value out of state.password_wo(String, Sensitive, Write-only) Write-only password for the MongoDB instance. Not persisted to state. Pair withpassword_wo_versionto trigger updates.password_wo_version(Number) Version trigger forpassword_wo. Increment to push a new password to the API.
Required:
credentials(Attributes, Sensitive) The credentials for the MySQL instance. Username is always required. Forbasicauthentication, supply eitherpasswordorpassword_wo. ForIAM_ROLEauthentication, password is optional. (see below for nested schema)host(String) The hostname of the MySQL instance.settings(Attributes) Settings for the MySQL CDC pipe. (see below for nested schema)table_mappings(Attributes Set) Table mappings from MySQL source to ClickHouse destination. (see below for nested schema)
Optional:
authentication(String) Authentication method for MySQL connection. Supported values:basic,IAM_ROLE. Default isbasic.ca_certificate(String) PEM encoded CA certificate to validate the MySQL server certificate.disable_tls(Boolean) Disable TLS for the MySQL connection.iam_role(String) IAM role ARN for IAM authentication. Required when authentication is set toIAM_ROLE.port(Number) The port of the MySQL instance. Default is 3306.skip_cert_verification(Boolean) Skip certificate verification for the MySQL connection.tls_host(String) TLS/SSL host for secure connections. Used to verify the server certificate.type(String) The type of MySQL-compatible source. (mysql,rdsmysql,auroramysql,planetscalevitess,mariadb,rdsmariadb). Default ismysql.
Required:
username(String, Sensitive) The username for the MySQL instance.
Optional:
password(String, Sensitive) The password for the MySQL instance. Usepassword_woinstead to keep the value out of state.password_wo(String, Sensitive, Write-only) Write-only password for the MySQL instance. Not persisted to state. Pair withpassword_wo_versionto trigger updates.password_wo_version(Number) Version trigger forpassword_wo. Increment to push a new password to the API.
Required:
replication_mode(String) Replication mode for the MySQL pipe. (cdc,snapshot,cdc_only)
Optional:
allow_nullable_columns(Boolean) Allow nullable columns in the destination table.delete_on_merge(Boolean) Enable hard delete behavior in ReplacingMergeTree for MySQL DELETE operations.initial_load_parallelism(Number) Number of parallel connections to use during initial load.pull_batch_size(Number) Number of rows to pull in each batch.replication_mechanism(String) Replication mechanism for the MySQL pipe. (GTID,FILE_POS). Default isGTID.snapshot_num_rows_per_partition(Number) Number of rows to snapshot per partition.snapshot_number_of_parallel_tables(Number) Number of parallel tables to snapshot.sync_interval_seconds(Number) Interval in seconds to sync data from MySQL.use_compression(Boolean) Enable compression for the MySQL replication connection.
Required:
source_schema_name(String) Source schema (database) name in MySQL.source_table(String) Source table name in MySQL.target_table(String) Target table name in ClickHouse.
Optional:
excluded_columns(List of String) Columns to exclude from replication.partition_key(String) Custom partitioning column used for parallel snapshotting. Must be an indexed column of integer, date, datetime, or timestamp type.sorting_keys(List of String) Ordered list of columns to use as sorting key for the target table. Required when use_custom_sorting_key is true.table_engine(String) Table engine to use for the target table. (MergeTree,ReplacingMergeTree,Null)use_custom_sorting_key(Boolean) Whether to use a custom sorting key for the target table.
Required:
format(String) The format of the S3 objects. (JSONEachRow,CSV,CSVWithNames,Parquet)
Optional:
access_key(Attributes) Access key (see below for nested schema)authentication(String) CONNECTION_STRING is for Azure Blob Storage. IAM_ROLE and IAM_USER are for AWS S3. IAM_USER and SERVICE_ACCOUNT are for GCS. If not provided, no authentication is usedazure_container_name(String) Container name for Azure Blob Storage. Required when type is azureblobstorage. Example:mycontainercompression(String) Compression algorithm used for the files.. (none,auto,gzip,brotli,br,xz,LZMA,zstd)connection_string(String, Sensitive) Connection string for Azure Blob Storage authentication. Required when authentication is CONNECTION_STRING. Example:DefaultEndpointsProtocol=https;AccountName=myaccount;AccountKey=mykey;EndpointSuffix=core.windows.netdelimiter(String) The delimiter for the S3 source. Default is,.iam_role(String) The IAM role for the S3 source. Use withIAM_ROLEauthentication. It can be used with AWS ClickHouse service only. Read more at https://clickhouse.com/docs/en/integrations/clickpipes/object-storage#authenticationis_continuous(Boolean) If set to true, the pipe will continuously read new files from the source. If set to false, the pipe will read the files only once. New files have to be uploaded lexically order.path(String) Path to the file(s) within the Azure container. Used for Azure Blob Storage sources. You can specify multiple files using bash-like wildcards. For more information, see the documentation on using wildcards in path: https://clickhouse.com/docs/en/integrations/clickpipes/object-storage#limitations. Example:data/logs/*.jsonqueue_url(String) Queue URL for event-based continuous ingestion. When provided, files are ingested based on event notifications rather than lexicographical order. Only applicable whenis_continuousistrueand authentication is provided. For S3: SQS URL in the formathttps://sqs.{region}.amazonaws.com/{account-id}/{queue-name}. For GCS: Pub/Sub subscription in the formatprojects/{project}/subscriptions/{subscription}.service_account_key(String, Sensitive) Base64-encoded GCP service account JSON key for GCS authentication. Required when authentication isSERVICE_ACCOUNT.type(String) The type of the S3-compatible source (s3,gcs,azureblobstorage). Default iss3.url(String) The URL of the S3/GCS bucket. Required for S3 and GCS types. Not used for Azure Blob Storage (use path and azure_container_name instead). You can specify multiple files using bash-like wildcards. For more information, see the documentation on using wildcards in path: https://clickhouse.com/docs/en/integrations/clickpipes/object-storage#limitations
Optional:
access_key_id(String, Sensitive) The access key ID for the S3 source. Use withIAM_USERauthentication.secret_key(String, Sensitive) The secret key for the S3 source. Use withIAM_USERauthentication.
Required:
credentials(Attributes, Sensitive) The credentials for the Postgres instance. Username is always required. Forbasicauthentication, supply eitherpasswordorpassword_wo. Foriam_roleauthentication, password is optional. (see below for nested schema)database(String) The database name of the Postgres instance.host(String) The hostname of the Postgres instance.settings(Attributes) Settings for the Postgres CDC pipe. (see below for nested schema)table_mappings(Attributes Set) Table mappings from Postgres source to ClickHouse destination. (see below for nested schema)
Optional:
authentication(String) Authentication method for Postgres connection. Supported values:basic,iam_role. Default isbasic.ca_certificate(String) PEM encoded CA certificate to validate the Postgres server certificate.iam_role(String) IAM role ARN for IAM authentication. Required when authentication is set toiam_role.port(Number) The port of the Postgres instance. Default is 5432.tls_host(String) TLS/SSL host for secure connections. Used to verify the server certificate.type(String) The type of the Postgres source. (postgres,supabase,neon,alloydb,planetscale,rdspostgres,aurorapostgres,cloudsqlpostgres,azurepostgres,crunchybridge,tigerdata). Default ispostgres.
Required:
username(String, Sensitive) The username for the Postgres instance.
Optional:
password(String, Sensitive) The password for the Postgres instance. Usepassword_woinstead to keep the value out of state.password_wo(String, Sensitive, Write-only) Write-only password for the Postgres instance. Not persisted to state. Pair withpassword_wo_versionto trigger updates.password_wo_version(Number) Version trigger forpassword_wo. Increment to push a new password to the API.
Required:
replication_mode(String) Replication mode for the Postgres pipe. (cdc,snapshot,cdc_only)
Optional:
allow_nullable_columns(Boolean) Allow nullable columns in the destination table.delete_on_merge(Boolean) Enable hard delete behavior in ReplacingMergeTree for PostgreSQL DELETE operations.enable_failover_slots(Boolean) Enable failover for created replication slot. Requires a replication slot to NOT be set.initial_load_parallelism(Number) Number of parallel connections to use during initial load.publication_name(String) Publication name to use for replication. If not provided, ClickPipes will create one.pull_batch_size(Number) Number of rows to pull in each batch.replication_slot_name(String) Replication slot name to use for replication. Only applicable when replication_mode iscdc_only.snapshot_num_rows_per_partition(Number) Number of rows to snapshot per partition.snapshot_number_of_parallel_tables(Number) Number of parallel tables to snapshot.sync_interval_seconds(Number) Interval in seconds to sync data from Postgres.
Required:
source_schema_name(String) Source schema name in Postgres.source_table(String) Source table name in Postgres.target_table(String) Target table name in ClickHouse.
Optional:
excluded_columns(List of String) Columns to exclude from replication.partition_key(String) Custom partitioning column used for parallel snapshotting. Only beneficial for PostgreSQL 13 (no benefit for PG14+, which supports indexed ctid scans). Must be an indexed column of type:smallint,integer,bigint,timestamp without time zone, ortimestamp with time zone. Unrelated to ClickHouse partitioning.sorting_keys(List of String) Ordered list of columns to use as sorting key for the target table. Required when use_custom_sorting_key is true.table_engine(String) Table engine to use for the target table. (MergeTree,ReplacingMergeTree,Null)use_custom_sorting_key(Boolean) Whether to use a custom sorting key for the target table.
Required:
authentication(String) The authentication method for the Pub/Sub source. Currently onlySERVICE_ACCOUNTis supported.format(String) The message format of the Pub/Sub topic. (JSONEachRow,Avro,Protobuf)project_id(String) The GCP project ID that owns the Pub/Sub topic.seek_type(String) The starting position for consuming the subscription. (latest,earliest,timestamp)service_account_key(Attributes) GCP service account credentials. Required on create; provide a new value on update to rotate the key. (see below for nested schema)topic(String) The Pub/Sub topic name (not the fully-qualified path).
Optional:
ack_deadline(Number) Acknowledgement deadline in seconds (10–600).enable_ordering(Boolean) Whether to enable ordered message delivery. Immutable — changing it requires destroy+create because ordered delivery is a property of the subscription at creation time.filter(String) Optional Pub/Sub subscription filter expression (CEL). Max 256 characters. Immutable — changing it requires destroy+create because the underlying subscription filter cannot be edited in place.seek_timestamp(String) RFC 3339 timestamp (e.g.2026-04-10T12:00:00Z). Required whenseek_type = "timestamp"; must be omitted otherwise.
Required:
service_account_file(String, Sensitive) Base64-encoded GCP service account JSON key file contents.
Required:
destination_field(String) The name of the column in destination table.source_field(String) The name of the source field.
Optional:
replica_cpu_millicores(Number) The CPU allocation per replica in millicores. Must be between 125 and 2000.replica_memory_gb(Number) The memory allocation per replica in GB. Must be between 0.5 and 8.0.replicas(Number) The number of desired replicas for the ClickPipe. Default is 1. The maximum value is 10.
Import is supported using the following syntax:
The terraform import command can be used, for example:
# ClickPipes can be imported by specifying both service ID and clickpipe ID.
terraform import clickhouse_clickpipe.example xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx