Skip to content

Commit 769bc2e

Browse files
author
Pawel Czajka
committed
comment added
1 parent 50c3ff2 commit 769bc2e

File tree

1 file changed

+3
-1
lines changed
  • engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate

1 file changed

+3
-1
lines changed

engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/ExtendedWindowOperator.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.transformers.
2323
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.triggers.FireOnEachEvent
2424

2525
import java.lang
26-
import java.util.concurrent.atomic.AtomicReference
2726

2827
object ExtendedWindowOperator {
2928
type Input[A] = ValueWithContext[StringKeyedValue[A]]
@@ -137,11 +136,14 @@ private class ValueEmittingWindowFunction(
137136
elements.forEach { element =>
138137
val contextOpt = contextHolderRef.nuWindowContext match {
139138
case OnElementWindowContext(contextToPreserve, timestampToOverride) =>
139+
// this means, that end of this window pane was triggered by an element
140+
140141
// in current flink implementation out is always of type TimestampedCollector
141142
out.asInstanceOf[TimestampedCollector[_]].setAbsoluteTimestamp(timestampToOverride)
142143
contextToPreserve
143144

144145
case OnTimerWindowContext =>
146+
// this means, that end of this window pane was triggered by a timer
145147
None
146148
}
147149

0 commit comments

Comments
 (0)