Skip to content

Commit b7841fe

Browse files
xiaoyulibenchao
authored andcommitted
[FLINK-36549][formats] Fix using the ignore-parse-errors parameter in Debezium/Canal/Maxwell/Ogg JSON results in unexpected data loss
1 parent 6362ad1 commit b7841fe

File tree

8 files changed

+231
-38
lines changed

8 files changed

+231
-38
lines changed

flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import java.io.IOException;
4242
import java.io.Serializable;
43+
import java.util.ArrayList;
4344
import java.util.List;
4445
import java.util.Objects;
4546
import java.util.regex.Pattern;
@@ -100,6 +101,9 @@ public final class CanalJsonDeserializationSchema implements DeserializationSche
100101
/** Pattern of the specific table. */
101102
private final Pattern tablePattern;
102103

104+
/** List of data to be processed. */
105+
private transient List<GenericRowData> genericRowDataList;
106+
103107
private CanalJsonDeserializationSchema(
104108
DataType physicalDataType,
105109
List<ReadableMetadata> requestedMetadata,
@@ -200,6 +204,7 @@ public CanalJsonDeserializationSchema build() {
200204

201205
@Override
202206
public void open(InitializationContext context) throws Exception {
207+
genericRowDataList = new ArrayList<>();
203208
jsonDeserializer.open(context);
204209
}
205210

@@ -214,6 +219,7 @@ public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws
214219
if (message == null || message.length == 0) {
215220
return;
216221
}
222+
genericRowDataList.clear();
217223
try {
218224
final JsonNode root = jsonDeserializer.deserializeToJsonNode(message);
219225
if (database != null) {
@@ -238,7 +244,7 @@ public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws
238244
for (int i = 0; i < data.size(); i++) {
239245
GenericRowData insert = (GenericRowData) data.getRow(i, fieldCount);
240246
insert.setRowKind(RowKind.INSERT);
241-
emitRow(row, insert, out);
247+
genericRowDataList.add(handleRow(row, insert));
242248
}
243249
} else if (OP_UPDATE.equals(type)) {
244250
// "data" field is an array of row, contains new rows
@@ -260,16 +266,16 @@ public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws
260266
}
261267
before.setRowKind(RowKind.UPDATE_BEFORE);
262268
after.setRowKind(RowKind.UPDATE_AFTER);
263-
emitRow(row, before, out);
264-
emitRow(row, after, out);
269+
genericRowDataList.add(handleRow(row, before));
270+
genericRowDataList.add(handleRow(row, after));
265271
}
266272
} else if (OP_DELETE.equals(type)) {
267273
// "data" field is an array of row, contains deleted rows
268274
ArrayData data = row.getArray(0);
269275
for (int i = 0; i < data.size(); i++) {
270276
GenericRowData insert = (GenericRowData) data.getRow(i, fieldCount);
271277
insert.setRowKind(RowKind.DELETE);
272-
emitRow(row, insert, out);
278+
genericRowDataList.add(handleRow(row, insert));
273279
}
274280
} else if (OP_CREATE.equals(type)) {
275281
// "data" field is null and "type" is "CREATE" which means
@@ -290,14 +296,15 @@ public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws
290296
format("Corrupt Canal JSON message '%s'.", new String(message)), t);
291297
}
292298
}
299+
for (GenericRowData genericRowData : genericRowDataList) {
300+
out.collect(genericRowData);
301+
}
293302
}
294303

295-
private void emitRow(
296-
GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
304+
private GenericRowData handleRow(GenericRowData rootRow, GenericRowData physicalRow) {
297305
// shortcut in case no output projection is required
298306
if (!hasMetadata) {
299-
out.collect(physicalRow);
300-
return;
307+
return physicalRow;
301308
}
302309
final int physicalArity = physicalRow.getArity();
303310
final int metadataArity = metadataConverters.length;
@@ -310,7 +317,7 @@ private void emitRow(
310317
producedRow.setField(
311318
physicalArity + metadataPos, metadataConverters[metadataPos].convert(rootRow));
312319
}
313-
out.collect(producedRow);
320+
return producedRow;
314321
}
315322

316323
@Override

flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import java.io.IOException;
3737
import java.io.Serializable;
38+
import java.util.ArrayList;
3839
import java.util.List;
3940
import java.util.Objects;
4041
import java.util.stream.Collectors;
@@ -88,6 +89,9 @@ public final class DebeziumJsonDeserializationSchema implements DeserializationS
8889
/** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
8990
private final boolean ignoreParseErrors;
9091

92+
/** List of data to be processed. */
93+
private transient List<GenericRowData> genericRowDataList;
94+
9195
public DebeziumJsonDeserializationSchema(
9296
DataType physicalDataType,
9397
List<ReadableMetadata> requestedMetadata,
@@ -117,6 +121,7 @@ public DebeziumJsonDeserializationSchema(
117121

118122
@Override
119123
public void open(InitializationContext context) throws Exception {
124+
genericRowDataList = new ArrayList<>();
120125
jsonDeserializer.open(context);
121126
}
122127

@@ -132,6 +137,7 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
132137
// skip tombstone messages
133138
return;
134139
}
140+
genericRowDataList.clear();
135141
try {
136142
GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message);
137143
GenericRowData payload;
@@ -146,23 +152,23 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
146152
String op = payload.getField(2).toString();
147153
if (OP_CREATE.equals(op) || OP_READ.equals(op)) {
148154
after.setRowKind(RowKind.INSERT);
149-
emitRow(row, after, out);
155+
genericRowDataList.add(handleRow(row, after));
150156
} else if (OP_UPDATE.equals(op)) {
151157
if (before == null) {
152158
throw new IllegalStateException(
153159
String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE"));
154160
}
155161
before.setRowKind(RowKind.UPDATE_BEFORE);
156162
after.setRowKind(RowKind.UPDATE_AFTER);
157-
emitRow(row, before, out);
158-
emitRow(row, after, out);
163+
genericRowDataList.add(handleRow(row, before));
164+
genericRowDataList.add(handleRow(row, after));
159165
} else if (OP_DELETE.equals(op)) {
160166
if (before == null) {
161167
throw new IllegalStateException(
162168
String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE"));
163169
}
164170
before.setRowKind(RowKind.DELETE);
165-
emitRow(row, before, out);
171+
genericRowDataList.add(handleRow(row, before));
166172
} else {
167173
if (!ignoreParseErrors) {
168174
throw new IOException(
@@ -178,14 +184,15 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
178184
format("Corrupt Debezium JSON message '%s'.", new String(message)), t);
179185
}
180186
}
187+
for (GenericRowData genericRowData : genericRowDataList) {
188+
out.collect(genericRowData);
189+
}
181190
}
182191

183-
private void emitRow(
184-
GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
192+
private GenericRowData handleRow(GenericRowData rootRow, GenericRowData physicalRow) {
185193
// shortcut in case no output projection is required
186194
if (!hasMetadata) {
187-
out.collect(physicalRow);
188-
return;
195+
return physicalRow;
189196
}
190197

191198
final int physicalArity = physicalRow.getArity();
@@ -202,8 +209,7 @@ private void emitRow(
202209
producedRow.setField(
203210
physicalArity + metadataPos, metadataConverters[metadataPos].convert(rootRow));
204211
}
205-
206-
out.collect(producedRow);
212+
return producedRow;
207213
}
208214

209215
@Override

flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
import java.io.IOException;
3838
import java.io.Serializable;
39+
import java.util.ArrayList;
3940
import java.util.List;
4041
import java.util.Objects;
4142
import java.util.stream.Collectors;
@@ -82,6 +83,9 @@ public class MaxwellJsonDeserializationSchema implements DeserializationSchema<R
8283
/** Number of physical fields. */
8384
private final int fieldCount;
8485

86+
/** List of data to be processed. */
87+
private transient List<GenericRowData> genericRowDataList;
88+
8589
public MaxwellJsonDeserializationSchema(
8690
DataType physicalDataType,
8791
List<ReadableMetadata> requestedMetadata,
@@ -111,6 +115,7 @@ public MaxwellJsonDeserializationSchema(
111115

112116
@Override
113117
public void open(InitializationContext context) throws Exception {
118+
genericRowDataList = new ArrayList<>();
114119
jsonDeserializer.open(context);
115120
}
116121

@@ -125,6 +130,7 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
125130
if (message == null || message.length == 0) {
126131
return;
127132
}
133+
genericRowDataList.clear();
128134
try {
129135
final JsonNode root = jsonDeserializer.deserializeToJsonNode(message);
130136
final GenericRowData row = (GenericRowData) jsonDeserializer.convertToRowData(root);
@@ -133,7 +139,7 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
133139
// "data" field is a row, contains inserted rows
134140
GenericRowData insert = (GenericRowData) row.getRow(0, fieldCount);
135141
insert.setRowKind(RowKind.INSERT);
136-
emitRow(row, insert, out);
142+
genericRowDataList.add(handleRow(row, insert));
137143
} else if (OP_UPDATE.equals(type)) {
138144
// "data" field is a row, contains new rows
139145
// "old" field is a row, contains old values
@@ -151,13 +157,13 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
151157
}
152158
before.setRowKind(RowKind.UPDATE_BEFORE);
153159
after.setRowKind(RowKind.UPDATE_AFTER);
154-
emitRow(row, before, out);
155-
emitRow(row, after, out);
160+
genericRowDataList.add(handleRow(row, before));
161+
genericRowDataList.add(handleRow(row, after));
156162
} else if (OP_DELETE.equals(type)) {
157163
// "data" field is a row, contains deleted rows
158164
GenericRowData delete = (GenericRowData) row.getRow(0, fieldCount);
159165
delete.setRowKind(RowKind.DELETE);
160-
emitRow(row, delete, out);
166+
genericRowDataList.add(handleRow(row, delete));
161167
} else {
162168
if (!ignoreParseErrors) {
163169
throw new IOException(
@@ -173,14 +179,15 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
173179
format("Corrupt Maxwell JSON message '%s'.", new String(message)), t);
174180
}
175181
}
182+
for (GenericRowData genericRowData : genericRowDataList) {
183+
out.collect(genericRowData);
184+
}
176185
}
177186

178-
private void emitRow(
179-
GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
187+
private GenericRowData handleRow(GenericRowData rootRow, GenericRowData physicalRow) {
180188
// shortcut in case no output projection is required
181189
if (!hasMetadata) {
182-
out.collect(physicalRow);
183-
return;
190+
return physicalRow;
184191
}
185192
final int metadataArity = metadataConverters.length;
186193
final GenericRowData producedRow =
@@ -192,7 +199,7 @@ private void emitRow(
192199
producedRow.setField(
193200
fieldCount + metadataPos, metadataConverters[metadataPos].convert(rootRow));
194201
}
195-
out.collect(producedRow);
202+
return producedRow;
196203
}
197204

198205
@Override

flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import java.io.IOException;
3737
import java.io.Serializable;
38+
import java.util.ArrayList;
3839
import java.util.List;
3940
import java.util.Objects;
4041
import java.util.stream.Collectors;
@@ -81,6 +82,9 @@ public final class OggJsonDeserializationSchema implements DeserializationSchema
8182
/** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
8283
private final boolean ignoreParseErrors;
8384

85+
/** List of data to be processed. */
86+
private transient List<GenericRowData> genericRowDataList;
87+
8488
public OggJsonDeserializationSchema(
8589
DataType physicalDataType,
8690
List<ReadableMetadata> requestedMetadata,
@@ -145,6 +149,7 @@ private static int findFieldPos(ReadableMetadata metadata, RowType jsonRowType)
145149

146150
@Override
147151
public void open(InitializationContext context) throws Exception {
152+
genericRowDataList = new ArrayList<>();
148153
jsonDeserializer.open(context);
149154
}
150155

@@ -160,6 +165,7 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
160165
// skip tombstone messages
161166
return;
162167
}
168+
genericRowDataList.clear();
163169
try {
164170
GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message);
165171

@@ -168,23 +174,23 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
168174
String op = row.getField(2).toString();
169175
if (OP_CREATE.equals(op)) {
170176
after.setRowKind(RowKind.INSERT);
171-
emitRow(row, after, out);
177+
genericRowDataList.add(emitRow(row, after));
172178
} else if (OP_UPDATE.equals(op)) {
173179
if (before == null) {
174180
throw new IllegalStateException(
175181
String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE"));
176182
}
177183
before.setRowKind(RowKind.UPDATE_BEFORE);
178184
after.setRowKind(RowKind.UPDATE_AFTER);
179-
emitRow(row, before, out);
180-
emitRow(row, after, out);
185+
genericRowDataList.add(emitRow(row, before));
186+
genericRowDataList.add(emitRow(row, after));
181187
} else if (OP_DELETE.equals(op)) {
182188
if (before == null) {
183189
throw new IllegalStateException(
184190
String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE"));
185191
}
186192
before.setRowKind(RowKind.DELETE);
187-
emitRow(row, before, out);
193+
genericRowDataList.add(emitRow(row, before));
188194
} else {
189195
if (!ignoreParseErrors) {
190196
throw new IOException(
@@ -200,16 +206,17 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
200206
format("Corrupt Ogg JSON message '%s'.", new String(message)), t);
201207
}
202208
}
209+
for (GenericRowData genericRowData : genericRowDataList) {
210+
out.collect(genericRowData);
211+
}
203212
}
204213

205214
// --------------------------------------------------------------------------------------------
206215

207-
private void emitRow(
208-
GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
216+
private GenericRowData emitRow(GenericRowData rootRow, GenericRowData physicalRow) {
209217
// shortcut in case no output projection is required
210218
if (!hasMetadata) {
211-
out.collect(physicalRow);
212-
return;
219+
return physicalRow;
213220
}
214221

215222
final int physicalArity = physicalRow.getArity();
@@ -226,8 +233,7 @@ private void emitRow(
226233
producedRow.setField(
227234
physicalArity + metadataPos, metadataConverters[metadataPos].convert(rootRow));
228235
}
229-
230-
out.collect(producedRow);
236+
return producedRow;
231237
}
232238

233239
@Override

0 commit comments

Comments
 (0)