Skip to content

[Bug] [CDC] Cannot instantiate user function. #4549

@J1031

Description

@J1031

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

I just tested dinky with the following docker images, and both are failed
dinkydocker/dinky-standalone-server:1.2.5-flink1.19
dinkydocker/dinky-standalone-server:1.2.5-flink1.20

I used with yarn session, and uploaded flink jars and cdc jars to the hdfs.The flink jars are from the docker containers /opt/flink. cdc jars are from maven, addresses are as follows

https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
https://repo1.maven.org/maven2/org/apache/doris/flink-doris-connector-1.19/26.0.0/flink-doris-connector-1.19-26.0.0.jar

https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.5.0/flink-sql-connector-mysql-cdc-3.5.0.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.5.0/flink-cdc-pipeline-connector-mysql-3.5.0.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-doris/3.5.0/flink-cdc-pipeline-connector-doris-3.5.0.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-paimon/3.5.0/flink-cdc-pipeline-connector-paimon-3.5.0.jar

flink errors are as follows

2026-04-02 10:19:11
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:417)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:869)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:836)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:825)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:825)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:202)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:731)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:713)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.userFunction of type org.apache.flink.api.common.functions.Function in instance of org.apache.flink.streaming.api.operators.StreamFlatMap
	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)
	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:494)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:478)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:473)
	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:428)
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:401)
	... 16 more

What you expected to happen

flink job runs ok

How to reproduce

1.Run the docker conainer
docker run -p 8888:8888 -d --name dinky dinkydocker/dinky-standalone-server:1.2.5-flink1.19

  1. upload the /opt/flink/lib, /opt/flink/plugins and cdc jars to hdfs
  2. start the yarn-session
  3. config the dinky flink instance with the yarn-session's job manager address
  4. execute sql
EXECUTE CDCSOURCE demo_doris
WITH
(
    'connector' = 'mysql-cdc',
    'hostname' = '127.0.0.1',
    'port' = '3306',
    'username' = 'root',
    'password' = '12345',
    'checkpoint' = '10000',
    'scan.startup.mode' = 'initial',
    'parallelism' = '1',
    'table-name' = 'test\.student,test\.score',
    'server-time-zone' = 'Asia/Shanghai',

    'sink.connector' = 'doris',
    'sink.fenodes' = '127.0.0.1:8030',
    'sink.username' = 'root',
    'sink.password' = '123456',

    'sink.table.prefix' = 'ODS_',
    'sink.table.upper' = 'true',
    'sink.table.identifier' = '#{schemaName}.#{tableName}'
);

Anything else

No response

Version

dev

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

BugSomething isn't workingWaiting for replyWaiting for reply

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions