Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added uploadTypedRows to BigQuery client #5218

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

shnapz
Copy link
Contributor

@shnapz shnapz commented Jan 30, 2024

writeTypedRows is not always suitable. It is using GCP's insertAll. Some GCP APIs do not reflect recent loads this way, like:
table.getNumRows() returns 0
Old post.
Data loaded as upload from file does not cause the same problem.

Copy link

codecov bot commented Jan 30, 2024

Codecov Report

Attention: 20 lines in your changes are missing coverage. Please review.

Comparison is base (5c112ff) 62.63% compared to head (cb0cb05) 62.53%.
Report is 9 commits behind head on main.

Files Patch % Lines
...ala/com/spotify/scio/bigquery/client/LoadOps.scala 0.00% 20 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5218      +/-   ##
==========================================
- Coverage   62.63%   62.53%   -0.10%     
==========================================
  Files         301      301              
  Lines       10845    10867      +22     
  Branches      768      744      -24     
==========================================
+ Hits         6793     6796       +3     
- Misses       4052     4071      +19     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

build.sbt Outdated
@@ -83,7 +83,7 @@ val googleCloudDatastoreVersion = "0.108.6"
val googleCloudMonitoringVersion = "3.32.0"
val googleCloudPubSubVersion = "1.107.13"
val googleCloudSpannerVersion = "6.55.0"
val googleCloudStorageVersion = "2.30.1"
val googleCloudStorageVersion = "2.26.0"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in sync with Beam

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your link points to GCP libraries-bom 26.22.0 but beam 2.53 uses 26.28.0 here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, reverting this

def uploadTypedRows[T <: HasAnnotation: TypeTag](
tableSpec: String,
rows: List[T],
tempLocation: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is temp, shouldn't we clean if afterward ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about bucket retention policy, but yeah, deleting it would be more optimal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on the other hand if I do:

  avro(
        List(blobId.toGsUtilUri),
        tableSpec,
        schema = Some(bqt.schema),
        createDisposition = createDisposition,
        writeDisposition = writeDisposition
  )

  storage.delete(blobId)

I am not confident that avro is fully synchronous and BQ doesn't read that file on the background

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

execute contains jobService.waitForJobs(loadJob) so I think this is fine.

I'm wondering also if we should create a SaveOps to allow saving some file formats (avro/json)

* Upload List of rows to Cloud Storage as Avro file and load to BigQuery table. Note that element
* type `T` must be annotated with [[BigQueryType]].
*/
def uploadTypedRows[T <: HasAnnotation: TypeTag](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO this naming can be simplified. Other APIs here do not have the upload prefix.
Usage will be from BigQuery with bq.load.uploadTypeRows.

I think this should be named

Suggested change
def uploadTypedRows[T <: HasAnnotation: TypeTag](
def typedRows[T <: HasAnnotation: TypeTag](

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants