Skip to content

Commit df993b0

Browse files
author
段晓雄
committed
fix(sink): force emitDownstream=true for Flink 2.2 two-phase commit
Flink 2.2 SinkWriterOperator sets emitDownstream based on whether the sink implements SupportsCommitter. Paimon uses the older TwoPhaseCommittingSink interface, causing emitDownstream=false and committables to be silently discarded. This results in data files written to storage but no snapshot/manifest created, making data unqueryable. Fix: after wrapping SinkWriterOperator, force emitDownstream=true and fill committableSerializer via reflection when the sink supports two-phase commit but does not implement SupportsCommitter. Made-with: Cursor
1 parent 2ea3fe0 commit df993b0

1 file changed

Lines changed: 48 additions & 0 deletions

File tree

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,54 @@ public void setup(
124124
public void open() throws Exception {
125125
invokeFlinkWriterOperatorMethod("open", new Class<?>[0]);
126126
copySinkWriter = getFieldValue("sinkWriter");
127+
ensureEmitDownstream();
128+
}
129+
130+
/**
131+
* Flink 2.2 {@code SinkWriterOperator} sets {@code emitDownstream = sink instanceof
132+
* SupportsCommitter}. Sinks like Paimon implement the older {@code TwoPhaseCommittingSink}
133+
* rather than Flink 2.x {@code SupportsCommitter}, so committables are silently discarded. This
134+
* method forces the flag to {@code true} and fills in the committable serializer so that the
135+
* wrapped operator properly emits committables to the downstream committer.
136+
*/
137+
private void ensureEmitDownstream() {
138+
try {
139+
Field emitField = findField(flinkWriterOperator.getClass(), "emitDownstream");
140+
if (emitField == null) {
141+
return;
142+
}
143+
emitField.setAccessible(true);
144+
if (!emitField.getBoolean(flinkWriterOperator)) {
145+
emitField.setBoolean(flinkWriterOperator, true);
146+
147+
Field serField = findField(flinkWriterOperator.getClass(), "committableSerializer");
148+
if (serField != null) {
149+
serField.setAccessible(true);
150+
if (serField.get(flinkWriterOperator) == null) {
151+
try {
152+
Method getSerializer =
153+
sink.getClass().getMethod("getCommittableSerializer");
154+
getSerializer.setAccessible(true);
155+
serField.set(flinkWriterOperator, getSerializer.invoke(sink));
156+
} catch (NoSuchMethodException ignored) {
157+
}
158+
}
159+
}
160+
}
161+
} catch (Exception e) {
162+
LOG.warn("Could not force emitDownstream on wrapped SinkWriterOperator", e);
163+
}
164+
}
165+
166+
private static Field findField(Class<?> clazz, String name) {
167+
while (clazz != null) {
168+
try {
169+
return clazz.getDeclaredField(name);
170+
} catch (NoSuchFieldException e) {
171+
clazz = clazz.getSuperclass();
172+
}
173+
}
174+
return null;
127175
}
128176

129177
@Override

0 commit comments

Comments
 (0)