Skip to content

Commit 7bdc10f

Browse files
authored
Add streaming for Indirect Write to BigQuery (#1445)
* Add integration test to assertThrows if streaming is used in write. * Format * Skip tests for v2 data source and let it run for v1 data source * Remove unused file * Format file * Add files * Implement Streaming for dsv2 * Format * Implement streaming for spark 4 * Fix createWriter for streaming * Fix gemini code assist suggestions * Draft * Add epochId to cleanTemporaryGcsPathIfNeeded * Force the dependency for Spark 4 to be on scala 2.13 to fix the tests * Increase the coverage on different datatypes of streaming. * Format and add loggingx * Fix the folder path * Delete the parent folder of the epoch * Properly clean up the entire folder for jobs using streaming. * Revert changes * Refactor according to comments. * Refactor gcs temp file path to remove redundant uuid and keep application id. * Enable debug option for Spark31 tests. * Enable debug option for Spark32 tests. * Remove debug option for Spark 31 and Spark32 tests. * Fix the IntermediateDataCleaner. * Fix the IntermediateDataCleaner. * Format imports and add Changes log.
1 parent 6bd76a5 commit 7bdc10f

File tree

14 files changed

+345
-25
lines changed

14 files changed

+345
-25
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## Next
44
* Added new connector, `spark-4.1-bigquery` aimed to be used in Spark 4.1. Like Spark 4.1, this connector requires at
55
least Java 17 runtime. It is currently in preview mode.
6+
* PR #1445: Add streaming support for Spark DS v2 indirect write.
67
* PR #1452: Improved the performance of the dynamic partition overwrite for RANGE_BUCKET partitioned tables.
78

89
## 0.43.1 - 2025-10-22

spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/IntermediateDataCleaner.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,17 @@ public void run() {
4646
public void deletePath() {
4747
try {
4848
logger.info("Deleting path " + path + " if it exists");
49-
FileSystem fs = path.getFileSystem(conf);
49+
// Create a copy of the config to avoid polluting the global state
50+
Configuration cleanConf = new Configuration(conf);
51+
52+
// Force a fresh instance that isn't already closed by Spark
53+
String scheme = path.toUri().getScheme();
54+
if (scheme != null) {
55+
cleanConf.set("fs." + scheme + ".impl.disable.cache", "true");
56+
}
57+
58+
// Use the specific URI to ensure GCS is targeted
59+
FileSystem fs = FileSystem.get(path.toUri(), cleanConf);
5060
if (pathExists(fs, path)) {
5161
fs.delete(path, true);
5262
}
@@ -55,6 +65,21 @@ public void deletePath() {
5565
logger.error("Failed to delete path " + path, e);
5666
}
5767
}
68+
69+
public void deleteEpochPath(long epochId) {
70+
Path epochPath = new Path(path + "/" + epochId);
71+
try {
72+
logger.info("Deleting epoch path " + epochPath + " if it exists");
73+
FileSystem fs = epochPath.getFileSystem(conf);
74+
if (pathExists(fs, epochPath)) {
75+
fs.delete(epochPath, true);
76+
}
77+
logger.info("Path " + epochPath + " no longer exists)");
78+
} catch (Exception e) {
79+
logger.error("Failed to delete path " + epochPath, e);
80+
}
81+
}
82+
5883
// fs.exists can throw exception on missing path
5984
private boolean pathExists(FileSystem fs, Path path) {
6085
try {

spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryIndirectDataSourceWriterContext.java

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,47 @@ public DataWriterContextFactory<InternalRow> createWriterContextFactory() {
107107
avroSchema.toString());
108108
}
109109

110+
@Override
111+
public void onDataStreamingWriterCommit(long epochId, WriterCommitMessageContext[] messages) {
112+
commitMessages(messages, epochId);
113+
}
114+
115+
@Override
116+
public void onDataStreamingWriterAbort(long epochId, WriterCommitMessageContext[] messages) {
117+
try {
118+
logger.warn(
119+
"Aborting epoch {} from streaming write {} for table {}",
120+
epochId,
121+
writeUUID,
122+
BigQueryUtil.friendlyTableName(config.getTableId()));
123+
} finally {
124+
cleanTemporaryGcsPathIfNeeded(epochId);
125+
}
126+
}
127+
110128
@Override
111129
public void commit(WriterCommitMessageContext[] messages) {
130+
commitMessages(messages, 0);
131+
}
132+
133+
@Override
134+
public void abort(WriterCommitMessageContext[] messages) {
135+
try {
136+
logger.warn(
137+
"Aborting write {} for table {}",
138+
writeUUID,
139+
BigQueryUtil.friendlyTableName(config.getTableId()));
140+
} finally {
141+
cleanTemporaryGcsPathIfNeeded(0);
142+
}
143+
}
144+
145+
@Override
146+
public void setTableInfo(TableInfo tableInfo) {
147+
this.tableInfo = Optional.ofNullable(tableInfo);
148+
}
149+
150+
private void commitMessages(WriterCommitMessageContext[] messages, long epochId) {
112151
logger.info(
113152
"Data has been successfully written to GCS. Going to load {} files to BigQuery",
114153
messages.length);
@@ -151,29 +190,12 @@ public void commit(WriterCommitMessageContext[] messages) {
151190
if (writeDisposition == JobInfo.WriteDisposition.WRITE_TRUNCATE) {
152191
updateMetadataIfNeeded();
153192
}
154-
logger.info("Data has been successfully loaded to BigQuery");
155193
} catch (IOException e) {
156194
throw new UncheckedIOException(e);
157195
} finally {
158-
cleanTemporaryGcsPathIfNeeded();
196+
cleanTemporaryGcsPathIfNeeded(epochId);
159197
}
160-
}
161-
162-
@Override
163-
public void abort(WriterCommitMessageContext[] messages) {
164-
try {
165-
logger.warn(
166-
"Aborting write {} for table {}",
167-
writeUUID,
168-
BigQueryUtil.friendlyTableName(config.getTableId()));
169-
} finally {
170-
cleanTemporaryGcsPathIfNeeded();
171-
}
172-
}
173-
174-
@Override
175-
public void setTableInfo(TableInfo tableInfo) {
176-
this.tableInfo = Optional.ofNullable(tableInfo);
198+
logger.info("Data has been successfully loaded to BigQuery");
177199
}
178200

179201
void loadDataToBigQuery(List<String> sourceUris, Schema schema) throws IOException {
@@ -202,7 +224,7 @@ void updateMetadataIfNeeded() {
202224
BigQueryWriteHelper.updateTableMetadataIfNeeded(sparkSchema, config, bigQueryClient);
203225
}
204226

205-
void cleanTemporaryGcsPathIfNeeded() {
206-
intermediateDataCleaner.ifPresent(cleaner -> cleaner.deletePath());
227+
void cleanTemporaryGcsPathIfNeeded(long epochId) {
228+
intermediateDataCleaner.ifPresent(cleaner -> cleaner.deleteEpochPath(epochId));
207229
}
208230
}

spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryIndirectDataWriterContextFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,10 @@ public DataWriterContext<InternalRow> createDataWriterContext(
4848
int partitionId, long taskId, long epochId) {
4949
try {
5050
Schema avroSchema = new Schema.Parser().parse(avroSchemaJson);
51+
5152
UUID uuid = new UUID(taskId, epochId);
52-
String uri = String.format("%s/part-%06d-%s.avro", gcsDirPath, partitionId, uuid);
53-
Path path = new Path(uri);
53+
String fileName = String.format("part-%06d-%s.avro", partitionId, uuid);
54+
Path path = new Path(gcsDirPath + "/" + epochId, fileName);
5455
FileSystem fs = path.getFileSystem(conf.get());
5556
IntermediateRecordWriter intermediateRecordWriter =
5657
new AvroIntermediateRecordWriter(avroSchema, fs.create(path));

spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/DataSourceWriterContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ default boolean useCommitCoordinator() {
4141

4242
default void onDataWriterCommit(WriterCommitMessageContext message) {}
4343

44+
default void onDataStreamingWriterCommit(long epochId, WriterCommitMessageContext[] messages) {}
45+
46+
default void onDataStreamingWriterAbort(long epochId, WriterCommitMessageContext[] messages) {}
47+
4448
void commit(WriterCommitMessageContext[] messages);
4549

4650
void abort(WriterCommitMessageContext[] messages);

spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/SparkBigQueryIntegrationTestBase.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.google.cloud.spark.bigquery.integration;
1717

18+
import java.util.UUID;
1819
import org.apache.spark.sql.SparkSession;
1920
import org.junit.Before;
2021
import org.junit.ClassRule;
@@ -42,9 +43,12 @@ protected static class SparkFactory extends ExternalResource {
4243

4344
@Override
4445
protected void before() throws Throwable {
46+
String appName = "integration-test-" + UUID.randomUUID();
4547
spark =
4648
SparkSession.builder()
4749
.master("local")
50+
.appName(appName)
51+
.config("spark.hadoop.google.cloud.appName.v2", appName)
4852
.config("spark.ui.enabled", "false")
4953
.config("spark.default.parallelism", 20)
5054
.getOrCreate();

spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/TestConstants.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,55 @@ public class TestConstants {
347347
array(struct(lit(1))),
348348
struct(lit(1), lit("stringa"), struct(lit(true), lit("stringaa"), lit(11))));
349349

350+
public static Row ALL_TYPES_TABLE_ROW =
351+
RowFactory.create(
352+
42L,
353+
null,
354+
true,
355+
"string",
356+
Date.valueOf("2019-03-18"),
357+
new Timestamp(1552872225000L), // 2019-03-18 01:23:45
358+
"2019-03-18T01:23:45.678901",
359+
5025678901L,
360+
new byte[] {98, 121, 116, 101, 115},
361+
4.2,
362+
"{\"json\":\"true\"}",
363+
RowFactory.create(
364+
Decimal.apply(
365+
new BigDecimal(
366+
"-99999999999999999999999999999.999999999",
367+
new MathContext(BQ_NUMERIC_PRECISION)),
368+
BQ_NUMERIC_PRECISION,
369+
BQ_NUMERIC_SCALE),
370+
Decimal.apply(
371+
new BigDecimal(
372+
"99999999999999999999999999999.999999999",
373+
new MathContext(BQ_NUMERIC_PRECISION)),
374+
BQ_NUMERIC_PRECISION,
375+
BQ_NUMERIC_SCALE),
376+
Decimal.apply(
377+
new BigDecimal("3.14", new MathContext(BQ_NUMERIC_PRECISION)),
378+
BQ_NUMERIC_PRECISION,
379+
BQ_NUMERIC_SCALE),
380+
Decimal.apply(
381+
new BigDecimal(
382+
"31415926535897932384626433832.795028841",
383+
new MathContext(BQ_NUMERIC_PRECISION)),
384+
BQ_NUMERIC_PRECISION,
385+
BQ_NUMERIC_SCALE)),
386+
RowFactory.create(
387+
Decimal.apply(
388+
new BigDecimal("-0.34992332820282019728792003956564819968"),
389+
BQ_NUMERIC_PRECISION,
390+
BQ_BIGNUMERIC_SCALE),
391+
Decimal.apply(
392+
new BigDecimal("0.34992332820282019728792003956564819967"),
393+
BQ_NUMERIC_PRECISION,
394+
BQ_BIGNUMERIC_SCALE)),
395+
new long[] {1L, 2L, 3L, 4L},
396+
new Row[] {RowFactory.create(123L), RowFactory.create(1L)},
397+
RowFactory.create(1L, "stringa", RowFactory.create(true, "stringaa", 11L)));
398+
350399
private static <T> T[] copy(T... elements) {
351400
return elements;
352401
}

spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,11 @@
5353
import com.google.common.collect.ImmutableMap;
5454
import com.google.common.collect.Streams;
5555
import com.google.inject.ProvisionException;
56+
import java.io.IOException;
5657
import java.math.BigDecimal;
58+
import java.nio.charset.StandardCharsets;
59+
import java.nio.file.Files;
60+
import java.nio.file.Path;
5761
import java.sql.Date;
5862
import java.sql.Timestamp;
5963
import java.time.LocalDateTime;
@@ -66,6 +70,7 @@
6670
import java.util.Map;
6771
import java.util.Optional;
6872
import java.util.TimeZone;
73+
import java.util.concurrent.TimeoutException;
6974
import java.util.concurrent.atomic.AtomicInteger;
7075
import java.util.function.Predicate;
7176
import java.util.stream.Collectors;
@@ -77,10 +82,14 @@
7782
import org.apache.spark.ml.linalg.SQLDataTypes;
7883
import org.apache.spark.package$;
7984
import org.apache.spark.sql.Dataset;
85+
import org.apache.spark.sql.Encoder;
8086
import org.apache.spark.sql.Encoders;
8187
import org.apache.spark.sql.Row;
8288
import org.apache.spark.sql.RowFactory;
8389
import org.apache.spark.sql.SaveMode;
90+
import org.apache.spark.sql.execution.streaming.MemoryStream;
91+
import org.apache.spark.sql.streaming.OutputMode;
92+
import org.apache.spark.sql.streaming.StreamingQuery;
8493
import org.apache.spark.sql.types.DataType;
8594
import org.apache.spark.sql.types.DataTypes;
8695
import org.apache.spark.sql.types.Decimal;
@@ -94,7 +103,9 @@
94103
import org.junit.Assume;
95104
import org.junit.Before;
96105
import org.junit.Test;
106+
import scala.Option;
97107
import scala.Some;
108+
import scala.collection.JavaConverters;
98109

99110
abstract class WriteIntegrationTestBase extends SparkBigQueryIntegrationTestBase {
100111

@@ -608,6 +619,91 @@ public void testInDirectWriteToBigQueryWithDiffInDescription() throws Exception
608619
assertThat(numOfRows).isEqualTo(1);
609620
}
610621

622+
@Test
623+
public void testInDirectWriteToBigQueryWithStreaming() throws TimeoutException, IOException {
624+
assumeThat(writeMethod, equalTo(WriteMethod.INDIRECT));
625+
626+
// Skipping test for spark 4: only works for spark 3 for now.
627+
String sparkVersion = package$.MODULE$.SPARK_VERSION();
628+
Assume.assumeThat(sparkVersion, CoreMatchers.startsWith("3."));
629+
630+
Path inputDir = Files.createTempDirectory("bq_integration_test_input");
631+
Path jsonFile = inputDir.resolve("test_data_for_streaming.json");
632+
Files.write(jsonFile, "{\"name\": \"spark\", \"age\": 100}".getBytes(StandardCharsets.UTF_8));
633+
634+
StructType schema =
635+
new StructType().add("name", DataTypes.StringType).add("age", DataTypes.LongType);
636+
Dataset<Row> df =
637+
spark.readStream().option("multiline", "true").schema(schema).json(inputDir.toString());
638+
639+
String destTableName = testDataset + "." + "test_stream_json_" + System.nanoTime();
640+
String checkPointLocation =
641+
Files.createTempDirectory("bq_integration_test_checkpoint").toString();
642+
643+
StreamingQuery writeStream =
644+
df.writeStream()
645+
.format("bigquery")
646+
.outputMode(OutputMode.Append())
647+
.option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET)
648+
.option("checkpointLocation", checkPointLocation)
649+
.option("table", destTableName)
650+
.start();
651+
writeStream.processAllAvailable();
652+
writeStream.stop();
653+
654+
List<Row> rows = spark.read().format("bigquery").load(destTableName).collectAsList();
655+
assertThat(rows).hasSize(1);
656+
Row row = rows.get(0);
657+
assertThat(row.getString(0)).isEqualTo("spark");
658+
assertThat(row.getLong(1)).isEqualTo(100L);
659+
}
660+
661+
@Test
662+
public void testInDirectWriteToBigQueryWithStreaming_AllTypes()
663+
throws IOException, TimeoutException {
664+
// Skipping test for spark 4: only works for spark 3.5 for now.
665+
String sparkVersion = package$.MODULE$.SPARK_VERSION();
666+
Assume.assumeThat(sparkVersion, CoreMatchers.startsWith("3.5"));
667+
668+
StructType schema = TestConstants.ALL_TYPES_TABLE_SCHEMA;
669+
Row row = TestConstants.ALL_TYPES_TABLE_ROW;
670+
List<Row> rawRows = Collections.nCopies(20, row);
671+
672+
Dataset<Row> normalizedDF = spark.createDataFrame(rawRows, schema);
673+
List<Row> rows = normalizedDF.collectAsList();
674+
Encoder<Row> encoder = normalizedDF.encoder();
675+
676+
MemoryStream<Row> memoryStream =
677+
new MemoryStream<>(
678+
1, // id
679+
spark.sqlContext(), // sqlContext
680+
Option.apply(null),
681+
encoder // Implicit encoder passed as final arg
682+
);
683+
memoryStream.addData(JavaConverters.asScalaBuffer(rows).toSeq());
684+
685+
String destTableName = testDataset + "." + "test_streaming_allTypes" + System.nanoTime();
686+
String checkPointLocation =
687+
Files.createTempDirectory("bq_integration_test_streaming_checkpoint").toString();
688+
689+
StreamingQuery writeStream =
690+
memoryStream
691+
.toDF()
692+
.writeStream()
693+
.format("bigquery")
694+
.outputMode(OutputMode.Append())
695+
.option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET)
696+
.option("checkpointLocation", checkPointLocation)
697+
.option("table", destTableName)
698+
.start();
699+
writeStream.processAllAvailable();
700+
writeStream.stop();
701+
702+
List<Row> readRows = spark.read().format("bigquery").load(destTableName).collectAsList();
703+
assertThat(readRows).hasSize(20);
704+
assertThat(readRows.get(0)).isEqualTo(rows.get(0));
705+
}
706+
611707
private void writeDFNullableToBigQueryNullable_Internal(String writeAtLeastOnce)
612708
throws Exception {
613709
String destTableName =

0 commit comments

Comments
 (0)