Skip to content

[Bug] [Connector-v2]It can't work when resuming the task for Kafka which chooses EXACTLY_ONCE. #10005

@davidzollo

Description

@davidzollo

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

It can't do checkpoint when resume the task through ./seatunnel.sh -r 1036289141794406401 -c ../config/mysqlcdc_to_kafka_exactly_once.conf, and it can't read new cdc data from mysql

SeaTunnel Version

2.3.12

SeaTunnel Config

env {
  "job.name"="mysqlcdc_to_kafka_exactly_once"
  "job.mode"=STREAMING
  checkpoint.interval = 60000
  checkpoint.timeout = 300000
}
source {
  MySQL-CDC {
    "split.enable-hash-split-for-string-column"="false"
    "format"="COMPATIBLE_DEBEZIUM_JSON"
    "schema-changes.enabled"="true"
    "startup.mode"=INITIAL
    "snapshot.split.size"=100000
    "snapshot.fetch.size"=1024
    debezium {
      "value.converter.schemas.enable"=true
      "key.converter.schemas.enable"=true
    }
    "connect.timeout.ms"=30000
    "connect.max-retries"=3
    "connection.pool.size"=6
    "chunk-key.even-distribution.factor.lower-bound"=0.05
    "chunk-key.even-distribution.factor.upper-bound"=100
    "sample-sharding.threshold"=1000
    "inverse-sampling.rate"=1000
    "exactly_once"="true"
    "stop.mode"=NEVER
    parallelism=1
    updateFieldType=null
    sceneMode="MULTIPLE_TABLE"
    "result_table_name"=Table18966143936576
    database-names=[
      "lyc_test"
    ]
    table-names=[
      "lyc_test.kfkdata"
    ]
    base-url="jdbc:mysql://xxx:3306/WHRTest?useSSL=false&serverTimezone=UTC&characterEncoding=utf8"
    password="QWerxxx"
    displayName=MySQL-CDC
    username="root"
    server-time-zone=UTC
  }
}
transform {

}
sink {
  Kafka {
    "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
    "topic_partitions_num"=1
    "topic_replication_num"=1
    "data_save_mode"="APPEND_DATA"
    semantics="EXACTLY_ONCE"
    updateFieldType=null
    "source_table_name"=Table18966143936576
    topic="dailidong_exactly_once_kafka"
    format="COMPATIBLE_DEBEZIUM_JSON"
    "bootstrap.servers"="datasource01:9092"
    kafka.config = {
      "transaction.timeout.ms"="900000"
      "request.timeout.ms"="930000"
      "delivery.timeout.ms"="960000"
      "acks"="all"
      "retries"="3"
      "retry.backoff.ms"="200"
    }
  }
}

Running Command

./bin/seatunnel.sh --config config/mysqlcdc_to_kafka_exactly_once.conf

Error Exception

There is no exception info.

seatunnel-engine-server.log

Zeta or Flink or Spark Version

No response

Java or Scala Version

JDK 8

Screenshots

Image Image

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions