Skip to content

Commit 84b196e

Browse files
authored
Merge branch 'master' into address-37990-shardedkey-consolidation
2 parents 3ea385d + b4c4571 commit 84b196e

559 files changed

Lines changed: 12540 additions & 2436 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
---
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
name: adding-new-metadata
20+
description: Guide on how to add and propagate new metadata fields in Apache Beam's WindowedValue, extending protos, windmill persistence, and runner interfaces to avoid metadata loss.
21+
---
22+
23+
# Adding New Metadata to WindowedValue
24+
25+
This skill provides a comprehensive guide on adding new metadata (e.g., CDC metadata, drain mode flags, OpenTelemetry trace context) to Apache Beam's `WindowedValue` and ensuring it propagates correctly through the execution engine. Failing to propagate metadata in all necessary places will result in metadata loss during pipeline execution.
26+
27+
## 1. Extending the Proto Model
28+
29+
When adding new metadata that must cross worker boundaries or be serialized by the Fn API, the proto definitions must be updated.
30+
31+
* **Key Files:** `model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto`
32+
* **Action:** Add the new metadata field to the appropriate message (`ElementMetadata`).
33+
* **Note:** Add proper documentation in proto. Type of the field can be different from the type in WindowedValue, see OpenTelemetry trace context for example.
34+
35+
## 2. WindowedValue Interface and Implementations
36+
37+
The `WindowedValue` is the core container for elements flowing through a Beam pipeline. It holds the value, timestamp, windows, pane info, and any additional metadata.
38+
39+
### Core Interface Updates
40+
* **Key File:** `sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java`
41+
* **Action:** Add getter methods for your new metadata.
42+
43+
### Concrete Implementations
44+
You must update **all** concrete implementations of `WindowedValue` to store and return the new metadata. If you miss one, metadata will be silently dropped.
45+
* `ValueInGlobalWindow`
46+
* `ValueInSingleWindow`
47+
* `ValueInEmptyWindows` (often used inside runners, like Dataflow's worker package)
48+
* **Action:** Update constructors, factory methods (`of()`), fields in these classes and coders.
49+
50+
### OutputBuilder vs. Context Output
51+
* **IMPORTANT:** Do **not** add new arguments to legacy methods like `context.outputWindowedValue(...)` or `WindowedValue.of(value, timestamp, windows, pane)`. This causes brittleness and breaks the API for every new metadata field.
52+
* **Action:** Modify `OutputBuilder` (`sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java`) to accept the new metadata (e.g., `.withDrainMode(...)`, `.withTraceContext(...)`). Use the builder pattern when constructing outputs to propagate offset and record IDs smoothly.
53+
54+
## 3. Windmill Persistence (Dataflow Streaming Engine) Runner v1
55+
56+
For the Dataflow streaming runner, metadata must survive serialization to and from the Windmill backend.
57+
58+
* **Serialization (Sink):**
59+
* **File:** `runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java`
60+
* **Action:** Extract the metadata from the `WindowedValue`, and add it to already created ElementMetadata proto builder.
61+
* **Deserialization (Reader):**
62+
* **Files:** `runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java` and `WindowingWindmillReader.java`
63+
* **Action:** Extract the metadata from ElementMetadata proto and reconstruct the `WindowedValue` using the updated factory methods/builders that include the metadata. This is incremental work, as plenty of metadata is already extracted from the proto.
64+
65+
## 4. Propagation Across Core Classes
66+
67+
Metadata must be explicitly copied or forwarded whenever a `WindowedValue` is transformed, buffered, or processed.
68+
69+
### DoFn Runners (Java Core)
70+
You must ensure that when a DoFn processes an element and outputs a new element, the appropriate metadata from the *input* is propagated to the *output* (unless explicitly changed by the logic).
71+
* `runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java`
72+
* `runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java`
73+
* `runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java`
74+
* `runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java`
75+
* `sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java`
76+
77+
**Action:** When these runners call `outputWindowedValue()`, they should extract the metadata from the input or current context and attach it using the `OutputBuilder` or the new `WindowedValue` interfaces.
78+
79+
### Grouping and Reducing
80+
* `runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java`
81+
* `runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java`
82+
* **Action:** Ensure that during GroupByKey/Combine operations, if metadata needs to be preserved (e.g., `CausedByDrain`), it is correctly passed into the `ReduceFnContextFactory` and propagated when outputting the grouped results.
83+
84+
### Splittable DoFns (SDF)
85+
* `runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java`
86+
* `sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java`
87+
88+
### Timers
89+
If metadata needs to survive timer firings (e.g., knowing an `@OnTimer` fired because of a system drain), it must be added to Timer data structures. This is a bit of uncharted area which was only implemented for CausedByDrain metadata that comes from backend, not from persisted metadata. In order to persist all WindowedValue metadata across timer, more work has to be done, below are some pointers:
90+
* `runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java` and implementations (e.g., `WindmillTimerInternals.java` in Dataflow).
91+
* `runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java` (or generic `TimerData`).
92+
* **Action:** Add the field to `TimerData`, next to `CausedByDrain`. Propagate it when setting the timer and expose it when the timer fires so it bubbles up.
93+
* Eventually, metadata from Timer lands in WindowedValue, so it can be exposed to users. Keep field names, types, and getters similar to WindowedValue as much as possible, as common interface may be introduced eventually.
94+
95+
## 5. Exposing Metadata to the User (DoFn Signatures)
96+
97+
User needs to access the metadata in their `DoFn` (e.g., `@ProcessElement public void process(ProcessContext c, CausedByDrain drain) { ... }`), you must update the reflection and bytecode generation logic.
98+
99+
* **Files:**
100+
* `sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java`
101+
* `sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java`
102+
* `sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java`
103+
* `sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java`
104+
* **Action:** Add logic to detect the new parameter type in the DoFn method signature. Generate bytecode using ByteBuddy to extract the property from the `WindowedValue` or context and pass it as an argument during method invocation.
105+
106+
## Checklist for Adding New Metadata
107+
108+
1. [ ] Define the metadata in `beam_fn_api.proto` (if applicable).
109+
2. [ ] Add getters to the `WindowedValue` interface.
110+
3. [ ] Update `ValueInGlobalWindow`, `ValueInSingleWindow`, `ValueInEmptyWindows` to store the metadata.
111+
4. [ ] Update `OutputBuilder` to accept the metadata.
112+
5. [ ] Update `WindmillSink` to serialize the metadata to the backend.
113+
6. [ ] Update `UngroupedWindmillReader` and `WindowingWindmillReader` to deserialize the metadata.
114+
7. [ ] Update `WindmillKeyedWorkItem`.
115+
8. [ ] Update `SimpleDoFnRunner`, `StatefulDoFnRunner`, and `FnApiDoFnRunner` to propagate the metadata from input to output.
116+
9. [ ] Update `ReduceFnRunner` and `OutputAndTimeBoundedSplittableProcessElementInvoker` for complex transform propagation.
117+
10. [ ] If required by timers, update `TimerData` and `TimerInternals`.
118+
11. [ ] If exposed to the user, update `DoFnSignatures` and `ByteBuddyDoFnInvokerFactory`.
119+
12. [ ] Update other runners (Flink, Spark, Samza) to ensure they propagate the new `WindowedValue` fields correctly in their specific operators/runners.

.asf.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ github:
5151

5252
protected_branches:
5353
master: {}
54+
release-2.73: {}
55+
release-2.72.0-postrelease: {}
5456
release-2.72: {}
5557
release-2.71.0-postrelease: {}
5658
release-2.71: {}

.github/ACTIONS.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ Currently, we have both GitHub-hosted and self-hosted runners for running the Gi
2828
### Getting Started with self-hosted runners
2929
* Refer to [this README](./gh-actions-self-hosted-runners/README.md) for the steps for creating your own self-hosted runners for testing your workflows.
3030
* Depending on your workflow's needs, it must specify the following `runs-on` tags to run in the specified operating system:
31-
* Ubuntu 20.04 self-hosted runner: `[self-hosted, ubuntu-20.04]`
31+
* Ubuntu 24.04 self-hosted runner: `[self-hosted, ubuntu-24.04, main]` (also `small`, `highmem`, or `highmem22` pool labels as needed)
3232
* Windows Server 2019 self-hosted runner: `[self-hosted, windows-server-2019]`
3333
* MacOS GitHub-hosted runner: `macos-latest`
3434
* Every workflow that tests the source code, needs to have the workflow trigger `pull_request_target` instead of `pull_request`.
@@ -48,7 +48,7 @@ Currently, we have both GitHub-hosted and self-hosted runners for running the Gi
4848
node-version: 16
4949
```
5050
* You can find the GitHub-hosted runner installations in the following links:
51-
* [Ubuntu-20.04](https://github.com/actions/runner-images/blob/main/images/linux/Ubuntu2004-Readme.md#installed-apt-packages)
51+
* [Ubuntu-24.04](https://github.com/actions/runner-images/blob/main/images/ubuntu/Ubuntu2404-Readme.md#installed-apt-packages)
5252
* [Windows-2019](https://github.com/actions/runner-images/blob/main/images/win/Windows2019-Readme.md)
5353
5454
#### GitHub Actions Example
@@ -60,7 +60,7 @@ on:
6060
permissions: read-all
6161
jobs:
6262
github-actions-example:
63-
runs-on: [self-hosted, ubuntu-20.04]
63+
runs-on: [self-hosted, ubuntu-24.04, main]
6464
steps:
6565
- name: Check out repository code
6666
uses: actions/checkout@v2

.github/actions/setup-default-test-properties/test-properties.json

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
{
22
"PythonTestProperties": {
3-
"ALL_SUPPORTED_VERSIONS": ["3.10", "3.11", "3.12", "3.13"],
3+
"ALL_SUPPORTED_VERSIONS": ["3.10", "3.11", "3.12", "3.13", "3.14"],
44
"LOWEST_SUPPORTED": ["3.10"],
5-
"HIGHEST_SUPPORTED": ["3.13"],
6-
"ESSENTIAL_VERSIONS": ["3.10", "3.13"],
7-
"CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIONS": ["3.10", "3.12", "3.13"],
5+
"HIGHEST_SUPPORTED": ["3.14"],
6+
"ESSENTIAL_VERSIONS": ["3.10", "3.14"],
7+
"CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIONS": ["3.10", "3.12", "3.13", "3.14"],
88
"CROSS_LANGUAGE_VALIDATES_RUNNER_DATAFLOW_USING_SQL_PYTHON_VERSIONS": ["3.11"],
9-
"VALIDATES_CONTAINER_DATAFLOW_PYTHON_VERSIONS": ["3.10", "3.11", "3.12", "3.13"],
9+
"VALIDATES_CONTAINER_DATAFLOW_PYTHON_VERSIONS": ["3.10", "3.11", "3.12", "3.13", "3.14"],
1010
"LOAD_TEST_PYTHON_VERSION": "3.10",
1111
"CHICAGO_TAXI_EXAMPLE_FLINK_PYTHON_VERSION": "3.10",
1212
"DEFAULT_INTERPRETER": "python3.10",

.github/actions/setup-environment-action/action.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ runs:
4848
steps:
4949
- name: Install Python
5050
if: ${{ inputs.python-version != '' }}
51-
uses: actions/setup-python@v5
51+
uses: actions/setup-python@v6
5252
with:
5353
python-version: ${{ inputs.python-version == 'default' && '3.10' || inputs.python-version }}
5454
cache: ${{ inputs.python-cache && 'pip' || 'none' }}

.github/gh-actions-self-hosted-runners/arc/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ main_runner = {
4545
max_node_count = "5" # Main runner pool maximal node count
4646
min_replicas = "5" # Min number of runner PODs in the main pool . Do not confuse with Nodes
4747
max_replicas = "20" # Max number of runner PODs in the main pool . Do not confuse with Nodes
48-
webhook_scaling # Enable webhook scaling for main pool
48+
webhook_scaling = true # Enable webhook scaling for main pool
4949
}
5050
environment = "environment_name" # Name of the environment. Used as a prefix like dev- stag- anything-
5151
ingress_domain = "fqdn" # FQDN for webhook ingress
@@ -66,7 +66,7 @@ machine_type = "e2-standard-2" # Macihne type for the pool
6666
min_node_count = 1 # Minimal node count
6767
max_node_count = 2 # Maximal node count
6868
min_replicas = 1 # Minimal replica count
69-
min_replicas = 2 # Maximal replica count
69+
max_replicas = 2 # Maximal replica count
7070
webhook_scaling = true # Enable webhook based scaling
7171
runner_image = "gcr.io/someimage:sometag" # Image to use
7272
labels = ["self-hosted", "testrunner"] # Label set for runner pool. Used in `on`

.github/gh-actions-self-hosted-runners/arc/environments/beam.env

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ additional_runner_pools = [{
8686
{
8787
name = "highmem-runner-22"
8888
machine_type = "c3-highmem-22"
89-
runner_image = "us-central1-docker.pkg.dev/apache-beam-testing/beam-github-actions/beam-arc-runner:d7cd81a1649bc665581951d2330c4b8acd19ed72"
89+
runner_image = "us-central1-docker.pkg.dev/apache-beam-testing/beam-github-actions/beam-arc-runner:latest"
9090
min_node_count = "0"
9191
max_node_count = "8"
9292
min_replicas = "0"
@@ -96,7 +96,7 @@ additional_runner_pools = [{
9696
cpu = "7.5"
9797
memory = "100Gi"
9898
}
99-
labels = ["self-hosted", "ubuntu-20.04", "highmem22"]
99+
labels = ["self-hosted", "ubuntu-24.04", "highmem22"]
100100
enable_selector = true
101101
enable_taint = true
102102
},

.github/gh-actions-self-hosted-runners/arc/gke.tf

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,15 @@ resource "google_container_node_pool" "main-actions-runner-pool" {
4747
]
4848
service_account = data.google_service_account.service_account.email
4949
tags = ["actions-runner-pool"]
50+
labels = {
51+
"runner-pool" = var.main_runner.name
52+
}
5053
}
54+
lifecycle {
55+
ignore_changes = [
56+
node_config[0].resource_labels,
57+
]
58+
}
5159
}
5260

5361
resource "google_container_node_pool" "additional_runner_pools" {
@@ -88,7 +96,12 @@ resource "google_container_node_pool" "additional_runner_pools" {
8896
}
8997
}
9098
}
99+
lifecycle {
100+
ignore_changes = [
101+
node_config[0].resource_labels,
102+
]
91103
}
104+
}
92105

93106

94107
resource "google_compute_global_address" "actions-runner-ip" {

.github/gh-actions-self-hosted-runners/arc/helm.tf

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ resource "helm_release" "cert-manager" {
2323
repository = "https://charts.jetstack.io"
2424
chart = "cert-manager"
2525

26-
atomic = "true"
27-
timeout = 100
26+
atomic = true
27+
timeout = 600
2828

2929
set = [
3030
{
@@ -38,12 +38,12 @@ resource "helm_release" "cert-manager" {
3838
resource "helm_release" "arc" {
3939
name = "arc"
4040
namespace = "arc"
41-
create_namespace = "true"
41+
create_namespace = true
4242
repository = "https://actions-runner-controller.github.io/actions-runner-controller"
4343
chart = "actions-runner-controller"
4444

45-
atomic = "true"
46-
timeout = 120
45+
atomic = true
46+
timeout = 600
4747

4848
set = [
4949
for k, v in local.arc_values : {

.github/gh-actions-self-hosted-runners/arc/images/Dockerfile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ RUN docker buildx install && docker buildx version
2424

2525

2626
USER root
27+
# Native build toolchain for Python C extensions.
28+
RUN apt-get update && \
29+
DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends build-essential time && \
30+
rm -rf /var/lib/apt/lists/*
2731
#Install Node
2832
RUN curl -OL https://nodejs.org/dist/v22.14.0/node-v22.14.0-linux-x64.tar.xz && \
2933
tar -C /usr/local -xf node-v22.14.0-linux-x64.tar.xz && \

0 commit comments

Comments
 (0)