Skip to content

Commit 571bf9d

Browse files
committed
Merge upstream/master into my branch
2 parents b591f83 + 4450031 commit 571bf9d

File tree

43 files changed

+3032
-549
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+3032
-549
lines changed
+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 3
3+
"modification": 4
44
}

Diff for: CHANGES.md

+37-16
Original file line numberDiff line numberDiff line change
@@ -54,18 +54,53 @@
5454
* ([#X](https://github.com/apache/beam/issues/X)).
5555
-->
5656

57-
# [2.64.0] - Unreleased
57+
# [2.65.0] - Unreleased
5858

5959
## Highlights
6060

61-
* Managed API for [Java](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/managed/Managed.html) and [Python](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.managed.html#module-apache_beam.transforms.managed) supports [key I/O connectors](https://beam.apache.org/documentation/io/connectors/) Iceberg, Kafka, and BigQuery.
6261
* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
6362
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
6463

6564
## I/Os
6665

6766
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
67+
68+
## New Features / Improvements
69+
70+
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
71+
72+
## Breaking Changes
73+
74+
* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
75+
76+
## Deprecations
77+
78+
* X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)).
79+
80+
## Bugfixes
81+
82+
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
83+
84+
## Security Fixes
85+
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
86+
87+
## Known Issues
88+
89+
[comment]: # ( When updating known issues after release, make sure also update website blog in website/www/site/content/blog.)
90+
* ([#X](https://github.com/apache/beam/issues/X)).
91+
92+
# [2.64.0] - Ongoing Release
93+
94+
## Highlights
95+
96+
* Managed API for [Java](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/managed/Managed.html) and [Python](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.managed.html#module-apache_beam.transforms.managed) supports [key I/O connectors](https://beam.apache.org/documentation/io/connectors/) Iceberg, Kafka, and BigQuery.
97+
* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
98+
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
99+
100+
## I/Os
101+
68102
* [Java] Use API compatible with both com.google.cloud.bigdataoss:util 2.x and 3.x in BatchLoads ([#34105](https://github.com/apache/beam/pull/34105))
103+
* [IcebergIO] Added new CDC source for batch and streaming, available as `Managed.ICEBERG_CDC` ([#33504](https://github.com/apache/beam/pull/33504))
69104
* [IcebergIO] Address edge case where bundle retry following a successful data commit results in data duplication ([#34264](https://github.com/apache/beam/pull/34264))
70105

71106
## New Features / Improvements
@@ -83,11 +118,6 @@
83118
* [Python] Reshuffle now correctly respects user-specified type hints, fixing a previous bug where it might use FastPrimitivesCoder wrongly. This change could break pipelines with incorrect type hints in Reshuffle. If you have issues after upgrading, temporarily set update_compatibility_version to a previous Beam version to use the old behavior. The recommended solution is to fix the type hints in your code. ([#33932](https://github.com/apache/beam/pull/33932))
84119
* [Java] SparkReceiver 2 has been moved to SparkReceiver 3 that supports Spark 3.x. ([#33574](https://github.com/apache/beam/pull/33574))
85120
* [Python] Correct parsing of `collections.abc.Sequence` type hints was added, which can lead to pipelines failing type hint checks that were previously passing erroneously. These issues will be most commonly seen trying to consume a PCollection with a `Sequence` type hint after a GroupByKey or a CoGroupByKey. ([#33999](https://github.com/apache/beam/pull/33999).
86-
* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
87-
88-
## Deprecations
89-
90-
* X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)).
91121

92122
## Bugfixes
93123

@@ -98,15 +128,6 @@
98128
* Fixed checkpoint recovery and streaming behavior in Spark Classic and Portable runner's Flatten transform by replacing queueStream with SingleEmitInputDStream ([#34080](https://github.com/apache/beam/pull/34080), [#18144](https://github.com/apache/beam/issues/18144), [#20426](https://github.com/apache/beam/issues/20426))
99129
* (Java) Fixed Read caching of UnboundedReader objects to effectively cache across multiple DoFns and avoid checkpointing unstarted reader. [#34146](https://github.com/apache/beam/pull/34146) [#33901](https://github.com/apache/beam/pull/33901)
100130

101-
102-
## Security Fixes
103-
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
104-
105-
## Known Issues
106-
107-
[comment]: # ( When updating known issues after release, make sure also update website blog in website/www/site/content/blog.)
108-
* ([#X](https://github.com/apache/beam/issues/X)).
109-
110131
# [2.63.0] - 2025-02-18
111132

112133
## I/Os

Diff for: contributor-docs/discussion-docs/2025.md

+6-2
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,9 @@ limitations under the License.
1515
# List Of Documents Submitted To [email protected] In 2025
1616
| No. | Author | Subject | Date (UTC) |
1717
|---|---|---|---|
18-
| 1 | Danny McCormick | [Beam Python Dependency Extras](https://docs.google.com/document/d/1c84Gc-cZRCfrU8f7kWGsNR2o8oSRjCM-dGHO9KvPWPw) | 2025-01-27 17:50:00 |
19-
| 2 | Danny McCormick | [How vLLM Model Handler Works (Plus a Summary of Model Memory Management in Beam ML)](https://docs.google.com/document/d/1UB4umrtnp1Eg45fiUB3iLS7kPK3BE6pcf0YRDkA289Q) | 2025-01-31 17:50:00 |
18+
| 1 | Kenneth Knowles | [Apache Beam Release Acceptance Criteria - Google Sheets](https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw) | 2025-01-13 10:54:22 |
19+
| 2 | Danny McCormick | [Apache Beam Vendored Dependencies Release Guide](https://s.apache.org/beam-release-vendored-artifacts) | 2025-01-13 15:00:51 |
20+
| 3 | Danny McCormick | [Beam Python & ML Dependency Extras](https://docs.google.com/document/d/1c84Gc-cZRCfrU8f7kWGsNR2o8oSRjCM-dGHO9KvPWPw) | 2025-01-27 15:33:36 |
21+
| 4 | Danny McCormick | [How vLLM Model Handler Works (Plus a Summary of Model Memory Management in Beam ML)](https://docs.google.com/document/d/1UB4umrtnp1Eg45fiUB3iLS7kPK3BE6pcf0YRDkA289Q) | 2025-01-31 11:56:59 |
22+
| 5 | Shunping Huang | [Improve Logging Dependencies in Beam Java SDK](https://docs.google.com/document/d/1IkbiM4m8D-aB3NYI1aErFZHt6M7BQ-8eCULh284Davs) | 2025-02-04 15:13:14 |
23+
| 6 | Ahmed Abualsaud | [Iceberg Incremental Source design](https://s.apache.org/beam-iceberg-incremental-source) | 2025-03-03 14:52:42 |

Diff for: model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto

+2
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ message ManagedTransforms {
7474
"beam:schematransform:org.apache.beam:bigquery_storage_read:v1"];
7575
BIGQUERY_WRITE = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) =
7676
"beam:schematransform:org.apache.beam:bigquery_write:v1"];
77+
ICEBERG_CDC_READ = 6 [(org.apache.beam.model.pipeline.v1.beam_urn) =
78+
"beam:schematransform:org.apache.beam:iceberg_cdc_read:v1"];
7779
}
7880
}
7981

Diff for: runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -321,9 +321,7 @@ public MetricUpdates getUpdates() {
321321

322322
// Add any metricKey labels to the monitoringInfoLabels.
323323
if (!metricName.getLabels().isEmpty()) {
324-
for (Map.Entry<String, String> entry : metricName.getLabels().entrySet()) {
325-
builder.setLabel(entry.getKey(), entry.getValue());
326-
}
324+
builder.setLabels(metricName.getLabels());
327325
}
328326
return builder;
329327
}

Diff for: runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -182,12 +182,12 @@ public static double decodeDoubleCounter(ByteString payload) {
182182
}
183183
}
184184

185-
/** Encodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
185+
/** Encodes to {@link MonitoringInfoConstants.TypeUrns#HISTOGRAM}. */
186186
public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
187187
return inputHistogram.toProto().toByteString();
188188
}
189189

190-
/** Decodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
190+
/** Decodes to {@link MonitoringInfoConstants.TypeUrns#HISTOGRAM}. */
191191
public static HistogramData decodeInt64Histogram(ByteString payload) {
192192
try {
193193
return new HistogramData(HistogramValue.parseFrom(payload));

Diff for: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java

+4
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ public List<String> getTopics() {
161161
return topics;
162162
}
163163

164+
public Map<String, Object> getConfigUpdates() {
165+
return configUpdates;
166+
}
167+
164168
@Override
165169
public BeamTableStatistics getTableStatistics(PipelineOptions options) {
166170
if (rowCountStatistics == null) {

Diff for: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java

+29-6
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@
2121
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
2222
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
2323

24+
import com.fasterxml.jackson.databind.JsonNode;
2425
import com.fasterxml.jackson.databind.node.ArrayNode;
2526
import com.fasterxml.jackson.databind.node.ObjectNode;
2627
import com.google.auto.service.AutoService;
28+
import java.util.HashMap;
29+
import java.util.Iterator;
2730
import java.util.List;
31+
import java.util.Map.Entry;
2832
import java.util.Optional;
2933
import org.apache.beam.sdk.extensions.sql.TableUtils;
3034
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
@@ -112,6 +116,8 @@ public BeamSqlTable buildBeamSqlTable(Table table) {
112116
properties.has("format")
113117
? Optional.of(properties.get("format").asText())
114118
: Optional.empty();
119+
120+
BeamKafkaTable kafkaTable = null;
115121
if (Schemas.isNestedSchema(schema)) {
116122
Optional<PayloadSerializer> serializer =
117123
payloadFormat.map(
@@ -120,7 +126,7 @@ public BeamSqlTable buildBeamSqlTable(Table table) {
120126
format,
121127
checkArgumentNotNull(schema.getField(PAYLOAD_FIELD).getType().getRowSchema()),
122128
TableUtils.convertNode2Map(properties)));
123-
return new NestedPayloadKafkaTable(schema, bootstrapServers, topics, serializer);
129+
kafkaTable = new NestedPayloadKafkaTable(schema, bootstrapServers, topics, serializer);
124130
} else {
125131
/*
126132
* CSV is handled separately because multiple rows can be produced from a single message, which
@@ -129,13 +135,30 @@ public BeamSqlTable buildBeamSqlTable(Table table) {
129135
* rows.
130136
*/
131137
if (payloadFormat.orElse("csv").equals("csv")) {
132-
return new BeamKafkaCSVTable(schema, bootstrapServers, topics);
138+
kafkaTable = new BeamKafkaCSVTable(schema, bootstrapServers, topics);
139+
} else {
140+
PayloadSerializer serializer =
141+
PayloadSerializers.getSerializer(
142+
payloadFormat.get(), schema, TableUtils.convertNode2Map(properties));
143+
kafkaTable = new PayloadSerializerKafkaTable(schema, bootstrapServers, topics, serializer);
144+
}
145+
}
146+
147+
// Get Consumer Properties from Table properties
148+
HashMap<String, Object> configUpdates = new HashMap<String, Object>();
149+
Iterator<Entry<String, JsonNode>> tableProperties = properties.fields();
150+
while (tableProperties.hasNext()) {
151+
Entry<String, JsonNode> field = tableProperties.next();
152+
if (field.getKey().startsWith("properties.")) {
153+
configUpdates.put(field.getKey().replace("properties.", ""), field.getValue().textValue());
133154
}
134-
PayloadSerializer serializer =
135-
PayloadSerializers.getSerializer(
136-
payloadFormat.get(), schema, TableUtils.convertNode2Map(properties));
137-
return new PayloadSerializerKafkaTable(schema, bootstrapServers, topics, serializer);
138155
}
156+
157+
if (!configUpdates.isEmpty()) {
158+
kafkaTable.updateConsumerProperties(configUpdates);
159+
}
160+
161+
return kafkaTable;
139162
}
140163

141164
@Override

Diff for: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java

+48-8
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@
2424
import com.fasterxml.jackson.databind.node.ArrayNode;
2525
import com.fasterxml.jackson.databind.node.ObjectNode;
2626
import java.util.List;
27+
import java.util.Map;
2728
import org.apache.beam.sdk.extensions.protobuf.PayloadMessages;
2829
import org.apache.beam.sdk.extensions.sql.TableUtils;
2930
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
3031
import org.apache.beam.sdk.extensions.sql.meta.Table;
3132
import org.apache.beam.sdk.io.thrift.payloads.SimpleThriftMessage;
3233
import org.apache.beam.sdk.schemas.Schema;
3334
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
35+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
3436
import org.apache.thrift.TBase;
3537
import org.apache.thrift.protocol.TCompactProtocol;
3638
import org.apache.thrift.protocol.TProtocolFactory;
@@ -84,6 +86,32 @@ public void testBuildWithExtraTopics() {
8486
assertEquals(ImmutableList.of(LOCATION_TOPIC, "topic2", "topic3"), kafkaTable.getTopics());
8587
}
8688

89+
@Test
90+
public void testBuildWithExtraProperties() {
91+
Table table =
92+
mockTableWithExtraProperties(
93+
"hello",
94+
ImmutableMap.of(
95+
"properties.ssl.truststore.location",
96+
"/path/to/kafka.client.truststore.jks",
97+
"properties.security.protocol",
98+
"SASL_SSL"));
99+
BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
100+
101+
assertNotNull(sqlTable);
102+
assertTrue(sqlTable instanceof BeamKafkaCSVTable);
103+
104+
BeamKafkaCSVTable kafkaTable = (BeamKafkaCSVTable) sqlTable;
105+
assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
106+
assertEquals(
107+
ImmutableMap.of(
108+
"ssl.truststore.location",
109+
"/path/to/kafka.client.truststore.jks",
110+
"security.protocol",
111+
"SASL_SSL"),
112+
kafkaTable.getConfigUpdates());
113+
}
114+
87115
@Test
88116
public void testBuildBeamSqlAvroTable() {
89117
Table table = mockTable("hello", "avro");
@@ -157,50 +185,56 @@ public void testGetTableType() {
157185
}
158186

159187
private static Table mockTable(String name) {
160-
return mockTable(name, false, null, null, null, null, null, null);
188+
return mockTable(name, false, null, null, null, null, null, null, null);
161189
}
162190

163191
private static Table mockTableWithExtraServers(String name, List<String> extraBootstrapServers) {
164-
return mockTable(name, false, extraBootstrapServers, null, null, null, null, null);
192+
return mockTable(name, false, extraBootstrapServers, null, null, null, null, null, null);
165193
}
166194

167195
private static Table mockTableWithExtraTopics(String name, List<String> extraTopics) {
168-
return mockTable(name, false, null, extraTopics, null, null, null, null);
196+
return mockTable(name, false, null, extraTopics, null, null, null, null, null);
197+
}
198+
199+
private static Table mockTableWithExtraProperties(
200+
String name, Map<String, String> extraProperties) {
201+
return mockTable(name, false, null, null, extraProperties, null, null, null, null);
169202
}
170203

171204
private static Table mockTable(String name, String payloadFormat) {
172-
return mockTable(name, false, null, null, payloadFormat, null, null, null);
205+
return mockTable(name, false, null, null, null, payloadFormat, null, null, null);
173206
}
174207

175208
private static Table mockProtoTable(String name, Class<?> protoClass) {
176-
return mockTable(name, false, null, null, "proto", protoClass, null, null);
209+
return mockTable(name, false, null, null, null, "proto", protoClass, null, null);
177210
}
178211

179212
private static Table mockThriftTable(
180213
String name,
181214
Class<? extends TBase<?, ?>> thriftClass,
182215
Class<? extends TProtocolFactory> thriftProtocolFactoryClass) {
183216
return mockTable(
184-
name, false, null, null, "thrift", null, thriftClass, thriftProtocolFactoryClass);
217+
name, false, null, null, null, "thrift", null, thriftClass, thriftProtocolFactoryClass);
185218
}
186219

187220
private static Table mockNestedBytesTable(String name) {
188-
return mockTable(name, true, null, null, null, null, null, null);
221+
return mockTable(name, true, null, null, null, null, null, null, null);
189222
}
190223

191224
private static Table mockNestedThriftTable(
192225
String name,
193226
Class<? extends TBase<?, ?>> thriftClass,
194227
Class<? extends TProtocolFactory> thriftProtocolFactoryClass) {
195228
return mockTable(
196-
name, true, null, null, "thrift", null, thriftClass, thriftProtocolFactoryClass);
229+
name, true, null, null, null, "thrift", null, thriftClass, thriftProtocolFactoryClass);
197230
}
198231

199232
private static Table mockTable(
200233
String name,
201234
boolean isNested,
202235
@Nullable List<String> extraBootstrapServers,
203236
@Nullable List<String> extraTopics,
237+
@Nullable Map<String, String> extraProperties,
204238
@Nullable String payloadFormat,
205239
@Nullable Class<?> protoClass,
206240
@Nullable Class<? extends TBase<?, ?>> thriftClass,
@@ -222,6 +256,12 @@ private static Table mockTable(
222256
properties.put("topics", topics);
223257
}
224258

259+
if (extraProperties != null) {
260+
for (Map.Entry<String, String> property : extraProperties.entrySet()) {
261+
properties.put(property.getKey(), property.getValue());
262+
}
263+
}
264+
225265
if (payloadFormat != null) {
226266
properties.put("format", payloadFormat);
227267
}

Diff for: sdks/java/io/iceberg/build.gradle

+2-1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ dependencies {
5050
implementation library.java.slf4j_api
5151
implementation library.java.joda_time
5252
implementation "org.apache.parquet:parquet-column:$parquet_version"
53+
implementation "org.apache.parquet:parquet-hadoop:$parquet_version"
5354
implementation "org.apache.orc:orc-core:$orc_version"
5455
implementation "org.apache.iceberg:iceberg-core:$iceberg_version"
5556
implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
@@ -88,7 +89,7 @@ dependencies {
8889
testImplementation library.java.google_api_services_bigquery
8990

9091
testRuntimeOnly library.java.slf4j_jdk14
91-
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
92+
testImplementation project(path: ":runners:direct-java", configuration: "shadow")
9293
testRuntimeOnly project(path: ":runners:google-cloud-dataflow-java")
9394
hadoopVersions.each {kv ->
9495
"hadoopVersion$kv.key" "org.apache.hadoop:hadoop-client:$kv.value"

Diff for: sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public String apply(FileWriteResult input) {
8282
.apply(
8383
"Append metadata updates to tables",
8484
ParDo.of(new AppendFilesToTablesDoFn(catalogConfig, manifestFilePrefix)))
85-
.setCoder(KvCoder.of(StringUtf8Coder.of(), SnapshotInfo.CODER));
85+
.setCoder(KvCoder.of(StringUtf8Coder.of(), SnapshotInfo.getCoder()));
8686
}
8787

8888
private static class AppendFilesToTablesDoFn

0 commit comments

Comments
 (0)