Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_SCHEMA;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NONCONTIGUOUS_ONEOF_PROTO;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NONCONTIGUOUS_ONEOF_ROW;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULLABLE_PRIMITIVE_PROTO;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULLABLE_PRIMITIVE_ROW;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULLABLE_PRIMITIVE_SCHEMA;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_PROTO;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_ROW;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_REPEATED_PROTO;
Expand Down Expand Up @@ -75,6 +78,7 @@
import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.MapPrimitive;
import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Nested;
import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.NonContiguousOneOf;
import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.NullablePrimitive;
import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OneOf;
import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OuterOneOf;
import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive;
Expand Down Expand Up @@ -116,6 +120,26 @@ public void testPrimitiveRowToProto() {
assertEquals(PRIMITIVE_PROTO, fromRow.apply(PRIMITIVE_ROW));
}

@Test
public void testNullablePrimitiveSchema() {
Schema schema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(NullablePrimitive.class));
assertEquals(NULLABLE_PRIMITIVE_SCHEMA, schema);
}

@Test
public void testNullablePrimitiveProtoToRow() {
SerializableFunction<NullablePrimitive, Row> toRow =
new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(NullablePrimitive.class));
assertEquals(NULLABLE_PRIMITIVE_ROW, toRow.apply(NULLABLE_PRIMITIVE_PROTO));
}

@Test
public void testNullablePrimitiveRowToProto() {
SerializableFunction<Row, NullablePrimitive> fromRow =
new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(NullablePrimitive.class));
assertEquals(NULLABLE_PRIMITIVE_PROTO, fromRow.apply(NULLABLE_PRIMITIVE_ROW));
}

@Test
public void testOptionalPrimitiveSchema() {
Schema schema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(OptionalPrimitive.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.MapPrimitive;
import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Nested;
import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.NonContiguousOneOf;
import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.NullablePrimitive;
import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OneOf;
import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OuterOneOf;
import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive;
Expand Down Expand Up @@ -111,6 +112,47 @@ static Schema.Options withTypeName(String typeName) {
"proto3_schema_messages.Primitive"))
.build();

static final Schema NULLABLE_PRIMITIVE_SCHEMA =
Schema.builder()
.addField(withFieldNumber("primitive_double", FieldType.DOUBLE, 1).withNullable(true))
.addField(withFieldNumber("primitive_float", FieldType.FLOAT, 2).withNullable(true))
.addField(withFieldNumber("primitive_int32", FieldType.INT32, 3).withNullable(true))
.addField(withFieldNumber("primitive_int64", FieldType.INT64, 4).withNullable(true))
.addField(
withFieldNumber("primitive_uint32", FieldType.logicalType(new UInt32()), 5)
.withNullable(true))
.addField(
withFieldNumber("primitive_uint64", FieldType.logicalType(new UInt64()), 6)
.withNullable(true))
.addField(
withFieldNumber("primitive_sint32", FieldType.logicalType(new SInt32()), 7)
.withNullable(true))
.addField(
withFieldNumber("primitive_sint64", FieldType.logicalType(new SInt64()), 8)
.withNullable(true))
.addField(
withFieldNumber("primitive_fixed32", FieldType.logicalType(new Fixed32()), 9)
.withNullable(true))
.addField(
withFieldNumber("primitive_fixed64", FieldType.logicalType(new Fixed64()), 10)
.withNullable(true))
.addField(
withFieldNumber("primitive_sfixed32", FieldType.logicalType(new SFixed32()), 11)
.withNullable(true))
.addField(
withFieldNumber("primitive_sfixed64", FieldType.logicalType(new SFixed64()), 12)
.withNullable(true))
.addField(withFieldNumber("primitive_bool", FieldType.BOOLEAN, 13).withNullable(true))
.addField(withFieldNumber("primitive_string", FieldType.STRING, 14).withNullable(true))
.addField(withFieldNumber("primitive_bytes", FieldType.BYTES, 15).withNullable(true))
.setOptions(
Schema.Options.builder()
.setOption(
SCHEMA_OPTION_META_TYPE_NAME,
FieldType.STRING,
"proto3_schema_messages.NullablePrimitive"))
.build();

static final Schema OPTIONAL_PRIMITIVE_SCHEMA =
Schema.builder()
.addField(withFieldNumber("primitive_int32", FieldType.INT32, 1))
Expand Down Expand Up @@ -181,6 +223,10 @@ static Schema.Options withTypeName(String typeName) {
.setPrimitiveBytes(ByteString.copyFrom(BYTE_ARRAY))
.build();

static final NullablePrimitive NULLABLE_PRIMITIVE_PROTO = NullablePrimitive.newBuilder().build();

static final Row NULLABLE_PRIMITIVE_ROW = Row.nullRow(NULLABLE_PRIMITIVE_SCHEMA);

// A sample instance of the row.
static final Row OPTIONAL_PRIMITIVE_ROW =
Row.withSchema(OPTIONAL_PRIMITIVE_SCHEMA).addValues(32, true, "horsey", BYTE_ARRAY).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,24 @@ message Primitive {
bytes primitive_bytes = 15;
}

message NullablePrimitive {
optional double primitive_double = 1;
optional float primitive_float = 2;
optional int32 primitive_int32 = 3;
optional int64 primitive_int64 = 4;
optional uint32 primitive_uint32 = 5;
optional uint64 primitive_uint64 = 6;
optional sint32 primitive_sint32 = 7;
optional sint64 primitive_sint64 = 8;
optional fixed32 primitive_fixed32 = 9;
optional fixed64 primitive_fixed64 = 10;
optional sfixed32 primitive_sfixed32 = 11;
optional sfixed64 primitive_sfixed64 = 12;
optional bool primitive_bool = 13;
optional string primitive_string = 14;
optional bytes primitive_bytes = 15;
}

message RepeatPrimitive {
repeated double repeated_double = 1;
repeated float repeated_float = 2;
Expand Down
Loading