From 84c6cee6e1fe1b322df093b51b539648529a2887 Mon Sep 17 00:00:00 2001 From: Pawel Czajka Date: Wed, 26 Mar 2025 11:34:38 +0100 Subject: [PATCH 01/17] some comments --- .../flink/util/transformer/aggregate/sampleTransformers.scala | 1 + .../engine/flink/util/transformer/aggregate/transformers.scala | 1 + .../engine/flink/util/transformer/aggregate/triggers.scala | 1 + 3 files changed, 3 insertions(+) diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/sampleTransformers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/sampleTransformers.scala index 4abce37274f..24e8e6e17ce 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/sampleTransformers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/sampleTransformers.scala @@ -76,6 +76,7 @@ object sampleTransformers { * * You should define `#AGG` global variable, because it is used in editors picked for `aggregateBy` parameter. */ + // TODO_PAWEL jest to jednak ten class TumblingAggregateTransformer(config: AggregateWindowsConfig) extends CustomStreamTransformer with UnboundedStreamComponent diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala index 8f9299abd45..4dbb0403102 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala @@ -124,6 +124,7 @@ object transformers { (tumblingWindowTrigger match { case TumblingWindowTrigger.OnEvent => keyedStream + // TODO_PAWEL tutaj cos jest nie teges .eventTriggerWindow(windowDefinition, typeInfos, aggregatingFunction, EventTimeTrigger.create()) case TumblingWindowTrigger.OnEnd => keyedStream diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala index d3201e428c1..e1af7afa38c 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala @@ -27,6 +27,7 @@ object triggers { // Window won't be emitted on end, but after each event. This would be useful e.g. when we want to have // daily (i.e. for current day) aggregate for each incoming event, but we're not interested in daily summary on each midnight + // TODO_PAWEL case class FireOnEachEvent[T, W <: Window](delegate: Trigger[_ >: T, W]) extends DelegatingTrigger[T, W](delegate) { override def onElement(element: T, timestamp: Long, window: W, ctx: Trigger.TriggerContext): TriggerResult = { From 41cdec1aa79037137f11ff92c103ae81dec69ae8 Mon Sep 17 00:00:00 2001 From: Pawel Czajka Date: Wed, 26 Mar 2025 14:48:08 +0100 Subject: [PATCH 02/17] some comments --- .../flink/util/transformer/aggregate/TransformersTest.scala | 2 ++ .../flink/util/transformer/aggregate/sampleTransformers.scala | 3 ++- .../engine/flink/util/transformer/aggregate/triggers.scala | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala index cb93d162386..6635327d3af 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala @@ -746,6 +746,7 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins } } + // TODO_PAWEL jest ok tu jest jakis test test("sum session aggregate on event with context") { val id = "1" @@ -920,6 +921,7 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins ) } + // TODO_PAWEL jest ok tu jest jakis test private def session( aggregator: String, aggregateBy: String, diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/sampleTransformers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/sampleTransformers.scala index 24e8e6e17ce..635225a0faa 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/sampleTransformers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/sampleTransformers.scala @@ -76,7 +76,7 @@ object sampleTransformers { * * You should define `#AGG` global variable, because it is used in editors picked for `aggregateBy` parameter. */ - // TODO_PAWEL jest to jednak ten + // TODO_PAWEL inny ktory byc moze nie dziala class TumblingAggregateTransformer(config: AggregateWindowsConfig) extends CustomStreamTransformer with UnboundedStreamComponent @@ -138,6 +138,7 @@ object sampleTransformers { * * You should define `#AGG` global variable, because it is used in editors picked for `aggregateBy` parameter. */ + // TODO_PAWEL o tym mowil zbyszek ze nie dziala. object SessionWindowAggregateTransformer extends CustomStreamTransformer with UnboundedStreamComponent diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala index e1af7afa38c..5aa1bbe3082 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala @@ -27,7 +27,7 @@ object triggers { // Window won't be emitted on end, but after each event. This would be useful e.g. when we want to have // daily (i.e. for current day) aggregate for each incoming event, but we're not interested in daily summary on each midnight - // TODO_PAWEL + // TODO_PAWEL, jednak to nie jest chyba to case class FireOnEachEvent[T, W <: Window](delegate: Trigger[_ >: T, W]) extends DelegatingTrigger[T, W](delegate) { override def onElement(element: T, timestamp: Long, window: W, ctx: Trigger.TriggerContext): TriggerResult = { From e2ec54d6f78cec71c61466d05d2272a54f9a4c29 Mon Sep 17 00:00:00 2001 From: Pawel Czajka Date: Thu, 27 Mar 2025 12:33:08 +0100 Subject: [PATCH 03/17] some comments, cos tam --- .../flink/util/transformer/aggregate/TransformersTest.scala | 4 +++- .../transformer/aggregate/OnEventTriggerWindowOperator.scala | 5 +++++ .../util/transformer/aggregate/sampleTransformers.scala | 1 + .../flink/util/transformer/aggregate/transformers.scala | 2 ++ .../engine/flink/util/transformer/aggregate/triggers.scala | 2 +- 5 files changed, 12 insertions(+), 2 deletions(-) diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala index 6635327d3af..79137c4306c 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala @@ -751,7 +751,8 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins val id = "1" ResultsCollectingListenerHolder.withListener { collectingListener => - val testRecords = + val testRecords = { + // TODO_PAWEL to jest List( TestRecordHours(id, 0, 1, "a"), TestRecordHours(id, 2, 2, "d"), @@ -761,6 +762,7 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins // stop condition TestRecordHours(id, 6, 5, "a") ) + } val model = modelData(collectingListener, testRecords) val testScenario = session("#AGG.list", "#input.eId", SessionWindowTrigger.OnEvent, "#input.str == 'stop'") diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala index afbf12e31b1..c96b164e672 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala @@ -41,6 +41,7 @@ object OnEventTriggerWindowOperator { implicit fctx: FlinkCustomNodeContext ) { + // TODO_PAWEL jest ok to jak ta funkcja jes tuzyta jest pewnie problemem def eventTriggerWindow( assigner: WindowAssigner[_ >: Input[A], TimeWindow], types: AggregatorTypeInformations, @@ -111,6 +112,10 @@ private class ValueEmittingWindowFunction( elements: lang.Iterable[AnyRef], out: Collector[ValueWithContext[AnyRef]] ): Unit = { + // TODO_PAWEL tu mozna by ten out scastowac na timestampedcollector i mu ustawic timestamp, tylko problem jest taki ze nie wiadomo jaki + // tutaj niby widze, ze te elements maja typ StreamRecord, mozna z nich wiec wziac timestamp + // wiec moge ustawic np timestamp ostatniego z nich, ale nie zawsze. czasem jest juz ustawiony prawidlowy w tym 'out' timestampcollector + // tylko jak mam odroznic sytuacje gdy window jest jakby przerwany wczesniej, w tym triggerze. mam jakis stan trzymac? elements.forEach { element => val ctx = Option(elementHolder.get()).getOrElse(api.Context(contextIdGenerator.nextContextId())) out.collect(ValueWithContext(element, KeyEnricher.enrichWithKey(ctx, key))) diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/sampleTransformers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/sampleTransformers.scala index 635225a0faa..e097b27d6dc 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/sampleTransformers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/sampleTransformers.scala @@ -19,6 +19,7 @@ object sampleTransformers { * * You should define `#AGG` global variable, because it is used in editors picked for `aggregateBy` parameter. */ + // TODO_PAWEL ten dzial prawidlowo object SlidingAggregateTransformerV2 extends CustomStreamTransformer with UnboundedStreamComponent diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala index 4dbb0403102..4f6551b901a 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala @@ -125,6 +125,7 @@ object transformers { case TumblingWindowTrigger.OnEvent => keyedStream // TODO_PAWEL tutaj cos jest nie teges + // TODO_PAWEL important tu jest ten co dziala .eventTriggerWindow(windowDefinition, typeInfos, aggregatingFunction, EventTimeTrigger.create()) case TumblingWindowTrigger.OnEnd => keyedStream @@ -193,6 +194,7 @@ object transformers { (sessionWindowTrigger match { case SessionWindowTrigger.OnEvent => + // TODO_PAWEL important a tu jest to co nie dziala keyedStream.eventTriggerWindow(windowDefinition, typeInfos, aggregatingFunction, baseTrigger) case SessionWindowTrigger.OnEnd => keyedStream diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala index 5aa1bbe3082..95d7f760da5 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala @@ -27,7 +27,7 @@ object triggers { // Window won't be emitted on end, but after each event. This would be useful e.g. when we want to have // daily (i.e. for current day) aggregate for each incoming event, but we're not interested in daily summary on each midnight - // TODO_PAWEL, jednak to nie jest chyba to + // TODO_PAWEL, jednak to nie jest chyba to, a jednak moze jest case class FireOnEachEvent[T, W <: Window](delegate: Trigger[_ >: T, W]) extends DelegatingTrigger[T, W](delegate) { override def onElement(element: T, timestamp: Long, window: W, ctx: Trigger.TriggerContext): TriggerResult = { From ee793e18107200b7bbced831a3d577cfd00398da Mon Sep 17 00:00:00 2001 From: Pawel Czajka Date: Thu, 27 Mar 2025 14:26:20 +0100 Subject: [PATCH 04/17] some comments removed --- .../flink/util/transformer/aggregate/TransformersTest.scala | 5 +---- .../transformer/aggregate/OnEventTriggerWindowOperator.scala | 1 - .../util/transformer/aggregate/sampleTransformers.scala | 4 ++-- .../flink/util/transformer/aggregate/transformers.scala | 3 --- .../engine/flink/util/transformer/aggregate/triggers.scala | 1 - 5 files changed, 3 insertions(+), 11 deletions(-) diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala index 79137c4306c..3b66b3afeda 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala @@ -751,8 +751,7 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins val id = "1" ResultsCollectingListenerHolder.withListener { collectingListener => - val testRecords = { - // TODO_PAWEL to jest + val testRecords = List( TestRecordHours(id, 0, 1, "a"), TestRecordHours(id, 2, 2, "d"), @@ -762,7 +761,6 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins // stop condition TestRecordHours(id, 6, 5, "a") ) - } val model = modelData(collectingListener, testRecords) val testScenario = session("#AGG.list", "#input.eId", SessionWindowTrigger.OnEvent, "#input.str == 'stop'") @@ -923,7 +921,6 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins ) } - // TODO_PAWEL jest ok tu jest jakis test private def session( aggregator: String, aggregateBy: String, diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala index c96b164e672..7e35dec103e 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala @@ -41,7 +41,6 @@ object OnEventTriggerWindowOperator { implicit fctx: FlinkCustomNodeContext ) { - // TODO_PAWEL jest ok to jak ta funkcja jes tuzyta jest pewnie problemem def eventTriggerWindow( assigner: WindowAssigner[_ >: Input[A], TimeWindow], types: AggregatorTypeInformations, diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/sampleTransformers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/sampleTransformers.scala index e097b27d6dc..31ee294c1c9 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/sampleTransformers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/sampleTransformers.scala @@ -77,7 +77,7 @@ object sampleTransformers { * * You should define `#AGG` global variable, because it is used in editors picked for `aggregateBy` parameter. */ - // TODO_PAWEL inny ktory byc moze nie dziala + // TODO_PAWEL inny ktory tez nie dziala class TumblingAggregateTransformer(config: AggregateWindowsConfig) extends CustomStreamTransformer with UnboundedStreamComponent @@ -139,7 +139,7 @@ object sampleTransformers { * * You should define `#AGG` global variable, because it is used in editors picked for `aggregateBy` parameter. */ - // TODO_PAWEL o tym mowil zbyszek ze nie dziala. + // TODO_PAWEL o tym mowil zbyszek ze nie dziala, i rzeczywiscie nie dziala object SessionWindowAggregateTransformer extends CustomStreamTransformer with UnboundedStreamComponent diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala index 4f6551b901a..8f9299abd45 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala @@ -124,8 +124,6 @@ object transformers { (tumblingWindowTrigger match { case TumblingWindowTrigger.OnEvent => keyedStream - // TODO_PAWEL tutaj cos jest nie teges - // TODO_PAWEL important tu jest ten co dziala .eventTriggerWindow(windowDefinition, typeInfos, aggregatingFunction, EventTimeTrigger.create()) case TumblingWindowTrigger.OnEnd => keyedStream @@ -194,7 +192,6 @@ object transformers { (sessionWindowTrigger match { case SessionWindowTrigger.OnEvent => - // TODO_PAWEL important a tu jest to co nie dziala keyedStream.eventTriggerWindow(windowDefinition, typeInfos, aggregatingFunction, baseTrigger) case SessionWindowTrigger.OnEnd => keyedStream diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala index 95d7f760da5..d3201e428c1 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala @@ -27,7 +27,6 @@ object triggers { // Window won't be emitted on end, but after each event. This would be useful e.g. when we want to have // daily (i.e. for current day) aggregate for each incoming event, but we're not interested in daily summary on each midnight - // TODO_PAWEL, jednak to nie jest chyba to, a jednak moze jest case class FireOnEachEvent[T, W <: Window](delegate: Trigger[_ >: T, W]) extends DelegatingTrigger[T, W](delegate) { override def onElement(element: T, timestamp: Long, window: W, ctx: Trigger.TriggerContext): TriggerResult = { From 2b50a5f98b186b0e960693c6ac1327c71c0e258c Mon Sep 17 00:00:00 2001 From: Pawel Czajka Date: Thu, 27 Mar 2025 15:45:08 +0100 Subject: [PATCH 05/17] changed code on each event case --- .../aggregate/OnEventTriggerWindowOperator.scala | 16 +++++++++++----- .../util/transformer/aggregate/triggers.scala | 9 ++++++++- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala index 7e35dec103e..393a0bfd314 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala @@ -5,6 +5,7 @@ import org.apache.flink.api.common.functions.{AggregateFunction, OpenContext, Ru import org.apache.flink.api.common.state.AggregatingStateDescriptor import org.apache.flink.streaming.api.datastream.{KeyedStream, SingleOutputStreamOperator} import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner import org.apache.flink.streaming.api.windowing.triggers.Trigger import org.apache.flink.streaming.api.windowing.windows.TimeWindow @@ -17,11 +18,7 @@ import pl.touk.nussknacker.engine.api.ValueWithContext import pl.touk.nussknacker.engine.api.runtimecontext.{ContextIdGenerator, EngineRuntimeContext} import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomNodeContext import pl.touk.nussknacker.engine.flink.util.keyed.{KeyEnricher, StringKeyedValue} -import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.OnEventTriggerWindowOperator.{ - elementHolder, - stateDescriptorName, - Input -} +import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.OnEventTriggerWindowOperator.{Input, elementHolder, stateDescriptorName, timestampToOverrideHolder} import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.transformers.AggregatorTypeInformations import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.triggers.FireOnEachEvent @@ -33,6 +30,7 @@ object OnEventTriggerWindowOperator { // We use ThreadLocal to pass context from WindowOperator.processElement to ProcessWindowFunction // without modifying too much Flink code. This assumes that window is triggered only on event val elementHolder = new ThreadLocal[api.Context] + val timestampToOverrideHolder = new ThreadLocal[Long] // WindowOperatorBuilder.WINDOW_STATE_NAME - should be the same for compatibility val stateDescriptorName = "window-contents" @@ -88,6 +86,7 @@ class OnEventTriggerWindowOperator[A]( super.processElement(element) } finally { elementHolder.remove() + timestampToOverrideHolder.remove() } } @@ -117,6 +116,13 @@ private class ValueEmittingWindowFunction( // tylko jak mam odroznic sytuacje gdy window jest jakby przerwany wczesniej, w tym triggerze. mam jakis stan trzymac? elements.forEach { element => val ctx = Option(elementHolder.get()).getOrElse(api.Context(contextIdGenerator.nextContextId())) + + out match { + case timedOut: TimestampedCollector[_] => + Option(timestampToOverrideHolder.get()).foreach(timestamp => timedOut.setAbsoluteTimestamp(timestamp)) + case _ => + } + out.collect(ValueWithContext(element, KeyEnricher.enrichWithKey(ctx, key))) } } diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala index d3201e428c1..5c13b1f18d1 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala @@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.flink.util.transformer.aggregate import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult} import org.apache.flink.streaming.api.windowing.windows.Window +import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.OnEventTriggerWindowOperator.timestampToOverrideHolder object triggers { @@ -31,11 +32,17 @@ object triggers { override def onElement(element: T, timestamp: Long, window: W, ctx: Trigger.TriggerContext): TriggerResult = { val result = super.onElement(element, timestamp, window, ctx) - result match { + val changedResult = result match { case TriggerResult.CONTINUE => TriggerResult.FIRE case TriggerResult.PURGE => TriggerResult.FIRE_AND_PURGE case fire => fire } + + if (!result.isFire && changedResult.isFire) { + timestampToOverrideHolder.set(timestamp) + } + + changedResult } override def onProcessingTime(time: Long, window: W, ctx: Trigger.TriggerContext): TriggerResult = { From ed51f16a2311d3540d28f5cbf97b882e00bc189e Mon Sep 17 00:00:00 2001 From: Pawel Czajka Date: Thu, 27 Mar 2025 15:46:29 +0100 Subject: [PATCH 06/17] changed code on each event case --- .../transformer/aggregate/OnEventTriggerWindowOperator.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala index 393a0bfd314..f37c06521ac 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala @@ -120,6 +120,7 @@ private class ValueEmittingWindowFunction( out match { case timedOut: TimestampedCollector[_] => Option(timestampToOverrideHolder.get()).foreach(timestamp => timedOut.setAbsoluteTimestamp(timestamp)) + // TODO_PAWEL maybe we should throw in other case? case _ => } From ed9ded4bba149ca5189330c0db6f803fb9a130ec Mon Sep 17 00:00:00 2001 From: Pawel Czajka Date: Thu, 27 Mar 2025 15:49:21 +0100 Subject: [PATCH 07/17] simplify --- .../flink/util/transformer/aggregate/transformers.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala index 8f9299abd45..42122698b31 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala @@ -198,11 +198,7 @@ object transformers { .window(windowDefinition) .trigger(baseTrigger) .aggregate( - new UnwrappingAggregateFunction[(AnyRef, java.lang.Boolean)]( - aggregator, - aggregateBy.returnType, - _._1 - ), + aggregatingFunction, EnrichingWithKeyFunction(fctx), typeInfos.storedTypeInfo, typeInfos.returnTypeInfo, From 89a064f3f52584e7d00140efe48b70237f3b3ea4 Mon Sep 17 00:00:00 2001 From: Pawel Czajka Date: Thu, 27 Mar 2025 16:22:10 +0100 Subject: [PATCH 08/17] added hacked operator --- ...rator.scala => HackedWindowOperator.scala} | 22 +++++++++++++------ .../transformer/aggregate/transformers.scala | 19 +++++----------- .../util/transformer/aggregate/triggers.scala | 3 ++- 3 files changed, 22 insertions(+), 22 deletions(-) rename engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/{OnEventTriggerWindowOperator.scala => HackedWindowOperator.scala} (83%) diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/HackedWindowOperator.scala similarity index 83% rename from engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala rename to engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/HackedWindowOperator.scala index f37c06521ac..367a22656c5 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/HackedWindowOperator.scala @@ -18,18 +18,19 @@ import pl.touk.nussknacker.engine.api.ValueWithContext import pl.touk.nussknacker.engine.api.runtimecontext.{ContextIdGenerator, EngineRuntimeContext} import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomNodeContext import pl.touk.nussknacker.engine.flink.util.keyed.{KeyEnricher, StringKeyedValue} -import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.OnEventTriggerWindowOperator.{Input, elementHolder, stateDescriptorName, timestampToOverrideHolder} +import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.HackedWindowOperator.{Input, elementHolder, stateDescriptorName, timestampToOverrideHolder} import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.transformers.AggregatorTypeInformations import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.triggers.FireOnEachEvent import java.lang -object OnEventTriggerWindowOperator { +object HackedWindowOperator { type Input[A] = ValueWithContext[StringKeyedValue[A]] // We use ThreadLocal to pass context from WindowOperator.processElement to ProcessWindowFunction // without modifying too much Flink code. This assumes that window is triggered only on event val elementHolder = new ThreadLocal[api.Context] + // TODO_PAWEL make this and other holder private here, and pass lambda to set it to trigger, add callback to trigger in other words. Yes, it is possible val timestampToOverrideHolder = new ThreadLocal[Long] // WindowOperatorBuilder.WINDOW_STATE_NAME - should be the same for compatibility @@ -39,15 +40,22 @@ object OnEventTriggerWindowOperator { implicit fctx: FlinkCustomNodeContext ) { - def eventTriggerWindow( + def hackedEventTriggerWindow( assigner: WindowAssigner[_ >: Input[A], TimeWindow], types: AggregatorTypeInformations, aggregateFunction: AggregateFunction[Input[A], AnyRef, AnyRef], trigger: Trigger[_ >: Input[A], TimeWindow] - ): SingleOutputStreamOperator[ValueWithContext[AnyRef]] = stream.transform( + ): SingleOutputStreamOperator[ValueWithContext[AnyRef]] = hackedWindow(assigner, types, aggregateFunction, FireOnEachEvent[ValueWithContext[StringKeyedValue[A]], TimeWindow](trigger)) + + def hackedWindow( + assigner: WindowAssigner[_ >: Input[A], TimeWindow], + types: AggregatorTypeInformations, + aggregateFunction: AggregateFunction[Input[A], AnyRef, AnyRef], + trigger: Trigger[_ >: Input[A], TimeWindow] + ): SingleOutputStreamOperator[ValueWithContext[AnyRef]] = stream.transform( assigner.getClass.getSimpleName, types.returnedValueTypeInfo, - new OnEventTriggerWindowOperator(stream, fctx, assigner, types, aggregateFunction, trigger) + new HackedWindowOperator(stream, fctx, assigner, types, aggregateFunction, trigger) ) } @@ -55,7 +63,7 @@ object OnEventTriggerWindowOperator { } @silent("deprecated") -class OnEventTriggerWindowOperator[A]( +class HackedWindowOperator[A]( stream: KeyedStream[Input[A], String], fctx: FlinkCustomNodeContext, assigner: WindowAssigner[_ >: Input[A], TimeWindow], @@ -75,7 +83,7 @@ class OnEventTriggerWindowOperator[A]( new InternalSingleValueProcessWindowFunction( new ValueEmittingWindowFunction(fctx.convertToEngineRuntimeContext, fctx.nodeId) ), - FireOnEachEvent[ValueWithContext[StringKeyedValue[A]], TimeWindow](trigger), + trigger, 0L, // lateness, null // tag ) { diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala index 42122698b31..c337ae26614 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala @@ -8,13 +8,13 @@ import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindo import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger import org.apache.flink.streaming.api.windowing.windows.TimeWindow -import pl.touk.nussknacker.engine.api.{Context => NkContext, NodeId, _} +import pl.touk.nussknacker.engine.api.{Context => NkContext, _} import pl.touk.nussknacker.engine.api.context.ContextTransformation import pl.touk.nussknacker.engine.flink.api.compat.ExplicitUidInOperatorsSupport import pl.touk.nussknacker.engine.flink.api.process._ import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection import pl.touk.nussknacker.engine.flink.util.richflink._ -import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.OnEventTriggerWindowOperator.OnEventOperatorKeyedStream +import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.HackedWindowOperator.OnEventOperatorKeyedStream import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.triggers.ClosingEndEventTrigger import pl.touk.nussknacker.engine.util.KeyedValue @@ -124,7 +124,7 @@ object transformers { (tumblingWindowTrigger match { case TumblingWindowTrigger.OnEvent => keyedStream - .eventTriggerWindow(windowDefinition, typeInfos, aggregatingFunction, EventTimeTrigger.create()) + .hackedEventTriggerWindow(windowDefinition, typeInfos, aggregatingFunction, EventTimeTrigger.create()) case TumblingWindowTrigger.OnEnd => keyedStream .window(windowDefinition) @@ -192,18 +192,9 @@ object transformers { (sessionWindowTrigger match { case SessionWindowTrigger.OnEvent => - keyedStream.eventTriggerWindow(windowDefinition, typeInfos, aggregatingFunction, baseTrigger) + keyedStream.hackedEventTriggerWindow(windowDefinition, typeInfos, aggregatingFunction, baseTrigger) case SessionWindowTrigger.OnEnd => - keyedStream - .window(windowDefinition) - .trigger(baseTrigger) - .aggregate( - aggregatingFunction, - EnrichingWithKeyFunction(fctx), - typeInfos.storedTypeInfo, - typeInfos.returnTypeInfo, - typeInfos.returnedValueTypeInfo - ) + keyedStream.hackedWindow(windowDefinition, typeInfos, aggregatingFunction, baseTrigger) }).setUidWithName(ctx, ExplicitUidInOperatorsSupport.defaultExplicitUidInStatefulOperators) }) ) diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala index 5c13b1f18d1..f4dd285f14b 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala @@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.flink.util.transformer.aggregate import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult} import org.apache.flink.streaming.api.windowing.windows.Window -import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.OnEventTriggerWindowOperator.timestampToOverrideHolder +import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.HackedWindowOperator.timestampToOverrideHolder object triggers { @@ -62,6 +62,7 @@ object triggers { override def onElement(element: T, timestamp: Long, window: W, ctx: Trigger.TriggerContext): TriggerResult = { if (endFunction(element)) { + timestampToOverrideHolder.set(timestamp) TriggerResult.FIRE_AND_PURGE } else super.onElement(element, timestamp, window, ctx) From c58d14fa3b755e86e472af70c643d1f39811d112 Mon Sep 17 00:00:00 2001 From: Pawel Czajka Date: Fri, 28 Mar 2025 09:12:44 +0100 Subject: [PATCH 09/17] some comments removed --- .../flink/util/transformer/aggregate/TransformersTest.scala | 1 - .../flink/util/transformer/aggregate/sampleTransformers.scala | 3 --- 2 files changed, 4 deletions(-) diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala index 3b66b3afeda..cb93d162386 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala @@ -746,7 +746,6 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins } } - // TODO_PAWEL jest ok tu jest jakis test test("sum session aggregate on event with context") { val id = "1" diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/sampleTransformers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/sampleTransformers.scala index 31ee294c1c9..4abce37274f 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/sampleTransformers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/sampleTransformers.scala @@ -19,7 +19,6 @@ object sampleTransformers { * * You should define `#AGG` global variable, because it is used in editors picked for `aggregateBy` parameter. */ - // TODO_PAWEL ten dzial prawidlowo object SlidingAggregateTransformerV2 extends CustomStreamTransformer with UnboundedStreamComponent @@ -77,7 +76,6 @@ object sampleTransformers { * * You should define `#AGG` global variable, because it is used in editors picked for `aggregateBy` parameter. */ - // TODO_PAWEL inny ktory tez nie dziala class TumblingAggregateTransformer(config: AggregateWindowsConfig) extends CustomStreamTransformer with UnboundedStreamComponent @@ -139,7 +137,6 @@ object sampleTransformers { * * You should define `#AGG` global variable, because it is used in editors picked for `aggregateBy` parameter. */ - // TODO_PAWEL o tym mowil zbyszek ze nie dziala, i rzeczywiscie nie dziala object SessionWindowAggregateTransformer extends CustomStreamTransformer with UnboundedStreamComponent From a85cfa3d565bf575100ff138ed8e448c26361e93 Mon Sep 17 00:00:00 2001 From: Pawel Czajka Date: Fri, 28 Mar 2025 09:23:27 +0100 Subject: [PATCH 10/17] fix test --- .../aggregate/HackedWindowOperator.scala | 14 +++++++++----- .../util/transformer/aggregate/transformers.scala | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/HackedWindowOperator.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/HackedWindowOperator.scala index 367a22656c5..8394cb454af 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/HackedWindowOperator.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/HackedWindowOperator.scala @@ -45,17 +45,18 @@ object HackedWindowOperator { types: AggregatorTypeInformations, aggregateFunction: AggregateFunction[Input[A], AnyRef, AnyRef], trigger: Trigger[_ >: Input[A], TimeWindow] - ): SingleOutputStreamOperator[ValueWithContext[AnyRef]] = hackedWindow(assigner, types, aggregateFunction, FireOnEachEvent[ValueWithContext[StringKeyedValue[A]], TimeWindow](trigger)) + ): SingleOutputStreamOperator[ValueWithContext[AnyRef]] = hackedWindow(assigner, types, aggregateFunction, FireOnEachEvent[ValueWithContext[StringKeyedValue[A]], TimeWindow](trigger), preserveContext = true) def hackedWindow( assigner: WindowAssigner[_ >: Input[A], TimeWindow], types: AggregatorTypeInformations, aggregateFunction: AggregateFunction[Input[A], AnyRef, AnyRef], - trigger: Trigger[_ >: Input[A], TimeWindow] + trigger: Trigger[_ >: Input[A], TimeWindow], + preserveContext: Boolean, ): SingleOutputStreamOperator[ValueWithContext[AnyRef]] = stream.transform( assigner.getClass.getSimpleName, types.returnedValueTypeInfo, - new HackedWindowOperator(stream, fctx, assigner, types, aggregateFunction, trigger) + new HackedWindowOperator(stream, fctx, assigner, types, aggregateFunction, trigger, preserveContext) ) } @@ -69,7 +70,8 @@ class HackedWindowOperator[A]( assigner: WindowAssigner[_ >: Input[A], TimeWindow], types: AggregatorTypeInformations, aggregateFunction: AggregateFunction[Input[A], AnyRef, AnyRef], - trigger: Trigger[_ >: Input[A], TimeWindow] + trigger: Trigger[_ >: Input[A], TimeWindow], + preserveContext: Boolean, ) extends WindowOperator[String, Input[A], AnyRef, ValueWithContext[AnyRef], TimeWindow]( assigner, assigner.getWindowSerializer(stream.getExecutionConfig), @@ -89,7 +91,9 @@ class HackedWindowOperator[A]( ) { override def processElement(element: StreamRecord[ValueWithContext[StringKeyedValue[A]]]): Unit = { - elementHolder.set(element.getValue.context) + if (preserveContext) { + elementHolder.set(element.getValue.context) + } try { super.processElement(element) } finally { diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala index c337ae26614..5c2c87e7d42 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala @@ -194,7 +194,7 @@ object transformers { case SessionWindowTrigger.OnEvent => keyedStream.hackedEventTriggerWindow(windowDefinition, typeInfos, aggregatingFunction, baseTrigger) case SessionWindowTrigger.OnEnd => - keyedStream.hackedWindow(windowDefinition, typeInfos, aggregatingFunction, baseTrigger) + keyedStream.hackedWindow(windowDefinition, typeInfos, aggregatingFunction, baseTrigger, preserveContext = false) }).setUidWithName(ctx, ExplicitUidInOperatorsSupport.defaultExplicitUidInStatefulOperators) }) ) From f26332e023f87aa047d2ee6686da635b5e6024a6 Mon Sep 17 00:00:00 2001 From: Pawel Czajka Date: Fri, 28 Mar 2025 10:37:42 +0100 Subject: [PATCH 11/17] small refactor --- .../aggregate/HackedWindowOperator.scala | 20 +++++++++++++++---- .../transformer/aggregate/transformers.scala | 8 ++++---- .../util/transformer/aggregate/triggers.scala | 9 ++++----- 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/HackedWindowOperator.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/HackedWindowOperator.scala index 8394cb454af..56b3179409c 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/HackedWindowOperator.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/HackedWindowOperator.scala @@ -8,7 +8,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction import org.apache.flink.streaming.api.operators.TimestampedCollector import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner import org.apache.flink.streaming.api.windowing.triggers.Trigger -import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window} import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction import org.apache.flink.streaming.runtime.streamrecord.StreamRecord @@ -20,7 +20,7 @@ import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomNodeContext import pl.touk.nussknacker.engine.flink.util.keyed.{KeyEnricher, StringKeyedValue} import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.HackedWindowOperator.{Input, elementHolder, stateDescriptorName, timestampToOverrideHolder} import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.transformers.AggregatorTypeInformations -import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.triggers.FireOnEachEvent +import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.triggers.{ClosingEndEventTrigger, FireOnEachEvent} import java.lang @@ -30,12 +30,23 @@ object HackedWindowOperator { // We use ThreadLocal to pass context from WindowOperator.processElement to ProcessWindowFunction // without modifying too much Flink code. This assumes that window is triggered only on event val elementHolder = new ThreadLocal[api.Context] - // TODO_PAWEL make this and other holder private here, and pass lambda to set it to trigger, add callback to trigger in other words. Yes, it is possible val timestampToOverrideHolder = new ThreadLocal[Long] // WindowOperatorBuilder.WINDOW_STATE_NAME - should be the same for compatibility val stateDescriptorName = "window-contents" + private def overrideResultEventTime(timestamp: Long): Unit = { + timestampToOverrideHolder.set(timestamp) + } + + def fireOnEachEventTriggerWrapper[T, W <: Window](delegate: Trigger[_ >: T, W]): FireOnEachEvent[T, W] = { + FireOnEachEvent(delegate, timestamp => overrideResultEventTime(timestamp)) + } + + def closingEndEventTriggerWrapper[T, W <: Window](delegate: Trigger[_ >: T, W], endFunction: T => Boolean): ClosingEndEventTrigger[T, W] = { + ClosingEndEventTrigger(delegate, endFunction, timestamp => overrideResultEventTime(timestamp)) + } + implicit class OnEventOperatorKeyedStream[A](stream: KeyedStream[Input[A], String])( implicit fctx: FlinkCustomNodeContext ) { @@ -45,7 +56,8 @@ object HackedWindowOperator { types: AggregatorTypeInformations, aggregateFunction: AggregateFunction[Input[A], AnyRef, AnyRef], trigger: Trigger[_ >: Input[A], TimeWindow] - ): SingleOutputStreamOperator[ValueWithContext[AnyRef]] = hackedWindow(assigner, types, aggregateFunction, FireOnEachEvent[ValueWithContext[StringKeyedValue[A]], TimeWindow](trigger), preserveContext = true) + ): SingleOutputStreamOperator[ValueWithContext[AnyRef]] = hackedWindow(assigner, types, aggregateFunction, + fireOnEachEventTriggerWrapper[ValueWithContext[StringKeyedValue[A]], TimeWindow](trigger), preserveContext = true) def hackedWindow( assigner: WindowAssigner[_ >: Input[A], TimeWindow], diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala index 5c2c87e7d42..b374d8a38cb 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala @@ -177,11 +177,11 @@ object transformers { implicit val fctx: FlinkCustomNodeContext = ctx val typeInfos = AggregatorTypeInformations(ctx, aggregator, aggregateBy) - val baseTrigger = - ClosingEndEventTrigger[ValueWithContext[KeyedValue[String, (AnyRef, java.lang.Boolean)]], TimeWindow]( + val baseTrigger = { + HackedWindowOperator.closingEndEventTriggerWrapper[ValueWithContext[KeyedValue[String, (AnyRef, java.lang.Boolean)]], TimeWindow]( EventTimeTrigger.create(), - _.value.value._2 - ) + _.value.value._2) + } val groupByValue = aggregateBy.product(endSessionCondition) val keyedStream = start diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala index f4dd285f14b..761c2696cdf 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala @@ -2,7 +2,6 @@ package pl.touk.nussknacker.engine.flink.util.transformer.aggregate import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult} import org.apache.flink.streaming.api.windowing.windows.Window -import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.HackedWindowOperator.timestampToOverrideHolder object triggers { @@ -28,7 +27,7 @@ object triggers { // Window won't be emitted on end, but after each event. This would be useful e.g. when we want to have // daily (i.e. for current day) aggregate for each incoming event, but we're not interested in daily summary on each midnight - case class FireOnEachEvent[T, W <: Window](delegate: Trigger[_ >: T, W]) extends DelegatingTrigger[T, W](delegate) { + case class FireOnEachEvent[T, W <: Window](delegate: Trigger[_ >: T, W], onOverrideFireOnElementAtTimestamp: Long => Unit) extends DelegatingTrigger[T, W](delegate) { override def onElement(element: T, timestamp: Long, window: W, ctx: Trigger.TriggerContext): TriggerResult = { val result = super.onElement(element, timestamp, window, ctx) @@ -39,7 +38,7 @@ object triggers { } if (!result.isFire && changedResult.isFire) { - timestampToOverrideHolder.set(timestamp) + onOverrideFireOnElementAtTimestamp(timestamp) } changedResult @@ -57,12 +56,12 @@ object triggers { } - case class ClosingEndEventTrigger[T, W <: Window](delegate: Trigger[_ >: T, W], endFunction: T => Boolean) + case class ClosingEndEventTrigger[T, W <: Window](delegate: Trigger[_ >: T, W], endFunction: T => Boolean, onEndEventAtTimestamp: Long => Unit) extends DelegatingTrigger[T, W](delegate) { override def onElement(element: T, timestamp: Long, window: W, ctx: Trigger.TriggerContext): TriggerResult = { if (endFunction(element)) { - timestampToOverrideHolder.set(timestamp) + onEndEventAtTimestamp(timestamp) TriggerResult.FIRE_AND_PURGE } else super.onElement(element, timestamp, window, ctx) From b05b47622867159122aabb1ed86d97ef1bc7c841 Mon Sep 17 00:00:00 2001 From: Pawel Czajka Date: Fri, 28 Mar 2025 10:51:08 +0100 Subject: [PATCH 12/17] some more refactors --- ...tor.scala => ExtendedWindowOperator.scala} | 30 +++++++++---------- .../transformer/aggregate/transformers.scala | 11 ++++--- .../util/transformer/aggregate/triggers.scala | 8 ++--- 3 files changed, 23 insertions(+), 26 deletions(-) rename engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/{HackedWindowOperator.scala => ExtendedWindowOperator.scala} (82%) diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/HackedWindowOperator.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/ExtendedWindowOperator.scala similarity index 82% rename from engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/HackedWindowOperator.scala rename to engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/ExtendedWindowOperator.scala index 56b3179409c..5f9bed46a2a 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/HackedWindowOperator.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/ExtendedWindowOperator.scala @@ -18,25 +18,27 @@ import pl.touk.nussknacker.engine.api.ValueWithContext import pl.touk.nussknacker.engine.api.runtimecontext.{ContextIdGenerator, EngineRuntimeContext} import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomNodeContext import pl.touk.nussknacker.engine.flink.util.keyed.{KeyEnricher, StringKeyedValue} -import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.HackedWindowOperator.{Input, elementHolder, stateDescriptorName, timestampToOverrideHolder} +import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.ExtendedWindowOperator.{Input, elementHolder, stateDescriptorName, overriddenResultEventTimeHolder} import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.transformers.AggregatorTypeInformations import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.triggers.{ClosingEndEventTrigger, FireOnEachEvent} import java.lang -object HackedWindowOperator { +object ExtendedWindowOperator { type Input[A] = ValueWithContext[StringKeyedValue[A]] // We use ThreadLocal to pass context from WindowOperator.processElement to ProcessWindowFunction // without modifying too much Flink code. This assumes that window is triggered only on event val elementHolder = new ThreadLocal[api.Context] - val timestampToOverrideHolder = new ThreadLocal[Long] + // This ThreadLocal is also used to pass information between two distant places in flink code without + // modifying it + val overriddenResultEventTimeHolder = new ThreadLocal[Long] // WindowOperatorBuilder.WINDOW_STATE_NAME - should be the same for compatibility val stateDescriptorName = "window-contents" private def overrideResultEventTime(timestamp: Long): Unit = { - timestampToOverrideHolder.set(timestamp) + overriddenResultEventTimeHolder.set(timestamp) } def fireOnEachEventTriggerWrapper[T, W <: Window](delegate: Trigger[_ >: T, W]): FireOnEachEvent[T, W] = { @@ -51,15 +53,15 @@ object HackedWindowOperator { implicit fctx: FlinkCustomNodeContext ) { - def hackedEventTriggerWindow( + def extendedEventTriggerWindow( assigner: WindowAssigner[_ >: Input[A], TimeWindow], types: AggregatorTypeInformations, aggregateFunction: AggregateFunction[Input[A], AnyRef, AnyRef], trigger: Trigger[_ >: Input[A], TimeWindow] - ): SingleOutputStreamOperator[ValueWithContext[AnyRef]] = hackedWindow(assigner, types, aggregateFunction, + ): SingleOutputStreamOperator[ValueWithContext[AnyRef]] = extendedWindow(assigner, types, aggregateFunction, fireOnEachEventTriggerWrapper[ValueWithContext[StringKeyedValue[A]], TimeWindow](trigger), preserveContext = true) - def hackedWindow( + def extendedWindow( assigner: WindowAssigner[_ >: Input[A], TimeWindow], types: AggregatorTypeInformations, aggregateFunction: AggregateFunction[Input[A], AnyRef, AnyRef], @@ -68,7 +70,7 @@ object HackedWindowOperator { ): SingleOutputStreamOperator[ValueWithContext[AnyRef]] = stream.transform( assigner.getClass.getSimpleName, types.returnedValueTypeInfo, - new HackedWindowOperator(stream, fctx, assigner, types, aggregateFunction, trigger, preserveContext) + new ExtendedWindowOperator(stream, fctx, assigner, types, aggregateFunction, trigger, preserveContext) ) } @@ -76,7 +78,7 @@ object HackedWindowOperator { } @silent("deprecated") -class HackedWindowOperator[A]( +class ExtendedWindowOperator[A]( stream: KeyedStream[Input[A], String], fctx: FlinkCustomNodeContext, assigner: WindowAssigner[_ >: Input[A], TimeWindow], @@ -110,7 +112,7 @@ class HackedWindowOperator[A]( super.processElement(element) } finally { elementHolder.remove() - timestampToOverrideHolder.remove() + overriddenResultEventTimeHolder.remove() } } @@ -134,17 +136,13 @@ private class ValueEmittingWindowFunction( elements: lang.Iterable[AnyRef], out: Collector[ValueWithContext[AnyRef]] ): Unit = { - // TODO_PAWEL tu mozna by ten out scastowac na timestampedcollector i mu ustawic timestamp, tylko problem jest taki ze nie wiadomo jaki - // tutaj niby widze, ze te elements maja typ StreamRecord, mozna z nich wiec wziac timestamp - // wiec moge ustawic np timestamp ostatniego z nich, ale nie zawsze. czasem jest juz ustawiony prawidlowy w tym 'out' timestampcollector - // tylko jak mam odroznic sytuacje gdy window jest jakby przerwany wczesniej, w tym triggerze. mam jakis stan trzymac? elements.forEach { element => val ctx = Option(elementHolder.get()).getOrElse(api.Context(contextIdGenerator.nextContextId())) out match { + // it should always be an instance of this class case timedOut: TimestampedCollector[_] => - Option(timestampToOverrideHolder.get()).foreach(timestamp => timedOut.setAbsoluteTimestamp(timestamp)) - // TODO_PAWEL maybe we should throw in other case? + Option(overriddenResultEventTimeHolder.get()).foreach(timestamp => timedOut.setAbsoluteTimestamp(timestamp)) case _ => } diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala index b374d8a38cb..25ea4da81e2 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala @@ -14,8 +14,7 @@ import pl.touk.nussknacker.engine.flink.api.compat.ExplicitUidInOperatorsSupport import pl.touk.nussknacker.engine.flink.api.process._ import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection import pl.touk.nussknacker.engine.flink.util.richflink._ -import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.HackedWindowOperator.OnEventOperatorKeyedStream -import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.triggers.ClosingEndEventTrigger +import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.ExtendedWindowOperator.OnEventOperatorKeyedStream import pl.touk.nussknacker.engine.util.KeyedValue import scala.collection.immutable.SortedMap @@ -124,7 +123,7 @@ object transformers { (tumblingWindowTrigger match { case TumblingWindowTrigger.OnEvent => keyedStream - .hackedEventTriggerWindow(windowDefinition, typeInfos, aggregatingFunction, EventTimeTrigger.create()) + .extendedEventTriggerWindow(windowDefinition, typeInfos, aggregatingFunction, EventTimeTrigger.create()) case TumblingWindowTrigger.OnEnd => keyedStream .window(windowDefinition) @@ -178,7 +177,7 @@ object transformers { val typeInfos = AggregatorTypeInformations(ctx, aggregator, aggregateBy) val baseTrigger = { - HackedWindowOperator.closingEndEventTriggerWrapper[ValueWithContext[KeyedValue[String, (AnyRef, java.lang.Boolean)]], TimeWindow]( + ExtendedWindowOperator.closingEndEventTriggerWrapper[ValueWithContext[KeyedValue[String, (AnyRef, java.lang.Boolean)]], TimeWindow]( EventTimeTrigger.create(), _.value.value._2) } @@ -192,9 +191,9 @@ object transformers { (sessionWindowTrigger match { case SessionWindowTrigger.OnEvent => - keyedStream.hackedEventTriggerWindow(windowDefinition, typeInfos, aggregatingFunction, baseTrigger) + keyedStream.extendedEventTriggerWindow(windowDefinition, typeInfos, aggregatingFunction, baseTrigger) case SessionWindowTrigger.OnEnd => - keyedStream.hackedWindow(windowDefinition, typeInfos, aggregatingFunction, baseTrigger, preserveContext = false) + keyedStream.extendedWindow(windowDefinition, typeInfos, aggregatingFunction, baseTrigger, preserveContext = false) }).setUidWithName(ctx, ExplicitUidInOperatorsSupport.defaultExplicitUidInStatefulOperators) }) ) diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala index 761c2696cdf..77515049602 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala @@ -30,18 +30,18 @@ object triggers { case class FireOnEachEvent[T, W <: Window](delegate: Trigger[_ >: T, W], onOverrideFireOnElementAtTimestamp: Long => Unit) extends DelegatingTrigger[T, W](delegate) { override def onElement(element: T, timestamp: Long, window: W, ctx: Trigger.TriggerContext): TriggerResult = { - val result = super.onElement(element, timestamp, window, ctx) - val changedResult = result match { + val previousResult = super.onElement(element, timestamp, window, ctx) + val result = previousResult match { case TriggerResult.CONTINUE => TriggerResult.FIRE case TriggerResult.PURGE => TriggerResult.FIRE_AND_PURGE case fire => fire } - if (!result.isFire && changedResult.isFire) { + if (!previousResult.isFire && result.isFire) { onOverrideFireOnElementAtTimestamp(timestamp) } - changedResult + result } override def onProcessingTime(time: Long, window: W, ctx: Trigger.TriggerContext): TriggerResult = { From 5ce64ea63a60d0c37e64234579f627ad930fa343 Mon Sep 17 00:00:00 2001 From: Pawel Czajka Date: Mon, 31 Mar 2025 13:56:19 +0200 Subject: [PATCH 13/17] wip implementing test --- build.sbt | 1 + .../aggregate/TransformersTest.scala | 16 ++++++---- .../engine/process/helpers/SampleNodes.scala | 31 ++++++++++++++++--- 3 files changed, 37 insertions(+), 11 deletions(-) diff --git a/build.sbt b/build.sbt index 928e6ec8cce..6d498886c1d 100644 --- a/build.sbt +++ b/build.sbt @@ -1827,6 +1827,7 @@ lazy val flinkBaseUnboundedComponents = (project in flink("components/base-unbou lazy val flinkBaseComponentsTests = (project in flink("components/base-tests")) .settings(commonSettings) .settings( + // TODO_PAWEL ten name := "nussknacker-flink-base-components-tests", libraryDependencies ++= Seq( "org.apache.flink" % "flink-connector-files" % flinkV % Test, diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala index cb93d162386..aabea30af4f 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala @@ -10,15 +10,12 @@ import org.scalatest.prop.TableDrivenPropertyChecks.forAll import org.scalatest.prop.Tables.Table import pl.touk.nussknacker.engine.api.{FragmentSpecificData, JobData, MetaData, ProcessVersion, VariableConstants} import pl.touk.nussknacker.engine.api.component.ComponentDefinition -import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.{ - CannotCreateObjectError, - ExpressionParserCompilationError -} +import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.{CannotCreateObjectError, ExpressionParserCompilationError} import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult} import pl.touk.nussknacker.engine.build.ScenarioBuilder -import pl.touk.nussknacker.engine.canonicalgraph.{canonicalnode, CanonicalProcess} +import pl.touk.nussknacker.engine.canonicalgraph.{CanonicalProcess, canonicalnode} import pl.touk.nussknacker.engine.compile.{CompilationResult, FragmentResolver, ProcessValidator} import pl.touk.nussknacker.engine.definition.component.parameter.editor.ParameterTypeEditorDeterminer import pl.touk.nussknacker.engine.flink.FlinkBaseUnboundedComponentProvider @@ -32,6 +29,7 @@ import pl.touk.nussknacker.engine.graph.node.{CustomNode, FragmentInputDefinitio import pl.touk.nussknacker.engine.graph.node.FragmentInputDefinition.{FragmentClazzRef, FragmentParameter} import pl.touk.nussknacker.engine.graph.variable.Field import pl.touk.nussknacker.engine.process.helpers.ConfigCreatorWithCollectingListener +import pl.touk.nussknacker.engine.process.helpers.SampleNodes.{CustomFilterContextTransformation, CustomTimestampExtractingTransformation} import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.testing.LocalModelData @@ -59,7 +57,9 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins ComponentDefinition("start", sourceComponent) :: FlinkBaseUnboundedComponentProvider.create( DocsConfig.Default, aggregateWindowsConfig - ) ::: FlinkBaseComponentProvider.Components, + // TODO_PAWEL tutaj trzeba go sobie dodac ten transformer definition. + ) ::: FlinkBaseComponentProvider.Components + ::: List(ComponentDefinition("customTimestampExtractingTransformation", CustomTimestampExtractingTransformation)), configCreator = new ConfigCreatorWithCollectingListener(collectingListener) ) } @@ -994,6 +994,10 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins definition.afterAggregateExpression.spel ) } + .customNodeNoOutput( + "custom", + "customTimestampExtractingTransformation", + ) .emptySink("end", "dead-end") } diff --git a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala index 0b632d5cac8..96d5cb38e0d 100644 --- a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala +++ b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala @@ -9,6 +9,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink} +import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.functions.co.{CoMapFunction, RichCoFlatMapFunction} import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, OneInputStreamOperator} @@ -16,6 +17,7 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindo import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.runtime.streamrecord.{RecordAttributes, StreamRecord} import org.apache.flink.util.Collector +import pl.touk.nussknacker.engine.api import pl.touk.nussknacker.engine.api._ import pl.touk.nussknacker.engine.api.component.UnboundedStreamComponent import pl.touk.nussknacker.engine.api.context._ @@ -28,15 +30,12 @@ import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.api.runtimecontext.{ContextIdGenerator, EngineRuntimeContext} import pl.touk.nussknacker.engine.api.test.{TestData, TestRecord, TestRecordParser} import pl.touk.nussknacker.engine.api.test.InvocationCollectors.ServiceInvocationCollector -import pl.touk.nussknacker.engine.api.typed.{typing, ReturningType, TypedMap} +import pl.touk.nussknacker.engine.api.typed.{ReturningType, TypedMap, typing} import pl.touk.nussknacker.engine.api.typed.typing.{Typed, Unknown} import pl.touk.nussknacker.engine.flink.api.compat.ExplicitUidInOperatorsSupport import pl.touk.nussknacker.engine.flink.api.datastream.DataStreamImplicits._ import pl.touk.nussknacker.engine.flink.api.process._ -import pl.touk.nussknacker.engine.flink.api.timestampwatermark.{ - StandardTimestampWatermarkHandler, - TimestampWatermarkHandler -} +import pl.touk.nussknacker.engine.flink.api.timestampwatermark.{StandardTimestampWatermarkHandler, TimestampWatermarkHandler} import pl.touk.nussknacker.engine.flink.util.sink.EmptySink import pl.touk.nussknacker.engine.flink.util.source.CollectionSource import pl.touk.nussknacker.engine.process.SimpleJavaEnum @@ -313,6 +312,28 @@ object SampleNodes { } + object CustomTimestampExtractingTransformation extends CustomStreamTransformer with Serializable { + + @MethodToInvoke(returnType = classOf[Void]) + def execute(): ContextTransformation = { + ContextTransformation + .definedBy(Valid(_)) + .implementedBy(FlinkCustomStreamTransformation { + (start: DataStream[Context], context: FlinkCustomNodeContext) => + start + .process(new ProcessFunction[Context, Context] { + override def processElement(value: api.Context, ctx: ProcessFunction[api.Context, api.Context]#Context, out: Collector[api.Context]): Unit = { + val timestamp = ctx.timestamp() + out.collect(value) + } + }) + + .map(ValueWithContext[AnyRef](null, _), context.valueWithContextInfo.forUnknown) + }) + } + + } + object CustomContextClear extends CustomStreamTransformer with Serializable { @MethodToInvoke(returnType = classOf[Void]) From eb8cc60f8a7889537f2a6a843da55ba33df90325 Mon Sep 17 00:00:00 2001 From: Pawel Czajka Date: Mon, 31 Mar 2025 14:34:18 +0200 Subject: [PATCH 14/17] added tests --- .../aggregate/TransformersTest.scala | 38 +++++++++++++++---- .../engine/process/helpers/SampleNodes.scala | 20 +++++++--- 2 files changed, 45 insertions(+), 13 deletions(-) diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala index aabea30af4f..d7e08e359db 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala @@ -10,12 +10,15 @@ import org.scalatest.prop.TableDrivenPropertyChecks.forAll import org.scalatest.prop.Tables.Table import pl.touk.nussknacker.engine.api.{FragmentSpecificData, JobData, MetaData, ProcessVersion, VariableConstants} import pl.touk.nussknacker.engine.api.component.ComponentDefinition -import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.{CannotCreateObjectError, ExpressionParserCompilationError} +import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.{ + CannotCreateObjectError, + ExpressionParserCompilationError +} import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult} import pl.touk.nussknacker.engine.build.ScenarioBuilder -import pl.touk.nussknacker.engine.canonicalgraph.{CanonicalProcess, canonicalnode} +import pl.touk.nussknacker.engine.canonicalgraph.{canonicalnode, CanonicalProcess} import pl.touk.nussknacker.engine.compile.{CompilationResult, FragmentResolver, ProcessValidator} import pl.touk.nussknacker.engine.definition.component.parameter.editor.ParameterTypeEditorDeterminer import pl.touk.nussknacker.engine.flink.FlinkBaseUnboundedComponentProvider @@ -29,7 +32,10 @@ import pl.touk.nussknacker.engine.graph.node.{CustomNode, FragmentInputDefinitio import pl.touk.nussknacker.engine.graph.node.FragmentInputDefinition.{FragmentClazzRef, FragmentParameter} import pl.touk.nussknacker.engine.graph.variable.Field import pl.touk.nussknacker.engine.process.helpers.ConfigCreatorWithCollectingListener -import pl.touk.nussknacker.engine.process.helpers.SampleNodes.{CustomFilterContextTransformation, CustomTimestampExtractingTransformation} +import pl.touk.nussknacker.engine.process.helpers.SampleNodes.{ + CustomFilterContextTransformation, + CustomTimestampExtractingTransformation +} import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.testing.LocalModelData @@ -44,6 +50,8 @@ import scala.jdk.CollectionConverters._ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Inside { + private val eventTimeExtractionComponentName = "customTimestampExtractingTransformation" + def modelData( collectingListener: => ResultsCollectingListener[Any], list: List[TestRecord] = List(), @@ -59,7 +67,7 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins aggregateWindowsConfig // TODO_PAWEL tutaj trzeba go sobie dodac ten transformer definition. ) ::: FlinkBaseComponentProvider.Components - ::: List(ComponentDefinition("customTimestampExtractingTransformation", CustomTimestampExtractingTransformation)), + ::: List(ComponentDefinition(eventTimeExtractionComponentName, CustomTimestampExtractingTransformation)), configCreator = new ConfigCreatorWithCollectingListener(collectingListener) ) } @@ -488,10 +496,12 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins val id = "1" ResultsCollectingListenerHolder.withListener { collectingListener => + val inputRecords = + List(TestRecordHours(id, 0, 1, "a"), TestRecordHours(id, 1, 2, "b"), TestRecordHours(id, 2, 5, "b")) val model = modelData( collectingListener, - List(TestRecordHours(id, 0, 1, "a"), TestRecordHours(id, 1, 2, "b"), TestRecordHours(id, 2, 5, "b")) + inputRecords ) val testScenario = tumbling( "#AGG.list", @@ -508,6 +518,9 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins val aggregateVariables = nodeResults.map(_.variableTyped[java.util.List[Number]]("fragmentResult").get) // TODO: reverse order in aggregate aggregateVariables shouldBe List(asList(1), asList(2, 1), asList(5)) + nodeResults.map( + _.variableTyped[Long](CustomTimestampExtractingTransformation.timestampVariableName).get + ) shouldBe inputRecords.map(e => e.timestamp) } } @@ -743,6 +756,13 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins val nodeResults = collectingListener.endVariablesForKey(id) nodeResults.flatMap(_.variableTyped[TestRecordHours]("input")) shouldBe Nil + + val timeMultiplier = 3600 * 1000 + + nodeResults.map( + _.variableTyped[Long](CustomTimestampExtractingTransformation.timestampVariableName).get + // session timeout is 2 + ) shouldBe List((3 + 2) * timeMultiplier - 1, 6 * timeMultiplier, (6 + 2) * timeMultiplier - 1) } } @@ -773,6 +793,10 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins asList(5) ) outputVariables.map(_.variableTyped[TestRecordHours]("input").get) shouldBe testRecords + outputVariables.map( + _.variableTyped[Long](CustomTimestampExtractingTransformation.timestampVariableName).get + ) shouldBe + testRecords.map(e => e.timestamp) } } @@ -994,9 +1018,9 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins definition.afterAggregateExpression.spel ) } - .customNodeNoOutput( + .customNodeNoOutput( "custom", - "customTimestampExtractingTransformation", + eventTimeExtractionComponentName, ) .emptySink("end", "dead-end") } diff --git a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala index 96d5cb38e0d..cbad9a301b7 100644 --- a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala +++ b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala @@ -30,12 +30,15 @@ import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.api.runtimecontext.{ContextIdGenerator, EngineRuntimeContext} import pl.touk.nussknacker.engine.api.test.{TestData, TestRecord, TestRecordParser} import pl.touk.nussknacker.engine.api.test.InvocationCollectors.ServiceInvocationCollector -import pl.touk.nussknacker.engine.api.typed.{ReturningType, TypedMap, typing} +import pl.touk.nussknacker.engine.api.typed.{typing, ReturningType, TypedMap} import pl.touk.nussknacker.engine.api.typed.typing.{Typed, Unknown} import pl.touk.nussknacker.engine.flink.api.compat.ExplicitUidInOperatorsSupport import pl.touk.nussknacker.engine.flink.api.datastream.DataStreamImplicits._ import pl.touk.nussknacker.engine.flink.api.process._ -import pl.touk.nussknacker.engine.flink.api.timestampwatermark.{StandardTimestampWatermarkHandler, TimestampWatermarkHandler} +import pl.touk.nussknacker.engine.flink.api.timestampwatermark.{ + StandardTimestampWatermarkHandler, + TimestampWatermarkHandler +} import pl.touk.nussknacker.engine.flink.util.sink.EmptySink import pl.touk.nussknacker.engine.flink.util.source.CollectionSource import pl.touk.nussknacker.engine.process.SimpleJavaEnum @@ -313,6 +316,7 @@ object SampleNodes { } object CustomTimestampExtractingTransformation extends CustomStreamTransformer with Serializable { + val timestampVariableName = "eventTimeTimestamp" @MethodToInvoke(returnType = classOf[Void]) def execute(): ContextTransformation = { @@ -322,12 +326,16 @@ object SampleNodes { (start: DataStream[Context], context: FlinkCustomNodeContext) => start .process(new ProcessFunction[Context, Context] { - override def processElement(value: api.Context, ctx: ProcessFunction[api.Context, api.Context]#Context, out: Collector[api.Context]): Unit = { - val timestamp = ctx.timestamp() - out.collect(value) + override def processElement( + value: api.Context, + ctx: ProcessFunction[api.Context, api.Context]#Context, + out: Collector[api.Context] + ): Unit = { + out.collect( + value.withVariable(timestampVariableName, ctx.timestamp()) + ) } }) - .map(ValueWithContext[AnyRef](null, _), context.valueWithContextInfo.forUnknown) }) } From cfdffba7a7c5555cffad0b37988d326b2aa61730 Mon Sep 17 00:00:00 2001 From: Pawel Czajka Date: Mon, 31 Mar 2025 14:54:59 +0200 Subject: [PATCH 15/17] more tests --- build.sbt | 1 - .../transformer/aggregate/TransformersTest.scala | 14 +++++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/build.sbt b/build.sbt index 635a7cfa6aa..8dc0d1449b0 100644 --- a/build.sbt +++ b/build.sbt @@ -1834,7 +1834,6 @@ lazy val flinkBaseUnboundedComponents = (project in flink("components/base-unbou lazy val flinkBaseComponentsTests = (project in flink("components/base-tests")) .settings(commonSettings) .settings( - // TODO_PAWEL ten name := "nussknacker-flink-base-components-tests", libraryDependencies ++= Seq( "org.apache.flink" % "flink-connector-files" % flinkV % Test, diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala index d7e08e359db..96368fff243 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala @@ -32,10 +32,7 @@ import pl.touk.nussknacker.engine.graph.node.{CustomNode, FragmentInputDefinitio import pl.touk.nussknacker.engine.graph.node.FragmentInputDefinition.{FragmentClazzRef, FragmentParameter} import pl.touk.nussknacker.engine.graph.variable.Field import pl.touk.nussknacker.engine.process.helpers.ConfigCreatorWithCollectingListener -import pl.touk.nussknacker.engine.process.helpers.SampleNodes.{ - CustomFilterContextTransformation, - CustomTimestampExtractingTransformation -} +import pl.touk.nussknacker.engine.process.helpers.SampleNodes.CustomTimestampExtractingTransformation import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.testing.LocalModelData @@ -65,7 +62,6 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins ComponentDefinition("start", sourceComponent) :: FlinkBaseUnboundedComponentProvider.create( DocsConfig.Default, aggregateWindowsConfig - // TODO_PAWEL tutaj trzeba go sobie dodac ten transformer definition. ) ::: FlinkBaseComponentProvider.Components ::: List(ComponentDefinition(eventTimeExtractionComponentName, CustomTimestampExtractingTransformation)), configCreator = new ConfigCreatorWithCollectingListener(collectingListener) @@ -359,6 +355,13 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins runScenario(model, testScenario) val aggregateVariables = collectingListener.fragmentResultEndVariable[Number](id) aggregateVariables shouldBe List(3, 5) + + collectingListener.endVariablesForKey(id).flatMap(_.variableTyped[TestRecordHours]("input")) shouldBe Nil + collectingListener + .endVariablesForKey(id) + .map( + _.variableTyped[Long](CustomTimestampExtractingTransformation.timestampVariableName).get + ) shouldBe List(2 * 3600 * 1000 - 1, 4 * 3600 * 1000 - 1) } } @@ -521,6 +524,7 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins nodeResults.map( _.variableTyped[Long](CustomTimestampExtractingTransformation.timestampVariableName).get ) shouldBe inputRecords.map(e => e.timestamp) + nodeResults.map(_.variableTyped[TestRecordHours]("input").get) shouldBe inputRecords } } From 1d16f51f4813b734bb3d2a83a009b8dbfe0cc254 Mon Sep 17 00:00:00 2001 From: Pawel Czajka Date: Tue, 1 Apr 2025 09:14:16 +0200 Subject: [PATCH 16/17] empty From f6f42659736b4e6fc859ba89256f3d034d2ed199 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Solarski?= Date: Mon, 28 Apr 2025 10:16:47 +0200 Subject: [PATCH 17/17] wip --- .../aggregate/TransformersTest.scala | 23 ++++++++------ .../aggregate/ExtendedWindowOperator.scala | 31 ++++++++++++------- .../util/transformer/aggregate/triggers.scala | 8 ++--- 3 files changed, 37 insertions(+), 25 deletions(-) diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala index 96368fff243..18f643152a0 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala @@ -10,15 +10,12 @@ import org.scalatest.prop.TableDrivenPropertyChecks.forAll import org.scalatest.prop.Tables.Table import pl.touk.nussknacker.engine.api.{FragmentSpecificData, JobData, MetaData, ProcessVersion, VariableConstants} import pl.touk.nussknacker.engine.api.component.ComponentDefinition -import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.{ - CannotCreateObjectError, - ExpressionParserCompilationError -} +import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.{CannotCreateObjectError, ExpressionParserCompilationError} import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult} import pl.touk.nussknacker.engine.build.ScenarioBuilder -import pl.touk.nussknacker.engine.canonicalgraph.{canonicalnode, CanonicalProcess} +import pl.touk.nussknacker.engine.canonicalgraph.{CanonicalProcess, canonicalnode} import pl.touk.nussknacker.engine.compile.{CompilationResult, FragmentResolver, ProcessValidator} import pl.touk.nussknacker.engine.definition.component.parameter.editor.ParameterTypeEditorDeterminer import pl.touk.nussknacker.engine.flink.FlinkBaseUnboundedComponentProvider @@ -26,6 +23,7 @@ import pl.touk.nussknacker.engine.flink.test.FlinkSpec import pl.touk.nussknacker.engine.flink.test.ScalatestMiniClusterJobStatusCheckingOps.miniClusterWithServicesToOps import pl.touk.nussknacker.engine.flink.util.source.EmitWatermarkAfterEachElementCollectionSource import pl.touk.nussknacker.engine.flink.util.transformer.FlinkBaseComponentProvider +import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.TestRecordHours.hoursToMillis import pl.touk.nussknacker.engine.graph.evaluatedparam.{Parameter => NodeParameter} import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.graph.node.{CustomNode, FragmentInputDefinition, FragmentOutputDefinition} @@ -761,12 +759,13 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins val nodeResults = collectingListener.endVariablesForKey(id) nodeResults.flatMap(_.variableTyped[TestRecordHours]("input")) shouldBe Nil - val timeMultiplier = 3600 * 1000 - nodeResults.map( _.variableTyped[Long](CustomTimestampExtractingTransformation.timestampVariableName).get - // session timeout is 2 - ) shouldBe List((3 + 2) * timeMultiplier - 1, 6 * timeMultiplier, (6 + 2) * timeMultiplier - 1) + ) shouldBe List( + hoursToMillis(5) - 1, // 3h last event time + (2h - 1ms) timeout + hoursToMillis(6), // 6h event time from event witch evaluated stop condition to true + hoursToMillis(8) - 1 // 6h last event time + (2h - 1ms) timeout + ) } } @@ -1098,7 +1097,11 @@ trait TestRecord { } case class TestRecordHours(id: String, timeHours: Int, eId: Int, str: String) extends TestRecord { - override def timestamp: Long = timeHours * 3600L * 1000 + override def timestamp: Long = hoursToMillis(timeHours) +} + +object TestRecordHours { + def hoursToMillis(hours: Int): Long = hours * 3600L * 1000 } case class TestRecordWithTimestamp(id: String, timestamp: Long, eId: Int, str: String) extends TestRecord diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/ExtendedWindowOperator.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/ExtendedWindowOperator.scala index 5f9bed46a2a..9220324e69b 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/ExtendedWindowOperator.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/ExtendedWindowOperator.scala @@ -4,6 +4,7 @@ import com.github.ghik.silencer.silent import org.apache.flink.api.common.functions.{AggregateFunction, OpenContext, RuntimeContext} import org.apache.flink.api.common.state.AggregatingStateDescriptor import org.apache.flink.streaming.api.datastream.{KeyedStream, SingleOutputStreamOperator} +import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction import org.apache.flink.streaming.api.operators.TimestampedCollector import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner @@ -18,14 +19,14 @@ import pl.touk.nussknacker.engine.api.ValueWithContext import pl.touk.nussknacker.engine.api.runtimecontext.{ContextIdGenerator, EngineRuntimeContext} import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomNodeContext import pl.touk.nussknacker.engine.flink.util.keyed.{KeyEnricher, StringKeyedValue} -import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.ExtendedWindowOperator.{Input, elementHolder, stateDescriptorName, overriddenResultEventTimeHolder} +import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.ExtendedWindowOperator.{Input, elementHolder, overriddenResultEventTimeHolder, stateDescriptorName} import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.transformers.AggregatorTypeInformations import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.triggers.{ClosingEndEventTrigger, FireOnEachEvent} import java.lang object ExtendedWindowOperator { - type Input[A] = ValueWithContext[StringKeyedValue[A]] + type Input[A] = EventWithTimestamp[ValueWithContext[StringKeyedValue[A]]] // We use ThreadLocal to pass context from WindowOperator.processElement to ProcessWindowFunction // without modifying too much Flink code. This assumes that window is triggered only on event @@ -59,7 +60,7 @@ object ExtendedWindowOperator { aggregateFunction: AggregateFunction[Input[A], AnyRef, AnyRef], trigger: Trigger[_ >: Input[A], TimeWindow] ): SingleOutputStreamOperator[ValueWithContext[AnyRef]] = extendedWindow(assigner, types, aggregateFunction, - fireOnEachEventTriggerWrapper[ValueWithContext[StringKeyedValue[A]], TimeWindow](trigger), preserveContext = true) + fireOnEachEventTriggerWrapper[EventWithTimestamp[ValueWithContext[StringKeyedValue[A]]], TimeWindow](trigger), preserveContext = true) def extendedWindow( assigner: WindowAssigner[_ >: Input[A], TimeWindow], @@ -77,6 +78,14 @@ object ExtendedWindowOperator { } +case class EventWithTimestamp[E](event: E, timestamp: Long) + +class SomeProcessFunction[KEY, IN, OUT] extends KeyedProcessFunction[KEY, IN, EventWithTimestamp[OUT]] { + override def processElement(value: IN, ctx: KeyedProcessFunction[KEY, IN, EventWithTimestamp[OUT]]#Context, out: Collector[EventWithTimestamp[OUT]]): Unit = { + out.collect(EventWithTimestamp(value, ctx.timestamp())) + } +} + @silent("deprecated") class ExtendedWindowOperator[A]( stream: KeyedStream[Input[A], String], @@ -86,7 +95,7 @@ class ExtendedWindowOperator[A]( aggregateFunction: AggregateFunction[Input[A], AnyRef, AnyRef], trigger: Trigger[_ >: Input[A], TimeWindow], preserveContext: Boolean, -) extends WindowOperator[String, Input[A], AnyRef, ValueWithContext[AnyRef], TimeWindow]( +) extends WindowOperator[String, Input[A], EventWithTimestamp[AnyRef], ValueWithContext[AnyRef], TimeWindow]( assigner, assigner.getWindowSerializer(stream.getExecutionConfig), stream.getKeySelector, @@ -104,9 +113,9 @@ class ExtendedWindowOperator[A]( null // tag ) { - override def processElement(element: StreamRecord[ValueWithContext[StringKeyedValue[A]]]): Unit = { + override def processElement(element: StreamRecord[EventWithTimestamp[ValueWithContext[StringKeyedValue[A]]]]): Unit = { if (preserveContext) { - elementHolder.set(element.getValue.context) + elementHolder.set(element.getValue.event.context) } try { super.processElement(element) @@ -121,7 +130,7 @@ class ExtendedWindowOperator[A]( private class ValueEmittingWindowFunction( convertToEngineRuntimeContext: RuntimeContext => EngineRuntimeContext, nodeId: String -) extends ProcessWindowFunction[AnyRef, ValueWithContext[AnyRef], String, TimeWindow] { +) extends ProcessWindowFunction[EventWithTimestamp[AnyRef], ValueWithContext[AnyRef], String, TimeWindow] { @transient private var contextIdGenerator: ContextIdGenerator = _ @@ -132,8 +141,8 @@ private class ValueEmittingWindowFunction( override def process( key: String, - context: ProcessWindowFunction[AnyRef, ValueWithContext[AnyRef], String, TimeWindow]#Context, - elements: lang.Iterable[AnyRef], + context: ProcessWindowFunction[EventWithTimestamp[AnyRef], ValueWithContext[AnyRef], String, TimeWindow]#Context, + elements: lang.Iterable[EventWithTimestamp[AnyRef]], out: Collector[ValueWithContext[AnyRef]] ): Unit = { elements.forEach { element => @@ -142,11 +151,11 @@ private class ValueEmittingWindowFunction( out match { // it should always be an instance of this class case timedOut: TimestampedCollector[_] => - Option(overriddenResultEventTimeHolder.get()).foreach(timestamp => timedOut.setAbsoluteTimestamp(timestamp)) + Option(overriddenResultEventTimeHolder.get()).foreach(timestamp => timedOut.setAbsoluteTimestamp(element.timestamp)) case _ => } - out.collect(ValueWithContext(element, KeyEnricher.enrichWithKey(ctx, key))) + out.collect(ValueWithContext(element.event, KeyEnricher.enrichWithKey(ctx, key))) } } diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala index 77515049602..1249b72463f 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/triggers.scala @@ -36,10 +36,10 @@ object triggers { case TriggerResult.PURGE => TriggerResult.FIRE_AND_PURGE case fire => fire } - - if (!previousResult.isFire && result.isFire) { - onOverrideFireOnElementAtTimestamp(timestamp) - } +// +// if (!previousResult.isFire && result.isFire) { +// onOverrideFireOnElementAtTimestamp(timestamp) +// } result }