Skip to content

Commit e117a8c

Browse files
authored
Merge branch 'master' into streaming
2 parents fe03b35 + bb9f7c1 commit e117a8c

File tree

4 files changed

+37
-47
lines changed

4 files changed

+37
-47
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* Added new connector, `spark-4.1-bigquery` aimed to be used in Spark 4.1. Like Spark 4.1, this connector requires at
55
least Java 17 runtime. It is currently in preview mode.
66
* PR #1445: Add streaming support for Spark DS v2 indirect write.
7+
* PR #1452: Improved the performance of the dynamic partition overwrite for RANGE_BUCKET partitioned tables.
78

89
## 0.43.1 - 2025-10-22
910
* Issue #1417: Fixed ClassCastException in AWS federated identity

README-template.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1197,8 +1197,8 @@ val df = spark.read.format("bigquery")
11971197

11981198
### Configuring Partitioning
11991199

1200-
By default the connector creates one partition per 400MB in the table being read (before filtering). This should roughly correspond to the maximum number of readers supported by the BigQuery Storage API.
1201-
This can be configured explicitly with the <code>[maxParallelism](#properties)</code> property. BigQuery may limit the number of partitions based on server constraints.
1200+
By default, the connector calculates the requested `maxParallelism` as the larger of `preferredMinParallelism` (which defaults to 3 times the application's default parallelism) and 20,000. BigQuery may limit the number of partitions based on server constraints.
1201+
Both <code>[maxParallelism](#properties)</code> and <code>[preferredMinParallelism](#properties)</code> can be configured explicitly to control the number of partitions.
12021202

12031203
## Tagging BigQuery Resources
12041204

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1191,8 +1191,8 @@ val df = spark.read.format("bigquery")
11911191

11921192
### Configuring Partitioning
11931193

1194-
By default the connector creates one partition per 400MB in the table being read (before filtering). This should roughly correspond to the maximum number of readers supported by the BigQuery Storage API.
1195-
This can be configured explicitly with the <code>[maxParallelism](#properties)</code> property. BigQuery may limit the number of partitions based on server constraints.
1194+
By default, the connector calculates the requested `maxParallelism` as the larger of `preferredMinParallelism` (which defaults to 3 times the application's default parallelism) and 20,000. BigQuery may limit the number of partitions based on server constraints.
1195+
Both <code>[maxParallelism](#properties)</code> and <code>[preferredMinParallelism](#properties)</code> can be configured explicitly to control the number of partitions.
11961196

11971197
## Tagging BigQuery Resources
11981198

bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryUtil.java

Lines changed: 32 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -769,6 +769,23 @@ static String getQueryForTimePartitionedTable(
769769
String.format(
770770
"%s(`target`.`%s`, %s)", truncFuntion, partitionField, partitionType.toString());
771771

772+
return createOptimizedMergeQuery(
773+
destinationDefinition,
774+
destinationTableName,
775+
temporaryTableName,
776+
extractedPartitionedSource,
777+
extractedPartitionedTarget,
778+
/* partitionMatchAdditionalCondition */ "TRUE");
779+
}
780+
781+
private static String createOptimizedMergeQuery(
782+
StandardTableDefinition destinationDefinition,
783+
String destinationTableName,
784+
String temporaryTableName,
785+
String extractedPartitionedSource,
786+
String extractedPartitionedTarget,
787+
String partitionMatchAdditionalCondition) {
788+
FieldList allFields = destinationDefinition.getSchema().getFields();
772789
String commaSeparatedFields =
773790
allFields.stream().map(Field::getName).collect(Collectors.joining("`,`", "`", "`"));
774791

@@ -778,7 +795,7 @@ static String getQueryForTimePartitionedTable(
778795
+ "MERGE `%s` AS target\n"
779796
+ "USING `%s` AS source\n"
780797
+ "ON FALSE\n"
781-
+ "WHEN NOT MATCHED BY SOURCE AND %s IN UNNEST(partitions_to_delete) THEN DELETE\n"
798+
+ "WHEN NOT MATCHED BY SOURCE AND (%s) AND %s IN UNNEST(partitions_to_delete) THEN DELETE\n"
782799
+ "WHEN NOT MATCHED BY TARGET THEN\n"
783800
+ "INSERT(%s) VALUES(%s)";
784801
return String.format(
@@ -787,6 +804,7 @@ static String getQueryForTimePartitionedTable(
787804
temporaryTableName,
788805
destinationTableName,
789806
temporaryTableName,
807+
partitionMatchAdditionalCondition,
790808
extractedPartitionedTarget,
791809
commaSeparatedFields,
792810
commaSeparatedFields);
@@ -803,57 +821,28 @@ static String getQueryForRangePartitionedTable(
803821
long interval = rangePartitioning.getRange().getInterval();
804822

805823
String partitionField = rangePartitioning.getField();
806-
String extractedPartitioned =
807-
"IFNULL(IF(%s.%s >= %s, 0, RANGE_BUCKET(%s.%s, GENERATE_ARRAY(%s, %s, %s))), -1)";
824+
808825
String extractedPartitionedSource =
809826
String.format(
810-
extractedPartitioned,
811-
"source",
812-
partitionField,
813-
end,
814-
"source",
815-
partitionField,
816-
start,
817-
end,
818-
interval);
827+
"IFNULL(IF(%s >= %s, 0, RANGE_BUCKET(%s, GENERATE_ARRAY(%s, %s, %s))), -1)",
828+
partitionField, end, partitionField, start, end, interval);
819829
String extractedPartitionedTarget =
820830
String.format(
821-
extractedPartitioned,
822-
"target",
823-
partitionField,
824-
end,
825-
"target",
826-
partitionField,
827-
start,
828-
end,
829-
interval);
830-
831-
FieldList allFields = destinationDefinition.getSchema().getFields();
832-
String commaSeparatedFields =
833-
allFields.stream().map(Field::getName).collect(Collectors.joining("`,`", "`", "`"));
834-
String booleanInjectedColumn = "_" + Long.toString(1234567890123456789L);
831+
"IFNULL(IF(target.%s >= %s, 0, RANGE_BUCKET(target.%s, GENERATE_ARRAY(%s, %s, %s))), -1)",
832+
partitionField, end, partitionField, start, end, interval);
833+
// needed for tables that require the partition field to be in the where clause. It must be
834+
// true.
835+
String partitionMatchAdditionalCondition =
836+
String.format(
837+
"target.%s is NULL OR target.%s >= %d", partitionField, partitionField, Long.MIN_VALUE);
835838

836-
String queryFormat =
837-
"MERGE `%s` AS target\n"
838-
+ "USING (SELECT * FROM `%s` CROSS JOIN UNNEST([true, false]) %s) AS source\n"
839-
+ "ON %s = %s AND %s AND (target.%s >= %d OR target.%s IS NULL )\n"
840-
+ "WHEN MATCHED THEN DELETE\n"
841-
+ "WHEN NOT MATCHED AND NOT %s THEN\n"
842-
+ "INSERT(%s) VALUES(%s)";
843-
return String.format(
844-
queryFormat,
839+
return createOptimizedMergeQuery(
840+
destinationDefinition,
845841
destinationTableName,
846842
temporaryTableName,
847-
booleanInjectedColumn,
848843
extractedPartitionedSource,
849844
extractedPartitionedTarget,
850-
booleanInjectedColumn,
851-
partitionField,
852-
BIGQUERY_INTEGER_MIN_VALUE,
853-
partitionField,
854-
booleanInjectedColumn,
855-
commaSeparatedFields,
856-
commaSeparatedFields);
845+
partitionMatchAdditionalCondition);
857846
}
858847

859848
// based on https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfiguration, it

0 commit comments

Comments
 (0)