Skip to content

[Bug] mysql cd同步达梦数据库中,想做到多表同步 #10199

@hope1234567hope

Description

@hope1234567hope

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

mysql cd同步达梦数据库中,想做到多表同步
[root@thirdmongodb tstmy]# cat dm07.conf
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 10000
}

source {
MySQL-CDC {
url = "jdbc:mysql://192.168.137.143:3307/MyDB_one?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8"
driver = "com.mysql.cj.jdbc.Driver"
username = "root"
password = "123456"
table-names = ["MyDB_one.t","MyDB_one.myrt2"]
#table-names = ["MyDB_one.t"]
startup.mode = "initial"
}
}

sink {
Jdbc {
url = "jdbc:dm://10.27.138.228:5236?schema=MYDB"
driver = "dm.jdbc.driver.DmDriver"
user = "SYSDBA"
password = "Passc0de003"

database="DMDB228"
table-names = ["MYDB.t","MYDB.myrt2"]
generate_sink_sql = true 
batch_size = 1000

}
}
其中database是达梦中select * from v$database中查name到的,
但它总是到报错,部分错位如下
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_292]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
Caused by: java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method) ~[?:1.8.0_292]
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:172) ~[seatunnel-transforms-v2.jar:2.3.12]
... 16 more
2025-12-17 10:04:41,748 INFO [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=1053492822700720129, pipelineId=1, taskGroupId=2}] - [localhost]:5801 [seatunnel-725946] [5.1] taskDone, taskId = 1000200000000, taskGroup = TaskGroupLocation{jobId=1053492822700720129, pipelineId=1, taskGroupId=2}
2025-12-17 10:04:41,749 INFO [i.d.j.JdbcConnection ] [pool-40-thread-1] - Connection gracefully closed
2025-12-17 10:04:41,753 INFO [o.a.k.c.j.JsonConverterConfig ] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1053492822700720129, pipelineId=1, taskGroupId=2}] - JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true

2025-12-17 10:04:41,755 WARN [i.d.c.m.MySqlConnection ] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1053492822700720129, pipelineId=1, taskGroupId=2}] - Database configuration option 'serverTimezone' is set but is obsolete, please use 'connectionTimeZone' instead
2025-12-17 10:04:41,755 INFO [.e.FieldNamedPreparedStatement] [BlockingWorker-TaskGroupLocation{jobId=1053492822700720129, pipelineId=1, taskGroupId=2}] - PrepareStatement sql is:
MERGE INTO DMDB228."t" TARGET USING (SELECT ? "id", ? "name", ? "age") SOURCE ON (TARGET."id"=SOURCE."id") WHEN MATCHED THEN UPDATE SET TARGET."name"=SOURCE."name", TARGET."age"=SOURCE."age" WHEN NOT MATCHED THEN INSERT ("id", "name", "age") VALUES (SOURCE."id", SOURCE."name", SOURCE."age")

