Skip to content

Commit 0f27325

Browse files
rajuGTrajuGT
andauthored
feat and bug-fix - The previous implementation StructMessageHandler did not correctly handle the google.protobuf.XYZ types (#67)
* feat and bug-fix - The previous implementation of StructMessageHandler did not correctly handle the fields This MR provides the implementation for the below "message" types google.protobuf.Struct, google.protobuf.Value, google.protobuf.ListValue, and google.protobuf.NullValue Adds GoogleProtobufComplexMessageHandler to handle some of the complex Google Protobuf message types that are dynamic and recursive in nature. This implementation converts these message types to Protobuf's byte-array representation. While outputting the data, the byte array is converted back to the original structure using the associated field descriptor. * version bump up * fix tests * Add GoogleProtobufComplexMessageHandlerTest test cases * fix-tests: added 3 new fields in TestBookingLogMessage proto which caused the tests to fail which are fixed in this comment * comments refactor --------- Co-authored-by: rajuGT <raju.gt@gojek.com>
1 parent b1a90fb commit 0f27325

13 files changed

Lines changed: 312 additions & 26 deletions

File tree

dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public interface TypeHandler {
1717
*/
1818
boolean canHandle();
1919

20+
// ---------- Flink -> Proto ----------
2021
/**
2122
* Transform to protobuf message builder.
2223
*
@@ -34,6 +35,8 @@ public interface TypeHandler {
3435
*/
3536
Object transformFromPostProcessor(Object field);
3637

38+
39+
// ---------- Proto -> Flink ----------
3740
/**
3841
* Transform from protobuf message.
3942
*

dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactory.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
package com.gotocompany.dagger.common.serde.typehandler;
22

33
import com.google.protobuf.Descriptors;
4-
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedEnumHandler;
5-
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedMessageHandler;
6-
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedPrimitiveHandler;
7-
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedStructMessageHandler;
84
import com.gotocompany.dagger.common.serde.typehandler.complex.EnumHandler;
5+
import com.gotocompany.dagger.common.serde.typehandler.complex.GoogleProtobufComplexMessageHandler;
96
import com.gotocompany.dagger.common.serde.typehandler.complex.MapHandler;
107
import com.gotocompany.dagger.common.serde.typehandler.complex.MessageHandler;
118
import com.gotocompany.dagger.common.serde.typehandler.complex.StructMessageHandler;
129
import com.gotocompany.dagger.common.serde.typehandler.complex.TimestampHandler;
10+
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedEnumHandler;
11+
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedMessageHandler;
12+
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedPrimitiveHandler;
13+
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedStructMessageHandler;
1314
import org.apache.commons.lang3.tuple.ImmutablePair;
1415
import org.apache.commons.lang3.tuple.Pair;
1516

@@ -64,6 +65,7 @@ private static List<TypeHandler> getSpecificHandlers(Descriptors.FieldDescriptor
6465
new MapHandler(fieldDescriptor),
6566
new TimestampHandler(fieldDescriptor),
6667
new EnumHandler(fieldDescriptor),
68+
new GoogleProtobufComplexMessageHandler(fieldDescriptor),
6769
new StructMessageHandler(fieldDescriptor),
6870
new RepeatedStructMessageHandler(fieldDescriptor),
6971
new RepeatedPrimitiveHandler(fieldDescriptor),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package com.gotocompany.dagger.common.serde.typehandler.complex;
2+
3+
import com.google.protobuf.Descriptors;
4+
import com.google.protobuf.DynamicMessage;
5+
import com.gotocompany.dagger.common.core.FieldDescriptorCache;
6+
import com.gotocompany.dagger.common.serde.typehandler.TypeHandler;
7+
import org.apache.flink.api.common.typeinfo.TypeInformation;
8+
import org.apache.flink.api.common.typeinfo.Types;
9+
import org.apache.parquet.example.data.simple.SimpleGroup;
10+
11+
import java.util.Arrays;
12+
import java.util.Collections;
13+
import java.util.HashSet;
14+
import java.util.Set;
15+
16+
/**
17+
* A TypeHandler to handle some of the complex Google Protobuf message types
18+
* that are dynamic and recursive in nature.
19+
* <p>
20+
* <a href="https://github.com/protocolbuffers/protobuf/blob/main/src/google/protobuf/struct.proto">github-link</a>
21+
* <p>
22+
* Struct is primarily used to represent JSON object types.
23+
* Value can represent any primitive, a Struct, or an array type.
24+
* ListValue represents a JSON array type.
25+
* NullValue is an enum used to represent null.
26+
* <p>
27+
* This implementation converts these message types to Protobuf's byte-array
28+
* representation. While outputting the data, the byte array is converted back
29+
* to the original structure using the associated field descriptor.
30+
*/
31+
public class GoogleProtobufComplexMessageHandler implements TypeHandler {
32+
33+
private static final Set<String> RECOGNIZED_COMPLEX_TYPES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
34+
"google.protobuf.Struct",
35+
"google.protobuf.Value",
36+
"google.protobuf.ListValue",
37+
"google.protobuf.NullValue"
38+
)));
39+
40+
private final Descriptors.FieldDescriptor fieldDescriptor;
41+
42+
public GoogleProtobufComplexMessageHandler(Descriptors.FieldDescriptor fieldDescriptor) {
43+
this.fieldDescriptor = fieldDescriptor;
44+
}
45+
46+
@Override
47+
public boolean canHandle() {
48+
return fieldDescriptor.getJavaType() == Descriptors.FieldDescriptor.JavaType.MESSAGE
49+
&& RECOGNIZED_COMPLEX_TYPES.contains(fieldDescriptor.getMessageType().getFullName());
50+
}
51+
52+
@Override
53+
public Object transformFromProto(Object field) {
54+
if (field == null) {
55+
return null;
56+
}
57+
58+
if (field instanceof DynamicMessage) {
59+
DynamicMessage msg = (DynamicMessage) field;
60+
if (msg.getAllFields().isEmpty()) {
61+
return null;
62+
}
63+
return msg.toByteArray();
64+
}
65+
return null;
66+
}
67+
68+
@Override
69+
public Object transformFromProtoUsingCache(Object field, FieldDescriptorCache cache) {
70+
return transformFromProto(field);
71+
}
72+
73+
@Override
74+
public DynamicMessage.Builder transformToProtoBuilder(DynamicMessage.Builder builder, Object field) {
75+
if (!canHandle() || field == null) {
76+
return builder;
77+
}
78+
79+
try {
80+
DynamicMessage parsed = DynamicMessage.parseFrom(fieldDescriptor.getMessageType(), (byte[]) field);
81+
builder.setField(fieldDescriptor, parsed);
82+
return builder;
83+
} catch (Exception e) {
84+
throw new RuntimeException("Failed to parse protobuf bytes for field: " + fieldDescriptor.getFullName(), e);
85+
}
86+
}
87+
88+
@Override
89+
public Object transformFromPostProcessor(Object field) {
90+
return field;
91+
}
92+
93+
@Override
94+
public Object transformFromParquet(SimpleGroup simpleGroup) {
95+
return null;
96+
}
97+
98+
@Override
99+
public Object transformToJson(Object field) {
100+
return null;
101+
}
102+
103+
@Override
104+
public TypeInformation getTypeInformation() {
105+
return Types.PRIMITIVE_ARRAY(Types.BYTE);
106+
}
107+
}

