diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index ebd88442b211..2b19e2b2b9fd 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -52,6 +52,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.ValueKind; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; @@ -202,6 +203,11 @@ public CausedByDrain causedByDrain(DoFn doFn) { return processContext.causedByDrain(); } + @Override + public ValueKind valueKind(DoFn doFn) { + return processContext.valueKind(); + } + @Override public RestrictionTracker restrictionTracker() { return processContext.tracker; @@ -407,6 +413,11 @@ public CausedByDrain causedByDrain() { return element.causedByDrain(); } + @Override + public ValueKind valueKind() { + return element.getValueKind(); + } + @Override public PipelineOptions getPipelineOptions() { return pipelineOptions; @@ -442,6 +453,7 @@ public void output(TupleTag tag, T value) { @Override public void outputWithTimestamp(TupleTag tag, T value, Instant timestamp) { + noteOutput(); outputReceiver.output( tag, WindowedValues.of( @@ -456,6 +468,32 @@ public void outputWithTimestamp(TupleTag tag, T value, Instant timestamp) element.getValueKind())); } + @Override + public void outputWithKind(OutputT output, ValueKind kind) { + outputWithKind(mainOutputTag, output, kind); + } + + @Override + public void outputWithKind(TupleTag tag, T value, ValueKind kind) { + noteOutput(); + if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) { + ((TimestampObservingWatermarkEstimator) watermarkEstimator) + .observeTimestamp(element.getTimestamp()); + } + outputReceiver.output( + tag, + WindowedValues.of( + value, + element.getTimestamp(), + element.getWindows(), + element.getPaneInfo(), + element.getRecordId(), + element.getRecordOffset(), + element.causedByDrain(), + element.getOpenTelemetryContext(), + kind)); + } + @Override public void outputWindowedValue( TupleTag tag, diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index d553a7be2d44..c05b5da8a271 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -61,6 +61,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.ValueKind; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; @@ -438,6 +439,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { outputWithTimestamp(mainOutputTag, output, timestamp); } + @Override + public void outputWithKind(OutputT output, ValueKind kind) { + outputWithKind(mainOutputTag, output, kind); + } + @Override public void outputWindowedValue( OutputT output, @@ -471,6 +477,22 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp outputWindowedValue(tag, output, timestamp, elem.getWindows(), elem.getPaneInfo()); } + @Override + public void outputWithKind(TupleTag tag, T output, ValueKind kind) { + builderSupplier + .builder(output) + .setTimestamp(elem.getTimestamp()) + .setWindows(elem.getWindows()) + .setPaneInfo(elem.getPaneInfo()) + .setValueKind(kind) + .setReceiver( + wv -> { + checkTimestamp(elem.getTimestamp(), wv.getTimestamp()); + SimpleDoFnRunner.this.outputWindowedValue(tag, wv); + }) + .output(); + } + @Override public void outputWindowedValue( TupleTag tag, @@ -506,6 +528,11 @@ public Instant timestamp() { return elem.getRecordOffset(); } + @Override + public ValueKind valueKind() { + return elem.getValueKind(); + } + public Collection windows() { return elem.getWindows(); } @@ -597,6 +624,11 @@ public CausedByDrain causedByDrain(DoFn doFn) { return elem.causedByDrain(); } + @Override + public ValueKind valueKind(DoFn doFn) { + return elem.getValueKind(); + } + @Override public String timerId(DoFn doFn) { throw new UnsupportedOperationException( @@ -895,6 +927,11 @@ public CausedByDrain causedByDrain(DoFn doFn) { return causedByDrain; } + @Override + public ValueKind valueKind(DoFn doFn) { + throw new UnsupportedOperationException("ValueKind parameters are not supported."); + } + @Override public String timerId(DoFn doFn) { return timerId; @@ -1041,6 +1078,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { outputWithTimestamp(mainOutputTag, output, timestamp); } + @Override + public void outputWithKind(OutputT output, ValueKind kind) { + outputWithKind(mainOutputTag, output, kind); + } + @Override public void outputWindowedValue( OutputT output, @@ -1063,6 +1105,19 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING); } + @Override + public void outputWithKind(TupleTag tag, T output, ValueKind kind) { + checkTimestamp(timestamp(), timestamp); + builderSupplier + .builder(output) + .setTimestamp(timestamp()) + .setWindows(Collections.singleton(window())) + .setPaneInfo(PaneInfo.NO_FIRING) + .setValueKind(kind) + .setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag, wv)) + .output(); + } + @Override public void outputWindowedValue( TupleTag tag, @@ -1228,6 +1283,11 @@ public TimeDomain timeDomain(DoFn doFn) { "Cannot access time domain outside of @ProcessTimer method."); } + @Override + public ValueKind valueKind(DoFn doFn) { + throw new UnsupportedOperationException("ValueKind parameters are not supported."); + } + @Override public KeyT key() { return key; @@ -1330,6 +1390,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { outputWithTimestamp(mainOutputTag, output, timestamp); } + @Override + public void outputWithKind(OutputT output, ValueKind kind) { + outputWithKind(mainOutputTag, output, kind); + } + @Override public void outputWindowedValue( OutputT output, @@ -1352,6 +1417,19 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING); } + @Override + public void outputWithKind(TupleTag tag, T output, ValueKind kind) { + checkTimestamp(this.timestamp, timestamp); + builderSupplier + .builder(output) + .setTimestamp(this.timestamp) + .setWindows(Collections.singleton(window())) + .setPaneInfo(PaneInfo.NO_FIRING) + .setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag, wv)) + .setValueKind(kind) + .output(); + } + @Override public void outputWindowedValue( TupleTag tag, diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index b15b3f3834d2..b80d53c96cb8 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -1381,6 +1381,11 @@ public T getValue() { return value; } + @Override + public ValueKind getValueKind() { + return ValueKind.INSERT; + } + @Override public CausedByDrain causedByDrain() { return CausedByDrain.NORMAL; diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/RedistributeByKeyOverrideFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/RedistributeByKeyOverrideFactory.java index 47ff5b764910..15a26530f3de 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/RedistributeByKeyOverrideFactory.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/RedistributeByKeyOverrideFactory.java @@ -141,6 +141,7 @@ public void processElement( .setTimestamp(kv.getValue().getTimestamp()) .setWindow(kv.getValue().getWindow()) .setPaneInfo(kv.getValue().getPaneInfo()) + .setValueKind(kv.getValue().getValueKind()) .output(); } })); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index ab3b62a0aa1b..883dd67a7d6c 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -154,6 +154,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Redistribute; +import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.resourcehints.ResourceHints; @@ -171,11 +172,17 @@ import org.apache.beam.sdk.util.construction.PipelineTranslation; import org.apache.beam.sdk.util.construction.SdkComponents; import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PValues; import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.sdk.values.ValueKind; +import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -191,6 +198,7 @@ import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -2777,4 +2785,133 @@ public void process() {} })); p.run(); } + + @Test + @Category({ValidatesRunner.class}) + public void testValueKindParameterAndOutputWithKind() { + boolean isRunnerV2 = false; + @Nullable + List experiments = + pipeline.getOptions().as(DataflowPipelineOptions.class).getExperiments(); + if (experiments != null + && (experiments.contains("use_unified_worker") || experiments.contains("use_runner_v2"))) { + isRunnerV2 = true; + } + // Skipp runner v2 because its Create uses a splittable DoFn, which contains a shuffle. + // ValueKind is not supported in Dataflow shuffle yet + assumeFalse(isRunnerV2); + + PCollection input = pipeline.apply(Create.of("a", "b", "c", "d", "e")); + TupleTag mainTag = new TupleTag() {}; + TupleTag sideTag = new TupleTag() {}; + + PCollectionTuple tuple = + input.apply( + "SetKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, + @Timestamp org.joda.time.Instant timestamp, + BoundedWindow window, + PaneInfo paneInfo, + ProcessContext c, + MultiOutputReceiver outputReceiver) { + switch (element) { + case "a": + c.output(element); // default: INSERT + return; + case "b": + c.outputWithKind(element, ValueKind.UPDATE_BEFORE); + return; + case "c": + c.outputWindowedValue( + WindowedValues.of( + element, + timestamp, + Collections.singleton(window), + paneInfo, + null, + null, + CausedByDrain.NORMAL, + null, + ValueKind.UPDATE_AFTER)); + return; + case "d": + c.outputWithKind(sideTag, element, ValueKind.UPDATE_AFTER); + return; + case "e": + outputReceiver.get(sideTag).outputWithKind(element, ValueKind.DELETE); + } + } + }) + .withOutputTags(mainTag, TupleTagList.of(sideTag))); + + PCollection main = + tuple + .get(mainTag) + .apply( + "ReadKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, ValueKind kind) { + c.output(element + ":" + kind); + } + })); + + PCollection side = + tuple + .get(sideTag) + .apply( + "ReadKind-SideTag", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, ValueKind kind) { + c.output(element + ":" + kind); + } + })); + + PAssert.that(main).containsInAnyOrder("a:INSERT", "b:UPDATE_BEFORE", "c:UPDATE_AFTER"); + PAssert.that(side).containsInAnyOrder("d:UPDATE_AFTER", "e:DELETE"); + pipeline.run(); + } + + @Test + @Ignore("enable once when element metadata is supported in shuffle") + @Category({ValidatesRunner.class}) + public void testValueKindPreservedAcrossShuffle() { + PCollection> input = pipeline.apply(Create.of(KV.of("key", "value"))); + + PCollection output = + input + .apply( + "SetKind", + ParDo.of( + new DoFn, KV>() { + @ProcessElement + public void processElement( + @Element KV element, ProcessContext c) { + c.outputWithKind(element, ValueKind.UPDATE_BEFORE); + } + })) + .apply(Reshuffle.of()) + .apply( + "ReadKind", + ParDo.of( + new DoFn, String>() { + @ProcessElement + public void processElement( + @Element KV element, ProcessContext c, ValueKind kind) { + c.output(element.getValue() + ":" + kind); + } + })); + + PAssert.that(output).containsInAnyOrder("value:UPDATE_BEFORE"); + pipeline.run(); + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java index 3ab46f0ddb42..c49e66226287 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.ValueKindUtil; import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.beam.sdk.values.ValueWithRecordId.ValueWithRecordIdCoder; import org.apache.beam.sdk.values.WindowedValue; @@ -226,6 +227,7 @@ public long add(WindowedValue data) throws IOException { data.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN ? BeamFnApi.Elements.DrainMode.Enum.DRAINING : BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING) + .setValueKind(ValueKindUtil.toProto(data.getValueKind())) .build(); ByteString metadata = encodeMetadata( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java index 7136538753db..3a863174d386 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java @@ -350,6 +350,7 @@ public void testFixedWindowsWithDraining() throws Exception { equalTo(window(10, 20)), anything(), is(CausedByDrain.CAUSED_BY_DRAIN)))); + WindowedValues.WindowedValueCoder.setMetadataNotSupported(); } @Test diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 864b903b25f9..55c9e6e28345 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.ValueKind; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowingStrategy; import org.checkerframework.checker.nullness.qual.Nullable; @@ -188,6 +189,25 @@ public abstract class WindowedContext { */ public abstract void outputWithTimestamp(OutputT output, Instant timestamp); + /** + * Adds the given element to the main output {@code PCollection}, with the given {@link + * ValueKind}. + * + *

Once passed to {@code outputWithKind} the element should not be modified in any way. + * + *

If invoked from {@link ProcessElement}, the output element will have the same windowing + * metadata as the input element. + * + *

If invoked from {@link StartBundle} or {@link FinishBundle}, this will attempt to use the + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} of the input {@code PCollection} to + * determine what windows the element should be in, throwing an exception if the {@code + * WindowFn} attempts to access any information about the input element. + * + *

