Skip to content

[FLINK-38054] flink-connector-elasticsearch8 use correct ClassLoader in OperationSerializer#136

Open
mayorandrew wants to merge 2 commits into
apache:mainfrom
chocoapp:FLINK-38054-fix-elasticsearch8-deserialization
Open

[FLINK-38054] flink-connector-elasticsearch8 use correct ClassLoader in OperationSerializer#136
mayorandrew wants to merge 2 commits into
apache:mainfrom
chocoapp:FLINK-38054-fix-elasticsearch8-deserialization

Conversation

@mayorandrew
Copy link
Copy Markdown

@mayorandrew mayorandrew commented Nov 7, 2025

Fixes FLINK-38054

I am not sure this is the best solution in this case, but it mitigates the original problem of Operation classes being invisible during deserialization.

The PR branch is opened from last main branch commit for version 3.1.0, so that it is compatible with Flink version 1.20.

@boring-cyborg
Copy link
Copy Markdown

boring-cyborg Bot commented Nov 7, 2025

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

@mayorandrew
Copy link
Copy Markdown
Author

mayorandrew commented Nov 14, 2025

Unfortunately this PR only fixes the class discovery error. However, the Operation itself still fails to deserialize due a NullPointerException from Jackson:

com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
_children (com.fasterxml.jackson.databind.node.ArrayNode)
_children (com.fasterxml.jackson.databind.node.ObjectNode)
doc (co.elastic.clients.elasticsearch.core.bulk.UpdateAction)
action (co.elastic.clients.elasticsearch.core.bulk.UpdateOperation)
bulkOperationVariant (org.apache.flink.connector.elasticsearch.sink.Operation)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
	at org.apache.flink.connector.elasticsearch.sink.OperationSerializer.deserialize(OperationSerializer.java:52)
	at org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:39)
	at org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:30)
	at org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:81)
	at org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:39)
	at org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:227)
	at org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138)
	at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
	at org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:119)
	at org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:103)
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:148)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:147)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:294)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:972)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:941)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:765)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
	at java.base/java.util.ArrayList.ensureCapacity(ArrayList.java:214)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:96)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	... 41 more

I'm trying to look deeper, but I'm new to this serialization stuff, so any hints or guidance is appreciated.

@mayorandrew
Copy link
Copy Markdown
Author

I added a custom serializer for JsonNode class which seems to have resolved the second problem. I'll need a few more weeks to verify if Flink is able to restart from a snapshot now.

@mayorandrew
Copy link
Copy Markdown
Author

I have now verified that with this fix our Flink job was able to successfully restart from snapshot after an error.

@denizgursoy
Copy link
Copy Markdown

denizgursoy commented Jan 15, 2026

I also have same problem. Is this going to be merged?
I use the following version

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch8</artifactId>
            <version>4.0.0-2.0</version>
  </dependency>

@snuyanzin
Copy link
Copy Markdown
Contributor

would be great if we have a test for this first in order to be sure it will not be broken again

@denizgursoy
Copy link
Copy Markdown

@mayorandrew Did you find a work around because this is not merged?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants