Skip to content

2065 time machine rsr wip #7997

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,27 @@ import org.scalatest.prop.TableDrivenPropertyChecks.forAll
import org.scalatest.prop.Tables.Table
import pl.touk.nussknacker.engine.api.{FragmentSpecificData, JobData, MetaData, ProcessVersion, VariableConstants}
import pl.touk.nussknacker.engine.api.component.ComponentDefinition
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.{
CannotCreateObjectError,
ExpressionParserCompilationError
}
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.{CannotCreateObjectError, ExpressionParserCompilationError}
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.process._
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult}
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.canonicalgraph.{canonicalnode, CanonicalProcess}
import pl.touk.nussknacker.engine.canonicalgraph.{CanonicalProcess, canonicalnode}
import pl.touk.nussknacker.engine.compile.{CompilationResult, FragmentResolver, ProcessValidator}
import pl.touk.nussknacker.engine.definition.component.parameter.editor.ParameterTypeEditorDeterminer
import pl.touk.nussknacker.engine.flink.FlinkBaseUnboundedComponentProvider
import pl.touk.nussknacker.engine.flink.test.FlinkSpec
import pl.touk.nussknacker.engine.flink.test.ScalatestMiniClusterJobStatusCheckingOps.miniClusterWithServicesToOps
import pl.touk.nussknacker.engine.flink.util.source.EmitWatermarkAfterEachElementCollectionSource
import pl.touk.nussknacker.engine.flink.util.transformer.FlinkBaseComponentProvider
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.TestRecordHours.hoursToMillis
import pl.touk.nussknacker.engine.graph.evaluatedparam.{Parameter => NodeParameter}
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.graph.node.{CustomNode, FragmentInputDefinition, FragmentOutputDefinition}
import pl.touk.nussknacker.engine.graph.node.FragmentInputDefinition.{FragmentClazzRef, FragmentParameter}
import pl.touk.nussknacker.engine.graph.variable.Field
import pl.touk.nussknacker.engine.process.helpers.ConfigCreatorWithCollectingListener
import pl.touk.nussknacker.engine.process.helpers.SampleNodes.CustomTimestampExtractingTransformation
import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob
import pl.touk.nussknacker.engine.spel.SpelExtension._
import pl.touk.nussknacker.engine.testing.LocalModelData
Expand All @@ -46,6 +45,8 @@ import scala.jdk.CollectionConverters._

class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Inside {

private val eventTimeExtractionComponentName = "customTimestampExtractingTransformation"

def modelData(
collectingListener: => ResultsCollectingListener[Any],
list: List[TestRecord] = List(),
Expand All @@ -59,7 +60,8 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins
ComponentDefinition("start", sourceComponent) :: FlinkBaseUnboundedComponentProvider.create(
DocsConfig.Default,
aggregateWindowsConfig
) ::: FlinkBaseComponentProvider.Components,
) ::: FlinkBaseComponentProvider.Components
::: List(ComponentDefinition(eventTimeExtractionComponentName, CustomTimestampExtractingTransformation)),
configCreator = new ConfigCreatorWithCollectingListener(collectingListener)
)
}
Expand Down Expand Up @@ -351,6 +353,13 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins
runScenario(model, testScenario)
val aggregateVariables = collectingListener.fragmentResultEndVariable[Number](id)
aggregateVariables shouldBe List(3, 5)

collectingListener.endVariablesForKey(id).flatMap(_.variableTyped[TestRecordHours]("input")) shouldBe Nil
collectingListener
.endVariablesForKey(id)
.map(
_.variableTyped[Long](CustomTimestampExtractingTransformation.timestampVariableName).get
) shouldBe List(2 * 3600 * 1000 - 1, 4 * 3600 * 1000 - 1)
}
}

Expand Down Expand Up @@ -488,10 +497,12 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins
val id = "1"

ResultsCollectingListenerHolder.withListener { collectingListener =>
val inputRecords =
List(TestRecordHours(id, 0, 1, "a"), TestRecordHours(id, 1, 2, "b"), TestRecordHours(id, 2, 5, "b"))
val model =
modelData(
collectingListener,
List(TestRecordHours(id, 0, 1, "a"), TestRecordHours(id, 1, 2, "b"), TestRecordHours(id, 2, 5, "b"))
inputRecords
)
val testScenario = tumbling(
"#AGG.list",
Expand All @@ -508,6 +519,10 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins
val aggregateVariables = nodeResults.map(_.variableTyped[java.util.List[Number]]("fragmentResult").get)
// TODO: reverse order in aggregate
aggregateVariables shouldBe List(asList(1), asList(2, 1), asList(5))
nodeResults.map(
_.variableTyped[Long](CustomTimestampExtractingTransformation.timestampVariableName).get
) shouldBe inputRecords.map(e => e.timestamp)
nodeResults.map(_.variableTyped[TestRecordHours]("input").get) shouldBe inputRecords
}
}

Expand Down Expand Up @@ -743,6 +758,14 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins

val nodeResults = collectingListener.endVariablesForKey(id)
nodeResults.flatMap(_.variableTyped[TestRecordHours]("input")) shouldBe Nil

nodeResults.map(
_.variableTyped[Long](CustomTimestampExtractingTransformation.timestampVariableName).get
) shouldBe List(
hoursToMillis(5) - 1, // 3h last event time + (2h - 1ms) timeout
hoursToMillis(6), // 6h event time from event witch evaluated stop condition to true
hoursToMillis(8) - 1 // 6h last event time + (2h - 1ms) timeout
)
}
}

Expand Down Expand Up @@ -773,6 +796,10 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins
asList(5)
)
outputVariables.map(_.variableTyped[TestRecordHours]("input").get) shouldBe testRecords
outputVariables.map(
_.variableTyped[Long](CustomTimestampExtractingTransformation.timestampVariableName).get
) shouldBe
testRecords.map(e => e.timestamp)
}
}

Expand Down Expand Up @@ -994,6 +1021,10 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins
definition.afterAggregateExpression.spel
)
}
.customNodeNoOutput(
"custom",
eventTimeExtractionComponentName,
)
.emptySink("end", "dead-end")
}

Expand Down Expand Up @@ -1066,7 +1097,11 @@ trait TestRecord {
}

case class TestRecordHours(id: String, timeHours: Int, eId: Int, str: String) extends TestRecord {
override def timestamp: Long = timeHours * 3600L * 1000
override def timestamp: Long = hoursToMillis(timeHours)
}

object TestRecordHours {
def hoursToMillis(hours: Int): Long = hours * 3600L * 1000
}

case class TestRecordWithTimestamp(id: String, timestamp: Long, eId: Int, str: String) extends TestRecord
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import com.github.ghik.silencer.silent
import org.apache.flink.api.common.functions.{AggregateFunction, OpenContext, RuntimeContext}
import org.apache.flink.api.common.state.AggregatingStateDescriptor
import org.apache.flink.streaming.api.datastream.{KeyedStream, SingleOutputStreamOperator}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
import org.apache.flink.streaming.api.operators.TimestampedCollector
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
import org.apache.flink.streaming.api.windowing.triggers.Trigger
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
Expand All @@ -17,54 +19,83 @@ import pl.touk.nussknacker.engine.api.ValueWithContext
import pl.touk.nussknacker.engine.api.runtimecontext.{ContextIdGenerator, EngineRuntimeContext}
import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomNodeContext
import pl.touk.nussknacker.engine.flink.util.keyed.{KeyEnricher, StringKeyedValue}
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.OnEventTriggerWindowOperator.{
elementHolder,
stateDescriptorName,
Input
}
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.ExtendedWindowOperator.{Input, elementHolder, overriddenResultEventTimeHolder, stateDescriptorName}
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.transformers.AggregatorTypeInformations
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.triggers.FireOnEachEvent
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.triggers.{ClosingEndEventTrigger, FireOnEachEvent}

import java.lang

