From 9f91e0df0171c13880cd12e3852652ff4b424962 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Tue, 30 Jan 2024 16:54:07 -0500 Subject: [PATCH 1/7] Added uploadTypedRows to BigQuery client --- .../scio/bigquery/client/BigQuery.scala | 8 +++ .../scio/bigquery/client/LoadOps.scala | 60 ++++++++++++++++++- 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala index bebbadfba6..e53a117a9a 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala @@ -33,11 +33,16 @@ import com.google.auth.http.HttpCredentialsAdapter import com.google.auth.oauth2.{GoogleCredentials, ImpersonatedCredentials} import com.google.cloud.bigquery.storage.v1beta1.{BigQueryStorageClient, BigQueryStorageSettings} import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer +import com.google.cloud.storage.{BlobId, BlobInfo, Storage, StorageOptions} import com.spotify.scio.bigquery.{Table => STable} import com.spotify.scio.bigquery.client.BigQuery.Client import com.spotify.scio.bigquery.client.BigQueryConfig.ImpersonationInfo +import com.spotify.scio.bigquery.types.BigQueryType import com.spotify.scio.bigquery.types.BigQueryType.HasAnnotation import com.spotify.scio.bigquery.{BigQuerySysProps, BigQueryType, CREATE_IF_NEEDED, WRITE_EMPTY} +import org.apache.avro.file.DataFileWriter +import org.apache.avro.generic.{GenericDatumWriter, GenericRecord} +import org.apache.avro.io.DatumWriter import org.apache.beam.sdk.extensions.gcp.options.GcpOptions.DefaultProjectFactory import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{CreateDisposition, WriteDisposition} @@ -331,5 +336,8 @@ object BigQuery { .build() BigQueryStorageClient.create(settings) } + + lazy val blobStorage: Storage = + StorageOptions.newBuilder.setProjectId(project).setCredentials(_credentials).build.getService } } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/LoadOps.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/LoadOps.scala index 3ef534a5a8..228a5a041c 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/LoadOps.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/LoadOps.scala @@ -18,15 +18,23 @@ package com.spotify.scio.bigquery.client import com.google.api.services.bigquery.model._ +import com.google.cloud.storage.{BlobId, BlobInfo, Storage} import com.spotify.scio.bigquery.client.BigQuery.Client -import com.spotify.scio.bigquery.{BigQueryUtil, CREATE_IF_NEEDED, WRITE_APPEND} +import com.spotify.scio.bigquery.types.BigQueryType +import com.spotify.scio.bigquery.types.BigQueryType.HasAnnotation +import com.spotify.scio.bigquery.{BigQueryType, BigQueryUtil, CREATE_IF_NEEDED, WRITE_APPEND} +import org.apache.avro.file.DataFileWriter +import org.apache.avro.generic.{GenericDatumWriter, GenericRecord} import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{CreateDisposition, WriteDisposition} import org.apache.beam.sdk.io.gcp.{bigquery => bq} +import org.apache.commons.lang3.RandomStringUtils import org.slf4j.LoggerFactory +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import scala.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.util.Try +import scala.reflect.runtime.universe.TypeTag private[client] object LoadOps { private val Logger = LoggerFactory.getLogger(this.getClass) @@ -119,6 +127,56 @@ final private[client] class LoadOps(client: Client, jobService: JobOps) { location = location ) + /** + * Upload List of rows to Cloud Storage as Avro file and load to BigQuery table. Note that element + * type `T` must be annotated with [[BigQueryType]]. + */ + def uploadTypedRows[T <: HasAnnotation: TypeTag]( + tableSpec: String, + rows: List[T], + tempLocation: String, + writeDisposition: WriteDisposition = WriteDisposition.WRITE_APPEND, + createDisposition: CreateDisposition = CreateDisposition.CREATE_IF_NEEDED + ): Try[TableReference] = { + val bqt = BigQueryType[T] + + Try { + val out = new ByteArrayOutputStream() + val datumWriter = new GenericDatumWriter[GenericRecord]() + val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter) + try { + dataFileWriter.create(bqt.avroSchema, out) + rows.foreach { row => + dataFileWriter.append(bqt.toAvro(row)) + } + } finally { + dataFileWriter.close() + } + + val blobId = + BlobId.fromGsUtilUri( + s"${tempLocation.stripSuffix("/")}/upload_${RandomStringUtils.randomAlphanumeric(10)}.avro" + ) + val blobInfo = BlobInfo.newBuilder(blobId).build + client.blobStorage.createFrom( + blobInfo, + new ByteArrayInputStream(out.toByteArray), + Storage.BlobWriteOption.doesNotExist(), + Storage.BlobWriteOption.crc32cMatch() + ) + + blobId + }.flatMap { blobId => + avro( + List(blobId.toGsUtilUri), + tableSpec, + schema = Some(bqt.schema), + createDisposition = createDisposition, + writeDisposition = writeDisposition + ) + } + } + @nowarn("msg=private default argument in class LoadOps is never used") private def execute( sources: List[String], From 99608868b8c8f0057077c0e69695b58653e6852a Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Tue, 30 Jan 2024 17:02:06 -0500 Subject: [PATCH 2/7] remove unused imports --- .../scala/com/spotify/scio/bigquery/client/BigQuery.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala index e53a117a9a..a08f083e02 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala @@ -33,16 +33,13 @@ import com.google.auth.http.HttpCredentialsAdapter import com.google.auth.oauth2.{GoogleCredentials, ImpersonatedCredentials} import com.google.cloud.bigquery.storage.v1beta1.{BigQueryStorageClient, BigQueryStorageSettings} import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer -import com.google.cloud.storage.{BlobId, BlobInfo, Storage, StorageOptions} +import com.google.cloud.storage.{Storage, StorageOptions} import com.spotify.scio.bigquery.{Table => STable} import com.spotify.scio.bigquery.client.BigQuery.Client import com.spotify.scio.bigquery.client.BigQueryConfig.ImpersonationInfo import com.spotify.scio.bigquery.types.BigQueryType import com.spotify.scio.bigquery.types.BigQueryType.HasAnnotation import com.spotify.scio.bigquery.{BigQuerySysProps, BigQueryType, CREATE_IF_NEEDED, WRITE_EMPTY} -import org.apache.avro.file.DataFileWriter -import org.apache.avro.generic.{GenericDatumWriter, GenericRecord} -import org.apache.avro.io.DatumWriter import org.apache.beam.sdk.extensions.gcp.options.GcpOptions.DefaultProjectFactory import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{CreateDisposition, WriteDisposition} From 17dcd89ae2f85b0c569f7360add665bf1df3e604 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Tue, 30 Jan 2024 17:03:53 -0500 Subject: [PATCH 3/7] fix 2 --- .../main/scala/com/spotify/scio/bigquery/client/BigQuery.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala index a08f083e02..60599bdf74 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala @@ -37,7 +37,6 @@ import com.google.cloud.storage.{Storage, StorageOptions} import com.spotify.scio.bigquery.{Table => STable} import com.spotify.scio.bigquery.client.BigQuery.Client import com.spotify.scio.bigquery.client.BigQueryConfig.ImpersonationInfo -import com.spotify.scio.bigquery.types.BigQueryType import com.spotify.scio.bigquery.types.BigQueryType.HasAnnotation import com.spotify.scio.bigquery.{BigQuerySysProps, BigQueryType, CREATE_IF_NEEDED, WRITE_EMPTY} import org.apache.beam.sdk.extensions.gcp.options.GcpOptions.DefaultProjectFactory From 9d4e9b6980f0f51f18e022f1b002c118c570dc44 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Tue, 30 Jan 2024 17:05:25 -0500 Subject: [PATCH 4/7] fix 3 --- .../main/scala/com/spotify/scio/bigquery/client/LoadOps.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/LoadOps.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/LoadOps.scala index 228a5a041c..8246f62588 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/LoadOps.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/LoadOps.scala @@ -20,7 +20,6 @@ package com.spotify.scio.bigquery.client import com.google.api.services.bigquery.model._ import com.google.cloud.storage.{BlobId, BlobInfo, Storage} import com.spotify.scio.bigquery.client.BigQuery.Client -import com.spotify.scio.bigquery.types.BigQueryType import com.spotify.scio.bigquery.types.BigQueryType.HasAnnotation import com.spotify.scio.bigquery.{BigQueryType, BigQueryUtil, CREATE_IF_NEEDED, WRITE_APPEND} import org.apache.avro.file.DataFileWriter From a2dbae027b6dac5337cf1c6c9a0d1bef8e28e0cf Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Tue, 30 Jan 2024 17:42:13 -0500 Subject: [PATCH 5/7] Fix sbt --- build.sbt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index 0e22f1fd7c..d891a4c188 100644 --- a/build.sbt +++ b/build.sbt @@ -83,7 +83,7 @@ val googleCloudDatastoreVersion = "0.108.6" val googleCloudMonitoringVersion = "3.32.0" val googleCloudPubSubVersion = "1.107.13" val googleCloudSpannerVersion = "6.55.0" -val googleCloudStorageVersion = "2.30.1" +val googleCloudStorageVersion = "2.26.0" val googleCommonsProtoVersion = "2.29.0" val googleHttpClientsVersion = "1.43.3" val googleIAMVersion = "1.24.0" @@ -814,9 +814,10 @@ lazy val `scio-google-cloud-platform` = project "org.apache.beam" % "beam-sdks-java-extensions-google-cloud-platform-core" % beamVersion, "org.apache.beam" % "beam-sdks-java-io-google-cloud-platform" % beamVersion, "org.apache.beam" % "beam-vendor-guava-32_1_2-jre" % beamVendorVersion, + "org.apache.commons" % "commons-lang3" % commonsLang3Version, "org.slf4j" % "slf4j-api" % slf4jVersion, + "com.google.cloud" % "google-cloud-storage" % googleCloudStorageVersion, // test - "com.google.cloud" % "google-cloud-storage" % googleCloudStorageVersion % Test, "com.spotify" %% "magnolify-cats" % magnolifyVersion % Test, "com.spotify" %% "magnolify-scalacheck" % magnolifyVersion % Test, "org.hamcrest" % "hamcrest" % hamcrestVersion % Test, From 08e180c7825c2d8e633e62999e01719f42fc199f Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Wed, 31 Jan 2024 13:30:17 -0500 Subject: [PATCH 6/7] Revert build.sbt --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index d891a4c188..81fe695816 100644 --- a/build.sbt +++ b/build.sbt @@ -83,7 +83,7 @@ val googleCloudDatastoreVersion = "0.108.6" val googleCloudMonitoringVersion = "3.32.0" val googleCloudPubSubVersion = "1.107.13" val googleCloudSpannerVersion = "6.55.0" -val googleCloudStorageVersion = "2.26.0" +val googleCloudStorageVersion = "2.30.1" val googleCommonsProtoVersion = "2.29.0" val googleHttpClientsVersion = "1.43.3" val googleIAMVersion = "1.24.0" From cb0cb057c31938bc21fc8543b92353a129f15b30 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Thu, 1 Feb 2024 14:59:14 -0500 Subject: [PATCH 7/7] Delete temporary file from GCS --- .../spotify/scio/bigquery/client/LoadOps.scala | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/LoadOps.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/LoadOps.scala index 8246f62588..646e123810 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/LoadOps.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/LoadOps.scala @@ -166,13 +166,17 @@ final private[client] class LoadOps(client: Client, jobService: JobOps) { blobId }.flatMap { blobId => - avro( - List(blobId.toGsUtilUri), - tableSpec, - schema = Some(bqt.schema), - createDisposition = createDisposition, - writeDisposition = writeDisposition - ) + try { + avro( + List(blobId.toGsUtilUri), + tableSpec, + schema = Some(bqt.schema), + createDisposition = createDisposition, + writeDisposition = writeDisposition + ) + } finally { + client.blobStorage.delete(blobId) + } } }