Skip to content

[Bug][Connector-v2] Schema evolution with CDC fails silently on Flink 1.13 with multiple parallelism #10787

@CloverDew

Description

@CloverDew

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

What happened

When running a MySQL CDC job with schema evolution (schema-changes.enabled = true) on Flink 1.13 with parallelism ≥ 2, the job silently produces incorrect results:

  • DDL (ALTER TABLE) is never applied to the sink — schema changes are buffered in SchemaOperator waiting for a checkpoint to complete, but no checkpoint ever completes.

  • DML data is lost — JdbcSinkWriter buffers records in a batch and only flushes them during prepareCommit(), which is triggered by checkpoints. Since checkpoints stop, the buffer is never flushed.

  • The root cause is a Flink 1.13 framework limitation (pre-FLIP-147): once any source subtask enters FINISHED state, CheckpointCoordinator permanently rejects all subsequent checkpoint attempts with:

Failed to trigger checkpoint since some tasks of job has been finished, abort the checkpoint.
Failure reason: Not all required tasks are currently running.In a multi-parallelism CDC job, the number of source splits (e.g., 1 table = 1 split) is often less than the parallelism. The subtask that receives no split finishes immediately, permanently killing all future checkpoints for the entire job.

This issue was addressed in the Flink project itself via FLIP-147, released in Flink 1.14.

SeaTunnel Version

--

SeaTunnel Config

env {
  # You can set engine configuration here
  parallelism = 2
  job.mode = "STREAMING"
  checkpoint.interval = 5000
  read_limit.bytes_per_second=7000000
  read_limit.rows_per_second=400
}

source {
  MySQL-CDC {
    server-id = 5652-5657
    username = "st_user_source"
    password = "mysqlpw"
    table-names = ["shop.products"]
    url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"

    schema-changes.enabled = true
  }
}

sink {
  jdbc {
    url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "st_user_sink"
    password = "mysqlpw"
    generate_sink_sql = true
    database = shop
    table = mysql_cdc_e2e_sink_table_with_schema_change
    primary_keys = ["id"]
  }
}

Running Command

--

Error Exception

