Skip to content

Commit 7832399

Browse files
author
Pawel Czajka
committed
some fixes
1 parent ba360ec commit 7832399

File tree

3 files changed

+27
-38
lines changed

3 files changed

+27
-38
lines changed

engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/ExtendedWindowOperator.scala

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomNodeContext
2020
import pl.touk.nussknacker.engine.flink.util.keyed.{KeyEnricher, StringKeyedValue}
2121
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.ExtendedWindowOperator.{stateDescriptorName, Input}
2222
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.transformers.AggregatorTypeInformations
23-
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.triggers.{ClosingEndEventTrigger, FireOnEachEvent}
23+
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.triggers.FireOnEachEvent
2424

2525
import java.lang
2626
import java.util.concurrent.atomic.AtomicReference
@@ -31,17 +31,6 @@ object ExtendedWindowOperator {
3131
// WindowOperatorBuilder.WINDOW_STATE_NAME - should be the same for compatibility
3232
val stateDescriptorName = "window-contents"
3333

34-
def fireOnEachEventTriggerWrapper[T, W <: Window](delegate: Trigger[_ >: T, W]): FireOnEachEvent[T, W] = {
35-
FireOnEachEvent(delegate)
36-
}
37-
38-
def closingEndEventTriggerWrapper[T, W <: Window](
39-
delegate: Trigger[_ >: T, W],
40-
endFunction: T => Boolean
41-
): ClosingEndEventTrigger[T, W] = {
42-
ClosingEndEventTrigger(delegate, endFunction)
43-
}
44-
4534
implicit class OnEventOperatorKeyedStream[A](stream: KeyedStream[Input[A], String])(
4635
implicit fctx: FlinkCustomNodeContext
4736
) {
@@ -55,7 +44,7 @@ object ExtendedWindowOperator {
5544
assigner,
5645
types,
5746
aggregateFunction,
58-
fireOnEachEventTriggerWrapper[ValueWithContext[StringKeyedValue[A]], TimeWindow](trigger),
47+
FireOnEachEvent(trigger),
5948
preserveContext = true
6049
)
6150

@@ -145,24 +134,25 @@ private class ValueEmittingWindowFunction(
145134
out: Collector[ValueWithContext[AnyRef]]
146135
): Unit = {
147136
elements.forEach { element =>
148-
contextHolderRef.get match {
137+
val contextOpt = contextHolderRef.get match {
149138
case OnElementWindowContext(contextToPreserve, timestampToOverride) =>
139+
// in current flink implementation out is always of type TimestampedCollector
150140
out.asInstanceOf[TimestampedCollector[_]].setAbsoluteTimestamp(timestampToOverride)
151-
out.collect(
152-
ValueWithContext(
153-
element,
154-
KeyEnricher.enrichWithKey(
155-
contextToPreserve.getOrElse(api.Context(contextIdGenerator.nextContextId())),
156-
key
157-
)
158-
)
159-
)
141+
contextToPreserve
160142

161143
case OnTimerWindowContext =>
162-
out.collect(
163-
ValueWithContext(element, KeyEnricher.enrichWithKey(api.Context(contextIdGenerator.nextContextId()), key))
164-
)
144+
None
165145
}
146+
147+
out.collect(
148+
ValueWithContext(
149+
element,
150+
KeyEnricher.enrichWithKey(
151+
contextOpt.getOrElse(api.Context(contextIdGenerator.nextContextId())),
152+
key
153+
)
154+
)
155+
)
166156
}
167157
}
168158

engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import pl.touk.nussknacker.engine.flink.api.process._
1515
import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection
1616
import pl.touk.nussknacker.engine.flink.util.richflink._
1717
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.ExtendedWindowOperator.OnEventOperatorKeyedStream
18+
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.triggers.ClosingEndEventTrigger
1819
import pl.touk.nussknacker.engine.util.KeyedValue
1920

2021
import scala.collection.immutable.SortedMap
@@ -176,11 +177,11 @@ object transformers {
176177
implicit val fctx: FlinkCustomNodeContext = ctx
177178
val typeInfos = AggregatorTypeInformations(ctx, aggregator, aggregateBy)
178179

179-
val baseTrigger = {
180-
ExtendedWindowOperator.closingEndEventTriggerWrapper[ValueWithContext[KeyedValue[String, (AnyRef, java.lang.Boolean)]], TimeWindow](
180+
val baseTrigger =
181+
ClosingEndEventTrigger[ValueWithContext[KeyedValue[String, (AnyRef, java.lang.Boolean)]], TimeWindow](
181182
EventTimeTrigger.create(),
182-
_.value.value._2)
183-
}
183+
_.value.value._2
184+
)
184185
val groupByValue = aggregateBy.product(endSessionCondition)
185186

186187
val keyedStream = start
@@ -193,7 +194,8 @@ object transformers {
193194
case SessionWindowTrigger.OnEvent =>
194195
keyedStream.extendedEventTriggerWindow(windowDefinition, typeInfos, aggregatingFunction, baseTrigger)
195196
case SessionWindowTrigger.OnEnd =>
196-
keyedStream.extendedWindow(windowDefinition, typeInfos, aggregatingFunction, baseTrigger, preserveContext = false)
197+
keyedStream
198+
.extendedWindow(windowDefinition, typeInfos, aggregatingFunction, baseTrigger, preserveContext = false)
197199
}).setUidWithName(ctx, ExplicitUidInOperatorsSupport.defaultExplicitUidInStatefulOperators)
198200
})
199201
)

engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,12 @@ object triggers {
3030
case class FireOnEachEvent[T, W <: Window](delegate: Trigger[_ >: T, W]) extends DelegatingTrigger[T, W](delegate) {
3131

3232
override def onElement(element: T, timestamp: Long, window: W, ctx: Trigger.TriggerContext): TriggerResult = {
33-
val previousResult = super.onElement(element, timestamp, window, ctx)
34-
val result = previousResult match {
33+
val result = super.onElement(element, timestamp, window, ctx)
34+
result match {
3535
case TriggerResult.CONTINUE => TriggerResult.FIRE
3636
case TriggerResult.PURGE => TriggerResult.FIRE_AND_PURGE
3737
case fire => fire
3838
}
39-
result
4039
}
4140

4241
override def onProcessingTime(time: Long, window: W, ctx: Trigger.TriggerContext): TriggerResult = {
@@ -51,10 +50,8 @@ object triggers {
5150

5251
}
5352

54-
case class ClosingEndEventTrigger[T, W <: Window](
55-
delegate: Trigger[_ >: T, W],
56-
endFunction: T => Boolean
57-
) extends DelegatingTrigger[T, W](delegate) {
53+
case class ClosingEndEventTrigger[T, W <: Window](delegate: Trigger[_ >: T, W], endFunction: T => Boolean)
54+
extends DelegatingTrigger[T, W](delegate) {
5855

5956
override def onElement(element: T, timestamp: Long, window: W, ctx: Trigger.TriggerContext): TriggerResult = {
6057
if (endFunction(element)) {

0 commit comments

Comments
 (0)