Skip to content

Commit 0f2e196

Browse files
authored
Replace getElementType with getValueType for MAP in AvroGenericRecord… (apache#31653)
1 parent 18af8c8 commit 0f2e196

File tree

2 files changed

+66
-6
lines changed

2 files changed

+66
-6
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -247,17 +247,15 @@ private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field field)
247247
break;
248248
case MAP:
249249
Schema keyType = Schema.create(Schema.Type.STRING);
250-
Schema valueType = TypeWithNullability.create(schema.getElementType()).getType();
250+
Schema valueType = Schema.create(schema.getValueType().getType());
251251
if (valueType == null) {
252252
throw new RuntimeException("Unexpected null element type!");
253253
}
254254
TableFieldSchema keyFieldSchema =
255-
fieldDescriptorFromAvroField(
256-
new Schema.Field("key", keyType, "key of the map entry", Schema.Field.NULL_VALUE));
255+
fieldDescriptorFromAvroField(new Schema.Field("key", keyType, "key of the map entry"));
257256
TableFieldSchema valueFieldSchema =
258257
fieldDescriptorFromAvroField(
259-
new Schema.Field(
260-
"value", valueType, "value of the map entry", Schema.Field.NULL_VALUE));
258+
new Schema.Field("value", valueType, "value of the map entry"));
261259
builder =
262260
builder
263261
.setType(TableFieldSchema.Type.STRUCT)
@@ -346,7 +344,7 @@ private static Object toProtoValue(
346344
return toProtoValue(fieldDescriptor, type.getType(), value);
347345
case MAP:
348346
Map<CharSequence, Object> map = (Map<CharSequence, Object>) value;
349-
Schema valueType = TypeWithNullability.create(avroSchema.getElementType()).getType();
347+
Schema valueType = Schema.create(avroSchema.getValueType().getType());
350348
if (valueType == null) {
351349
throw new RuntimeException("Unexpected null element type!");
352350
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java

+62
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import java.security.MessageDigest;
3333
import java.security.NoSuchAlgorithmException;
3434
import java.util.EnumSet;
35+
import java.util.HashMap;
36+
import java.util.List;
3537
import java.util.Map;
3638
import java.util.UUID;
3739
import java.util.stream.Collectors;
@@ -266,6 +268,25 @@ enum TestEnum {
266268
.noDefault()
267269
.endRecord();
268270

271+
private static final Schema SCHEMA_WITH_MAP;
272+
273+
static {
274+
SCHEMA_WITH_MAP =
275+
SchemaBuilder.record("TestMap")
276+
.fields()
277+
.name("nested")
278+
.type()
279+
.optional()
280+
.type(BASE_SCHEMA)
281+
.name("aMap")
282+
.type()
283+
.map()
284+
.values()
285+
.stringType()
286+
.mapDefault(ImmutableMap.<String, Object>builder().put("key1", "value1").build())
287+
.endRecord();
288+
}
289+
269290
private static GenericRecord baseRecord;
270291
private static GenericRecord logicalTypesRecord;
271292
private static Map<String, Object> baseProtoExpectedFields;
@@ -505,4 +526,45 @@ public void testMessageFromGenericRecordLogicalTypes() throws Exception {
505526
assertEquals(7, msg.getAllFields().size());
506527
assertBaseRecord(msg, logicalTypesProtoExpectedFields);
507528
}
529+
530+
@Test
531+
public void testMessageFromGenericRecordWithMap() throws Exception {
532+
// Create a GenericRecord with a map field
533+
Map<String, String> mapData = new HashMap<>();
534+
mapData.put("key1", "value1");
535+
mapData.put("key2", "value2");
536+
GenericRecord recordWithMap =
537+
new GenericRecordBuilder(SCHEMA_WITH_MAP)
538+
.set("nested", baseRecord)
539+
.set("aMap", mapData)
540+
.build();
541+
542+
Descriptors.Descriptor descriptor =
543+
TableRowToStorageApiProto.getDescriptorFromTableSchema(
544+
AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(SCHEMA_WITH_MAP),
545+
true,
546+
false);
547+
DynamicMessage msg =
548+
AvroGenericRecordToStorageApiProto.messageFromGenericRecord(
549+
descriptor, recordWithMap, null, -1);
550+
551+
assertEquals(2, msg.getAllFields().size());
552+
553+
Map<String, Descriptors.FieldDescriptor> fieldDescriptors =
554+
descriptor.getFields().stream()
555+
.collect(Collectors.toMap(Descriptors.FieldDescriptor::getName, Functions.identity()));
556+
DynamicMessage nestedMsg = (DynamicMessage) msg.getField(fieldDescriptors.get("nested"));
557+
assertBaseRecord(nestedMsg, baseProtoExpectedFields);
558+
559+
// Assert the map field
560+
List<DynamicMessage> list = (List<DynamicMessage>) msg.getField(fieldDescriptors.get("amap"));
561+
// Convert the list of DynamicMessages back to a map
562+
Map<String, String> actualMap = new HashMap<>();
563+
for (DynamicMessage entry : list) {
564+
String key = (String) entry.getField(entry.getDescriptorForType().findFieldByName("key"));
565+
String value = (String) entry.getField(entry.getDescriptorForType().findFieldByName("value"));
566+
actualMap.put(key, value);
567+
}
568+
assertEquals(mapData, actualMap);
569+
}
508570
}

0 commit comments

Comments
 (0)