-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Add timestamp precision option to bigquery storage read for TIMESTAMP(12) columns. #37079
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @claudevdm, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the BigQueryIO connector by introducing explicit control over timestamp precision when reading data using the BigQuery Storage Read API. This allows users to specify whether TIMESTAMP(12) columns should be read with nanosecond or picosecond precision, ensuring data fidelity for high-precision timestamp values. The changes involve adding a new configuration option and integrating it into the BigQuery Storage API request generation, along with thorough testing to cover various data formats and precision levels. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new option to specify timestamp precision when reading from BigQuery using the Storage Read API. The changes are well-implemented across the relevant classes, and the addition of comprehensive tests is excellent. I have a couple of suggestions: one to correct the default value mentioned in a Javadoc, and another to refactor a new method to improve its structure and reduce code duplication.
...a/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
Outdated
Show resolved
Hide resolved
...ud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
Outdated
Show resolved
Hide resolved
|
R: @Abacn |
|
R: @ahmedabu98 |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
1 similar comment
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
...a/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
Outdated
Show resolved
Hide resolved
|
/gemini review |
damccorm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly LGTM, just had a naming question
| SerializableFunction<SchemaAndRecord, T> parseFn, | ||
| Coder<T> outputCoder, | ||
| BigQueryServices bqServices, | ||
| @Nullable TimestampPrecision picosTimestampPrecision) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason we're naming this picosTimestampPrecision? Couldn't it take in non-picos precisions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a valuable feature for controlling timestamp precision during BigQuery storage reads for TIMESTAMP(12) columns. The implementation is well-structured, propagating the new option from the user-facing API down to the storage read session creation. The accompanying tests are comprehensive and cover a wide range of scenarios. I've identified one critical issue regarding serialization that could break portability, along with a few medium-severity suggestions to improve code robustness and maintainability. Overall, this is a solid contribution.
| fieldValues.put( | ||
| "timestamp_precision", toByteArray(transform.getDirectReadPicosTimestampPrecision())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a mismatch between the field name used here for serialization (timestamp_precision) and the name defined in the schema and used for deserialization (direct_read_picos_timestamp_precision). This will cause portability issues as the serialized value won't be correctly read back. Please use the correct field name to ensure the transform can be correctly serialized and deserialized across different contexts.
| fieldValues.put( | |
| "timestamp_precision", toByteArray(transform.getDirectReadPicosTimestampPrecision())); | |
| fieldValues.put( | |
| "direct_read_picos_timestamp_precision", toByteArray(transform.getDirectReadPicosTimestampPrecision())); |
| switch (checkNotNull(picosTimestampPrecision)) { | ||
| case MICROS: | ||
| precision = ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_MICROS; | ||
| break; | ||
| case NANOS: | ||
| precision = ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_NANOS; | ||
| break; | ||
| case PICOS: | ||
| precision = ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS; | ||
| break; | ||
| default: | ||
| return; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method can be slightly improved for clarity and robustness:
- The
checkNotNull(picosTimestampPrecision)is redundant becausepicosTimestampPrecisionis checked for null in the calling methodsetPicosTimestampPrecision. - The
switchstatement is not exhaustive for theTimestampPrecisionenum (it's missingMILLIS). Thedefaultcase currently returns silently, which could hide bugs if new enum values are added in the future. It would be safer to explicitly handleMILLIS(if it's intentionally unsupported) and have adefaultcase that throws an exception for any other unhandled values.
switch (picosTimestampPrecision) {
case MICROS:
precision = ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_MICROS;
break;
case NANOS:
precision = ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_NANOS;
break;
case PICOS:
precision = ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS;
break;
case MILLIS:
// The Storage Read API does not support millisecond precision for TIMESTAMP(12) columns.
return;
default:
throw new IllegalArgumentException(
"Unsupported timestamp precision for Storage Read API: " + picosTimestampPrecision);
}| switch (checkNotNull(picosTimestampPrecision)) { | ||
| case MICROS: | ||
| precision = AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_MICROS; | ||
| break; | ||
| case NANOS: | ||
| precision = AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_NANOS; | ||
| break; | ||
| case PICOS: | ||
| precision = AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS; | ||
| break; | ||
| default: | ||
| return; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to setArrowTimestampPrecision, this method can be improved by removing the redundant checkNotNull and making the switch statement more robust against future changes to the TimestampPrecision enum by handling all cases explicitly or throwing an exception for unhandled ones.
switch (picosTimestampPrecision) {
case MICROS:
precision = AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_MICROS;
break;
case NANOS:
precision = AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_NANOS;
break;
case PICOS:
precision = AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS;
break;
case MILLIS:
// The Storage Read API does not support millisecond precision for TIMESTAMP(12) columns.
return;
default:
throw new IllegalArgumentException(
"Unsupported timestamp precision for Storage Read API: " + picosTimestampPrecision);
}| switch (precision) { | ||
| case NANOS: | ||
| return ARROW_SCHEMA_TS_NANOS; | ||
| case PICOS: | ||
| return ARROW_SCHEMA_TS_PICOS; | ||
| case MICROS: | ||
| default: | ||
| return ARROW_SCHEMA_TS_MICROS; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default case here will treat any unhandled TimestampPrecision enum value as MICROS. This could lead to subtle bugs if new precisions are added. It would be safer to make the switch statement exhaustive or throw an exception in the default case to ensure all enum values are explicitly handled. This applies to other similar helper methods in this test file as well (e.g., getAvroSchemaTs, createArrowTsResponse).
switch (precision) {
case NANOS:
return ARROW_SCHEMA_TS_NANOS;
case PICOS:
return ARROW_SCHEMA_TS_PICOS;
case MICROS:
return ARROW_SCHEMA_TS_MICROS;
default:
throw new IllegalArgumentException("Unsupported timestamp precision: " + precision);
}
Add read timestamp precision setting for storage api reads.
The storage API allows reading TIMESTAMP(12) columns with MICRO (default), NANOS or PICOS precision for both AVRO and ARROW formats.
This propagates the read precision setting to the storage API, and adds relevant tests.
Known Issue:
Arrow readTableRows and readTableRowsWithSchema converts arrow records to beam rows via ArrowConversion.java.
ArrowConversion is a generic utility for arrow -> beam schema, it does not take into account the bigquery schema.
Even before this PR, arrow format with readTableRows truncates timestamps to millisecond precision because millis and micro timestamps were historically mapped to FieldType.DATETIME
beam/sdks/java/extensions/arrow/src/main/java/org/apache/beam/sdk/extensions/arrow/ArrowConversion.java
Line 210 in 15b50e2
For avro readTableRowsWithSchema this is not an issue because we can map timestamp-micros to timestamp logical type if the bigquery schema is TIMESTAMP(12) with read precision micros
beam/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
Line 415 in 15b50e2
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.