From 1ec872fd83553c84949147f1666e1ea1db31f24f Mon Sep 17 00:00:00 2001 From: Andrey Starostin Date: Fri, 7 Nov 2025 13:23:01 +0100 Subject: [PATCH 1/2] [FLINK-38054] flink-connector-elasticsearch8 use correct ClassLoader in OperationSerializer --- .../flink/connector/elasticsearch/sink/OperationSerializer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/OperationSerializer.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/OperationSerializer.java index 1c2cb810..633d0cde 100644 --- a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/OperationSerializer.java +++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/OperationSerializer.java @@ -37,6 +37,7 @@ public class OperationSerializer { public OperationSerializer() { kryo.setRegistrationRequired(false); kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); + kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); } public void serialize(Operation request, DataOutputStream out) { From 209b7af0ec510f383a6c9ff62b76df80ffc26a0e Mon Sep 17 00:00:00 2001 From: Andrey Starostin Date: Fri, 14 Nov 2025 18:14:26 +0100 Subject: [PATCH 2/2] [FLINK-38054] flink-connector-elasticsearch8 use custom serializer for JsonNode --- .../sink/OperationSerializer.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/OperationSerializer.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/OperationSerializer.java index 633d0cde..d935d473 100644 --- a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/OperationSerializer.java +++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/OperationSerializer.java @@ -22,8 +22,11 @@ package org.apache.flink.connector.elasticsearch.sink; import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.objenesis.strategy.StdInstantiatorStrategy; import java.io.ByteArrayOutputStream; @@ -33,10 +36,12 @@ /** OperationSerializer is responsible for serialization and deserialization of an Operation. */ public class OperationSerializer { private final Kryo kryo = new Kryo(); + private static final ObjectMapper MAPPER = new ObjectMapper(); public OperationSerializer() { kryo.setRegistrationRequired(false); kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); + kryo.addDefaultSerializer(JsonNode.class, new JsonNodeSerializer()); kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); } @@ -62,4 +67,28 @@ public int size(Operation operation) { return (int) output.total(); } } + + private static class JsonNodeSerializer extends Serializer { + @Override + public void write(Kryo kryo, Output output, JsonNode object) { + try { + byte[] bytes = MAPPER.writeValueAsBytes(object); + output.writeInt(bytes.length, true); + output.writeBytes(bytes); + } catch (Exception e) { + throw new RuntimeException("Failed to serialize JsonNode", e); + } + } + + @Override + public JsonNode read(Kryo kryo, Input input, Class type) { + try { + int length = input.readInt(true); + byte[] bytes = input.readBytes(length); + return MAPPER.readTree(bytes); + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize JsonNode", e); + } + } + } }