dagger-common/src/test/java/com/gotocompany/dagger/common/core/FieldDescriptorCacheTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public void shouldReturnOriginalFieldIndex() {
3737
@Test
3838
public void shouldReturnOriginalFieldCount() {
3939
FieldDescriptorCache fieldDescriptorCache = new FieldDescriptorCache(TestBookingLogMessage.getDescriptor());
40-
assertEquals(49, fieldDescriptorCache.getOriginalFieldCount(TestBookingLogMessage.getDescriptor()));
40+
assertEquals(52, fieldDescriptorCache.getOriginalFieldCount(TestBookingLogMessage.getDescriptor()));
4141
}
4242

4343
@Test

dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void shouldAddExtraFieldsToRow() {
8888
Row row = protoDeserializer.deserialize(new ConsumerRecord<>("test-topic", 0, 0, null, protoBytes));
8989

9090
int size = row.getArity();
91-
assertEquals(51, size);
91+
assertEquals(54, size);
9292
assertTrue("Didn't add field at the penultimate index", (Boolean) row.getField(size - 2));
9393
assertEquals(1595548800000L, ((java.sql.Timestamp) row.getField(size - 1)).getTime());
9494
}
@@ -290,7 +290,7 @@ public void shouldAddExtraFieldsToRowWhenStencilAutoRefreshEnabled() {
290290
Row row = protoDeserializer.deserialize(new ConsumerRecord<>("test-topic", 0, 0, null, protoBytes));
291291

292292
int size = row.getArity();
293-
assertEquals(51, size);
293+
assertEquals(54, size);
294294
assertTrue("Didn't add field at the penultimate index", (Boolean) row.getField(size - 2));
295295
assertEquals(1595548800000L, ((java.sql.Timestamp) row.getField(size - 1)).getTime());
296296
}

dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoTypeTest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.gotocompany.dagger.consumer.TestBookingLogMessage;
77
import com.gotocompany.dagger.consumer.TestNestedRepeatedMessage;
88
import org.apache.flink.api.common.typeinfo.TypeInformation;
9+
import org.apache.flink.api.common.typeinfo.Types;
910
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
1011
import org.apache.flink.api.java.typeutils.RowTypeInfo;
1112
import org.apache.flink.types.Row;
@@ -130,16 +131,16 @@ public void shouldProcessArrayForStringData() {
130131
@Test
131132
public void shouldGiveAllNamesAndTypesIncludingStructFields() {
132133
ProtoType clevertapMessageProtoType = new ProtoType(TestBookingLogMessage.class.getName(), "rowtime", stencilClientOrchestrator);
133-
assertEquals(51, ((RowTypeInfo) clevertapMessageProtoType.getRowType()).getFieldNames().length);
134-
assertEquals(51, ((RowTypeInfo) clevertapMessageProtoType.getRowType()).getFieldTypes().length);
134+
assertEquals(54, ((RowTypeInfo) clevertapMessageProtoType.getRowType()).getFieldNames().length);
135+
assertEquals(54, ((RowTypeInfo) clevertapMessageProtoType.getRowType()).getFieldTypes().length);
135136
}
136137

137138
@Test
138139
public void shouldReturnRowTypeForStructFields() {
139140
ProtoType protoType = new ProtoType(TestBookingLogMessage.class.getName(), "rowtime", stencilClientOrchestrator);
140-
assertEquals(ROW(), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[35]);
141-
assertEquals(ROW(), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[36]);
142-
assertEquals(ROW(), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[37]);
141+
assertEquals(Types.PRIMITIVE_ARRAY(Types.BYTE), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[35]);
142+
assertEquals(Types.PRIMITIVE_ARRAY(Types.BYTE), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[36]);
143+
assertEquals(Types.PRIMITIVE_ARRAY(Types.BYTE), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[37]);
143144
assertEquals("profile_data", ((RowTypeInfo) protoType.getRowType()).getFieldNames()[35]);
144145
assertEquals("event_properties", ((RowTypeInfo) protoType.getRowType()).getFieldNames()[36]);
145146
assertEquals("key_values", ((RowTypeInfo) protoType.getRowType()).getFieldNames()[37]);
@@ -155,7 +156,7 @@ public void shouldGiveAllNamesAndTypesIncludingPrimitiveArrayFields() {
155156
public void shouldGiveNameAndTypeForRepeatingStructType() {
156157
ProtoType testNestedRepeatedMessage = new ProtoType(TestNestedRepeatedMessage.class.getName(), "rowtime", stencilClientOrchestrator);
157158
assertEquals("metadata", ((RowTypeInfo) testNestedRepeatedMessage.getRowType()).getFieldNames()[4]);
158-
assertEquals(OBJECT_ARRAY(ROW()), ((RowTypeInfo) testNestedRepeatedMessage.getRowType()).getFieldTypes()[4]);
159+
assertEquals(Types.PRIMITIVE_ARRAY(Types.BYTE), ((RowTypeInfo) testNestedRepeatedMessage.getRowType()).getFieldTypes()[4]);
159160
}
160161

161162
private int bookingLogFieldIndex(String propertyName) {

dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/RowFactoryTest.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,18 @@ public void shouldCreateRowForInputMap() {
4040
inputMap.put("created_at", "2016-01-18T08:55:26.16Z");
4141
Row row = RowFactory.createRow(inputMap, descriptor);
4242

43-
Row expectedRow = new Row(49);
43+
Row expectedRow = new Row(52);
4444
expectedRow.setField(5, "144614");
4545
expectedRow.setField(6, "https://www.abcd.com/1234");
4646
assertEquals(expectedRow, row);
47-
4847
}
4948

5049
@Test
5150
public void shouldReturnAEmptyRowOfSizeEqualToNoOfFieldsInDescriptorForInputMap() {
5251
Descriptors.Descriptor descriptor = TestBookingLogMessage.getDescriptor();
5352
Map<String, Object> inputMap = new HashMap<>();
5453
Row row = RowFactory.createRow(inputMap, descriptor);
55-
assertEquals(new Row(49), row);
54+
assertEquals(new Row(52), row);
5655
}
5756

5857
@Test
@@ -74,7 +73,7 @@ public void shouldCreateRowWithPassedFieldsForInputMap() {
7473
public void shouldReturnEmptyRowIfNullPassedAsMapForInputMap() {
7574
Descriptors.Descriptor descriptor = TestBookingLogMessage.getDescriptor();
7675
Row row = RowFactory.createRow(null, descriptor);
77-
assertEquals(new Row(49), row);
76+
assertEquals(new Row(52), row);
7877
}
7978

8079
@Test
@@ -83,7 +82,7 @@ public void shouldCreateRowForDynamicMessage() throws InvalidProtocolBufferExcep
8382
DynamicMessage dynamicMessage = DynamicMessage.parseFrom(TestBookingLogMessage.getDescriptor(), customerLogMessage.toByteArray());
8483
Row row = RowFactory.createRow(dynamicMessage);
8584
assertNotNull(row);
86-
assertEquals(49, row.getArity());
85+
assertEquals(52, row.getArity());
8786
}
8887

8988
@Test
@@ -135,7 +134,7 @@ public void shouldCreateRowUsingCacheForDynamicMessage() throws InvalidProtocolB
135134

136135
Row row = RowFactory.createRow(dynamicMessage, fieldDescriptorCache);
137136
assertNotNull(row);
138-
assertEquals(49, row.getArity());
137+
assertEquals(52, row.getArity());
139138
}
140139

141140
@Test

dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactoryTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@
44
import com.google.protobuf.Descriptors;
55
import com.google.protobuf.InvalidProtocolBufferException;
66
import com.gotocompany.dagger.common.serde.typehandler.complex.EnumHandler;
7+
import com.gotocompany.dagger.common.serde.typehandler.complex.GoogleProtobufComplexMessageHandler;
78
import com.gotocompany.dagger.common.serde.typehandler.complex.MapHandler;
89
import com.gotocompany.dagger.common.serde.typehandler.complex.MessageHandler;
9-
import com.gotocompany.dagger.common.serde.typehandler.complex.StructMessageHandler;
1010
import com.gotocompany.dagger.common.serde.typehandler.complex.TimestampHandler;
1111
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedEnumHandler;
1212
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedMessageHandler;
1313
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedPrimitiveHandler;
14-
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedStructMessageHandler;
1514
import com.gotocompany.dagger.consumer.TestBookingLogMessage;
1615
import com.gotocompany.dagger.consumer.TestFeedbackLogMessage;
1716
import com.gotocompany.dagger.consumer.TestGrpcResponse;
@@ -107,14 +106,14 @@ public void shouldReturnRepeatedEnumHandlerIfRepeatedEnumFieldDescriptorPassed()
107106
public void shouldReturnRepeatedStructHandlerIfRepeatedStructFieldDescriptorPassed() {
108107
Descriptors.FieldDescriptor repeatedStructFieldDescriptor = TestNestedRepeatedMessage.getDescriptor().findFieldByName("metadata");
109108
TypeHandler typeHandler = TypeHandlerFactory.getTypeHandler(repeatedStructFieldDescriptor);
110-
assertEquals(RepeatedStructMessageHandler.class, typeHandler.getClass());
109+
assertEquals(GoogleProtobufComplexMessageHandler.class, typeHandler.getClass());
111110
}
112111

113112
@Test
114113
public void shouldReturnStructHandlerIfStructFieldDescriptorPassed() {
115114
Descriptors.FieldDescriptor structFieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("profile_data");
116115
TypeHandler typeHandler = TypeHandlerFactory.getTypeHandler(structFieldDescriptor);
117-
assertEquals(StructMessageHandler.class, typeHandler.getClass());
116+
assertEquals(GoogleProtobufComplexMessageHandler.class, typeHandler.getClass());
118117
}
119118

120119
@Test

0 commit comments

Comments
 (0)