Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Sch
fieldDescriptorFromAvroField(
new Schema.Field(field.name(), elementType, field.doc(), field.defaultVal()));
builder = builder.setType(elementFieldSchema.getType());
if (elementFieldSchema.hasTimestampPrecision()) {
builder.setTimestampPrecision(elementFieldSchema.getTimestampPrecision());
}
builder.addAllFields(elementFieldSchema.getFieldsList());
builder = builder.setMode(TableFieldSchema.Mode.REPEATED);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ private static TableFieldSchema fieldDescriptorFromBeamField(Field field) {
TableFieldSchema elementFieldSchema =
fieldDescriptorFromBeamField(Field.of(field.getName(), elementType));
builder = builder.setType(elementFieldSchema.getType());
if (elementFieldSchema.hasTimestampPrecision()) {
builder = builder.setTimestampPrecision(elementFieldSchema.getTimestampPrecision());
}
builder.addAllFields(elementFieldSchema.getFieldsList());
builder = builder.setMode(TableFieldSchema.Mode.REPEATED);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,15 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy
// boolean
return SchemaBuilder.builder().booleanType();
case "TIMESTAMP":
// in Extract Jobs, it always uses the Avro logical type
// we may have to change this if we move to EXPORT DATA
return LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType());
if (schema.getTimestampPrecision() == null || schema.getTimestampPrecision() == 6) {
// in Extract Jobs, it always uses the Avro logical type
// we may have to change this if we move to EXPORT DATA
return LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType());
}
return SchemaBuilder.builder()
.longBuilder()
.prop("logicalType", TIMESTAMP_NANOS_LOGICAL_TYPE)
.endLong();
case "DATE":
if (useAvroLogicalTypes) {
return LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,23 @@ private static Schema createTimestampNanosSchema() {
.endRecord();
}

private static Schema createRepeatedTimestampNanosSchema() {
Schema longSchema = Schema.create(Schema.Type.LONG);
longSchema.addProp("logicalType", "timestamp-nanos");

Schema arraySchema = Schema.createArray(longSchema);

return SchemaBuilder.record("RepeatedTimestampNanosRecord")
.fields()
.name("timestampNanosArray")
.type(arraySchema)
.noDefault()
.endRecord();
}

private static final Schema TIMESTAMP_NANOS_SCHEMA = createTimestampNanosSchema();
private static final Schema REPEATED_TIMESTAMP_NANOS_SCHEMA =
createRepeatedTimestampNanosSchema();

private static GenericRecord baseRecord;
private static GenericRecord rawLogicalTypesRecord;
Expand Down Expand Up @@ -885,4 +901,22 @@ public void testProtoTableSchemaFromAvroSchemaTimestampNanos() {
assertTrue(field.hasTimestampPrecision());
assertEquals(12L, field.getTimestampPrecision().getValue());
}

@Test
public void testProtoTableSchemaFromAvroSchemaRepeatedTimestampNanos() {
com.google.cloud.bigquery.storage.v1.TableSchema protoSchema =
AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(
REPEATED_TIMESTAMP_NANOS_SCHEMA);

assertEquals(1, protoSchema.getFieldsCount());
com.google.cloud.bigquery.storage.v1.TableFieldSchema field = protoSchema.getFields(0);

assertEquals("timestampnanosarray", field.getName());
assertEquals(
com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.TIMESTAMP, field.getType());
assertEquals(
com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode.REPEATED, field.getMode());

assertEquals(12L, field.getTimestampPrecision().getValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ public class BeamRowToStorageApiProtoTest {
Schema.builder()
.addField("timestampNanos", FieldType.logicalType(Timestamp.NANOS).withNullable(true))
.build();
private static final Schema TIMESTAMP_NANOS_ARRAY_SCHEMA =
Schema.builder()
.addField("timestampNanosArray", FieldType.array(FieldType.logicalType(Timestamp.NANOS)))
.build();
private static final EnumerationType TEST_ENUM =
EnumerationType.create("ONE", "TWO", "RED", "BLUE");
private static final Schema BASE_SCHEMA =
Expand Down Expand Up @@ -614,6 +618,19 @@ public void testTimestampNanosSchema() {
assertEquals(12L, field.getTimestampPrecision().getValue());
}

@Test
public void testTimestampNanosArraySchema() {
com.google.cloud.bigquery.storage.v1.TableSchema protoSchema =
BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(TIMESTAMP_NANOS_ARRAY_SCHEMA);

assertEquals(1, protoSchema.getFieldsCount());
TableFieldSchema field = protoSchema.getFields(0);
assertEquals(TableFieldSchema.Type.TIMESTAMP, field.getType());
assertEquals(
com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode.REPEATED, field.getMode());
assertEquals(12L, field.getTimestampPrecision().getValue());
}

@Test
public void testTimestampNanosDescriptor() throws Exception {
DescriptorProto descriptor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ private static org.apache.avro.Schema createTimestampNanosAvroSchema() {
.name("ts_nanos")
.type(longSchema)
.noDefault()
.name("ts_picos")
.type()
.stringType()
.noDefault()
.endRecord();
}

Expand All @@ -421,12 +425,12 @@ private static org.apache.avro.Schema createTimestampNanosAvroSchema() {
public void testWriteGenericRecordTimestampNanos() throws Exception {
String tableSpec =
String.format("%s:%s.%s", project, DATASET_ID, "generic_record_ts_nanos_test");

// Create GenericRecord with timestamp-nanos value
GenericRecord record =
new GenericRecordBuilder(TIMESTAMP_NANOS_AVRO_SCHEMA)
.set(
"ts_nanos", TEST_INSTANT.getEpochSecond() * 1_000_000_000L + TEST_INSTANT.getNano())
.set("ts_picos", "2024-01-15T10:30:45.123456789123Z")
.build();

// Write using Storage Write API with Avro format
Expand All @@ -437,7 +441,6 @@ public void testWriteGenericRecordTimestampNanos() throws Exception {
"WriteGenericRecords",
BigQueryIO.writeGenericRecords()
.to(tableSpec)
.withAvroSchemaFactory(tableSchema -> TIMESTAMP_NANOS_AVRO_SCHEMA)
.withSchema(BigQueryUtils.fromGenericAvroSchema(TIMESTAMP_NANOS_AVRO_SCHEMA, true))
.useAvroLogicalTypes()
.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
Expand All @@ -457,12 +460,18 @@ public void testWriteGenericRecordTimestampNanos() throws Exception {
.from(tableSpec));

PAssert.that(result)
.containsInAnyOrder(new TableRow().set("ts_nanos", "2024-01-15T10:30:45.123456789000Z"));
.containsInAnyOrder(
new TableRow()
.set("ts_nanos", "2024-01-15T10:30:45.123456789000Z")
.set("ts_picos", "2024-01-15T10:30:45.123456789123Z"));
readPipeline.run().waitUntilFinish();
}

private static final Schema BEAM_TIMESTAMP_NANOS_SCHEMA =
Schema.builder().addField("ts_nanos", Schema.FieldType.logicalType(Timestamp.NANOS)).build();
Schema.builder()
.addField("ts_nanos", Schema.FieldType.logicalType(Timestamp.NANOS))
.addField("ts_picos", Schema.FieldType.STRING)
.build();

@Test
public void testWriteBeamRowTimestampNanos() throws Exception {
Expand All @@ -472,6 +481,7 @@ public void testWriteBeamRowTimestampNanos() throws Exception {
Row row =
Row.withSchema(BEAM_TIMESTAMP_NANOS_SCHEMA)
.withFieldValue("ts_nanos", TEST_INSTANT)
.withFieldValue("ts_picos", "2024-01-15T10:30:45.123456789123Z")
.build();

// Write using Storage Write API with Beam Schema
Expand Down Expand Up @@ -500,7 +510,10 @@ public void testWriteBeamRowTimestampNanos() throws Exception {
.from(tableSpec));

PAssert.that(result)
.containsInAnyOrder(new TableRow().set("ts_nanos", "2024-01-15T10:30:45.123456789000Z"));
.containsInAnyOrder(
new TableRow()
.set("ts_nanos", "2024-01-15T10:30:45.123456789000Z")
.set("ts_picos", "2024-01-15T10:30:45.123456789123Z"));
readPipeline.run().waitUntilFinish();
}

Expand Down
Loading