object OnEventTriggerWindowOperator {
type Input[A] = ValueWithContext[StringKeyedValue[A]]
object ExtendedWindowOperator {
type Input[A] = EventWithTimestamp[ValueWithContext[StringKeyedValue[A]]]

// We use ThreadLocal to pass context from WindowOperator.processElement to ProcessWindowFunction
// without modifying too much Flink code. This assumes that window is triggered only on event
val elementHolder = new ThreadLocal[api.Context]
// This ThreadLocal is also used to pass information between two distant places in flink code without
// modifying it
val overriddenResultEventTimeHolder = new ThreadLocal[Long]

// WindowOperatorBuilder.WINDOW_STATE_NAME - should be the same for compatibility
val stateDescriptorName = "window-contents"

private def overrideResultEventTime(timestamp: Long): Unit = {
overriddenResultEventTimeHolder.set(timestamp)
}

def fireOnEachEventTriggerWrapper[T, W <: Window](delegate: Trigger[_ >: T, W]): FireOnEachEvent[T, W] = {
FireOnEachEvent(delegate, timestamp => overrideResultEventTime(timestamp))
}

def closingEndEventTriggerWrapper[T, W <: Window](delegate: Trigger[_ >: T, W], endFunction: T => Boolean): ClosingEndEventTrigger[T, W] = {
ClosingEndEventTrigger(delegate, endFunction, timestamp => overrideResultEventTime(timestamp))
}

implicit class OnEventOperatorKeyedStream[A](stream: KeyedStream[Input[A], String])(
implicit fctx: FlinkCustomNodeContext
) {

def eventTriggerWindow(
def extendedEventTriggerWindow(
assigner: WindowAssigner[_ >: Input[A], TimeWindow],
types: AggregatorTypeInformations,
aggregateFunction: AggregateFunction[Input[A], AnyRef, AnyRef],
trigger: Trigger[_ >: Input[A], TimeWindow]
): SingleOutputStreamOperator[ValueWithContext[AnyRef]] = stream.transform(
): SingleOutputStreamOperator[ValueWithContext[AnyRef]] = extendedWindow(assigner, types, aggregateFunction,
fireOnEachEventTriggerWrapper[EventWithTimestamp[ValueWithContext[StringKeyedValue[A]]], TimeWindow](trigger), preserveContext = true)

def extendedWindow(
assigner: WindowAssigner[_ >: Input[A], TimeWindow],
types: AggregatorTypeInformations,
aggregateFunction: AggregateFunction[Input[A], AnyRef, AnyRef],
trigger: Trigger[_ >: Input[A], TimeWindow],
preserveContext: Boolean,
): SingleOutputStreamOperator[ValueWithContext[AnyRef]] = stream.transform(
assigner.getClass.getSimpleName,
types.returnedValueTypeInfo,
new OnEventTriggerWindowOperator(stream, fctx, assigner, types, aggregateFunction, trigger)
new ExtendedWindowOperator(stream, fctx, assigner, types, aggregateFunction, trigger, preserveContext)
)

}

}

case class EventWithTimestamp[E](event: E, timestamp: Long)

class SomeProcessFunction[KEY, IN, OUT] extends KeyedProcessFunction[KEY, IN, EventWithTimestamp[OUT]] {
override def processElement(value: IN, ctx: KeyedProcessFunction[KEY, IN, EventWithTimestamp[OUT]]#Context, out: Collector[EventWithTimestamp[OUT]]): Unit = {
out.collect(EventWithTimestamp(value, ctx.timestamp()))
}
}

@silent("deprecated")
class OnEventTriggerWindowOperator[A](
class ExtendedWindowOperator[A](
stream: KeyedStream[Input[A], String],
fctx: FlinkCustomNodeContext,
assigner: WindowAssigner[_ >: Input[A], TimeWindow],
types: AggregatorTypeInformations,
aggregateFunction: AggregateFunction[Input[A], AnyRef, AnyRef],
trigger: Trigger[_ >: Input[A], TimeWindow]
) extends WindowOperator[String, Input[A], AnyRef, ValueWithContext[AnyRef], TimeWindow](
trigger: Trigger[_ >: Input[A], TimeWindow],
preserveContext: Boolean,
) extends WindowOperator[String, Input[A], EventWithTimestamp[AnyRef], ValueWithContext[AnyRef], TimeWindow](
assigner,
assigner.getWindowSerializer(stream.getExecutionConfig),
stream.getKeySelector,
Expand All @@ -77,17 +108,20 @@ class OnEventTriggerWindowOperator[A](
new InternalSingleValueProcessWindowFunction(
new ValueEmittingWindowFunction(fctx.convertToEngineRuntimeContext, fctx.nodeId)
),
FireOnEachEvent[ValueWithContext[StringKeyedValue[A]], TimeWindow](trigger),
trigger,
0L, // lateness,
null // tag
) {

override def processElement(element: StreamRecord[ValueWithContext[StringKeyedValue[A]]]): Unit = {
elementHolder.set(element.getValue.context)
override def processElement(element: StreamRecord[EventWithTimestamp[ValueWithContext[StringKeyedValue[A]]]]): Unit = {
if (preserveContext) {
elementHolder.set(element.getValue.event.context)
}
try {
super.processElement(element)
} finally {
elementHolder.remove()
overriddenResultEventTimeHolder.remove()
}
}

Expand All @@ -96,7 +130,7 @@ class OnEventTriggerWindowOperator[A](
private class ValueEmittingWindowFunction(
convertToEngineRuntimeContext: RuntimeContext => EngineRuntimeContext,
nodeId: String
) extends ProcessWindowFunction[AnyRef, ValueWithContext[AnyRef], String, TimeWindow] {
) extends ProcessWindowFunction[EventWithTimestamp[AnyRef], ValueWithContext[AnyRef], String, TimeWindow] {

@transient
private var contextIdGenerator: ContextIdGenerator = _
Expand All @@ -107,13 +141,21 @@ private class ValueEmittingWindowFunction(

override def process(
key: String,
context: ProcessWindowFunction[AnyRef, ValueWithContext[AnyRef], String, TimeWindow]#Context,
elements: lang.Iterable[AnyRef],
context: ProcessWindowFunction[EventWithTimestamp[AnyRef], ValueWithContext[AnyRef], String, TimeWindow]#Context,
elements: lang.Iterable[EventWithTimestamp[AnyRef]],
out: Collector[ValueWithContext[AnyRef]]
): Unit = {
elements.forEach { element =>
val ctx = Option(elementHolder.get()).getOrElse(api.Context(contextIdGenerator.nextContextId()))
out.collect(ValueWithContext(element, KeyEnricher.enrichWithKey(ctx, key)))

out match {
// it should always be an instance of this class
case timedOut: TimestampedCollector[_] =>
Option(overriddenResultEventTimeHolder.get()).foreach(timestamp => timedOut.setAbsoluteTimestamp(element.timestamp))
case _ =>
}

out.collect(ValueWithContext(element.event, KeyEnricher.enrichWithKey(ctx, key)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindo
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import pl.touk.nussknacker.engine.api.{Context => NkContext, NodeId, _}
import pl.touk.nussknacker.engine.api.{Context => NkContext, _}
import pl.touk.nussknacker.engine.api.context.ContextTransformation
import pl.touk.nussknacker.engine.flink.api.compat.ExplicitUidInOperatorsSupport
import pl.touk.nussknacker.engine.flink.api.process._
import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection
import pl.touk.nussknacker.engine.flink.util.richflink._
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.OnEventTriggerWindowOperator.OnEventOperatorKeyedStream
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.triggers.ClosingEndEventTrigger
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.ExtendedWindowOperator.OnEventOperatorKeyedStream
import pl.touk.nussknacker.engine.util.KeyedValue

import scala.collection.immutable.SortedMap
Expand Down Expand Up @@ -124,7 +123,7 @@ object transformers {
(tumblingWindowTrigger match {
case TumblingWindowTrigger.OnEvent =>
keyedStream
.eventTriggerWindow(windowDefinition, typeInfos, aggregatingFunction, EventTimeTrigger.create())
.extendedEventTriggerWindow(windowDefinition, typeInfos, aggregatingFunction, EventTimeTrigger.create())
case TumblingWindowTrigger.OnEnd =>
keyedStream
.window(windowDefinition)
Expand Down Expand Up @@ -177,11 +176,11 @@ object transformers {
implicit val fctx: FlinkCustomNodeContext = ctx
val typeInfos = AggregatorTypeInformations(ctx, aggregator, aggregateBy)

val baseTrigger =
ClosingEndEventTrigger[ValueWithContext[KeyedValue[String, (AnyRef, java.lang.Boolean)]], TimeWindow](
val baseTrigger = {
ExtendedWindowOperator.closingEndEventTriggerWrapper[ValueWithContext[KeyedValue[String, (AnyRef, java.lang.Boolean)]], TimeWindow](
EventTimeTrigger.create(),
_.value.value._2
)
_.value.value._2)
}
val groupByValue = aggregateBy.product(endSessionCondition)

val keyedStream = start
Expand All @@ -192,22 +191,9 @@ object transformers {

(sessionWindowTrigger match {
case SessionWindowTrigger.OnEvent =>
keyedStream.eventTriggerWindow(windowDefinition, typeInfos, aggregatingFunction, baseTrigger)
keyedStream.extendedEventTriggerWindow(windowDefinition, typeInfos, aggregatingFunction, baseTrigger)
case SessionWindowTrigger.OnEnd =>
keyedStream
.window(windowDefinition)
.trigger(baseTrigger)
.aggregate(
new UnwrappingAggregateFunction[(AnyRef, java.lang.Boolean)](
aggregator,
aggregateBy.returnType,
_._1
),
EnrichingWithKeyFunction(fctx),
typeInfos.storedTypeInfo,
typeInfos.returnTypeInfo,
typeInfos.returnedValueTypeInfo
)
keyedStream.extendedWindow(windowDefinition, typeInfos, aggregatingFunction, baseTrigger, preserveContext = false)
}).setUidWithName(ctx, ExplicitUidInOperatorsSupport.defaultExplicitUidInStatefulOperators)
})
)
Expand Down
Loading