Skip to content

Commit 496945b

Browse files
committed
Merge remote-tracking branch 'upstream/master' into bq-read-schema
2 parents 68d0c72 + 35732ce commit 496945b

File tree

18 files changed

+337
-27
lines changed

18 files changed

+337
-27
lines changed

Diff for: .asf.yaml

+3-5
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,9 @@ github:
3636

3737
# Give some users issue triage permissions
3838
collaborators:
39-
- pcoet
40-
- olehborysevych
41-
- rshamunov
42-
- andreydevyatkin
43-
- liferoad
39+
- Amar3tto
40+
- mrshakirov
41+
- akashorabek
4442

4543
enabled_merge_buttons:
4644
squash: true
+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
2-
"comment": "Modify this file in a trivial way to cause this test suite to run",
2+
"comment": "Modify this file in a trivial way to cause this test suite to run.",
33
"modification": 1
44
}

Diff for: .test-infra/tools/stale_k8s_workload_cleaner.sh

+4-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ function should_teardown() {
4343
gcloud container clusters get-credentials io-datastores --zone us-central1-a --project apache-beam-testing
4444

4545
while read NAME STATUS AGE; do
46-
if [[ $NAME =~ ^beam-.+(test|-it) ]] && should_teardown $AGE; then
46+
# Regex has temporary workaround to avoid trying to delete beam-performancetests-singlestoreio-* to avoid getting stuck in a terminal state
47+
# See https://github.com/apache/beam/pull/33545 for context.
48+
# This may be safe to remove if https://cloud.google.com/knowledge/kb/deleted-namespace-remains-in-terminating-status-000004867 has been resolved, just try it before checking in :)
49+
if [[ $NAME =~ ^beam-.+(test|-it)(?!s-singlestoreio) ]] && should_teardown $AGE; then
4750
kubectl delete namespace $NAME
4851
fi
4952
done < <( kubectl get namespaces --context=gke_${PROJECT}_${LOCATION}_${CLUSTER} )

Diff for: CHANGES.md

+11
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080

8181
## Bugfixes
8282

83+
* Fix data loss issues when reading gzipped files with TextIO (Python) ([#18390](https://github.com/apache/beam/issues/18390), [#31040](https://github.com/apache/beam/issues/31040)).
8384
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
8485
* [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231))
8586

@@ -115,6 +116,7 @@
115116
## Bugfixes
116117

117118
* Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)).
119+
* [Managed Iceberg] Fixed a bug where DataFile metadata was assigned incorrect partition values ([#33549](https://github.com/apache/beam/pull/33549)).
118120

119121
## Security Fixes
120122

@@ -157,6 +159,11 @@
157159
* Adding flag to support conditionally disabling auto-commit in JdbcIO ReadFn ([#31111](https://github.com/apache/beam/issues/31111))
158160
* (Python) Fixed BigQuery Enrichment bug that can lead to multiple conditions returning duplicate rows, batching returning incorrect results and conditions not scoped by row during batching ([#32780](https://github.com/apache/beam/pull/32780)).
159161

162+
## Known Issues
163+
164+
* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)).
165+
* Fixed in 2.62.0
166+
160167
# [2.60.0] - 2024-10-17
161168

162169
## Highlights
@@ -211,6 +218,8 @@ when running on 3.8. ([#31192](https://github.com/apache/beam/issues/31192))
211218
* Duplicate Rows: Multiple conditions may be applied incorrectly, leading to the duplication of rows in the output.
212219
* Incorrect Results with Batched Requests: Conditions may not be correctly scoped to individual rows within the batch, potentially causing inaccurate results.
213220
* Fixed in 2.61.0.
221+
* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)).
222+
* Fixed in 2.62.0
214223

215224
# [2.59.0] - 2024-09-11
216225

@@ -259,6 +268,8 @@ when running on 3.8. ([#31192](https://github.com/apache/beam/issues/31192))
259268
* Duplicate Rows: Multiple conditions may be applied incorrectly, leading to the duplication of rows in the output.
260269
* Incorrect Results with Batched Requests: Conditions may not be correctly scoped to individual rows within the batch, potentially causing inaccurate results.
261270
* Fixed in 2.61.0.
271+
* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)).
272+
* Fixed in 2.62.0
262273

263274
# [2.58.1] - 2024-08-15
264275

Diff for: buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

+5
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,7 @@ class BeamModulePlugin implements Plugin<Project> {
618618
def influxdb_version = "2.19"
619619
def httpclient_version = "4.5.13"
620620
def httpcore_version = "4.4.14"
621+
def iceberg_bqms_catalog_version = "1.5.2-0.1.0"
621622
def jackson_version = "2.15.4"
622623
def jaxb_api_version = "2.3.3"
623624
def jsr305_version = "3.0.2"
@@ -650,6 +651,10 @@ class BeamModulePlugin implements Plugin<Project> {
650651

651652
// Export Spark versions, so they are defined in a single place only
652653
project.ext.spark3_version = spark3_version
654+
// version for BigQueryMetastore catalog (used by sdks:java:io:iceberg:bqms)
655+
// TODO: remove this and download the jar normally when the catalog gets
656+
// open-sourced (https://github.com/apache/iceberg/pull/11039)
657+
project.ext.iceberg_bqms_catalog_version = iceberg_bqms_catalog_version
653658

654659
// A map of maps containing common libraries used per language. To use:
655660
// dependencies {

Diff for: sdks/java/container/boot.go

+13-3
Original file line numberDiff line numberDiff line change
@@ -134,21 +134,31 @@ func main() {
134134
}
135135

136136
const jarsDir = "/opt/apache/beam/jars"
137+
const javaHarnessJar = "beam-sdks-java-harness.jar"
137138
cp := []string{
138139
filepath.Join(jarsDir, "slf4j-api.jar"),
139140
filepath.Join(jarsDir, "slf4j-jdk14.jar"),
140141
filepath.Join(jarsDir, "jcl-over-slf4j.jar"),
141142
filepath.Join(jarsDir, "log4j-over-slf4j.jar"),
142143
filepath.Join(jarsDir, "log4j-to-slf4j.jar"),
143-
filepath.Join(jarsDir, "beam-sdks-java-harness.jar"),
144+
filepath.Join(jarsDir, javaHarnessJar),
144145
}
145146

146147
var hasWorkerExperiment = strings.Contains(options, "use_staged_dataflow_worker_jar")
147148
for _, a := range artifacts {
148149
name, _ := artifact.MustExtractFilePayload(a)
149150
if hasWorkerExperiment {
150-
if strings.HasPrefix(name, "beam-runners-google-cloud-dataflow-java-fn-api-worker") {
151-
continue
151+
if strings.HasPrefix(name, "beam-sdks-java-harness") {
152+
// Remove system "beam-sdks-java-harness.jar". User-provided jar will be
153+
// added to classpath as a normal user jar further below.
154+
for i, cl := range cp {
155+
if !strings.HasSuffix(cl, javaHarnessJar) {
156+
continue
157+
}
158+
logger.Printf(ctx, "Using staged java harness: %v", name)
159+
cp = append(cp[:i], cp[i+1:]...)
160+
break
161+
}
152162
}
153163
if name == "dataflow-worker.jar" {
154164
continue

Diff for: sdks/java/io/expansion-service/build.gradle

+4-2
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,13 @@ dependencies {
5959
// **** IcebergIO runtime dependencies ****
6060
runtimeOnly library.java.hadoop_auth
6161
runtimeOnly library.java.hadoop_client
62-
// Needed when using GCS as the warehouse location.
62+
// For writing to GCS
6363
runtimeOnly library.java.bigdataoss_gcs_connector
64-
// Needed for HiveCatalog
64+
// HiveCatalog
6565
runtimeOnly ("org.apache.iceberg:iceberg-hive-metastore:1.4.2")
6666
runtimeOnly project(path: ":sdks:java:io:iceberg:hive")
67+
// BigQueryMetastoreCatalog (Java 11+)
68+
runtimeOnly project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow")
6769

6870
runtimeOnly library.java.kafka_clients
6971
runtimeOnly library.java.slf4j_jdk14

Diff for: sdks/java/io/iceberg/bqms/build.gradle

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* License); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an AS IS BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
plugins {
19+
id 'org.apache.beam.module'
20+
}
21+
22+
applyJavaNature(
23+
automaticModuleName: 'org.apache.beam.sdk.io.iceberg.bqms',
24+
shadowClosure: {},
25+
exportJavadoc: false,
26+
publish: false, // it's an intermediate jar for io-expansion-service
27+
validateShadowJar: false
28+
)
29+
30+
def libDir = "$buildDir/libs"
31+
def bqmsFileName = "iceberg-bqms-catalog-${iceberg_bqms_catalog_version}.jar"
32+
task downloadBqmsJar(type: Copy) {
33+
// TODO: remove this workaround and downlooad normally when the catalog gets open-sourced:
34+
// (https://github.com/apache/iceberg/pull/11039)
35+
def jarUrl = "https://storage.googleapis.com/spark-lib/bigquery/iceberg-bigquery-catalog-${iceberg_bqms_catalog_version}.jar"
36+
def outputDir = file("$libDir")
37+
outputDir.mkdirs()
38+
def destFile = new File(outputDir, bqmsFileName)
39+
40+
if (!destFile.exists()) {
41+
try {
42+
ant.get(src: jarUrl, dest: destFile)
43+
println "Successfully downloaded BQMS catalog jar: $destFile"
44+
} catch (Exception e) {
45+
println "Could not download $jarUrl: ${e.message}"
46+
}
47+
}
48+
}
49+
50+
repositories {
51+
flatDir {
52+
dirs "$libDir"
53+
}
54+
}
55+
56+
compileJava.dependsOn downloadBqmsJar
57+
58+
dependencies {
59+
implementation files("$libDir/$bqmsFileName")
60+
}
61+
62+
description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg :: BigQuery Metastore"
63+
ext.summary = "A copy of the BQMS catalog."

Diff for: sdks/java/io/iceberg/build.gradle

+8
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ dependencies {
8383
exclude group: "org.apache.parquet", module: "parquet-hadoop-bundle"
8484
}
8585

86+
// BigQueryMetastore catalog dep
87+
testImplementation project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow")
88+
8689
testRuntimeOnly library.java.slf4j_jdk14
8790
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
8891
testRuntimeOnly project(path: ":runners:google-cloud-dataflow-java")
@@ -136,6 +139,11 @@ task integrationTest(type: Test) {
136139
outputs.upToDateWhen { false }
137140

138141
include '**/*IT.class'
142+
// BQ metastore catalog doesn't support java 8
143+
if (project.findProperty('testJavaVersion') == '8' ||
144+
JavaVersion.current().equals(JavaVersion.VERSION_1_8)) {
145+
exclude '**/BigQueryMetastoreCatalogIT.class'
146+
}
139147

140148
maxParallelForks 4
141149
classpath = sourceSets.test.runtimeClasspath

Diff for: sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java

+12-8
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,9 @@ class DestinationState {
9696
private final IcebergDestination icebergDestination;
9797
private final PartitionSpec spec;
9898
private final org.apache.iceberg.Schema schema;
99-
private final PartitionKey partitionKey;
99+
// used to determine the partition to which a record belongs
100+
// must not be directly used to create a writer
101+
private final PartitionKey routingPartitionKey;
100102
private final Table table;
101103
private final String stateToken = UUID.randomUUID().toString();
102104
final Cache<PartitionKey, RecordWriter> writers;
@@ -109,7 +111,7 @@ class DestinationState {
109111
this.icebergDestination = icebergDestination;
110112
this.schema = table.schema();
111113
this.spec = table.spec();
112-
this.partitionKey = new PartitionKey(spec, schema);
114+
this.routingPartitionKey = new PartitionKey(spec, schema);
113115
this.table = table;
114116
for (PartitionField partitionField : spec.fields()) {
115117
partitionFieldMap.put(partitionField.name(), partitionField);
@@ -154,12 +156,12 @@ class DestinationState {
154156
* can't create a new writer, the {@link Record} is rejected and {@code false} is returned.
155157
*/
156158
boolean write(Record record) {
157-
partitionKey.partition(getPartitionableRecord(record));
159+
routingPartitionKey.partition(getPartitionableRecord(record));
158160

159-
if (!writers.asMap().containsKey(partitionKey) && openWriters >= maxNumWriters) {
161+
if (!writers.asMap().containsKey(routingPartitionKey) && openWriters >= maxNumWriters) {
160162
return false;
161163
}
162-
RecordWriter writer = fetchWriterForPartition(partitionKey);
164+
RecordWriter writer = fetchWriterForPartition(routingPartitionKey);
163165
writer.write(record);
164166
return true;
165167
}
@@ -173,10 +175,12 @@ private RecordWriter fetchWriterForPartition(PartitionKey partitionKey) {
173175
RecordWriter recordWriter = writers.getIfPresent(partitionKey);
174176

175177
if (recordWriter == null || recordWriter.bytesWritten() > maxFileSize) {
178+
// each writer must have its own PartitionKey object
179+
PartitionKey copy = partitionKey.copy();
176180
// calling invalidate for a non-existent key is a safe operation
177-
writers.invalidate(partitionKey);
178-
recordWriter = createWriter(partitionKey);
179-
writers.put(partitionKey, recordWriter);
181+
writers.invalidate(copy);
182+
recordWriter = createWriter(copy);
183+
writers.put(copy, recordWriter);
180184
}
181185
return recordWriter;
182186
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.iceberg.catalog;
19+
20+
import java.io.IOException;
21+
import java.util.Map;
22+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
23+
import org.apache.hadoop.conf.Configuration;
24+
import org.apache.iceberg.CatalogUtil;
25+
import org.apache.iceberg.catalog.Catalog;
26+
import org.apache.iceberg.catalog.Namespace;
27+
import org.apache.iceberg.catalog.TableIdentifier;
28+
29+
public class BigQueryMetastoreCatalogIT extends IcebergCatalogBaseIT {
30+
static final String BQMS_CATALOG = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog";
31+
static final String DATASET = "managed_iceberg_bqms_tests_no_delete";
32+
static final long SALT = System.nanoTime();
33+
34+
@Override
35+
public String tableId() {
36+
return DATASET + "." + testName.getMethodName() + "_" + SALT;
37+
}
38+
39+
@Override
40+
public Catalog createCatalog() {
41+
return CatalogUtil.loadCatalog(
42+
BQMS_CATALOG,
43+
"bqms_" + catalogName,
44+
ImmutableMap.<String, String>builder()
45+
.put("gcp_project", options.getProject())
46+
.put("gcp_location", "us-central1")
47+
.put("warehouse", warehouse)
48+
.build(),
49+
new Configuration());
50+
}
51+
52+
@Override
53+
public void catalogCleanup() throws IOException {
54+
for (TableIdentifier tableIdentifier : catalog.listTables(Namespace.of(DATASET))) {
55+
// only delete tables that were created in this test run
56+
if (tableIdentifier.name().contains(String.valueOf(SALT))) {
57+
catalog.dropTable(tableIdentifier);
58+
}
59+
}
60+
}
61+
62+
@Override
63+
public Map<String, Object> managedIcebergConfig(String tableId) {
64+
return ImmutableMap.<String, Object>builder()
65+
.put("table", tableId)
66+
.put(
67+
"catalog_properties",
68+
ImmutableMap.<String, String>builder()
69+
.put("gcp_project", options.getProject())
70+
.put("gcp_location", "us-central1")
71+
.put("warehouse", warehouse)
72+
.put("catalog-impl", BQMS_CATALOG)
73+
.build())
74+
.build();
75+
}
76+
}

Diff for: sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java

-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ public Catalog createCatalog() {
6565

6666
@Override
6767
public void catalogCleanup() throws Exception {
68-
System.out.println("xxx CLEANING UP!");
6968
if (hiveMetastoreExtension != null) {
7069
hiveMetastoreExtension.cleanup();
7170
}

0 commit comments

Comments
 (0)