2025-12-17 10:04:41,756 INFO [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=1053492822700720129, pipelineId=1, taskGroupId=2}] - [localhost]:5801 [seatunnel-725946] [5.1] taskGroup TaskGroupLocation{jobId=1053492822700720129, pipelineId=1, taskGroupId=2} complete with FAILED
2025-12-17 10:04:41,757 INFO [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=1053492822700720129, pipelineId=1, taskGroupId=2}] - [localhost]:5801 [seatunnel-725946] [5.1] task 1000200000000 error with exception: [java.lang.RuntimeException: java.lang.RuntimeException: table MyDB_one.myrt2 sink throw error], cancel other task in taskGroup TaskGroupLocation{jobId=1053492822700720129, pipelineId=1, taskGroupId=2}.
2025-12-17 10:04:41,757 INFO [o.a.s.e.s.TaskExecutionService] [hz.main.seaTunnel.task.thread-8] - [localhost]:5801 [seatunnel-725946] [5.1] Task TaskGroupLocation{jobId=1053492822700720129, pipelineId=1, taskGroupId=2} complete with state FAILED
2025-12-17 10:04:41,757 INFO [a.s.c.s.c.s.r.SourceReaderBase] [BlockingWorker-TaskGroupLocation{jobId=1053492822700720129, pipelineId=1, taskGroupId=2}] - Closing Source Reader 0.
2025-12-17 10:04:41,759 INFO [o.a.s.c.s.c.s.r.f.SplitFetcher] [BlockingWorker-TaskGroupLocation{jobId=1053492822700720129, pipelineId=1, taskGroupId=2}] - Shutting down split fetcher 0
2025-12-17 10:04:41,759 INFO [o.a.s.e.s.CoordinatorService ] [hz.main.seaTunnel.task.thread-8] - [localhost]:5801 [seatunnel-725946] [5.1] Received task end from execution TaskGroupLocation{jobId=1053492822700720129, pipelineId=1, taskGroupId=2}, state FAILED
2025-12-17 10:04:41,759 INFO [o.a.s.a.e.LoggingEventHandler ] [hz.main.generic-operation.thread-26] - log event: ReaderCloseEvent(createdTime=1765937081757, jobId=1053492822700720129, eventType=LIFECYCLE_READER_CLOSE)
2025-12-17 10:04:41,761 INFO [o.a.s.e.s.d.p.PhysicalVertex ] [hz.main.seaTunnel.task.thread-8] - Job (1053492822700720129), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-MySQL-CDC]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=1053492822700720129, pipelineId=1, taskGroupId=2}] turned from state RUNNING to FAILED.
2025-12-17 10:04:41,761 INFO [o.a.s.e.s.d.p.PhysicalVertex ] [hz.main.seaTunnel.task.thread-8] - Job (1053492822700720129), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-MySQL-CDC]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=1053492822700720129, pipelineId=1, taskGroupId=2}] state process is stopped
2025-12-17 10:04:41,761 ERROR [o.a.s.e.s.d.p.PhysicalVertex ] [hz.main.seaTunnel.task.thread-8] - Job (1053492822700720129), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-MySQL-CDC]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=1053492822700720129, pipelineId=1, taskGroupId=2}] end with state FAILED and Exception: java.lang.RuntimeException: java.lang.RuntimeException: table MyDB_one.myrt2 sink throw error
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:303)
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:70)
at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:82)
at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:56)
at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:72)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:77)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:679)
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1008)
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: table MyDB_one.myrt2 sink throw error
at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.subSinkErrorCheck(MultiTableSinkWriter.java:140)
at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.write(MultiTableSinkWriter.java:184)
at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.write(MultiTableSinkWriter.java:47)
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:269)
... 17 more
Caused by: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-08], ErrorDescription:[Sql operation failed, such as (execute,addBatch,close) etc...] - unable to open JDBC writer
at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.createAndOpenStatementExecutor(JdbcOutputFormat.java:82)
at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.open(JdbcOutputFormat.java:74)
at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter.tryOpen(JdbcSinkWriter.java:128)
at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter.write(JdbcSinkWriter.java:139)
at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter.write(JdbcSinkWriter.java:44)
at org.apache.seatunnel.api.sink.multitablesink.MultiTableWriterRunnable.run(MultiTableWriterRunnable.java:67)
... 6 more
Caused by: dm.jdbc.driver.DMException: 第1 行附近出现错误:
无效的模式名[DMDB228]
at dm.jdbc.driver.DBError.throwException(SourceFile:793)
at dm.jdbc.a.a.ac.s(SourceFile:647)
at dm.jdbc.a.a.f.s(SourceFile:143)
at dm.jdbc.a.a.ac.K(SourceFile:579)
at dm.jdbc.a.a.ac.V(SourceFile:560)
at dm.jdbc.a.a.a(SourceFile:278)
at dm.jdbc.a.a.a(SourceFile:824)
at dm.jdbc.driver.DmdbPreparedStatement.do_prepareSql(SourceFile:305)
at dm.jdbc.driver.DmdbPreparedStatement.prepareSql(SourceFile:301)
at dm.jdbc.driver.DmdbPreparedStatement.allocateHandle(SourceFile:219)
at dm.jdbc.driver.DmdbPreparedStatement.(SourceFile:152)
at dm.jdbc.driver.DmdbPreparedStatement.(SourceFile:182)
at dm.jdbc.driver.DmdbConnection.do_prepareStatement(SourceFile:950)
at dm.jdbc.driver.DmdbConnection.do_prepareStatement(SourceFile:941)
at dm.jdbc.driver.DmdbConnection.prepareStatement(SourceFile:1436)
at org.apache.seatunnel.shade.com.zaxxer.hikari.pool.ProxyConnection.prepareStatement(ProxyConnection.java:337)
at org.apache.seatunnel.shade.com.zaxxer.hikari.pool.HikariProxyConnection.prepareStatement(HikariProxyConnection.java)
at

SeaTunnel Version

apache-seatunnel-2.3.12