Note: A splittable {@link DoFn} is not allowed to output from {@link StartBundle} + * or {@link FinishBundle} methods. + */ + public abstract void outputWithKind(OutputT output, ValueKind kind); + /** * Adds the given element to the main output {@code PCollection}, with the given windowing * metadata. @@ -289,6 +309,17 @@ public abstract void outputWindowedValue( public abstract void outputWindowedValue(TupleTag tag, WindowedValue windowedValue); public abstract void outputWindowedValue(WindowedValue windowedValue); + + /** + * Adds the given element to the main output {@code PCollection} with the given {@link + * ValueKind}. + * + *

Once passed to {@code outputWithKind} the element should not be modified in any way. + * + *

Note: A splittable {@link DoFn} is not allowed to output from {@link StartBundle} + * or {@link FinishBundle} methods. + */ + public abstract void outputWithKind(TupleTag tag, T output, ValueKind kind); } /** Information accessible when running a {@link DoFn.ProcessElement} method. */ @@ -338,6 +369,9 @@ public abstract class ProcessContext extends WindowedContext { @Pure public abstract org.apache.beam.sdk.values.CausedByDrain causedByDrain(); + + @Pure + public abstract ValueKind valueKind(); } /** Information accessible when running a {@link DoFn.OnTimer} method. */ @@ -419,6 +453,10 @@ default void outputWithTimestamp(T value, Instant timestamp) { builder(value).setTimestamp(timestamp).output(); } + default void outputWithKind(T value, ValueKind valueKind) { + builder(value).setValueKind(valueKind).output(); + } + default void outputWindowedValue( T value, Instant timestamp, @@ -426,6 +464,20 @@ default void outputWindowedValue( PaneInfo paneInfo) { builder(value).setTimestamp(timestamp).setWindows(windows).setPaneInfo(paneInfo).output(); } + + default void outputWindowedValue( + T value, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ValueKind valueKind) { + builder(value) + .setTimestamp(timestamp) + .setWindows(windows) + .setPaneInfo(paneInfo) + .setValueKind(valueKind) + .output(); + } } /** Receives tagged output for a multi-output function. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 05d7b7b0d920..2bdc724b8e0e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -56,6 +56,7 @@ import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.beam.sdk.values.ValueKind; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; @@ -503,7 +504,8 @@ public void output(TupleTag tag, T output, Instant timestamp, BoundedWind null, null, CausedByDrain.NORMAL, - null)); + null, + ValueKind.INSERT)); } }; } @@ -607,6 +609,11 @@ public CausedByDrain causedByDrain() { return element.getCausedByDrain(); } + @Override + public ValueKind valueKind() { + return element.getValueKind(); + } + @Override public PipelineOptions getPipelineOptions() { return options; @@ -622,6 +629,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { outputWithTimestamp(mainOutputTag, output, timestamp); } + @Override + public void outputWithKind(OutputT output, ValueKind kind) { + outputWithKind(mainOutputTag, output, kind); + } + @Override public void outputWindowedValue( OutputT output, @@ -648,7 +660,24 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp null, null, CausedByDrain.NORMAL, - element.getOpenTelemetryContext())); + element.getOpenTelemetryContext(), + ValueKind.INSERT)); + } + + @Override + public void outputWithKind(TupleTag tag, T output, ValueKind kind) { + getMutableOutput(tag) + .add( + ValueInSingleWindow.of( + output, + element.getTimestamp(), + element.getWindow(), + element.getPaneInfo(), + null, + null, + CausedByDrain.NORMAL, + element.getOpenTelemetryContext(), + kind)); } @Override @@ -669,7 +698,8 @@ public void outputWindowedValue(TupleTag tag, WindowedValue windowedVa windowedValue.getRecordId(), windowedValue.getRecordOffset(), windowedValue.causedByDrain(), - windowedValue.getOpenTelemetryContext())); + windowedValue.getOpenTelemetryContext(), + windowedValue.getValueKind())); } } @@ -684,7 +714,15 @@ public void outputWindowedValue( getMutableOutput(tag) .add( ValueInSingleWindow.of( - output, timestamp, w, paneInfo, null, null, CausedByDrain.NORMAL, null)); + output, + timestamp, + w, + paneInfo, + null, + null, + CausedByDrain.NORMAL, + null, + ValueKind.INSERT)); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java index 44f27824382d..5463365e4c6e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java @@ -188,6 +188,7 @@ public void processElement( .setWindow(kv.getValue().getWindow()) .setPaneInfo(kv.getValue().getPaneInfo()) .setCausedByDrain(kv.getValue().getCausedByDrain()) + .setValueKind(kv.getValue().getValueKind()) .output(); } })); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java index 1e3203744d05..b1288c054142 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.beam.sdk.values.ValueKind; import org.joda.time.Duration; import org.joda.time.Instant; @@ -148,6 +149,7 @@ public void processElement( BoundedWindow window, PaneInfo paneInfo, CausedByDrain causedByDrain, + ValueKind valueKind, OutputReceiver>> r) { r.output( KV.of( @@ -160,7 +162,8 @@ public void processElement( pc.currentRecordId(), pc.currentRecordOffset(), causedByDrain, - null))); + null, + valueKind))); } })) .setCoder( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java index 0a8d058107b8..594bdfc85039 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java @@ -190,6 +190,7 @@ public void processElement( .setTimestamp(kv.getValue().getTimestamp()) .setWindow(kv.getValue().getWindow()) .setPaneInfo(kv.getValue().getPaneInfo()) + .setValueKind(kv.getValue().getValueKind()) .output(); } })); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java index 9adbe3a12cf4..3ebabb6e3c37 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java @@ -97,6 +97,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerFamilyParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimestampParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ValueKindParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WatermarkEstimatorParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WatermarkEstimatorStateParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter; @@ -134,6 +135,7 @@ class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { public static final String CURRENT_RECORD_ID_PARAMETER_METHOD = "currentRecordId"; public static final String CURRENT_RECORD_OFFSET_PARAMETER_METHOD = "currentRecordOffset"; public static final String FIRE_TIMESTAMP_PARAMETER_METHOD = "fireTimestamp"; + public static final String VALUE_KIND_PARAMETER_METHOD = "valueKind"; public static final String BUNDLE_FINALIZER_PARAMETER_METHOD = "bundleFinalizer"; public static final String OUTPUT_ROW_RECEIVER_METHOD = "outputRowReceiver"; public static final String TIME_DOMAIN_PARAMETER_METHOD = "timeDomain"; @@ -1117,6 +1119,15 @@ public StackManipulation dispatch(CausedByDrainParameter p) { CAUSED_BY_DRAIN_PARAMETER_METHOD, DoFn.class))); } + @Override + public StackManipulation dispatch(ValueKindParameter p) { + return new StackManipulation.Compound( + pushDelegate, + MethodInvocation.invoke( + getExtraContextFactoryMethodDescription( + VALUE_KIND_PARAMETER_METHOD, DoFn.class))); + } + @Override public StackManipulation dispatch(BundleFinalizerParameter p) { return simpleExtraContextParameter(BUNDLE_FINALIZER_PARAMETER_METHOD); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java index 1f122f1bf661..eaabdff907c7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.ValueKind; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -232,6 +233,9 @@ interface ArgumentProvider { /** Provide a reference to the caused by drain. */ CausedByDrain causedByDrain(DoFn doFn); + /** Provide a reference to the {@link ValueKind}. */ + ValueKind valueKind(DoFn doFn); + /** Provide a reference to the time domain for a timer firing. */ TimeDomain timeDomain(DoFn doFn); @@ -364,6 +368,12 @@ public CausedByDrain causedByDrain(DoFn doFn) { String.format("CausedByDrain unsupported in %s", getErrorContext())); } + @Override + public ValueKind valueKind(DoFn doFn) { + throw new UnsupportedOperationException( + String.format("ValueKind unsupported in %s", getErrorContext())); + } + @Override public String timerId(DoFn doFn) { throw new UnsupportedOperationException( @@ -573,6 +583,11 @@ public CausedByDrain causedByDrain(DoFn doFn) { return delegate.causedByDrain(doFn); } + @Override + public ValueKind valueKind(DoFn doFn) { + return delegate.valueKind(doFn); + } + @Override public TimeDomain timeDomain(DoFn doFn) { return delegate.timeDomain(doFn); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java index 33fccc2e1cde..51dadd178a6f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -351,6 +351,8 @@ public ResultT match(Cases cases) { return cases.dispatch((CurrentRecordOffsetParameter) this); } else if (this instanceof FireTimestampParameter) { return cases.dispatch((FireTimestampParameter) this); + } else if (this instanceof ValueKindParameter) { + return cases.dispatch((ValueKindParameter) this); } else if (this instanceof KeyParameter) { return cases.dispatch((KeyParameter) this); } else { @@ -417,6 +419,8 @@ public interface Cases { ResultT dispatch(CausedByDrainParameter p); + ResultT dispatch(ValueKindParameter p); + ResultT dispatch(KeyParameter p); /** A base class for a visitor with a default method for cases it is not interested in. */ @@ -534,6 +538,11 @@ public ResultT dispatch(CausedByDrainParameter p) { return dispatchDefault(p); } + @Override + public ResultT dispatch(ValueKindParameter p) { + return dispatchDefault(p); + } + @Override public ResultT dispatch(StateParameter p) { return dispatchDefault(p); @@ -591,6 +600,8 @@ public ResultT dispatch(KeyParameter p) { new AutoValue_DoFnSignature_Parameter_BundleFinalizerParameter(); private static final CausedByDrainParameter CAUSED_BY_DRAIN_PARAMETER = new AutoValue_DoFnSignature_Parameter_CausedByDrainParameter(); + private static final ValueKindParameter VALUE_KIND_PARAMETER = + new AutoValue_DoFnSignature_Parameter_ValueKindParameter(); private static final OnWindowExpirationContextParameter ON_WINDOW_EXPIRATION_CONTEXT_PARAMETER = new AutoValue_DoFnSignature_Parameter_OnWindowExpirationContextParameter(); private static final CurrentRecordIdParameter CURRENT_RECORD_ID_PARAMETER = @@ -625,6 +636,11 @@ public static CausedByDrainParameter causedByDrainParameter() { return CAUSED_BY_DRAIN_PARAMETER; } + /** Returns a {@link ValueKindParameter}. */ + public static ValueKindParameter valueKindParameter() { + return VALUE_KIND_PARAMETER; + } + /** Returns a {@link CurrentRecordIdParameter}. */ public static CurrentRecordIdParameter currentRecordIdParameter() { return CURRENT_RECORD_ID_PARAMETER; @@ -802,6 +818,16 @@ public abstract static class CausedByDrainParameter extends Parameter { CausedByDrainParameter() {} } + /** + * Descriptor for a {@link Parameter} of type {@link org.apache.beam.sdk.values.ValueKind}. + * + *

All such descriptors are equal. + */ + @AutoValue + public abstract static class ValueKindParameter extends Parameter { + ValueKindParameter() {} + } + /** * Descriptor for a {@link Parameter} of type {@link DoFn.RecordId}. * diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index ec624696fc7c..0bd2c1c888f0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -98,6 +98,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.TypeParameter; +import org.apache.beam.sdk.values.ValueKind; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -143,6 +144,7 @@ private DoFnSignatures() {} Parameter.CurrentRecordIdParameter.class, Parameter.CurrentRecordOffsetParameter.class, Parameter.CausedByDrainParameter.class, + Parameter.ValueKindParameter.class, Parameter.BundleFinalizerParameter.class); private static final ImmutableList> @@ -162,6 +164,7 @@ private DoFnSignatures() {} Parameter.CurrentRecordIdParameter.class, Parameter.CurrentRecordOffsetParameter.class, Parameter.CausedByDrainParameter.class, + Parameter.ValueKindParameter.class, Parameter.BundleFinalizerParameter.class); private static final ImmutableList> ALLOWED_SETUP_PARAMETERS = @@ -1386,6 +1389,11 @@ private static Parameter analyzeExtraParameter( rawType.equals(CausedByDrain.class), "CausedByDrain argument must have type org.apache.beam.sdk.values.CausedByDrain."); return Parameter.causedByDrainParameter(); + } else if (ValueKind.class.isAssignableFrom(rawType)) { + methodErrors.checkArgument( + rawType.equals(ValueKind.class), + "ValueKind argument must have type org.apache.beam.sdk.values.ValueKind."); + return Parameter.valueKindParameter(); } else if (hasAnnotation(DoFn.SideInput.class, param.getAnnotations())) { String sideInputId = getSideInputId(param.getAnnotations()); paramErrors.checkArgument( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java index 44212f97ad28..6d4cc7c93e92 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java @@ -56,6 +56,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.ValueKind; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; @@ -520,6 +521,11 @@ public CausedByDrain causedByDrain() { return outerContext.causedByDrain(); } + @Override + public ValueKind valueKind() { + return outerContext.valueKind(); + } + @Override public Object sideInput(String tagId) { PCollectionView view = sideInputMapping.get(tagId); @@ -549,6 +555,11 @@ public CausedByDrain causedByDrain(DoFn doFn) { return outerContext.causedByDrain(); } + @Override + public ValueKind valueKind(DoFn doFn) { + return outerContext.valueKind(); + } + @Override public String timerId(DoFn doFn) { throw new UnsupportedOperationException(); @@ -619,6 +630,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { outerContext.outputWithTimestamp(output, timestamp); } + @Override + public void outputWithKind(OutputT output, ValueKind kind) { + outerContext.outputWithKind(output, kind); + } + @Override public void outputWindowedValue( OutputT output, @@ -638,6 +654,11 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp outerContext.outputWithTimestamp(tag, output, timestamp); } + @Override + public void outputWithKind(TupleTag tag, T output, ValueKind kind) { + outerContext.outputWithKind(tag, output, kind); + } + @Override public void outputWindowedValue( TupleTag tag, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java index 33700a9dc0d2..43db94dffb61 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java @@ -397,7 +397,6 @@ public static WindowedValue of( ValueKind.INSERT); } - /** Returns a {@code WindowedValue} with the given value, timestamp, and window. */ public static WindowedValue of( T value, Instant timestamp, @@ -1045,7 +1044,7 @@ public static ParamWindowedValueCoder getParamWindowedValueCoder(Coder /** Abstract class for {@code WindowedValue} coder. */ public abstract static class WindowedValueCoder extends StructuredCoder> { final Coder valueCoder; - private static boolean metadataSupported = false; + private static volatile boolean metadataSupported = false; public static void setMetadataSupported() { metadataSupported = true; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValueKindTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValueKindTest.java new file mode 100644 index 000000000000..31f8d0ca7fdb --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValueKindTest.java @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.io.Serializable; +import java.util.Collections; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.DoFn.StateId; +import org.apache.beam.sdk.transforms.DoFn.TimerId; +import org.apache.beam.sdk.transforms.DoFn.Timestamp; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.CausedByDrain; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.beam.sdk.values.ValueKind; +import org.apache.beam.sdk.values.WindowedValues; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link ValueKind} support in {@link DoFn}. */ +@RunWith(JUnit4.class) +public class ValueKindTest implements Serializable { + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + @Test + @Category(NeedsRunner.class) + public void testValueKindParameterInDoFn() { + PCollection input = pipeline.apply(Create.of("a", "b")); + + PCollection output = + input.apply( + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, ValueKind kind) { + c.output(element + ":" + kind); + } + })); + + PAssert.that(output).containsInAnyOrder("a:INSERT", "b:INSERT"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testOutputWithKind() { + PCollection input = pipeline.apply(Create.of("a", "b")); + + PCollection output = + input + .apply( + "SetKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element String element, ProcessContext c) { + if ("a".equals(element)) { + c.outputWithKind(element, ValueKind.UPDATE_BEFORE); + } else { + c.outputWithKind(element, ValueKind.UPDATE_AFTER); + } + } + })) + .apply( + "ReadKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, ValueKind kind) { + c.output(element + ":" + kind); + } + })); + + PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE", "b:UPDATE_AFTER"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testDefaultValueKind() { + PCollection input = pipeline.apply(Create.of("a")); + + PCollection output = + input + .apply( + "SetKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element String element, ProcessContext c) { + c.outputWithKind(element, ValueKind.UPDATE_BEFORE); + } + })) + .apply( + "StandardOutput", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element String element, ProcessContext c) { + c.output(element); // Should preserve input kind! + } + })) + .apply( + "ReadKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, ValueKind kind) { + c.output(element + ":" + kind); + } + })); + + PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testValueKindPreservedInOutputWindowedValue_MainOutput() { + PCollection input = pipeline.apply(Create.of("a")); + + PCollection output = + input + .apply( + "SetKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element String element, ProcessContext c) { + c.outputWithKind(element, ValueKind.UPDATE_BEFORE); + } + })) + .apply( + "OutputWindowedValue", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, BoundedWindow window) { + // Should preserve input kind! + c.outputWindowedValue( + element, c.timestamp(), Collections.singleton(window), c.pane()); + } + })) + .apply( + "ReadKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, ValueKind kind) { + c.output(element + ":" + kind); + } + })); + + PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testValueKindPreservedInOutputWindowedValue_TaggedOutput() { + PCollection input = pipeline.apply(Create.of("a")); + TupleTag mainTag = new TupleTag() {}; + TupleTag sideTag = new TupleTag() {}; + + PCollectionTuple outputTuple = + input + .apply( + "SetKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element String element, ProcessContext c) { + c.outputWithKind(element, ValueKind.UPDATE_BEFORE); + } + })) + .apply( + "OutputWindowedValueTagged", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, BoundedWindow window) { + c.outputWindowedValue( + sideTag, + element, + c.timestamp(), + Collections.singleton(window), + c.pane()); + } + }) + .withOutputTags(mainTag, TupleTagList.of(sideTag))); + + PCollection output = + outputTuple + .get(sideTag) + .apply( + "ReadKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, ValueKind kind) { + c.output(element + ":" + kind); + } + })); + + PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testValueKindPreservedInOutputWindowedValue_Object() { + PCollection input = pipeline.apply(Create.of("a")); + + PCollection output = + input + .apply( + "OutputWindowedValueObject", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, BoundedWindow window) { + c.outputWindowedValue( + WindowedValues.of( + element, + c.timestamp(), + Collections.singleton(window), + c.pane(), + null, + null, + CausedByDrain.NORMAL, + null, + ValueKind.UPDATE_BEFORE)); + } + })) + .apply( + "ReadKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, ValueKind kind) { + c.output(element + ":" + kind); + } + })); + + PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testValueKindPreservedInOutputWindowedValue_TaggedObject() { + PCollection input = pipeline.apply(Create.of("a")); + TupleTag mainTag = new TupleTag() {}; + TupleTag sideTag = new TupleTag() {}; + + PCollectionTuple outputTuple = + input.apply( + "OutputWindowedValueTaggedObject", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, BoundedWindow window) { + c.outputWindowedValue( + sideTag, + WindowedValues.of( + element, + c.timestamp(), + Collections.singleton(window), + c.pane(), + null, + null, + CausedByDrain.NORMAL, + null, + ValueKind.UPDATE_BEFORE)); + } + }) + .withOutputTags(mainTag, TupleTagList.of(sideTag))); + + PCollection output = + outputTuple + .get(sideTag) + .apply( + "ReadKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, ValueKind kind) { + c.output(element + ":" + kind); + } + })); + + PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testValueKindPreservedAcrossTags() { + PCollection input = pipeline.apply(Create.of("a")); + TupleTag mainTag = new TupleTag() {}; + TupleTag sideTag = new TupleTag() {}; + + PCollectionTuple outputTuple = + input.apply( + "OutputWithKindTagged", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element String element, ProcessContext c) { + c.outputWithKind(sideTag, element, ValueKind.UPDATE_BEFORE); + } + }) + .withOutputTags(mainTag, TupleTagList.of(sideTag))); + + PCollection output = + outputTuple + .get(sideTag) + .apply( + "ReadKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, ValueKind kind) { + c.output(element + ":" + kind); + } + })); + + PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testValueKindInSplittableDoFn() { + PCollection input = pipeline.apply(Create.of("a")); + + PCollection output = + input + .apply( + "SetKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element String element, ProcessContext c) { + c.outputWithKind(element, ValueKind.UPDATE_BEFORE); + } + })) + .apply( + "SplittableDoFn", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, + RestrictionTracker tracker, + ProcessContext c, + ValueKind kind) { + if (tracker.tryClaim(tracker.currentRestriction().getFrom())) { + c.output(element + ":" + kind); + } + } + + @GetInitialRestriction + public OffsetRange getInitialRestriction(@Element String element) { + return new OffsetRange(0, 1); + } + })); + + PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testOutputWithKindInWindowExpiration() { + PCollection> input = pipeline.apply(Create.of(KV.of("key", "a"))); + PCollection> windowedInput = + input.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))); + + PCollection output = + windowedInput.apply( + "StatefulParDo", + ParDo.of( + new DoFn, String>() { + @StateId("dummy") + private final StateSpec> spec = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement( + @Element KV element, ProcessContext c) { + // Do nothing, just let state be created + } + + @OnWindowExpiration + public void onWindowExpiration( + OutputReceiver receiver, + BoundedWindow window, + @Timestamp Instant timestamp) { + receiver.outputWindowedValue( + "expired", + timestamp, + Collections.singleton(window), + PaneInfo.NO_FIRING, + ValueKind.UPDATE_BEFORE); + } + })); + + PAssert.that(output).containsInAnyOrder("expired"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testOutputWithKindInOnTimer() { + PCollection> input = pipeline.apply(Create.of(KV.of("key", "a"))); + + PCollection output = + input.apply( + "TimerParDo", + ParDo.of( + new DoFn, String>() { + @TimerId("timer") + private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement( + @Element KV element, + @TimerId("timer") Timer timer, + ProcessContext c) { + timer.set(c.timestamp().plus(Duration.standardSeconds(1))); + } + + @OnTimer("timer") + public void onTimer(OnTimerContext c) { + c.outputWithKind("timed_out", ValueKind.UPDATE_BEFORE); + } + })); + + PAssert.that(output).containsInAnyOrder("timed_out"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testValueKindPreservedInReshuffle() { + PCollection> input = pipeline.apply(Create.of(KV.of("key", "value"))); + + PCollection output = + input + .apply( + "SetKind", + ParDo.of( + new DoFn, KV>() { + @ProcessElement + public void processElement( + @Element KV element, ProcessContext c) { + c.outputWithKind(element, ValueKind.UPDATE_BEFORE); + } + })) + .apply(Reshuffle.of()) + .apply( + "ReadKind", + ParDo.of( + new DoFn, String>() { + @ProcessElement + public void processElement( + @Element KV element, ProcessContext c, ValueKind kind) { + c.output(element.getValue() + ":" + kind); + } + })); + + PAssert.that(output).containsInAnyOrder("value:UPDATE_BEFORE"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testValueKindPreservedInGroupByKeyWithReify() { + PCollection> input = pipeline.apply(Create.of(KV.of("key", "value"))); + + PCollection output = + input + .apply( + "SetKind", + ParDo.of( + new DoFn, KV>() { + @ProcessElement + public void processElement( + @Element KV element, ProcessContext c) { + c.outputWithKind(element, ValueKind.UPDATE_BEFORE); + } + })) + .apply(Reify.windowsInValue()) + .apply(GroupByKey.create()) + .apply( + "ReadKind", + ParDo.of( + new DoFn>>, String>() { + @ProcessElement + public void processElement( + @Element KV>> element, + ProcessContext c) { + for (ValueInSingleWindow value : element.getValue()) { + c.output(value.getValue() + ":" + value.getValueKind()); + } + } + })); + + PAssert.that(output).containsInAnyOrder("value:UPDATE_BEFORE"); + pipeline.run(); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java index 3689b1be7dba..54a3715dd50b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java @@ -128,6 +128,35 @@ public void testWindowedValueWithElementMetadataCoder() throws CoderException { Assert.assertEquals(CausedByDrain.CAUSED_BY_DRAIN, value.causedByDrain()); Assert.assertNotNull(value.getOpenTelemetryContext()); Assert.assertEquals(ValueKind.DELETE, value.getValueKind()); + WindowedValues.WindowedValueCoder.setMetadataNotSupported(); + } + + @Test + public void testWindowedValueWithValueKindCoder() throws CoderException { + WindowedValues.WindowedValueCoder.setMetadataSupported(); + Instant timestamp = new Instant(1234); + WindowedValue value = + WindowedValues.builder() + .setValue("abc") + .setTimestamp(timestamp) + .setWindows( + Arrays.asList(new IntervalWindow(timestamp, timestamp.plus(Duration.millis(1000))))) + .setPaneInfo(PaneInfo.NO_FIRING) + .setValueKind(ValueKind.UPDATE_BEFORE) + .build(); + + Coder> windowedValueCoder = + WindowedValues.getFullCoder(StringUtf8Coder.of(), IntervalWindow.getCoder()); + + byte[] encodedValue = CoderUtils.encodeToByteArray(windowedValueCoder, value); + WindowedValue decodedValue = + CoderUtils.decodeFromByteArray(windowedValueCoder, encodedValue); + + Assert.assertEquals(value.getValue(), decodedValue.getValue()); + Assert.assertEquals(value.getTimestamp(), decodedValue.getTimestamp()); + Assert.assertArrayEquals(value.getWindows().toArray(), decodedValue.getWindows().toArray()); + Assert.assertEquals(value.getValueKind(), decodedValue.getValueKind()); + WindowedValues.WindowedValueCoder.setMetadataNotSupported(); } @Test diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 0fbc92c1f7d8..45353ccd85c4 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -108,6 +108,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.ValueKind; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowedValues.WindowedValueCoder; @@ -1736,6 +1737,13 @@ private class WindowObservingProcessBundleContext public OutputBuilder builder(OutputT value) { return WindowedValues.builder() .setValue(value) + .setTimestamp(currentElement.getTimestamp()) + .setPaneInfo(currentElement.getPaneInfo()) + .setWindows(currentElement.getWindows()) + .setRecordOffset(currentElement.getRecordOffset()) + .setRecordId(currentElement.getRecordId()) + .setCausedByDrain(currentElement.causedByDrain()) + .setValueKind(currentElement.getValueKind()) .setReceiver(windowedValue -> outputTo(mainOutputConsumer, windowedValue)); } @@ -1762,6 +1770,32 @@ public void output(TupleTag tag, T output) { output, currentElement.getTimestamp(), currentWindow, currentElement.getPaneInfo())); } + @Override + public void outputWithKind(OutputT output, ValueKind kind) { + builder(output).setValueKind(kind).output(); + } + + @Override + public void outputWithKind(TupleTag tag, T output, ValueKind kind) { + outputWindowedValue( + tag, + output, + currentElement.getTimestamp(), + currentElement.getWindows(), + currentElement.getPaneInfo(), + kind); + } + + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ValueKind valueKind) { + outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo, valueKind); + } + @Override public void outputWithTimestamp(OutputT output, Instant timestamp) { // TODO(https://github.com/apache/beam/issues/29637): Check that timestamp is valid once all @@ -1813,6 +1847,16 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo) { + outputWindowedValue(tag, output, timestamp, windows, paneInfo, ValueKind.INSERT); + } + + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ValueKind valueKind) { // TODO(https://github.com/apache/beam/issues/29637): Check that timestamp is valid once all // runners can provide proper timestamps. FnDataReceiver> consumer = @@ -1820,7 +1864,18 @@ public void outputWindowedValue( if (consumer == null) { throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); } - outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo)); + outputTo( + consumer, + WindowedValues.of( + output, + timestamp, + windows, + paneInfo, + null, + null, + CausedByDrain.NORMAL, + null, + valueKind)); } @Override @@ -1912,6 +1967,47 @@ public void output(TupleTag tag, T output) { outputTo(consumer, currentElement.withValue(output)); } + @Override + public void outputWithKind(OutputT output, ValueKind kind) { + builder(output).setValueKind(kind).output(); + } + + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ValueKind valueKind) { + builder(output) + .setTimestamp(timestamp) + .setWindows(windows) + .setPaneInfo(paneInfo) + .setValueKind(valueKind) + .output(); + } + + @Override + public void outputWithKind(TupleTag tag, T output, ValueKind kind) { + FnDataReceiver> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo( + consumer, + WindowedValues.of( + output, + currentElement.getTimestamp(), + currentElement.getWindows(), + currentElement.getPaneInfo(), + null, + null, + CausedByDrain.NORMAL, + currentElement.getOpenTelemetryContext(), + kind)); + } + @Override public void outputWithTimestamp(OutputT output, Instant timestamp) { builder(output).setValue(output).setTimestamp(timestamp).output(); @@ -2286,10 +2382,20 @@ public CausedByDrain causedByDrain() { return currentElement.causedByDrain(); } + @Override + public ValueKind valueKind() { + return currentElement.getValueKind(); + } + @Override public CausedByDrain causedByDrain(DoFn doFn) { return currentElement.causedByDrain(); } + + @Override + public ValueKind valueKind(DoFn doFn) { + return currentElement.getValueKind(); + } } /** @@ -2328,6 +2434,43 @@ public void outputWindowedValue( OutputReceiver.super.outputWindowedValue(output, timestamp, windows, paneInfo); } + @Override + public void outputWithKind(OutputT output, ValueKind kind) { + OutputReceiver.super.outputWithKind(output, kind); + } + + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ValueKind valueKind) { + OutputReceiver.super.outputWindowedValue(output, timestamp, windows, paneInfo, valueKind); + } + + @Override + public void outputWithKind(TupleTag tag, T output, ValueKind kind) { + checkOnWindowExpirationTimestamp(currentTimer.getHoldTimestamp()); + FnDataReceiver> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo( + consumer, + WindowedValues.of( + output, + currentTimer.getHoldTimestamp(), + currentWindow, + currentTimer.getPaneInfo(), + null, + null, + currentTimer.causedByDrain(), + null, + kind)); + } + @Override public BoundedWindow window() { return currentWindow; @@ -2659,6 +2802,43 @@ public void outputWindowedValue( OutputReceiver.super.outputWindowedValue(output, timestamp, windows, paneInfo); } + @Override + public void outputWithKind(OutputT output, ValueKind kind) { + OutputReceiver.super.outputWithKind(output, kind); + } + + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ValueKind valueKind) { + OutputReceiver.super.outputWindowedValue(output, timestamp, windows, paneInfo, valueKind); + } + + @Override + public void outputWithKind(TupleTag tag, T output, ValueKind kind) { + checkTimerTimestamp(currentTimer.getHoldTimestamp()); + FnDataReceiver> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo( + consumer, + WindowedValues.of( + output, + currentTimer.getHoldTimestamp(), + currentWindow, + currentTimer.getPaneInfo(), + null, + null, + causedByDrain, + null, + kind)); + } + @Override public void output(TupleTag tag, T output) { checkTimerTimestamp(currentTimer.getHoldTimestamp());