Skip to content

Commit 883f015

Browse files
authored
[BEAM-5149] Add support for the Java SDK harness to merge windows.
2 parents 9b68f92 + def076e commit 883f015

File tree

5 files changed

+329
-9
lines changed

5 files changed

+329
-9
lines changed

model/pipeline/src/main/proto/beam_runner_api.proto

+28-2
Original file line numberDiff line numberDiff line change
@@ -202,9 +202,35 @@ message StandardPTransforms {
202202
// Payload: TestStreamPayload
203203
TEST_STREAM = 5 [(beam_urn) = "urn:beam:transform:teststream:v1"];
204204

205-
// Represents mapping of main input window into side input window.
206-
// Payload: serialized WindowMappingFn.
205+
// Represents mapping of main input window onto side input window.
206+
//
207+
// Side input window mapping function:
208+
// Input: KV<nonce, MainInputWindow>
209+
// Output: KV<nonce, SideInputWindow>
210+
//
211+
// For each main input window, the side input window is returned. The
212+
// nonce is used by a runner to associate each input with its output.
213+
// The nonce is represented as an opaque set of bytes.
214+
//
215+
// Payload: WindowMappingFn from SideInputSpec.
207216
MAP_WINDOWS = 6 [(beam_urn) = "beam:transform:map_windows:v1"];
217+
218+
// Used to merge windows during a GroupByKey.
219+
//
220+
// Window merging function:
221+
// Input: KV<nonce, iterable<OriginalWindow>>
222+
// Output: KV<nonce, KV<iterable<UnmergedOriginalWindow>, iterable<KV<MergedWindow, iterable<ConsumedOriginalWindow>>>>
223+
//
224+
// For each set of original windows, a list of all unmerged windows is
225+
// output alongside a map of merged window to set of consumed windows.
226+
// All original windows must be contained in either the unmerged original
227+
// window set or one of the consumed original window sets. Each original
228+
// window can only be part of one output set. The nonce is used by a runner
229+
// to associate each input with its output. The nonce is represented as an
230+
// opaque set of bytes.
231+
//
232+
// Payload: WindowFn from WindowingStrategy.
233+
MERGE_WINDOWS = 7 [(beam_urn) = "beam:transform:merge_windows:v1"];
208234
}
209235
enum DeprecatedPrimitives {
210236
// Represents the operation to read a Bounded or Unbounded source.

runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,13 @@ public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime.
199199
// This URN says that the WindowFn is just a UDF blob the Java SDK understands
200200
// TODO: standardize such things
201201
public static final String SERIALIZED_JAVA_WINDOWFN_URN = "beam:windowfn:javasdk:v0.1";
202+
public static final String GLOBAL_WINDOWS_URN =
203+
BeamUrns.getUrn(GlobalWindowsPayload.Enum.PROPERTIES);
204+
public static final String FIXED_WINDOWS_URN =
205+
BeamUrns.getUrn(FixedWindowsPayload.Enum.PROPERTIES);
206+
public static final String SLIDING_WINDOWS_URN =
207+
BeamUrns.getUrn(SlidingWindowsPayload.Enum.PROPERTIES);
208+
public static final String SESSION_WINDOWS_URN = BeamUrns.getUrn(SessionsPayload.Enum.PROPERTIES);
202209

203210
/**
204211
* Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link
@@ -210,7 +217,7 @@ public static SdkFunctionSpec toProto(WindowFn<?, ?> windowFn, SdkComponents com
210217
if (windowFn instanceof GlobalWindows) {
211218
return SdkFunctionSpec.newBuilder()
212219
.setEnvironmentId(components.getOnlyEnvironmentId())
213-
.setSpec(FunctionSpec.newBuilder().setUrn(getUrn(GlobalWindowsPayload.Enum.PROPERTIES)))
220+
.setSpec(FunctionSpec.newBuilder().setUrn(GLOBAL_WINDOWS_URN))
214221
.build();
215222
} else if (windowFn instanceof FixedWindows) {
216223
FixedWindowsPayload fixedWindowsPayload =
@@ -222,7 +229,7 @@ public static SdkFunctionSpec toProto(WindowFn<?, ?> windowFn, SdkComponents com
222229
.setEnvironmentId(components.getOnlyEnvironmentId())
223230
.setSpec(
224231
FunctionSpec.newBuilder()
225-
.setUrn(getUrn(FixedWindowsPayload.Enum.PROPERTIES))
232+
.setUrn(FIXED_WINDOWS_URN)
226233
.setPayload(fixedWindowsPayload.toByteString()))
227234
.build();
228235
} else if (windowFn instanceof SlidingWindows) {
@@ -236,7 +243,7 @@ public static SdkFunctionSpec toProto(WindowFn<?, ?> windowFn, SdkComponents com
236243
.setEnvironmentId(components.getOnlyEnvironmentId())
237244
.setSpec(
238245
FunctionSpec.newBuilder()
239-
.setUrn(getUrn(SlidingWindowsPayload.Enum.PROPERTIES))
246+
.setUrn(SLIDING_WINDOWS_URN)
240247
.setPayload(slidingWindowsPayload.toByteString()))
241248
.build();
242249
} else if (windowFn instanceof Sessions) {
@@ -248,7 +255,7 @@ public static SdkFunctionSpec toProto(WindowFn<?, ?> windowFn, SdkComponents com
248255
.setEnvironmentId(components.getOnlyEnvironmentId())
249256
.setSpec(
250257
FunctionSpec.newBuilder()
251-
.setUrn(getUrn(SessionsPayload.Enum.PROPERTIES))
258+
.setUrn(SESSION_WINDOWS_URN)
252259
.setPayload(sessionsPayload.toByteString()))
253260
.build();
254261
} else {

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMappingFnRunner.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,17 @@
3232
import org.apache.beam.sdk.values.KV;
3333

3434
/**
35-
* Maps windows using a window mapping fn. The input is {@link KV} with the key being a nonce and
36-
* the value being a window, the output must be a {@link KV} with the key being the same nonce as
37-
* the input and the value being the mapped window.
35+
* Represents mapping of main input window onto side input window.
36+
*
37+
* <p>Side input window mapping function:
38+
*
39+
* <ul>
40+
* <li>Input: {@code KV<nonce, MainInputWindow>}
41+
* <li>Output: {@code KV<nonce, SideInputWindow>}
42+
* </ul>
43+
*
44+
* <p>For each main input window, the side input window is returned. The nonce is used by a runner
45+
* to associate each input with its output. The nonce is represented as an opaque set of bytes.
3846
*/
3947
public class WindowMappingFnRunner {
4048
static final String URN = BeamUrns.getUrn(StandardPTransforms.Primitives.MAP_WINDOWS);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.fn.harness;
19+
20+
import com.google.auto.service.AutoService;
21+
import com.google.common.collect.ImmutableMap;
22+
import com.google.common.collect.Sets;
23+
import java.io.IOException;
24+
import java.util.ArrayList;
25+
import java.util.Collection;
26+
import java.util.Collections;
27+
import java.util.List;
28+
import java.util.Map;
29+
import org.apache.beam.model.pipeline.v1.RunnerApi;
30+
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
31+
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms;
32+
import org.apache.beam.runners.core.construction.BeamUrns;
33+
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
34+
import org.apache.beam.sdk.fn.function.ThrowingFunction;
35+
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
36+
import org.apache.beam.sdk.transforms.windowing.WindowFn;
37+
import org.apache.beam.sdk.transforms.windowing.WindowFn.MergeContext;
38+
import org.apache.beam.sdk.values.KV;
39+
40+
/**
41+
* Merges windows using a {@link org.apache.beam.sdk.transforms.windowing.WindowFn}.
42+
*
43+
* <p>Window merging function:
44+
*
45+
* <ul>
46+
* <li>Input: {@code KV<nonce, iterable<OriginalWindow>>}
47+
* <li>Output: {@code KV<nonce, KV<iterable<UnmergedOriginalWindow>, iterable<KV<MergedWindow,
48+
* iterable<ConsumedOriginalWindow>>>>}
49+
* </ul>
50+
*
51+
* <p>For each set of original windows, a list of all unmerged windows is output alongside a map of
52+
* merged window to set of consumed windows. All original windows must be contained in either the
53+
* unmerged original window set or one of the consumed original window sets. Each original window
54+
* can only be part of one output set. The nonce is used by a runner to associate each input with
55+
* its output. The nonce is represented as an opaque set of bytes.
56+
*/
57+
public abstract class WindowMergingFnRunner<T, W extends BoundedWindow> {
58+
static final String URN = BeamUrns.getUrn(StandardPTransforms.Primitives.MERGE_WINDOWS);
59+
60+
/**
61+
* A registrar which provides a factory to handle merging windows based upon the {@link WindowFn}.
62+
*/
63+
@AutoService(PTransformRunnerFactory.Registrar.class)
64+
public static class Registrar implements PTransformRunnerFactory.Registrar {
65+
66+
@Override
67+
public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
68+
return ImmutableMap.of(
69+
URN,
70+
MapFnRunners.forValueMapFnFactory(WindowMergingFnRunner::createMapFunctionForPTransform));
71+
}
72+
}
73+
74+
static <T, W extends BoundedWindow>
75+
ThrowingFunction<KV<T, Iterable<W>>, KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>>>
76+
createMapFunctionForPTransform(String ptransformId, PTransform ptransform)
77+
throws IOException {
78+
RunnerApi.SdkFunctionSpec payload =
79+
RunnerApi.SdkFunctionSpec.parseFrom(ptransform.getSpec().getPayload());
80+
81+
WindowFn<?, W> windowFn =
82+
(WindowFn<?, W>) WindowingStrategyTranslation.windowFnFromProto(payload);
83+
return WindowMergingFnRunner.<T, W>create(windowFn)::mergeWindows;
84+
}
85+
86+
static <T, W extends BoundedWindow> WindowMergingFnRunner<T, W> create(WindowFn<?, W> windowFn) {
87+
if (windowFn.isNonMerging()) {
88+
return new NonMergingWindowFnRunner();
89+
} else {
90+
return new MergingViaWindowFnRunner(windowFn);
91+
}
92+
}
93+
94+
/**
95+
* Returns the set of unmerged windows and a mapping from merged windows to sets of original
96+
* windows.
97+
*/
98+
abstract KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> mergeWindows(
99+
KV<T, Iterable<W>> windowsToMerge) throws Exception;
100+
101+
/////////////////////////////////////////////////////////////////////////////////////////////////
102+
103+
/**
104+
* An optimized version of window merging where the {@link WindowFn} does not do any window
105+
* merging.
106+
*
107+
* <p>Note that this is likely to never be invoked and the identity mapping will be handled
108+
* directly by runners. We have this here because runners may not perform this optimization.
109+
*/
110+
private static class NonMergingWindowFnRunner<T, W extends BoundedWindow>
111+
extends WindowMergingFnRunner<T, W> {
112+
@Override
113+
KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> mergeWindows(
114+
KV<T, Iterable<W>> windowsToMerge) {
115+
return KV.of(
116+
windowsToMerge.getKey(), KV.of(windowsToMerge.getValue(), Collections.emptyList()));
117+
}
118+
}
119+
120+
/** An implementation which uses a {@link WindowFn} to merge windows. */
121+
private static class MergingViaWindowFnRunner<T, W extends BoundedWindow>
122+
extends WindowMergingFnRunner<T, W> {
123+
private final WindowFn<?, W> windowFn;
124+
private final WindowFn<?, W>.MergeContext mergeContext;
125+
private Collection<W> currentWindows;
126+
private List<KV<W, Collection<W>>> mergedWindows;
127+
128+
private MergingViaWindowFnRunner(WindowFn<?, W> windowFn) {
129+
this.windowFn = windowFn;
130+
this.mergedWindows = new ArrayList<>();
131+
this.currentWindows = new ArrayList<>();
132+
this.mergeContext =
133+
windowFn.new MergeContext() {
134+
135+
@Override
136+
public Collection<W> windows() {
137+
return currentWindows;
138+
}
139+
140+
@Override
141+
public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
142+
mergedWindows.add(KV.of(mergeResult, toBeMerged));
143+
}
144+
};
145+
}
146+
147+
@Override
148+
KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> mergeWindows(
149+
KV<T, Iterable<W>> windowsToMerge) throws Exception {
150+
currentWindows = Sets.newHashSet(windowsToMerge.getValue());
151+
windowFn.mergeWindows((MergeContext) mergeContext);
152+
for (KV<W, Collection<W>> mergedWindow : mergedWindows) {
153+
currentWindows.removeAll(mergedWindow.getValue());
154+
}
155+
return KV.of(windowsToMerge.getKey(), KV.of(currentWindows, (Iterable) mergedWindows));
156+
}
157+
}
158+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.fn.harness;
19+
20+
import static org.hamcrest.Matchers.containsInAnyOrder;
21+
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertThat;
23+
24+
import com.google.common.collect.ImmutableList;
25+
import com.google.common.collect.Iterables;
26+
import com.google.common.collect.Sets;
27+
import java.util.Collections;
28+
import org.apache.beam.model.pipeline.v1.RunnerApi;
29+
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
30+
import org.apache.beam.runners.core.construction.SdkComponents;
31+
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
32+
import org.apache.beam.sdk.fn.function.ThrowingFunction;
33+
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
34+
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
35+
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
36+
import org.apache.beam.sdk.transforms.windowing.Sessions;
37+
import org.apache.beam.sdk.transforms.windowing.WindowFn;
38+
import org.apache.beam.sdk.values.KV;
39+
import org.joda.time.Duration;
40+
import org.joda.time.Instant;
41+
import org.junit.Test;
42+
import org.junit.runner.RunWith;
43+
import org.junit.runners.JUnit4;
44+
45+
/** Tests for {@link WindowMergingFnRunner}. */
46+
@RunWith(JUnit4.class)
47+
public class WindowMergingFnRunnerTest {
48+
@Test
49+
public void testWindowMergingWithNonMergingWindowFn() throws Exception {
50+
ThrowingFunction<
51+
KV<Object, Iterable<BoundedWindow>>,
52+
KV<
53+
Object,
54+
KV<Iterable<BoundedWindow>, Iterable<KV<BoundedWindow, Iterable<BoundedWindow>>>>>>
55+
mapFunction =
56+
WindowMergingFnRunner.createMapFunctionForPTransform(
57+
"ptransformId", createMergeTransformForWindowFn(new GlobalWindows()));
58+
59+
KV<Object, Iterable<BoundedWindow>> input =
60+
KV.of(
61+
"abc",
62+
ImmutableList.of(new IntervalWindow(Instant.now(), Duration.standardMinutes(1))));
63+
64+
assertEquals(
65+
KV.of(input.getKey(), KV.of(input.getValue(), Collections.emptyList())),
66+
mapFunction.apply(input));
67+
}
68+
69+
@Test
70+
public void testWindowMergingWithMergingWindowFn() throws Exception {
71+
ThrowingFunction<
72+
KV<Object, Iterable<BoundedWindow>>,
73+
KV<
74+
Object,
75+
KV<Iterable<BoundedWindow>, Iterable<KV<BoundedWindow, Iterable<BoundedWindow>>>>>>
76+
mapFunction =
77+
WindowMergingFnRunner.createMapFunctionForPTransform(
78+
"ptransformId",
79+
createMergeTransformForWindowFn(Sessions.withGapDuration(Duration.millis(5L))));
80+
81+
// 7, 8 and 10 should all be merged. 1 and 20 should remain in the original set.
82+
BoundedWindow[] expectedToBeMerged =
83+
new BoundedWindow[] {
84+
new IntervalWindow(new Instant(9L), new Instant(11L)),
85+
new IntervalWindow(new Instant(10L), new Instant(10L)),
86+
new IntervalWindow(new Instant(7L), new Instant(10L))
87+
};
88+
Iterable<BoundedWindow> expectedToBeUnmerged =
89+
Sets.newHashSet(
90+
new IntervalWindow(new Instant(1L), new Instant(1L)),
91+
new IntervalWindow(new Instant(20L), new Instant(20L)));
92+
KV<Object, Iterable<BoundedWindow>> input =
93+
KV.of(
94+
"abc",
95+
ImmutableList.<BoundedWindow>builder()
96+
.add(expectedToBeMerged)
97+
.addAll(expectedToBeUnmerged)
98+
.build());
99+
100+
KV<Object, KV<Iterable<BoundedWindow>, Iterable<KV<BoundedWindow, Iterable<BoundedWindow>>>>>
101+
output = mapFunction.apply(input);
102+
assertEquals(input.getKey(), output.getKey());
103+
assertEquals(expectedToBeUnmerged, output.getValue().getKey());
104+
KV<BoundedWindow, Iterable<BoundedWindow>> mergedOutput =
105+
Iterables.getOnlyElement(output.getValue().getValue());
106+
assertEquals(new IntervalWindow(new Instant(7L), new Instant(11L)), mergedOutput.getKey());
107+
assertThat(mergedOutput.getValue(), containsInAnyOrder(expectedToBeMerged));
108+
}
109+
110+
private static <W extends BoundedWindow> RunnerApi.PTransform createMergeTransformForWindowFn(
111+
WindowFn<?, W> windowFn) throws Exception {
112+
SdkComponents components = SdkComponents.create();
113+
components.registerEnvironment(Environment.newBuilder().setUrl("java").build());
114+
RunnerApi.FunctionSpec functionSpec =
115+
RunnerApi.FunctionSpec.newBuilder()
116+
.setUrn(WindowMergingFnRunner.URN)
117+
.setPayload(WindowingStrategyTranslation.toProto(windowFn, components).toByteString())
118+
.build();
119+
return RunnerApi.PTransform.newBuilder().setSpec(functionSpec).build();
120+
}
121+
}

0 commit comments

Comments
 (0)