[] 2026-04-18 21:59:40,465 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,464 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FINISHED to JobManager for task schema-evolution (2/2)#0 0c0b60b1468465b935b1ebbdb10a67db.
[] 2026-04-18 21:59:40,468 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,467 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcMultiTableResourceManager [] - start close connection poolHikariPool-1
[] 2026-04-18 21:59:40,468 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,467 INFO  org.apache.seatunnel.shade.com.zaxxer.hikari.HikariDataSource [] - HikariPool-1 - Shutdown initiated...
[] 2026-04-18 21:59:40,469 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:jobmanager] - STDOUT: 2026-04-18 13:59:40,468 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: MySQL-CDC-Source (2/2) (7da7905d6b2d054957ed512038c68fe3) switched from RUNNING to FINISHED.
[] 2026-04-18 21:59:40,472 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,471 INFO  org.apache.seatunnel.shade.com.zaxxer.hikari.HikariDataSource [] - HikariPool-1 - Shutdown completed.
[] 2026-04-18 21:59:40,472 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:jobmanager] - STDOUT: 2026-04-18 13:59:40,471 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - schema-evolution (2/2) (0c0b60b1468465b935b1ebbdb10a67db) switched from RUNNING to FINISHED.
[] 2026-04-18 21:59:40,472 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,471 INFO  org.apache.seatunnel.api.event.LoggingEventHandler           [] - log event: WriterCloseEvent(createdTime=1776520780471, jobId=null, eventType=LIFECYCLE_WRITER_CLOSE)
[] 2026-04-18 21:59:40,473 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,472 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - BroadcastSchemaHandler -> Sink Writer: MultiTableSink-Sink (2/2)#0 (9681f59cef0e75c65e73f7ef50ffd2b3) switched from RUNNING to FINISHED.
[] 2026-04-18 21:59:40,473 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,472 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for BroadcastSchemaHandler -> Sink Writer: MultiTableSink-Sink (2/2)#0 (9681f59cef0e75c65e73f7ef50ffd2b3).
[] 2026-04-18 21:59:40,474 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,473 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FINISHED to JobManager for task BroadcastSchemaHandler -> Sink Writer: MultiTableSink-Sink (2/2)#0 9681f59cef0e75c65e73f7ef50ffd2b3.
[] 2026-04-18 21:59:40,477 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:jobmanager] - STDOUT: 2026-04-18 13:59:40,476 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - BroadcastSchemaHandler -> Sink Writer: MultiTableSink-Sink (2/2) (9681f59cef0e75c65e73f7ef50ffd2b3) switched from RUNNING to FINISHED.
[] 2026-04-18 21:59:40,479 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:jobmanager] - STDOUT: 2026-04-18 13:59:40,478 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job 13d21bcc5952844f8163abbfb049b60c: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]
[] 2026-04-18 21:59:40,480 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,479 INFO  org.apache.seatunnel.api.event.LoggingEventHandler           [] - log event: MessageDelayedEvent(createdTime=1776520780479, jobId=13d21bcc5952844f8163abbfb049b60c, eventType=READER_MESSAGE_DELAYED, delayTime=1973, record=SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1776520778, file=mysql-bin.000003, pos=15590, gtids=b73f798c-3b2e-11f1-967c-56c873bbfd8a:1-62, server_id=223344}} ConnectRecord{topic='mysql_binlog_source', kafkaPartition=0, key=Struct{databaseName=shop}, keySchema=Schema{io.debezium.connector.mysql.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1776520778506,db=shop,table=products,server_id=223344,gtid=b73f798c-3b2e-11f1-967c-56c873bbfd8a:63,file=mysql-bin.000003,pos=15379,row=0},databaseName=shop,ddl=alter table products ADD COLUMN add_column1 varchar(64) not null default 'yy',ADD COLUMN add_column2 int not null default 1,tableChanges=[Stru
[] 2026-04-18 21:59:40,480 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: ct{type=ALTER,id="shop"."products",table=Struct{defaultCharsetName=utf8mb4,primaryKeyColumnNames=[id],columns=[Struct{name=id,jdbcType=4,typeName=INT,typeExpression=INT,position=1,optional=false,autoIncremented=true,generated=true}, Struct{name=name,jdbcType=12,typeName=VARCHAR,typeExpression=VARCHAR,charsetName=utf8mb4,length=255,position=2,optional=false,autoIncremented=false,generated=false}, Struct{name=description,jdbcType=12,typeName=VARCHAR,typeExpression=VARCHAR,charsetName=utf8mb4,length=512,position=3,optional=true,autoIncremented=false,generated=false}, Struct{name=weight,jdbcType=6,typeName=FLOAT,typeExpression=FLOAT,position=4,optional=true,autoIncremented=false,generated=false}, Struct{name=add_column1,jdbcType=12,typeName=VARCHAR,typeExpression=VARCHAR,charsetName=utf8mb4,length=64,position=5,optional=false,autoIncremented=false,generated=false}, Struct{name=add_column2,jdbcType=4,typeName=INT,typeExpression=INT,position=6,optional=false,autoIncremented=false,generated=false}]}}]}, valueSchema=
[] 2026-04-18 21:59:40,481 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: Schema{io.debezium.connector.mysql.SchemaChangeValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)})
[] 2026-04-18 21:59:40,482 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,481 INFO  org.apache.kafka.connect.json.JsonConverterConfig            [] - JsonConverterConfig values: 
[] 2026-04-18 21:59:40,482 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 	converter.type = value
[] 2026-04-18 21:59:40,482 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 	decimal.format = BASE64
[] 2026-04-18 21:59:40,482 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 	schemas.cache.size = 1000
[] 2026-04-18 21:59:40,482 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 	schemas.enable = true
[] 2026-04-18 21:59:40,482 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 
[] 2026-04-18 21:59:40,496 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,495 WARN  io.debezium.connector.mysql.MySqlValueConverters             [] - Column is missing a character set: add_column1 VARCHAR(64) NOT NULL DEFAULT VALUE yy
[] 2026-04-18 21:59:40,496 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,495 WARN  io.debezium.connector.mysql.MySqlValueConverters             [] - Using UTF-8 charset by default for column without charset: add_column1 VARCHAR(64) NOT NULL DEFAULT VALUE yy
[] 2026-04-18 21:59:40,498 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,497 INFO  org.apache.seatunnel.api.event.LoggingEventHandler           [] - log event: AlterTableColumnsEvent(super=AlterTableEvent(super=TableEvent(createdTime=1776520780497, tableIdentifier=.shop.products, jobId=13d21bcc5952844f8163abbfb049b60c, statement=alter table products ADD COLUMN add_column1 varchar(64) not null default 'yy',ADD COLUMN add_column2 int not null default 1, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://mysql_cdc_e2e:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()})), events=[AlterTableAddColumnEvent(super=AlterTableColumnEvent(super=AlterTableEvent(super=TableEvent(createdTime=1776520780495, tableIdentifier=.shop.products, jobId=null, statement=null, sourceDialectName=MySQL, changeAfter=CatalogTable{tableI
[] 2026-04-18 21:59:40,498 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: d=MySQL.shop.products, tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://mysql_cdc_e2e:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()}))), column=PhysicalColumn(super=Column(name=add_column1, dataType=STRING, columnLength=256, scale=null, nullable=false, defaultValue=yy, comment=null, sourceType=VARCHAR(64), sinkType=null, options=null, isUnsigned=false, isZeroFill=false, bitLen=2048, longColumnLength=256)), first=false, afterColumn=null), AlterTableAddColumnEvent(super=AlterTableColumnEvent(super=AlterTableEvent(super=TableEvent(createdTime=1776520780496, tableIdentifier=.shop.products, jobId=null, statement=null, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table
[] 2026-04-18 21:59:40,498 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: -name=shop.products, connector=jdbc, url=jdbc:mysql://mysql_cdc_e2e:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()}))), column=PhysicalColumn(super=Column(name=add_column2, dataType=INT, columnLength=null, scale=null, nullable=false, defaultValue=1, comment=null, sourceType=INT, sinkType=null, options=null, isUnsigned=false, isZeroFill=false, bitLen=0, longColumnLength=null)), first=false, afterColumn=null)])
[] 2026-04-18 21:59:40,511 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,510 INFO  org.apache.seatunnel.translation.flink.schema.SchemaOperator [] - Starting schema change processing for table: .shop.products, job: 13d21bcc5952844f8163abbfb049b60c, event time: 1776520780497
[] 2026-04-18 21:59:40,511 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,510 INFO  org.apache.seatunnel.translation.flink.schema.SchemaOperator [] - Broadcasting SchemaChangeEvent to all downstream sink subtasks for table: .shop.products
[] 2026-04-18 21:59:40,512 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,511 INFO  org.apache.seatunnel.translation.flink.schema.SchemaOperator [] - SchemaChangeEvent broadcast sent for table: .shop.products
[] 2026-04-18 21:59:40,512 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,511 INFO  org.apache.seatunnel.translation.flink.schema.SchemaOperator [] - Synchronously processing schema change for table .shop.products (epoch 1776520780497). Business data buffered.
[] 2026-04-18 21:59:40,512 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,511 INFO  org.apache.seatunnel.translation.flink.schema.coordinator.LocalSchemaCoordinator [] - Requesting schema change for table .shop.products (epoch 1776520780497). Waiting for all 2 sink subtasks to apply after checkpoint completion.
[] 2026-04-18 21:59:40,613 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,611 INFO  org.apache.seatunnel.translation.flink.schema.BroadcastSchemaSinkOperator [] - Subtask 0 applying schema change immediately for table .shop.products (epoch 1776520780497, change: AlterTableColumnsEvent). This prevents deadlock by allowing checkpoint barriers to propagate.
[] 2026-04-18 21:59:40,617 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,615 INFO  org.apache.seatunnel.translation.flink.sink.FlinkSinkWriter  [] - FlinkSinkWriter applying SchemaChangeEvent for table: .shop.products
[] 2026-04-18 21:59:40,617 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,615 INFO  org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter [] - Start apply schema change for table shop.products sub-writer 0
[] 2026-04-18 21:59:40,663 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,662 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect [] - Executing add column SQL: ALTER TABLE `shop`.`mysql_cdc_e2e_sink_table_with_schema_change` ADD COLUMN `add_column1` VARCHAR(64) NOT NULL DEFAULT 'yy'
[] 2026-04-18 21:59:40,719 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,718 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect [] - Executing add column SQL: ALTER TABLE `shop`.`mysql_cdc_e2e_sink_table_with_schema_change` ADD COLUMN `add_column2` INT NOT NULL DEFAULT 1
[] 2026-04-18 21:59:40,766 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,765 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement [] - PrepareStatement sql is:
[] 2026-04-18 21:59:40,766 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: INSERT INTO `shop`.`mysql_cdc_e2e_sink_table_with_schema_change` (`id`, `name`, `description`, `weight`, `add_column1`, `add_column2`) VALUES (?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`), `name`=VALUES(`name`), `description`=VALUES(`description`), `weight`=VALUES(`weight`), `add_column1`=VALUES(`add_column1`), `add_column2`=VALUES(`add_column2`)
[] 2026-04-18 21:59:40,766 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 
[] 2026-04-18 21:59:40,768 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,766 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement [] - PrepareStatement sql is:
[] 2026-04-18 21:59:40,768 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: DELETE FROM `shop`.`mysql_cdc_e2e_sink_table_with_schema_change` WHERE `id` = ?
[] 2026-04-18 21:59:40,768 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 
[] 2026-04-18 21:59:40,768 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,767 INFO  org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter [] - Finish apply schema change for table shop.products sub-writer 0
[] 2026-04-18 21:59:40,768 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,767 INFO  org.apache.seatunnel.translation.flink.sink.FlinkSinkWriter  [] - FlinkSinkWriter successfully applied SchemaChangeEvent for table: .shop.products
[] 2026-04-18 21:59:40,768 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,767 INFO  org.apache.seatunnel.translation.flink.schema.coordinator.LocalSchemaCoordinator [] - Subtask 0 applied schema change for table .shop.products (epoch 1776520780497), success: true. 1/2 subtasks applied.
[] 2026-04-18 21:59:40,768 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,767 INFO  org.apache.seatunnel.translation.flink.sink.FlinkSinkWriter  [] - FlinkSinkWriter sent schema change ack to coordinator for table .shop.products (epoch 1776520780497), subtask 0, success: true
[] 2026-04-18 21:59:40,768 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,767 WARN  org.apache.seatunnel.translation.flink.schema.coordinator.LocalSchemaCoordinator [] - Subtask 0 already applied schema change for table .shop.products (epoch 1776520780497). Ignoring duplicate notification.
[] 2026-04-18 21:59:40,768 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:40,767 INFO  org.apache.seatunnel.translation.flink.schema.BroadcastSchemaSinkOperator [] - Subtask 0 processed schema change for table .shop.products (epoch 1776520780497) and sent ACK to coordinator.
[] 2026-04-18 21:59:41,020 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:41,018 WARN  org.apache.seatunnel.connectors.cdc.base.schema.AbstractSchemaChangeResolver [] - Ignoring statement for non-captured table ALTER TABLE `shop`.`mysql_cdc_e2e_sink_table_with_schema_change` ADD COLUMN `add_column1` VARCHAR(64) NOT NULL DEFAULT 'yy'
[] 2026-04-18 21:59:41,020 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:41,018 WARN  org.apache.seatunnel.connectors.cdc.base.schema.AbstractSchemaChangeResolver [] - Ignoring statement for non-captured table ALTER TABLE `shop`.`mysql_cdc_e2e_sink_table_with_schema_change` ADD COLUMN `add_column2` INT NOT NULL DEFAULT 1
[] 2026-04-18 21:59:41,507 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:41,506 INFO  org.apache.seatunnel.api.event.LoggingEventHandler           [] - log event: AlterTableColumnsEvent(super=AlterTableEvent(super=TableEvent(createdTime=1776520781506, tableIdentifier=.shop.products, jobId=13d21bcc5952844f8163abbfb049b60c, statement=alter table products ADD COLUMN add_column3 float not null default 1.1, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://mysql_cdc_e2e:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()})), events=[AlterTableAddColumnEvent(super=AlterTableColumnEvent(super=AlterTableEvent(super=TableEvent(createdTime=1776520781506, tableIdentifier=.shop.products, jobId=null, statement=null, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=TableSchema(primar
[] 2026-04-18 21:59:41,507 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: yKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://mysql_cdc_e2e:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()}))), column=PhysicalColumn(super=Column(name=add_column3, dataType=FLOAT, columnLength=null, scale=null, nullable=false, defaultValue=1.1, comment=null, sourceType=FLOAT, sinkType=null, options=null, isUnsigned=false, isZeroFill=false, bitLen=0, longColumnLength=null)), first=false, afterColumn=null)])
[] 2026-04-18 21:59:41,511 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:41,510 INFO  org.apache.seatunnel.api.event.LoggingEventHandler           [] - log event: AlterTableColumnsEvent(super=AlterTableEvent(super=TableEvent(createdTime=1776520781509, tableIdentifier=.shop.products, jobId=13d21bcc5952844f8163abbfb049b60c, statement=alter table products ADD COLUMN add_column4 timestamp not null default current_timestamp(), sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://mysql_cdc_e2e:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()})), events=[AlterTableAddColumnEvent(super=AlterTableColumnEvent(super=AlterTableEvent(super=TableEvent(createdTime=1776520781509, tableIdentifier=.shop.products, jobId=null, statement=null, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchem
[] 2026-04-18 21:59:41,512 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: a=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://mysql_cdc_e2e:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()}))), column=PhysicalColumn(super=Column(name=add_column4, dataType=TIMESTAMP, columnLength=null, scale=0, nullable=false, defaultValue=current_timestamp(), comment=null, sourceType=TIMESTAMP, sinkType=null, options=null, isUnsigned=false, isZeroFill=false, bitLen=0, longColumnLength=null)), first=false, afterColumn=null)])
[] 2026-04-18 21:59:42,513 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:42,511 INFO  org.apache.seatunnel.api.event.LoggingEventHandler           [] - log event: MessageDelayedEvent(createdTime=1776520782511, jobId=13d21bcc5952844f8163abbfb049b60c, eventType=READER_MESSAGE_DELAYED, delayTime=4511, record=SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1776520778, file=mysql-bin.000003, pos=17689, gtids=b73f798c-3b2e-11f1-967c-56c873bbfd8a:1-68, row=1, server_id=223344, event=2}} ConnectRecord{topic='mysql_binlog_source.shop.products', kafkaPartition=null, key=Struct{id=113}, keySchema=Schema{mysql_binlog_source.shop.products.Key:STRUCT}, value=Struct{before=Struct{id=113,name=hammer,description=12oz carpenter's hammer,weight=0.75,add_column1=yy,add_column2=1,add_column3=1.100000023841858,add_column4=2026-04-18T13:59:38Z},source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1776520778000,db=shop,table=products,server_id=223344,gtid=b73f798c-3b2e-11f1-967c-56c873bbfd8a:69,file=mysql-bi
[] 2026-04-18 21:59:42,513 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: n.000003,pos=17839,row=0,thread=287},op=d,ts_ms=1776520778701}, valueSchema=Schema{mysql_binlog_source.shop.products.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)})
[] 2026-04-18 21:59:42,520 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:42,519 WARN  io.debezium.connector.mysql.MySqlValueConverters             [] - Column is missing a character set: add_column6 VARCHAR(64) NOT NULL DEFAULT VALUE ff
[] 2026-04-18 21:59:42,520 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:42,519 WARN  io.debezium.connector.mysql.MySqlValueConverters             [] - Using UTF-8 charset by default for column without charset: add_column6 VARCHAR(64) NOT NULL DEFAULT VALUE ff
[] 2026-04-18 21:59:42,520 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: 2026-04-18 13:59:42,519 INFO  org.apache.seatunnel.api.event.LoggingEventHandler           [] - log event: AlterTableColumnsEvent(super=AlterTableEvent(super=TableEvent(createdTime=1776520782519, tableIdentifier=.shop.products, jobId=13d21bcc5952844f8163abbfb049b60c, statement=alter table products ADD COLUMN add_column6 varchar(64) not null default 'ff' after id, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://mysql_cdc_e2e:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()})), events=[AlterTableAddColumnEvent(super=AlterTableColumnEvent(super=AlterTableEvent(super=TableEvent(createdTime=1776520782519, tableIdentifier=.shop.products, jobId=null, statement=null, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=Ta
[] 2026-04-18 21:59:42,520 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:taskmanager] - STDOUT: bleSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://mysql_cdc_e2e:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()}))), column=PhysicalColumn(super=Column(name=add_column6, dataType=STRING, columnLength=256, scale=null, nullable=false, defaultValue=ff, comment=null, sourceType=VARCHAR(64), sinkType=null, options=null, isUnsigned=false, isZeroFill=false, bitLen=2048, longColumnLength=256)), first=false, afterColumn=id)])
[] 2026-04-18 21:59:42,908 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:jobmanager] - STDOUT: 2026-04-18 13:59:42,906 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Failed to trigger checkpoint for job 13d21bcc5952844f8163abbfb049b60c since some tasks of job 13d21bcc5952844f8163abbfb049b60c has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.
[] 2026-04-18 21:59:47,906 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:jobmanager] - STDOUT: 2026-04-18 13:59:47,904 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Failed to trigger checkpoint for job 13d21bcc5952844f8163abbfb049b60c since some tasks of job 13d21bcc5952844f8163abbfb049b60c has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.
[] 2026-04-18 21:59:52,905 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:jobmanager] - STDOUT: 2026-04-18 13:59:52,904 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Failed to trigger checkpoint for job 13d21bcc5952844f8163abbfb049b60c since some tasks of job 13d21bcc5952844f8163abbfb049b60c has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.
[] 2026-04-18 21:59:57,907 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:jobmanager] - STDOUT: 2026-04-18 13:59:57,905 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Failed to trigger checkpoint for job 13d21bcc5952844f8163abbfb049b60c since some tasks of job 13d21bcc5952844f8163abbfb049b60c has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.
[] 2026-04-18 22:00:02,905 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:jobmanager] - STDOUT: 2026-04-18 14:00:02,903 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Failed to trigger checkpoint for job 13d21bcc5952844f8163abbfb049b60c since some tasks of job 13d21bcc5952844f8163abbfb049b60c has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.
[] 2026-04-18 22:00:07,915 INFO  🐳 [tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27:jobmanager] - STDOUT: 2026-04-18 14:00:07,904 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Failed to trigger checkpoint for job 13d21bcc5952844f8163abbfb049b60c since some tasks of job 13d21bcc5952844f8163abbfb049b60c has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.

Zeta or Flink or Spark Version

Flink 1.13

Java or Scala Version

--

Screenshots

--

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions