Skip to content

Commit 5977686

Browse files
claudevdmClaude
andauthored
Cherry pick #37257 and #37294 into release-2.71 (#37307)
* Map TIMSETAMP(12) BQ type -> timestamp-nanos Avro type in default schemafactory (#37257) * Map TIMSETAMP(12) BQ type -> timestamp-nanos Avro type in default schemafactory * Use default schemafactory in test. --------- Co-authored-by: Claude <[email protected]> * Support picosecond tiemstamps when writing GenericRecord and Beam Rows. (#37294) Co-authored-by: Claude <[email protected]> --------- Co-authored-by: Claude <[email protected]>
1 parent 0c4d81b commit 5977686

File tree

6 files changed

+84
-8
lines changed

6 files changed

+84
-8
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,9 @@ private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Sch
348348
fieldDescriptorFromAvroField(
349349
new Schema.Field(field.name(), elementType, field.doc(), field.defaultVal()));
350350
builder = builder.setType(elementFieldSchema.getType());
351+
if (elementFieldSchema.hasTimestampPrecision()) {
352+
builder.setTimestampPrecision(elementFieldSchema.getTimestampPrecision());
353+
}
351354
builder.addAllFields(elementFieldSchema.getFieldsList());
352355
builder = builder.setMode(TableFieldSchema.Mode.REPEATED);
353356
break;

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,9 @@ private static TableFieldSchema fieldDescriptorFromBeamField(Field field) {
237237
TableFieldSchema elementFieldSchema =
238238
fieldDescriptorFromBeamField(Field.of(field.getName(), elementType));
239239
builder = builder.setType(elementFieldSchema.getType());
240+
if (elementFieldSchema.hasTimestampPrecision()) {
241+
builder = builder.setTimestampPrecision(elementFieldSchema.getTimestampPrecision());
242+
}
240243
builder.addAllFields(elementFieldSchema.getFieldsList());
241244
builder = builder.setMode(TableFieldSchema.Mode.REPEATED);
242245
break;

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,15 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy
102102
// boolean
103103
return SchemaBuilder.builder().booleanType();
104104
case "TIMESTAMP":
105-
// in Extract Jobs, it always uses the Avro logical type
106-
// we may have to change this if we move to EXPORT DATA
107-
return LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType());
105+
if (schema.getTimestampPrecision() == null || schema.getTimestampPrecision() == 6) {
106+
// in Extract Jobs, it always uses the Avro logical type
107+
// we may have to change this if we move to EXPORT DATA
108+
return LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType());
109+
}
110+
return SchemaBuilder.builder()
111+
.longBuilder()
112+
.prop("logicalType", TIMESTAMP_NANOS_LOGICAL_TYPE)
113+
.endLong();
108114
case "DATE":
109115
if (useAvroLogicalTypes) {
110116
return LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,23 @@ private static Schema createTimestampNanosSchema() {
348348
.endRecord();
349349
}
350350

351+
private static Schema createRepeatedTimestampNanosSchema() {
352+
Schema longSchema = Schema.create(Schema.Type.LONG);
353+
longSchema.addProp("logicalType", "timestamp-nanos");
354+
355+
Schema arraySchema = Schema.createArray(longSchema);
356+
357+
return SchemaBuilder.record("RepeatedTimestampNanosRecord")
358+
.fields()
359+
.name("timestampNanosArray")
360+
.type(arraySchema)
361+
.noDefault()
362+
.endRecord();
363+
}
364+
351365
private static final Schema TIMESTAMP_NANOS_SCHEMA = createTimestampNanosSchema();
366+
private static final Schema REPEATED_TIMESTAMP_NANOS_SCHEMA =
367+
createRepeatedTimestampNanosSchema();
352368

353369
private static GenericRecord baseRecord;
354370
private static GenericRecord rawLogicalTypesRecord;
@@ -885,4 +901,22 @@ public void testProtoTableSchemaFromAvroSchemaTimestampNanos() {
885901
assertTrue(field.hasTimestampPrecision());
886902
assertEquals(12L, field.getTimestampPrecision().getValue());
887903
}
904+
905+
@Test
906+
public void testProtoTableSchemaFromAvroSchemaRepeatedTimestampNanos() {
907+
com.google.cloud.bigquery.storage.v1.TableSchema protoSchema =
908+
AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(
909+
REPEATED_TIMESTAMP_NANOS_SCHEMA);
910+
911+
assertEquals(1, protoSchema.getFieldsCount());
912+
com.google.cloud.bigquery.storage.v1.TableFieldSchema field = protoSchema.getFields(0);
913+
914+
assertEquals("timestampnanosarray", field.getName());
915+
assertEquals(
916+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.TIMESTAMP, field.getType());
917+
assertEquals(
918+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode.REPEATED, field.getMode());
919+
920+
assertEquals(12L, field.getTimestampPrecision().getValue());
921+
}
888922
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ public class BeamRowToStorageApiProtoTest {
6969
Schema.builder()
7070
.addField("timestampNanos", FieldType.logicalType(Timestamp.NANOS).withNullable(true))
7171
.build();
72+
private static final Schema TIMESTAMP_NANOS_ARRAY_SCHEMA =
73+
Schema.builder()
74+
.addField("timestampNanosArray", FieldType.array(FieldType.logicalType(Timestamp.NANOS)))
75+
.build();
7276
private static final EnumerationType TEST_ENUM =
7377
EnumerationType.create("ONE", "TWO", "RED", "BLUE");
7478
private static final Schema BASE_SCHEMA =
@@ -614,6 +618,19 @@ public void testTimestampNanosSchema() {
614618
assertEquals(12L, field.getTimestampPrecision().getValue());
615619
}
616620

621+
@Test
622+
public void testTimestampNanosArraySchema() {
623+
com.google.cloud.bigquery.storage.v1.TableSchema protoSchema =
624+
BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(TIMESTAMP_NANOS_ARRAY_SCHEMA);
625+
626+
assertEquals(1, protoSchema.getFieldsCount());
627+
TableFieldSchema field = protoSchema.getFields(0);
628+
assertEquals(TableFieldSchema.Type.TIMESTAMP, field.getType());
629+
assertEquals(
630+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode.REPEATED, field.getMode());
631+
assertEquals(12L, field.getTimestampPrecision().getValue());
632+
}
633+
617634
@Test
618635
public void testTimestampNanosDescriptor() throws Exception {
619636
DescriptorProto descriptor =

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,10 @@ private static org.apache.avro.Schema createTimestampNanosAvroSchema() {
408408
.name("ts_nanos")
409409
.type(longSchema)
410410
.noDefault()
411+
.name("ts_picos")
412+
.type()
413+
.stringType()
414+
.noDefault()
411415
.endRecord();
412416
}
413417

@@ -421,12 +425,12 @@ private static org.apache.avro.Schema createTimestampNanosAvroSchema() {
421425
public void testWriteGenericRecordTimestampNanos() throws Exception {
422426
String tableSpec =
423427
String.format("%s:%s.%s", project, DATASET_ID, "generic_record_ts_nanos_test");
424-
425428
// Create GenericRecord with timestamp-nanos value
426429
GenericRecord record =
427430
new GenericRecordBuilder(TIMESTAMP_NANOS_AVRO_SCHEMA)
428431
.set(
429432
"ts_nanos", TEST_INSTANT.getEpochSecond() * 1_000_000_000L + TEST_INSTANT.getNano())
433+
.set("ts_picos", "2024-01-15T10:30:45.123456789123Z")
430434
.build();
431435

432436
// Write using Storage Write API with Avro format
@@ -437,7 +441,6 @@ public void testWriteGenericRecordTimestampNanos() throws Exception {
437441
"WriteGenericRecords",
438442
BigQueryIO.writeGenericRecords()
439443
.to(tableSpec)
440-
.withAvroSchemaFactory(tableSchema -> TIMESTAMP_NANOS_AVRO_SCHEMA)
441444
.withSchema(BigQueryUtils.fromGenericAvroSchema(TIMESTAMP_NANOS_AVRO_SCHEMA, true))
442445
.useAvroLogicalTypes()
443446
.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
@@ -457,12 +460,18 @@ public void testWriteGenericRecordTimestampNanos() throws Exception {
457460
.from(tableSpec));
458461

459462
PAssert.that(result)
460-
.containsInAnyOrder(new TableRow().set("ts_nanos", "2024-01-15T10:30:45.123456789000Z"));
463+
.containsInAnyOrder(
464+
new TableRow()
465+
.set("ts_nanos", "2024-01-15T10:30:45.123456789000Z")
466+
.set("ts_picos", "2024-01-15T10:30:45.123456789123Z"));
461467
readPipeline.run().waitUntilFinish();
462468
}
463469

464470
private static final Schema BEAM_TIMESTAMP_NANOS_SCHEMA =
465-
Schema.builder().addField("ts_nanos", Schema.FieldType.logicalType(Timestamp.NANOS)).build();
471+
Schema.builder()
472+
.addField("ts_nanos", Schema.FieldType.logicalType(Timestamp.NANOS))
473+
.addField("ts_picos", Schema.FieldType.STRING)
474+
.build();
466475

467476
@Test
468477
public void testWriteBeamRowTimestampNanos() throws Exception {
@@ -472,6 +481,7 @@ public void testWriteBeamRowTimestampNanos() throws Exception {
472481
Row row =
473482
Row.withSchema(BEAM_TIMESTAMP_NANOS_SCHEMA)
474483
.withFieldValue("ts_nanos", TEST_INSTANT)
484+
.withFieldValue("ts_picos", "2024-01-15T10:30:45.123456789123Z")
475485
.build();
476486

477487
// Write using Storage Write API with Beam Schema
@@ -500,7 +510,10 @@ public void testWriteBeamRowTimestampNanos() throws Exception {
500510
.from(tableSpec));
501511

502512
PAssert.that(result)
503-
.containsInAnyOrder(new TableRow().set("ts_nanos", "2024-01-15T10:30:45.123456789000Z"));
513+
.containsInAnyOrder(
514+
new TableRow()
515+
.set("ts_nanos", "2024-01-15T10:30:45.123456789000Z")
516+
.set("ts_picos", "2024-01-15T10:30:45.123456789123Z"));
504517
readPipeline.run().waitUntilFinish();
505518
}
506519

0 commit comments

Comments
 (0)