Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -202,6 +203,11 @@ public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
return processContext.causedByDrain();
}

@Override
public ValueKind valueKind(DoFn<InputT, OutputT> doFn) {
return processContext.valueKind();
}

@Override
public RestrictionTracker<?, ?> restrictionTracker() {
return processContext.tracker;
Expand Down Expand Up @@ -407,6 +413,11 @@ public CausedByDrain causedByDrain() {
return element.causedByDrain();
}

@Override
public ValueKind valueKind() {
return element.getValueKind();
}

@Override
public PipelineOptions getPipelineOptions() {
return pipelineOptions;
Expand Down Expand Up @@ -442,6 +453,7 @@ public <T> void output(TupleTag<T> tag, T value) {

@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) {
noteOutput();
outputReceiver.output(
tag,
WindowedValues.of(
Expand All @@ -456,6 +468,32 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp)
element.getValueKind()));
}

@Override
public void outputWithKind(OutputT output, ValueKind kind) {
outputWithKind(mainOutputTag, output, kind);
}

@Override
public <T> void outputWithKind(TupleTag<T> tag, T value, ValueKind kind) {
noteOutput();
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
((TimestampObservingWatermarkEstimator) watermarkEstimator)
.observeTimestamp(element.getTimestamp());
}
outputReceiver.output(
tag,
WindowedValues.of(
value,
element.getTimestamp(),
element.getWindows(),
element.getPaneInfo(),
element.getRecordId(),
element.getRecordOffset(),
element.causedByDrain(),
element.getOpenTelemetryContext(),
kind));
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowingStrategy;
Expand Down Expand Up @@ -438,6 +439,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
outputWithTimestamp(mainOutputTag, output, timestamp);
}

@Override
public void outputWithKind(OutputT output, ValueKind kind) {
outputWithKind(mainOutputTag, output, kind);
}

@Override
public void outputWindowedValue(
OutputT output,
Expand Down Expand Up @@ -471,6 +477,22 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
outputWindowedValue(tag, output, timestamp, elem.getWindows(), elem.getPaneInfo());
}

@Override
public <T> void outputWithKind(TupleTag<T> tag, T output, ValueKind kind) {
builderSupplier
.builder(output)
.setTimestamp(elem.getTimestamp())
.setWindows(elem.getWindows())
.setPaneInfo(elem.getPaneInfo())
.setValueKind(kind)
.setReceiver(
wv -> {
checkTimestamp(elem.getTimestamp(), wv.getTimestamp());
SimpleDoFnRunner.this.outputWindowedValue(tag, wv);
})
.output();
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
Expand Down Expand Up @@ -506,6 +528,11 @@ public Instant timestamp() {
return elem.getRecordOffset();
}

@Override
public ValueKind valueKind() {
return elem.getValueKind();
}

public Collection<? extends BoundedWindow> windows() {
return elem.getWindows();
}
Expand Down Expand Up @@ -597,6 +624,11 @@ public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
return elem.causedByDrain();
}

@Override
public ValueKind valueKind(DoFn<InputT, OutputT> doFn) {
return elem.getValueKind();
}

@Override
public String timerId(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -895,6 +927,11 @@ public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
return causedByDrain;
}

@Override
public ValueKind valueKind(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException("ValueKind parameters are not supported.");
}

@Override
public String timerId(DoFn<InputT, OutputT> doFn) {
return timerId;
Expand Down Expand Up @@ -1041,6 +1078,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
outputWithTimestamp(mainOutputTag, output, timestamp);
}

@Override
public void outputWithKind(OutputT output, ValueKind kind) {
outputWithKind(mainOutputTag, output, kind);
}

@Override
public void outputWindowedValue(
OutputT output,
Expand All @@ -1063,6 +1105,19 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
}

@Override
public <T> void outputWithKind(TupleTag<T> tag, T output, ValueKind kind) {
checkTimestamp(timestamp(), timestamp);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The variable timestamp is not defined in the scope of the outputWithKind method, which will lead to a compilation error. This appears to be a copy-paste error from the outputWithTimestamp method. Since the output timestamp is implicitly the context's firing timestamp (timestamp()), this check is redundant and should be removed.

builderSupplier
.builder(output)
.setTimestamp(timestamp())
.setWindows(Collections.singleton(window()))
.setPaneInfo(PaneInfo.NO_FIRING)
.setValueKind(kind)
.setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag, wv))
.output();
}

Comment thread
ahmedabu98 marked this conversation as resolved.
@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
Expand Down Expand Up @@ -1228,6 +1283,11 @@ public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
"Cannot access time domain outside of @ProcessTimer method.");
}

@Override
public ValueKind valueKind(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException("ValueKind parameters are not supported.");
}

@Override
public KeyT key() {
return key;
Expand Down Expand Up @@ -1330,6 +1390,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
outputWithTimestamp(mainOutputTag, output, timestamp);
}

@Override
public void outputWithKind(OutputT output, ValueKind kind) {
outputWithKind(mainOutputTag, output, kind);
}

@Override
public void outputWindowedValue(
OutputT output,
Expand All @@ -1352,6 +1417,19 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
}

@Override
public <T> void outputWithKind(TupleTag<T> tag, T output, ValueKind kind) {
checkTimestamp(this.timestamp, timestamp);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The variable timestamp is not defined in this method, likely due to a copy-paste error from outputWithTimestamp. In this context, the output timestamp is fixed to this.timestamp, making this check redundant. It should be removed to avoid compilation issues.

builderSupplier
.builder(output)
.setTimestamp(this.timestamp)
.setWindows(Collections.singleton(window()))
.setPaneInfo(PaneInfo.NO_FIRING)
.setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag, wv))
.setValueKind(kind)
.output();
}

Comment thread
ahmedabu98 marked this conversation as resolved.
@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,11 @@ public T getValue() {
return value;
}

@Override
public ValueKind getValueKind() {
return ValueKind.INSERT;
}

@Override
public CausedByDrain causedByDrain() {
return CausedByDrain.NORMAL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public void processElement(
.setTimestamp(kv.getValue().getTimestamp())
.setWindow(kv.getValue().getWindow())
.setPaneInfo(kv.getValue().getPaneInfo())
.setValueKind(kv.getValue().getValueKind())
.output();
}
}));
Expand Down
Loading
Loading