Skip to content

Commit e748b61

Browse files
authored
Add support for Connect Schema Map type (blueapron#49)
* Add support for converting fields with Protobuf Map type into Connect Schema Map type * Document the new protoMapConversionType configuration * Log config params
1 parent b166dc8 commit e748b61

File tree

6 files changed

+1182
-8
lines changed

6 files changed

+1182
-8
lines changed

README.md

+7-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ key.converter=com.blueapron.connect.protobuf.ProtobufConverter
8787
|string |STRING |
8888
|bytes |BYTES |
8989
|repeated |ARRAY |
90-
|map |ARRAY of STRUCT |
90+
|map |ARRAY of STRUCT* |
9191

9292

9393
|Protobuf Type|Connect Logical Type|
@@ -96,6 +96,12 @@ key.converter=com.blueapron.connect.protobuf.ProtobufConverter
9696
|Date |Date |
9797
|uint64 |Decimal |
9898

99+
\* Protobuf map type fields are converted by default to an Array of Struct,
100+
but the converter supports also converting these fields into Connect Schema Map type.
101+
To enable this conversion mode, set the configuration field `protoMapConversionType` to `map`:
102+
```
103+
value.converter.protoMapConversionType=map
104+
```
99105

100106
## Handling field renames and deletes
101107
Renaming and removing fields is supported by the proto IDL, but certain output formats (for example, BigQuery) do not

src/main/java/com/blueapron/connect/protobuf/ProtobufConverter.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public class ProtobufConverter implements Converter {
1717
private static final Logger log = LoggerFactory.getLogger(ProtobufConverter.class);
1818
private static final String PROTO_CLASS_NAME_CONFIG = "protoClassName";
1919
private static final String LEGACY_NAME_CONFIG = "legacyName";
20+
private static final String PROTO_MAP_CONVERSION_TYPE = "protoMapConversionType";
2021
private ProtobufData protobufData;
2122

2223
private boolean isInvalidConfiguration(Object proto, boolean isKey) {
@@ -27,6 +28,7 @@ private boolean isInvalidConfiguration(Object proto, boolean isKey) {
2728
public void configure(Map<String, ?> configs, boolean isKey) {
2829
Object legacyName = configs.get(LEGACY_NAME_CONFIG);
2930
String legacyNameString = legacyName == null ? "legacy_name" : legacyName.toString();
31+
boolean useConnectSchemaMap = "map".equals(configs.get(PROTO_MAP_CONVERSION_TYPE));
3032

3133
Object protoClassName = configs.get(PROTO_CLASS_NAME_CONFIG);
3234
if (isInvalidConfiguration(protoClassName, isKey)) {
@@ -40,7 +42,8 @@ public void configure(Map<String, ?> configs, boolean isKey) {
4042

4143
String protoClassNameString = protoClassName.toString();
4244
try {
43-
protobufData = new ProtobufData(Class.forName(protoClassNameString).asSubclass(com.google.protobuf.GeneratedMessageV3.class), legacyNameString);
45+
log.info("Initializing ProtobufData with args: [protoClassName={}, legacyName={}, useConnectSchemaMap={}]", protoClassNameString, legacyNameString, useConnectSchemaMap);
46+
protobufData = new ProtobufData(Class.forName(protoClassNameString).asSubclass(com.google.protobuf.GeneratedMessageV3.class), legacyNameString, useConnectSchemaMap);
4447
} catch (ClassNotFoundException e) {
4548
throw new ConnectException("Proto class " + protoClassNameString + " not found in the classpath");
4649
} catch (ClassCastException e) {

src/main/java/com/blueapron/connect/protobuf/ProtobufData.java

+30-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.google.protobuf.Descriptors.OneofDescriptor;
88
import com.google.protobuf.GeneratedMessageV3;
99
import com.google.protobuf.InvalidProtocolBufferException;
10+
import com.google.protobuf.MapEntry;
1011
import com.google.protobuf.Message;
1112
import org.apache.kafka.connect.data.Date;
1213
import org.apache.kafka.connect.data.Decimal;
@@ -28,6 +29,7 @@
2829
import java.util.HashMap;
2930
import java.util.List;
3031
import java.util.Map;
32+
import java.util.stream.Collectors;
3133
import com.google.protobuf.util.Timestamps;
3234

3335
import static com.google.protobuf.Descriptors.FieldDescriptor.Type.BOOL;
@@ -47,6 +49,7 @@ class ProtobufData {
4749
private final Method newBuilder;
4850
private final Schema schema;
4951
private final String legacyName;
52+
private final boolean useConnectSchemaMap;
5053
public static final Descriptors.FieldDescriptor.Type[] PROTO_TYPES_WITH_DEFAULTS = new Descriptors.FieldDescriptor.Type[] { INT32, INT64, SINT32, SINT64, FLOAT, DOUBLE, BOOL, STRING, BYTES, ENUM };
5154
private HashMap<String, String> connectProtoNameMap = new HashMap<String, String>();
5255

@@ -87,7 +90,12 @@ private String getProtoFieldName(String descriptorForTypeName, String connectFie
8790
}
8891

8992
ProtobufData(Class<? extends com.google.protobuf.GeneratedMessageV3> clazz, String legacyName) {
93+
this(clazz, legacyName, false);
94+
}
95+
96+
ProtobufData(Class<? extends com.google.protobuf.GeneratedMessageV3> clazz, String legacyName, boolean useConnectSchemaMap ) {
9097
this.legacyName = legacyName;
98+
this.useConnectSchemaMap = useConnectSchemaMap;
9199

92100
try {
93101
this.newBuilder = clazz.getDeclaredMethod("newBuilder");
@@ -194,6 +202,14 @@ private Schema toConnectSchema(Descriptors.FieldDescriptor descriptor) {
194202
builder = Date.builder();
195203
break;
196204
}
205+
206+
if (shouldConvertToConnectSchemaMap(descriptor)) {
207+
FieldDescriptor keyFieldDescriptor = descriptor.getMessageType().findFieldByName("key");
208+
FieldDescriptor valueFieldDescriptor = descriptor.getMessageType().findFieldByName("value");
209+
builder = SchemaBuilder.map(toConnectSchema(keyFieldDescriptor), toConnectSchema(valueFieldDescriptor));
210+
break;
211+
}
212+
197213
String jsonName = descriptor.getJsonName();
198214
builder = SchemaBuilder.struct().name(jsonName.substring(0, 1).toUpperCase() + jsonName.substring(1));
199215
for (Descriptors.FieldDescriptor fieldDescriptor : descriptor.getMessageType().getFields()) {
@@ -210,7 +226,7 @@ private Schema toConnectSchema(Descriptors.FieldDescriptor descriptor) {
210226
builder.optional();
211227
Schema schema = builder.build();
212228

213-
if (descriptor.isRepeated()) {
229+
if (descriptor.isRepeated() && !shouldConvertToConnectSchemaMap(descriptor)) {
214230
final SchemaBuilder arrayBuilder = SchemaBuilder.array(schema);
215231
arrayBuilder.optional();
216232
schema = arrayBuilder.build();
@@ -219,6 +235,10 @@ private Schema toConnectSchema(Descriptors.FieldDescriptor descriptor) {
219235
return schema;
220236
}
221237

238+
private boolean shouldConvertToConnectSchemaMap(FieldDescriptor descriptor) {
239+
return useConnectSchemaMap && descriptor.isMapField();
240+
}
241+
222242
private boolean isProtobufTimestamp(Schema schema) {
223243
return Timestamp.SCHEMA.name().equals(schema.name());
224244
}
@@ -340,6 +360,15 @@ Object toConnectData(Schema schema, Object value) {
340360
break;
341361
}
342362

363+
case MAP: {
364+
Collection<MapEntry> original = (Collection<MapEntry>) value;
365+
converted = original.stream().collect(Collectors.toMap(
366+
entry -> toConnectData(schema.keySchema(), entry.getKey()),
367+
entry -> toConnectData(schema.valueSchema(), entry.getValue())
368+
));
369+
break;
370+
}
371+
343372
case STRUCT: {
344373
final Message message = (Message) value; // Validate type
345374

0 commit comments

Comments
 (0)