Description
Describe the bug
Both Clickhouse kafka connect and clickhouse are running as part of same Kubernetes cluster in same namespace.
Steps to reproduce
- Keep both clickhouse kafka connect and clickhouse in the same namespace
- Keep them running for a long time
- Observer events from kafka getting dropped abruptly
Expected behaviour
Clickhouse kafka connect should write all the data from kafka to clickhouse.
Error log
[2025-02-07 08:55:41,021] ERROR Failed in show tables (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient)
com.clickhouse.client.ClickHouseException: upstream connect error or disconnect/reset before headers. reset reason: connection termination
at com.clickhouse.client.ClickHouseException.of(ClickHouseException.java:151)
at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:275)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.io.IOException: upstream connect error or disconnect/reset before headers. reset reason: connection termination
at com.clickhouse.client.http.ApacheHttpConnectionImpl.checkResponse(ApacheHttpConnectionImpl.java:241)
at com.clickhouse.client.http.ApacheHttpConnectionImpl.post(ApacheHttpConnectionImpl.java:304)
at com.clickhouse.client.http.ClickHouseHttpClient.send(ClickHouseHttpClient.java:195)
at com.clickhouse.client.AbstractClient.sendAsync(AbstractClient.java:161)
at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:273)
... 4 more
Configuration
Environment
-
Kafka-Connect version: v1.2.5
-
Kafka Connect configuration:
Environment variables
- name: CONNECT_BOOTSTRAP_SERVERS
value: "kafka:9092"
- name: CONNECT_GROUP_ID
value: "clickhouse_kafka_connect"
- name: CONNECT_CONFIG_STORAGE_TOPIC
value: "clickhouse_kafka_connect_config"
- name: CONNECT_OFFSET_STORAGE_TOPIC
value: "clickhouse_kafka_connect_offset"
- name: CONNECT_STATUS_STORAGE_TOPIC
value: "clickhouse_kafka_connect_status"
- name: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR
value: "3"
- name: CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
value: "3"
- name: CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
value: "3"
- name: CONNECT_KEY_CONVERTER
value: "org.apache.kafka.connect.json.JsonConverter"
- name: CONNECT_VALUE_CONVERTER
value: "org.apache.kafka.connect.json.JsonConverter"
- name: CONNECT_INTERNAL_KEY_CONVERTER
value: "org.apache.kafka.connect.json.JsonConverter"
- name: CONNECT_INTERNAL_VALUE_CONVERTER
value: "org.apache.kafka.connect.json.JsonConverter"
- name: CONNECT_REST_ADVERTISED_HOST_NAME
value: "clickhouse-kafka-connect.ns.svc.cluster.local"
Connector config:
{
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
"tasks.max": 1,
"topics.regex": "{{ .TopicsRegex }}",
"ssl": false,
"security.protocol": "SSL",
"hostname": "{{ .Hostname }}",
"database": "{{ .DatabaseName }}",
"password": "{{ .Password }}",
"ssl.truststore.location": "/tmp/kafka.client.truststore.jks",
"port": {{ .Port }},
"schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"exactlyOnce": "true",
"username": "{{ .UserName }}",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "ch_deadletterqueue",
"errors.deadletterqueue.context.headers.enable": true,
"errors.deadletterqueue.topic.replication.factor": {{ .DeadletterQueueReplicationFactor }},
"tableRefreshInterval": 5,
"keeperOnCluster": "{{ .ClusterName }}"
} -
Kafka version: 3.8.0
-
Kafka environment: K8s pods with PVC
-
OS: Linux
ClickHouse server
- ClickHouse Server version: v24.6
- ClickHouse Server non-default settings, if any:
<default_replica_path>/clickhouse/tables/{shard}/{database}/{table}</default_replica_path>
<default_replica_name>{replica}</default_replica_name>
<disable_internal_dns_cache>1</disable_internal_dns_cache>
<keeper_map_path_prefix>/keeper_map_tables</keeper_map_path_prefix>
debug
1
1000M
3
<display_name from_env="HOSTNAME"></display_name>
<listen_host>0.0.0.0</listen_host>
<http_port>8123</http_port>
<tcp_port>9000</tcp_port>
<user_directories>
<users_xml>
/etc/clickhouse-server/users.d/users.xml
</users_xml>
<local_directory>
/var/lib/clickhouse/access/
</local_directory>
</user_directories>
<distributed_ddl>
/clickhouse/task_queue/ddl
</distributed_ddl>
<remote_servers>
<cluster_1S_2R>
mysecretphrase
<internal_replication>true</internal_replication>
clickhouse-0
9000
<internal_replication>true</internal_replication>
clickhouse-1
9000
</cluster_1S_2R>
</remote_servers>
clickhouse-keeper-0
9181
clickhouse-keeper-1
9181
clickhouse-keeper-2
9181
1
cluster_1S_2R
CREATE TABLE
statements for tables involved:
CREATE TABLE IF NOT EXISTS event_table on cluster '{cluster}'
(
id
String,
type
Int16,
timestamp
DateTime('UTC'),
user_id
String,
username
String,
field
Boolean,
cl
String,
ag
String,
id1
LowCardinality(String),
id2
LowCardinality(String),
id3
String,
id4
String,
id5
String,
id6
String,
id6
String,
) ENGINE = ReplicatedReplacingMergeTree
PRIMARY KEY (id1, id2, id3, id4, timestamp)
ORDER BY (id1, id2, id3, id4, timestamp, id);- Sample data for all these tables, use clickhouse-obfuscator if necessary