SeaTunnel Config

[root@thirdmongodb tstmy]# cat  dm07.conf
env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 10000
}
 
source {
  MySQL-CDC {
    url = "jdbc:mysql://192.168.137.143:3307/MyDB_one?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8"
    driver = "com.mysql.cj.jdbc.Driver"
    username = "root"
    password = "123456"
    table-names = ["MyDB_one.t","MyDB_one.myrt2"]
    #table-names = ["MyDB_one.t"]
    startup.mode = "initial"
  }
}
 
sink {
  Jdbc {
    url = "jdbc:dm://10.27.138.228:5236?schema=MYDB"
    driver = "dm.jdbc.driver.DmDriver"
    user = "SYSDBA"
    password = "Passc0de003"

    database="DMDB228"
    table-names = ["MYDB.t","MYDB.myrt2"]

    generate_sink_sql = true 
 
    batch_size = 1000
  }
}

Running Command

/opt/apache-seatunnel-2.3.12/bin/seatunnel.sh  --config /opt/apache-seatunnel-2.3.12/tstmy/dm07.conf -m local

Error Exception

2025-12-17 10:04:41,921 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.lang.RuntimeException: table MyDB_one.myrt2 sink throw error
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:303)
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:70)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:82)
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:56)
        at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:72)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:77)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:679)
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1008)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: table MyDB_one.myrt2 sink throw error
        at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.subSinkErrorCheck(MultiTableSinkWriter.java:140)
        at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.write(MultiTableSinkWriter.java:184)
        at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.write(MultiTableSinkWriter.java:47)
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:269)
        ... 17 more
Caused by: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-08], ErrorDescription:[Sql operation failed, such as (execute,addBatch,close) etc...] - unable to open JDBC writer
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.createAndOpenStatementExecutor(JdbcOutputFormat.java:82)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.open(JdbcOutputFormat.java:74)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter.tryOpen(JdbcSinkWriter.java:128)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter.write(JdbcSinkWriter.java:139)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter.write(JdbcSinkWriter.java:44)
        at org.apache.seatunnel.api.sink.multitablesink.MultiTableWriterRunnable.run(MultiTableWriterRunnable.java:67)
        ... 6 more
Caused by: dm.jdbc.driver.DMException: 第1 行附近出现错误:
无效的模式名[DMDB228]
        at dm.jdbc.driver.DBError.throwException(SourceFile:793)
        at dm.jdbc.a.a.ac.s(SourceFile:647)
        at dm.jdbc.a.a.f.s(SourceFile:143)
        at dm.jdbc.a.a.ac.K(SourceFile:579)
        at dm.jdbc.a.a.ac.V(SourceFile:560)
        at dm.jdbc.a.a.a(SourceFile:278)
        at dm.jdbc.a.a.a(SourceFile:824)
        at dm.jdbc.driver.DmdbPreparedStatement.do_prepareSql(SourceFile:305)
        at dm.jdbc.driver.DmdbPreparedStatement.prepareSql(SourceFile:301)
        at dm.jdbc.driver.DmdbPreparedStatement.allocateHandle(SourceFile:219)
        at dm.jdbc.driver.DmdbPreparedStatement.<init>(SourceFile:152)
        at dm.jdbc.driver.DmdbPreparedStatement.<init>(SourceFile:182)
        at dm.jdbc.driver.DmdbConnection.do_prepareStatement(SourceFile:950)
        at dm.jdbc.driver.DmdbConnection.do_prepareStatement(SourceFile:941)
        at dm.jdbc.driver.DmdbConnection.prepareStatement(SourceFile:1436)
        at org.apache.seatunnel.shade.com.zaxxer.hikari.pool.ProxyConnection.prepareStatement(ProxyConnection.java:337)
        at org.apache.seatunnel.shade.com.zaxxer.hikari.pool.HikariProxyConnection.prepareStatement(HikariProxyConnection.java)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement.prepareStatement(FieldNamedPreparedStatement.java:673)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder.lambda$createSimpleExecutor$6(JdbcOutputFormatBuilder.java:342)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor.prepareStatements(SimpleBatchStatementExecutor.java:43)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferReducedBatchStatementExecutor.prepareStatements(BufferReducedBatchStatementExecutor.java:50)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.createAndOpenStatementExecutor(JdbcOutputFormat.java:80)
        ... 11 more

        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
        ... 2 more
 
2025-12-17 10:04:41,921 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
===============================================================================

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions