|
5 | 5 | import com.google.protobuf.Descriptors;
|
6 | 6 | import com.google.protobuf.GeneratedMessageV3;
|
7 | 7 | import com.google.protobuf.InvalidProtocolBufferException;
|
8 |
| -import com.google.protobuf.MapEntry; |
9 | 8 | import com.google.protobuf.Message;
|
10 | 9 | import org.apache.kafka.connect.data.Date;
|
11 | 10 | import org.apache.kafka.connect.data.Field;
|
|
26 | 25 | import java.util.Map;
|
27 | 26 | import com.google.protobuf.util.Timestamps;
|
28 | 27 |
|
| 28 | +import static com.google.protobuf.Descriptors.FieldDescriptor.Type.BOOL; |
| 29 | +import static com.google.protobuf.Descriptors.FieldDescriptor.Type.BYTES; |
| 30 | +import static com.google.protobuf.Descriptors.FieldDescriptor.Type.DOUBLE; |
| 31 | +import static com.google.protobuf.Descriptors.FieldDescriptor.Type.ENUM; |
| 32 | +import static com.google.protobuf.Descriptors.FieldDescriptor.Type.FLOAT; |
| 33 | +import static com.google.protobuf.Descriptors.FieldDescriptor.Type.INT32; |
| 34 | +import static com.google.protobuf.Descriptors.FieldDescriptor.Type.INT64; |
| 35 | +import static com.google.protobuf.Descriptors.FieldDescriptor.Type.MESSAGE; |
| 36 | +import static com.google.protobuf.Descriptors.FieldDescriptor.Type.SINT32; |
| 37 | +import static com.google.protobuf.Descriptors.FieldDescriptor.Type.SINT64; |
| 38 | +import static com.google.protobuf.Descriptors.FieldDescriptor.Type.STRING; |
| 39 | + |
| 40 | + |
29 | 41 | class ProtobufData {
|
30 |
| - private final Class<? extends com.google.protobuf.GeneratedMessageV3> clazz; |
31 | 42 | private final Method newBuilder;
|
32 | 43 | private final Schema schema;
|
33 | 44 | private final String legacyName;
|
| 45 | + public static final Descriptors.FieldDescriptor.Type[] PROTO_TYPES_WITH_DEFAULTS = new Descriptors.FieldDescriptor.Type[] { INT32, INT64, SINT32, SINT64, FLOAT, DOUBLE, BOOL, STRING, BYTES, ENUM }; |
34 | 46 | private HashMap<String, String> connectProtoNameMap = new HashMap<String, String>();
|
35 | 47 |
|
36 | 48 | private GeneratedMessageV3.Builder getBuilder() {
|
@@ -69,8 +81,13 @@ private String getProtoFieldName(String descriptorForTypeName, String connectFie
|
69 | 81 | return connectProtoNameMap.get(getProtoMapKey(descriptorForTypeName, connectFieldName));
|
70 | 82 | }
|
71 | 83 |
|
| 84 | + private boolean isUnsetOneof(Descriptors.FieldDescriptor fieldDescriptor, Object value) { |
| 85 | + return fieldDescriptor.getContainingOneof() != null && |
| 86 | + fieldDescriptor.getType() != MESSAGE && |
| 87 | + fieldDescriptor.getDefaultValue().equals(value); |
| 88 | + } |
| 89 | + |
72 | 90 | ProtobufData(Class<? extends com.google.protobuf.GeneratedMessageV3> clazz, String legacyName) {
|
73 |
| - this.clazz = clazz; |
74 | 91 | this.legacyName = legacyName;
|
75 | 92 |
|
76 | 93 | try {
|
@@ -209,10 +226,6 @@ private boolean isProtobufDate(Schema schema) {
|
209 | 226 | }
|
210 | 227 |
|
211 | 228 | Object toConnectData(Schema schema, Object value) {
|
212 |
| - if (value == null) { |
213 |
| - return null; |
214 |
| - } |
215 |
| - |
216 | 229 | try {
|
217 | 230 | if (isProtobufTimestamp(schema)) {
|
218 | 231 | com.google.protobuf.Timestamp timestamp = (com.google.protobuf.Timestamp) value;
|
@@ -311,14 +324,20 @@ Object toConnectData(Schema schema, Object value) {
|
311 | 324 |
|
312 | 325 | case STRUCT: {
|
313 | 326 | final Message message = (Message) value; // Validate type
|
| 327 | + if (message == message.getDefaultInstanceForType()) { |
| 328 | + return null; |
| 329 | + } |
| 330 | + |
314 | 331 | final Struct result = new Struct(schema.schema());
|
315 |
| - final Map<Descriptors.FieldDescriptor, Object> fieldsMap = message.getAllFields(); |
316 |
| - for (Map.Entry<Descriptors.FieldDescriptor, Object> pair : fieldsMap.entrySet()) { |
317 |
| - final Descriptors.FieldDescriptor fieldDescriptor = pair.getKey(); |
| 332 | + |
| 333 | + for (Descriptors.FieldDescriptor fieldDescriptor : message.getDescriptorForType().getFields()) { |
318 | 334 | final String fieldName = getConnectFieldName(fieldDescriptor);
|
319 | 335 | final Field field = schema.field(fieldName);
|
320 |
| - final Object obj = pair.getValue(); |
321 |
| - result.put(fieldName, toConnectData(field.schema(), obj)); |
| 336 | + |
| 337 | + Object obj = message.getField(fieldDescriptor); |
| 338 | + if (!isUnsetOneof(fieldDescriptor, obj)) { |
| 339 | + result.put(fieldName, toConnectData(field.schema(), obj)); |
| 340 | + } |
322 | 341 | }
|
323 | 342 |
|
324 | 343 | converted = result;
|
|
0 commit comments