diff --git a/build.sbt b/build.sbt index 25c0611af92..ce57dbb6bed 100644 --- a/build.sbt +++ b/build.sbt @@ -647,6 +647,7 @@ lazy val flinkDeploymentManager = (project in flink("management")) ) .dependsOn( deploymentManagerApi % Provided, + liveDataCollector % Provided, scenarioCompilerFlinkDeps, flinkMiniCluster, commonUtils % Provided, @@ -2023,6 +2024,20 @@ lazy val deploymentManagerApi = (project in file("designer/deployment-manager-ap ) .dependsOn(extensionsApi, testUtils % Test) +lazy val liveDataCollector = (project in file("designer/live-data-collector")) + .settings(commonSettings) + .settings( + name := "nussknacker-live-data-collector", + libraryDependencies ++= Seq( + "org.scalatest" %% "scalatest" % scalaTestV % Test, + ), + ) + .dependsOn( + deploymentManagerApi % Provided, + // For testResultsVariableEncoder purpose + scenarioCompiler % Provided, + ) + lazy val prepareDesignerTests = taskKey[Unit]("Prepare all necessary artifacts before running designer module tests") lazy val prepareDesignerSlowTests = taskKey[Unit]("Prepare all necessary artifacts before running designer module slow tests") @@ -2166,6 +2181,7 @@ lazy val designer = (project in file("designer/server")) processReports, security, deploymentManagerApi, + liveDataCollector, componentsApi, restmodel, listenerApi, @@ -2317,6 +2333,7 @@ lazy val modules = List[ProjectReference]( customHttpServiceApi, configLoaderApi, deploymentManagerApi, + liveDataCollector, designer, sqlComponents, schemedKafkaComponentsUtils, diff --git a/components-api/src/main/scala/pl/touk/nussknacker/engine/ModelConfig.scala b/components-api/src/main/scala/pl/touk/nussknacker/engine/ModelConfig.scala index 709efa5dbd3..8e5d8ac9f40 100644 --- a/components-api/src/main/scala/pl/touk/nussknacker/engine/ModelConfig.scala +++ b/components-api/src/main/scala/pl/touk/nussknacker/engine/ModelConfig.scala @@ -4,11 +4,13 @@ import com.typesafe.config.Config import net.ceedubs.ficus.Ficus.toFicusConfig import net.ceedubs.ficus.readers.AnyValReaders._ import net.ceedubs.ficus.readers.OptionReader._ +import pl.touk.nussknacker.engine.ModelConfig.LiveDataPreviewMode import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy final case class ModelConfig( allowEndingScenarioWithoutSink: Boolean, namingStrategy: NamingStrategy, + liveDataPreviewMode: LiveDataPreviewMode, // TODO: we should parse this underlying config as ModelConfig class fields instead of passing raw config underlyingConfig: Config, ) { @@ -23,8 +25,33 @@ object ModelConfig { ModelConfig( allowEndingScenarioWithoutSink = rawModelConfig.getOrElse[Boolean]("allowEndingScenarioWithoutSink", false), namingStrategy = NamingStrategy.fromConfig(rawModelConfig), + liveDataPreviewMode = parseLiveDataPreviewMode(rawModelConfig), underlyingConfig = rawModelConfig, ) } + sealed trait LiveDataPreviewMode + + object LiveDataPreviewMode { + + case object Disabled extends LiveDataPreviewMode + + final case class Enabled( + maxNumberOfSamples: Int, + throughputTimeWindowInSeconds: Int, + ) extends LiveDataPreviewMode + + } + + private def parseLiveDataPreviewMode(config: Config): LiveDataPreviewMode = { + if (config.getOrElse("liveDataPreview.enabled", false)) { + LiveDataPreviewMode.Enabled( + maxNumberOfSamples = config.getOrElse("liveDataPreview.maxNumberOfSamples", 10), + throughputTimeWindowInSeconds = config.getOrElse("liveDataPreview.throughputTimeWindowInSeconds", 60), + ) + } else { + LiveDataPreviewMode.Disabled + } + } + } diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala index 36481ee69b5..3c077798b7f 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala @@ -2,10 +2,13 @@ package pl.touk.nussknacker.engine.api.deployment import cats.effect.{Resource, SyncIO} import com.typesafe.config.Config +import io.circe.Json import pl.touk.nussknacker.engine.api.definition.EngineScenarioCompilationDependencies +import pl.touk.nussknacker.engine.api.deployment.LiveDataPreviewSupported.{LiveData, LiveDataError} import pl.touk.nussknacker.engine.api.deployment.scheduler.services._ import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} import pl.touk.nussknacker.engine.newdeployment +import pl.touk.nussknacker.engine.testmode.TestProcess.{NodeTransition, TestResults} import java.time.Instant import scala.concurrent.Future @@ -33,6 +36,8 @@ trait DeploymentManager extends AutoCloseable { def scenarioCompilationDependenciesResource: Resource[SyncIO, EngineScenarioCompilationDependencies] + def liveDataPreviewSupport: LiveDataPreviewSupport + protected final def notImplemented: Future[Nothing] = Future.failed(new NotImplementedError()) } @@ -89,3 +94,30 @@ trait SchedulingSupported extends SchedulingSupport { } case object NoSchedulingSupport extends SchedulingSupport + +sealed trait LiveDataPreviewSupport + +trait LiveDataPreviewSupported extends LiveDataPreviewSupport { + + def getLiveData( + processIdWithName: ProcessIdWithName, + ): Future[Either[LiveDataError, LiveData]] + +} + +object LiveDataPreviewSupported { + + final case class LiveData( + liveDataSamples: TestResults[Json], + nodeTransitionThroughput: Map[NodeTransition, BigDecimal], + ) + + sealed trait LiveDataError + + object LiveDataError { + case object NoLiveDataAvailableForScenario extends LiveDataError + } + +} + +case object NoLiveDataPreviewSupport extends LiveDataPreviewSupport diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala index 14d06a34722..91d7592ff83 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala @@ -19,6 +19,7 @@ class CachingProcessStateDeploymentManager( override val deploymentSynchronisationSupport: DeploymentSynchronisationSupport, override val deploymentsStatusesQueryForAllScenariosSupport: DeploymentsStatusesQueryForAllScenariosSupport, override val schedulingSupport: SchedulingSupport, + override val liveDataPreviewSupport: LiveDataPreviewSupport, ) extends DeploymentManager { private val cache: AsyncCache[ProcessName, List[DeploymentStatusDetails]] = Caffeine @@ -74,6 +75,7 @@ object CachingProcessStateDeploymentManager extends LazyLogging { delegate.deploymentSynchronisationSupport, delegate.deploymentsStatusesQueryForAllScenariosSupport, delegate.schedulingSupport, + delegate.liveDataPreviewSupport, ) } .getOrElse { diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala index 3c9d5b91283..edf63bc6116 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala @@ -60,6 +60,8 @@ class DeploymentManagerStub(implicit ec: ExecutionContext) extends BaseDeploymen override def schedulingSupport: SchedulingSupport = NoSchedulingSupport + override def liveDataPreviewSupport: LiveDataPreviewSupport = NoLiveDataPreviewSupport + override def close(): Unit = {} override def scenarioCompilationDependenciesResource: Resource[SyncIO, EngineScenarioCompilationDependencies] = diff --git a/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManagerSpec.scala b/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManagerSpec.scala index 31d5408798a..9e84a252b2e 100644 --- a/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManagerSpec.scala +++ b/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManagerSpec.scala @@ -32,6 +32,7 @@ class CachingProcessStateDeploymentManagerSpec NoDeploymentSynchronisationSupport, NoDeploymentsStatusesQueryForAllScenariosSupport, NoSchedulingSupport, + NoLiveDataPreviewSupport, ) val results = List( @@ -52,6 +53,7 @@ class CachingProcessStateDeploymentManagerSpec NoDeploymentSynchronisationSupport, NoDeploymentsStatusesQueryForAllScenariosSupport, NoSchedulingSupport, + NoLiveDataPreviewSupport, ) val firstInvocation = cachingManager.getProcessStatesDeploymentIdNow(DataFreshnessPolicy.CanBeCached) @@ -71,6 +73,7 @@ class CachingProcessStateDeploymentManagerSpec NoDeploymentSynchronisationSupport, NoDeploymentsStatusesQueryForAllScenariosSupport, NoSchedulingSupport, + NoLiveDataPreviewSupport, ) val resultForFresh = cachingManager.getProcessStatesDeploymentIdNow(DataFreshnessPolicy.Fresh) diff --git a/designer/live-data-collector/src/main/scala/pl/touk/nussknacker/engine/livedata/LiveDataCollectingListener.scala b/designer/live-data-collector/src/main/scala/pl/touk/nussknacker/engine/livedata/LiveDataCollectingListener.scala new file mode 100644 index 00000000000..869eef3a4fd --- /dev/null +++ b/designer/live-data-collector/src/main/scala/pl/touk/nussknacker/engine/livedata/LiveDataCollectingListener.scala @@ -0,0 +1,104 @@ +package pl.touk.nussknacker.engine.livedata + +import io.circe.Json +import pl.touk.nussknacker.engine.api._ +import pl.touk.nussknacker.engine.api.exception.NuExceptionInfo +import pl.touk.nussknacker.engine.api.process.ProcessName +import pl.touk.nussknacker.engine.testmode.TestInterpreterRunner +import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults + +import scala.util.Try + +// This class must be serializable. It means, that when deserializing, we lose the reference to it. +// The actual data is stored in the LiveDataCollectingListenerHolder, and all instances of LiveDataCollectingListener can access the data. +class LiveDataCollectingListener private[livedata] ( + processName: ProcessName, + maxNumberOfSamples: Int, + throughputTimeWindowInSeconds: Int, +) extends ProcessListener + with Serializable { + + private val variableEncoder: Any => io.circe.Json = TestInterpreterRunner.testResultsVariableEncoder + + private def storage = LiveDataCollectingListenerHolder.storage( + processName = processName, + maxNumberOfSamples = maxNumberOfSamples, + throughputTimeWindowInSeconds = throughputTimeWindowInSeconds, + ) + + override def nodeEntered(nodeId: String, context: Context, processMetaData: MetaData): Unit = { + updateResults(context, _.updateNodeResult(nodeId, context, variableEncoder)) + } + + override def transitionToNextNode( + nodeId: String, + nextNodeId: String, + context: Context, + processMetaData: MetaData, + ): Unit = { + storage.registerTransitionBetweenNodes(nodeId, Some(nextNodeId)) + updateResults(context, _.updateNodeOutputResult(nodeId, Some(nextNodeId), context, variableEncoder)) + } + + override def processingFinishedInNode( + nodeId: String, + context: Context, + processMetaData: MetaData, + ): Unit = { + storage.registerTransitionBetweenNodes(nodeId, None) + updateResults(context, _.updateNodeOutputResult(nodeId, None, context, variableEncoder)) + } + + override def endEncountered( + nodeId: String, + ref: String, + context: Context, + processMetaData: MetaData + ): Unit = () + + override def deadEndEncountered( + lastNodeId: String, + context: Context, + processMetaData: MetaData + ): Unit = {} + + override def expressionEvaluated( + nodeId: String, + expressionId: String, + expression: String, + context: Context, + processMetaData: MetaData, + result: Any + ): Unit = { + updateResults(context, _.updateExpressionResult(nodeId, context, expressionId, result, variableEncoder)) + } + + override def serviceInvoked( + nodeId: String, + id: String, + context: Context, + processMetaData: MetaData, + result: Try[Any] + ): Unit = {} + + override def exceptionThrown(exceptionInfo: NuExceptionInfo): Unit = { + updateResults(exceptionInfo.context, _.updateExceptionResult(exceptionInfo, variableEncoder)) + } + + override final def close(): Unit = () + + private def updateResults(context: Context, action: TestResults[Json] => TestResults[Json]): Unit = { + // todo: The ContextId will be refactored to be represented by case class with fields, + // but for now we have to use regex in order to parse the initial context id as generated by the source. + val pattern = raw"^(.+?-.+?-\d+-\d+)(-.+)?".r + val initialContextIdOpt = context.id match { + case pattern(initialContextId, _*) => Some(initialContextId) + case _ => None + } + initialContextIdOpt match { + case Some(initialContextId) => storage.updateResults(initialContextId, action) + case None => () + } + } + +} diff --git a/designer/live-data-collector/src/main/scala/pl/touk/nussknacker/engine/livedata/LiveDataCollectingListenerHolder.scala b/designer/live-data-collector/src/main/scala/pl/touk/nussknacker/engine/livedata/LiveDataCollectingListenerHolder.scala new file mode 100644 index 00000000000..ffc081dce3c --- /dev/null +++ b/designer/live-data-collector/src/main/scala/pl/touk/nussknacker/engine/livedata/LiveDataCollectingListenerHolder.scala @@ -0,0 +1,56 @@ +package pl.touk.nussknacker.engine.livedata + +import com.github.benmanes.caffeine.cache.Caffeine +import pl.touk.nussknacker.engine.api.deployment.LiveDataPreviewSupported.{LiveData, LiveDataError} +import pl.touk.nussknacker.engine.api.process.ProcessName + +import java.time.Clock +import scala.compat.java8.FunctionConverters._ + +object LiveDataCollectingListenerHolder { + + private implicit val clock: Clock = Clock.systemUTC() + + private val listenerStorages = + Caffeine + .newBuilder() + .expireAfterAccess(java.time.Duration.ofHours(1)) + .build[String, LiveDataCollectingListenerStorage]() + + def createListenerFor( + processName: ProcessName, + maxNumberOfSamples: Int, + throughputTimeWindowInSeconds: Int, + ): LiveDataCollectingListener = { + cleanResults(processName) + new LiveDataCollectingListener(processName, maxNumberOfSamples, throughputTimeWindowInSeconds) + } + + def getLiveDataPreview(processName: ProcessName): Either[LiveDataError, LiveData] = { + Option(listenerStorages.getIfPresent(processName.value)).map(_.getLiveData) match { + case Some(liveData) => Right(liveData) + case None => Left(LiveDataError.NoLiveDataAvailableForScenario) + } + } + + private[livedata] def storage( + processName: ProcessName, + maxNumberOfSamples: Int, + throughputTimeWindowInSeconds: Int, + ): LiveDataCollectingListenerStorage = { + listenerStorages.get( + processName.value, + asJavaFunction((_: String) => + new LiveDataCollectingListenerStorage(maxNumberOfSamples, throughputTimeWindowInSeconds) + ) + ) + } + + // We want to store and present the live data from the most recent deployment: + // - the data from the old run is stored until the listener storage cache expires (currently hardcoded 1 hour) + // - or new deployment is done - then we discard all old data + private def cleanResults(processName: ProcessName): Unit = { + listenerStorages.invalidate(processName.value) + } + +} diff --git a/designer/live-data-collector/src/main/scala/pl/touk/nussknacker/engine/livedata/LiveDataCollectingListenerStorage.scala b/designer/live-data-collector/src/main/scala/pl/touk/nussknacker/engine/livedata/LiveDataCollectingListenerStorage.scala new file mode 100644 index 00000000000..5ee05dd9d6a --- /dev/null +++ b/designer/live-data-collector/src/main/scala/pl/touk/nussknacker/engine/livedata/LiveDataCollectingListenerStorage.scala @@ -0,0 +1,38 @@ +package pl.touk.nussknacker.engine.livedata + +import io.circe.Json +import pl.touk.nussknacker.engine.api.deployment.LiveDataPreviewSupported.LiveData +import pl.touk.nussknacker.engine.testmode.TestProcess._ + +import java.time.{Clock, Instant} + +private[livedata] class LiveDataCollectingListenerStorage( + maxNumberOfSamples: Int, + throughputTimeWindowInSeconds: Int, +)(implicit clock: Clock) { + + private val results = new RingBuffer[String, TestResults[Json]](maxNumberOfSamples) + private val transitionsCounter = new SlidingWindowCounter[NodeTransition](Instant.now, throughputTimeWindowInSeconds) + + def getLiveData: LiveData = { + LiveData( + liveDataSamples = TestResults.aggregate(results.values), + nodeTransitionThroughput = transitionsCounter.getThroughput, + ) + } + + def updateResults(contextId: String, action: TestResults[Json] => TestResults[Json]): Unit = { + results.update( + contextId, + { + case Some(resultsSoFar) => action(resultsSoFar) + case None => action(TestResults.empty[Json]) + } + ) + } + + def registerTransitionBetweenNodes(from: String, to: Option[String]): Unit = { + transitionsCounter.add(NodeTransition(from, to)) + } + +} diff --git a/designer/live-data-collector/src/main/scala/pl/touk/nussknacker/engine/livedata/RingBuffer.scala b/designer/live-data-collector/src/main/scala/pl/touk/nussknacker/engine/livedata/RingBuffer.scala new file mode 100644 index 00000000000..3da6e41841a --- /dev/null +++ b/designer/live-data-collector/src/main/scala/pl/touk/nussknacker/engine/livedata/RingBuffer.scala @@ -0,0 +1,25 @@ +package pl.touk.nussknacker.engine.livedata + +import java.util +import java.util.{Map => JMap} +import scala.jdk.CollectionConverters._ + +private[livedata] class RingBuffer[K, V](maxSize: Int) { + + private val underlying = new util.LinkedHashMap[K, V](2 * maxSize, 0.75f, false) { + override protected def removeEldestEntry(eldest: JMap.Entry[K, V]): Boolean = size() > maxSize + } + + def values: List[V] = underlying.synchronized { + underlying.values().asScala.toList + } + + def update(key: K, f: Option[V] => V): V = underlying.synchronized { + underlying.compute(key, (_: K, output: V) => f(Option(output))) + } + + def clear(): Unit = underlying.synchronized { + underlying.clear() + } + +} diff --git a/designer/live-data-collector/src/main/scala/pl/touk/nussknacker/engine/livedata/SlidingWindowCounter.scala b/designer/live-data-collector/src/main/scala/pl/touk/nussknacker/engine/livedata/SlidingWindowCounter.scala new file mode 100644 index 00000000000..13f8424f75c --- /dev/null +++ b/designer/live-data-collector/src/main/scala/pl/touk/nussknacker/engine/livedata/SlidingWindowCounter.scala @@ -0,0 +1,56 @@ +package pl.touk.nussknacker.engine.livedata + +import java.time.{Clock, Instant} +import java.util.concurrent.{ConcurrentLinkedQueue, ConcurrentSkipListMap} +import java.util.concurrent.atomic.AtomicLong +import scala.jdk.CollectionConverters._ + +private[livedata] class SlidingWindowCounter[T]( + counterCreatedAt: Instant, + windowSizeSeconds: Int +)(implicit clock: Clock) { + + private val buckets = new ConcurrentSkipListMap[Long, ConcurrentLinkedQueue[T]]() + + def add(event: T): Unit = { + val currentEpochSecond = now() + cleanOldBuckets(currentEpochSecond) + val bucket = buckets.computeIfAbsent(currentEpochSecond, _ => new ConcurrentLinkedQueue[T]()) + bucket.add(event) + } + + def getThroughput: Map[T, BigDecimal] = { + val currentEpochSecond = now() + cleanOldBuckets(currentEpochSecond) + + // We want to calculate correct throughput just after the scenario is started + val windowStart = Math.max(counterCreatedAt.getEpochSecond, currentEpochSecond - windowSizeSeconds) + val windowsEnd = currentEpochSecond + val samplingInterval = windowsEnd - windowStart + 1 + + buckets.asScala.values + .flatMap(_.asScala) + .toList + .groupBy(identity) + .view + .map { case (key, value) => (key, value.size) } + .toMap + .map { case (transition, count) => + transition -> (BigDecimal(count) / samplingInterval).setScale(4, BigDecimal.RoundingMode.HALF_EVEN) + } + } + + private val lastCleaned = new AtomicLong(0) + + private def cleanOldBuckets(now: Long): Unit = synchronized { + // Clean old buckets at most once per second, not on each call + if (lastCleaned.getAndSet(now) != now) { + buckets.headMap(cutoff(now)).clear() + } + } + + private def cutoff(now: Long): Long = now - windowSizeSeconds + 1 + + private def now(): Long = clock.instant().getEpochSecond + +} diff --git a/designer/live-data-collector/src/test/scala/pl/touk/nussknacker/engine/livedata/MutableClock.scala b/designer/live-data-collector/src/test/scala/pl/touk/nussknacker/engine/livedata/MutableClock.scala new file mode 100644 index 00000000000..3d77da31575 --- /dev/null +++ b/designer/live-data-collector/src/test/scala/pl/touk/nussknacker/engine/livedata/MutableClock.scala @@ -0,0 +1,23 @@ +package pl.touk.nussknacker.engine.livedata + +import java.time.{Clock, Instant, ZoneId} +import java.util.concurrent.atomic.AtomicLong + +class MutableClock(initialTime: Instant) extends Clock { + + private val epochSecond = + new AtomicLong(initialTime.getEpochSecond) + + def advanceBySeconds(seconds: Long): Unit = + epochSecond.getAndUpdate(_ + seconds) + + def setTime(newInstant: Instant): Unit = + epochSecond.set(newInstant.getEpochSecond) + + override def getZone: ZoneId = ZoneId.systemDefault() + + override def withZone(zone: ZoneId): Clock = this + + override def instant(): Instant = Instant.ofEpochSecond(epochSecond.get()) + +} diff --git a/designer/live-data-collector/src/test/scala/pl/touk/nussknacker/engine/livedata/RingBufferSpec.scala b/designer/live-data-collector/src/test/scala/pl/touk/nussknacker/engine/livedata/RingBufferSpec.scala new file mode 100644 index 00000000000..ad0326611e5 --- /dev/null +++ b/designer/live-data-collector/src/test/scala/pl/touk/nussknacker/engine/livedata/RingBufferSpec.scala @@ -0,0 +1,42 @@ +package pl.touk.nussknacker.engine.livedata + +import org.scalatest.funsuite.AnyFunSuiteLike +import org.scalatest.matchers.should.Matchers + +class RingBufferSpec extends AnyFunSuiteLike with Matchers { + + test("create and update value for a single key") { + val ringBuffer = new RingBuffer[String, Int](10) + ringBuffer.update("first", _ => 1) + ringBuffer.values shouldBe List(1) + ringBuffer.update("first", previousValue => previousValue.get + 2) + ringBuffer.values shouldBe List(3) + } + + test("create and update values for multiple keys") { + val ringBuffer = new RingBuffer[String, Int](10) + ringBuffer.update("first", _ => 1) + ringBuffer.values shouldBe List(1) + ringBuffer.update("second", _ => 5) + ringBuffer.values shouldBe List(1, 5) + ringBuffer.update("first", previousValue => previousValue.get + 2) + ringBuffer.values shouldBe List(3, 5) + ringBuffer.update("second", previousValue => previousValue.get - 2) + ringBuffer.values shouldBe List(3, 3) + } + + test("evict oldest value") { + val ringBuffer = new RingBuffer[String, Int](10) + 1 to 10 foreach { i => + ringBuffer.update(s"$i", _ => i) + } + ringBuffer.values shouldBe (1 to 10).toList + ringBuffer.update("11", _ => 11) + ringBuffer.values shouldBe (2 to 11).toList + ringBuffer.update("12", _ => 12) + ringBuffer.values shouldBe (3 to 12).toList + ringBuffer.update("12", _ => 13) + ringBuffer.values shouldBe List(3, 4, 5, 6, 7, 8, 9, 10, 11, 13) + } + +} diff --git a/designer/live-data-collector/src/test/scala/pl/touk/nussknacker/engine/livedata/SlidingWindowCounterSpec.scala b/designer/live-data-collector/src/test/scala/pl/touk/nussknacker/engine/livedata/SlidingWindowCounterSpec.scala new file mode 100644 index 00000000000..4f0162a5f69 --- /dev/null +++ b/designer/live-data-collector/src/test/scala/pl/touk/nussknacker/engine/livedata/SlidingWindowCounterSpec.scala @@ -0,0 +1,65 @@ +package pl.touk.nussknacker.engine.livedata + +import org.scalatest.funsuite.AnyFunSuiteLike +import org.scalatest.matchers.should.Matchers + +import java.time.Instant + +class SlidingWindowCounterSpec extends AnyFunSuiteLike with Matchers { + + private val startTime = + Instant.parse("2025-04-06T13:18:00Z") + + test("return counts when 1 event per second added for each event type") { + implicit val mutableClock: MutableClock = new MutableClock(startTime) + val counter = new SlidingWindowCounter[Int](startTime, 10) + + 1 to 10 foreach { _ => + 1 to 10 foreach { i => + counter.add(i) + } + counter.getThroughput.toSet shouldBe Set( + 1 -> BigDecimal(1), + 2 -> BigDecimal(1), + 3 -> BigDecimal(1), + 4 -> BigDecimal(1), + 5 -> BigDecimal(1), + 6 -> BigDecimal(1), + 7 -> BigDecimal(1), + 8 -> BigDecimal(1), + 9 -> BigDecimal(1), + 10 -> BigDecimal(1), + ) + mutableClock.advanceBySeconds(1) + } + } + + test("throughput is correctly calculated for single event during consecutive seconds") { + implicit val mutableClock: MutableClock = new MutableClock(startTime) + val counter = new SlidingWindowCounter[Int](startTime, 10) + + // Empty result before event is received + counter.getThroughput.toSet shouldBe Set.empty + + // Calculated as 1 event per second in the first second + counter.add(0) + counter.getThroughput.toSet shouldBe Set(0 -> BigDecimal(1)) + + mutableClock.advanceBySeconds(1) + // Calculated as 0.5000 event per second when another second passes + counter.getThroughput.toSet shouldBe Set(0 -> BigDecimal(0.5)) + + mutableClock.advanceBySeconds(2) + // Calculated as 0.2500 event in fifth second + counter.getThroughput.toSet shouldBe Set(0 -> BigDecimal(0.25)) + + mutableClock.advanceBySeconds(6) + // Calculated as 0.1000 event in eleventh second + counter.getThroughput.toSet shouldBe Set(0 -> BigDecimal(0.1)) + + mutableClock.advanceBySeconds(1) + // Empty result, because the event moved outside the time window + counter.getThroughput.toSet shouldBe Set.empty + } + +} diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ScenarioLiveDataApiHttpService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ScenarioLiveDataApiHttpService.scala new file mode 100644 index 00000000000..29e3183b963 --- /dev/null +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ScenarioLiveDataApiHttpService.scala @@ -0,0 +1,98 @@ +package pl.touk.nussknacker.ui.api + +import cats.data.EitherT +import com.typesafe.scalalogging.LazyLogging +import pl.touk.nussknacker.engine.api.deployment.{DeploymentManager, LiveDataPreviewSupported, NoLiveDataPreviewSupport} +import pl.touk.nussknacker.engine.api.deployment.LiveDataPreviewSupported.LiveData +import pl.touk.nussknacker.engine.api.deployment.LiveDataPreviewSupported.LiveDataError.NoLiveDataAvailableForScenario +import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessIdWithName, ProcessName} +import pl.touk.nussknacker.security.Permission +import pl.touk.nussknacker.security.Permission.Permission +import pl.touk.nussknacker.ui.api.BaseHttpService.CustomAuthorizationError +import pl.touk.nussknacker.ui.api.description.scenarioLiveData.Dtos._ +import pl.touk.nussknacker.ui.api.description.scenarioLiveData.Dtos.LiveDataError._ +import pl.touk.nussknacker.ui.api.description.scenarioLiveData.ScenarioLiveDataApiEndpoints +import pl.touk.nussknacker.ui.api.utils.ScenarioHttpServiceExtensions +import pl.touk.nussknacker.ui.process.ProcessService +import pl.touk.nussknacker.ui.process.ProcessService.GetScenarioWithDetailsOptions +import pl.touk.nussknacker.ui.process.deployment.DeploymentManagerDispatcher +import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider +import pl.touk.nussknacker.ui.process.test.ScenarioTestService +import pl.touk.nussknacker.ui.security.api.{AuthManager, LoggedUser} + +import scala.concurrent.{ExecutionContext, Future} + +class ScenarioLiveDataApiHttpService( + authManager: AuthManager, + scenarioAuthorizer: AuthorizeProcess, + processingTypeToScenarioTestServices: ProcessingTypeDataProvider[ScenarioTestService, _], + dmDispatcher: DeploymentManagerDispatcher, + protected override val scenarioService: ProcessService +)(override protected implicit val executionContext: ExecutionContext) + extends BaseHttpService(authManager) + with ScenarioHttpServiceExtensions + with LazyLogging { + + override protected type BusinessErrorType = LiveDataError + override protected def noScenarioError(scenarioName: ProcessName): LiveDataError = NoScenario + override protected def noPermissionError: LiveDataError with CustomAuthorizationError = NoPermission + + private val endpoints = new ScenarioLiveDataApiEndpoints(authManager.authenticationEndpointInput()) + + expose { + endpoints.scenarioLiveDataEndpoint + .serverSecurityLogic(authorizeKnownUser[LiveDataError]) + .serverLogicEitherT { implicit loggedUser => scenarioName => + for { + scenarioWithDetails <- getScenarioWithDetailsByName( + scenarioName, + GetScenarioWithDetailsOptions.withScenarioGraph + ) + processIdWithName = ProcessIdWithName(scenarioWithDetails.processIdUnsafe, scenarioName) + _ <- isAuthorized(processIdWithName.id, Permission.Read) + deploymentManager <- EitherT[Future, LiveDataError, DeploymentManager] { + dmDispatcher.deploymentManager(processIdWithName).map { + case Some(deploymentManager) => Right(deploymentManager) + case None => Left(NoScenario) + } + } + liveDataPreview <- EitherT[Future, LiveDataError, LiveData] { + deploymentManager.liveDataPreviewSupport match { + case supported: LiveDataPreviewSupported => + supported.getLiveData(processIdWithName).map { + case Right(liveData) => Right(liveData) + case Left(NoLiveDataAvailableForScenario) => Left(LiveDataNotAvailable) + } + case NoLiveDataPreviewSupport => + Future.successful(Left(LiveDataNotSupported)) + } + } + scenarioTestService = processingTypeToScenarioTestServices.forProcessingTypeUnsafe( + scenarioWithDetails.processingType + ) + resultsWithCounts = scenarioTestService.resultsWithCounts( + liveDataPreview.liveDataSamples, + scenarioWithDetails.scenarioGraphUnsafe, + scenarioWithDetails.processVersionUnsafe, + scenarioWithDetails.isFragment + ) + } yield LiveDataDto.from( + resultsWithCounts, + Some(liveDataPreview.nodeTransitionThroughput), + ) + } + } + + private def isAuthorized(scenarioId: ProcessId, permission: Permission)( + implicit loggedUser: LoggedUser + ): EitherT[Future, LiveDataError, Unit] = + EitherT( + scenarioAuthorizer + .check(scenarioId, permission, loggedUser) + .map[Either[LiveDataError, Unit]] { + case true => Right(()) + case false => Left(noPermissionError) + } + ) + +} diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/scenarioLiveData/Dtos.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/scenarioLiveData/Dtos.scala new file mode 100644 index 00000000000..a1b77271bd1 --- /dev/null +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/scenarioLiveData/Dtos.scala @@ -0,0 +1,133 @@ +package pl.touk.nussknacker.ui.api.description.scenarioLiveData + +import io.circe._ +import pl.touk.nussknacker.engine.api.graph.ScenarioGraph +import pl.touk.nussknacker.engine.api.typed.typing +import pl.touk.nussknacker.engine.api.typed.typing.TypingResult +import pl.touk.nussknacker.engine.testmode.TestProcess._ +import pl.touk.nussknacker.restmodel.BaseEndpointDefinitions +import pl.touk.nussknacker.ui.api.BaseHttpService.CustomAuthorizationError +import pl.touk.nussknacker.ui.process.test.ResultsWithCounts +import pl.touk.nussknacker.ui.processreport.NodeCount +import sttp.tapir.{Codec, CodecFormat, Schema} + +import scala.collection.compat._ + +object Dtos { + + import sttp.tapir.json.circe._ + lazy val typingResultEncoder: Encoder[TypingResult] = TypingResult.encoder + + final case class LiveDataDto( + results: LiveDataSamplesDto, + counts: Map[String, NodeCount], + nodeTransitionThroughput: Option[List[NodeTransitionThroughputDto]], + ) + + object LiveDataDto { + + def from( + resultsWithCounts: ResultsWithCounts, + nodeTransitionThroughput: Option[Map[NodeTransition, BigDecimal]], + ): LiveDataDto = { + LiveDataDto( + results = LiveDataSamplesDto.from(resultsWithCounts.results), + counts = resultsWithCounts.counts, + nodeTransitionThroughput = nodeTransitionThroughput.map(NodeTransitionThroughput.from), + ) + } + + } + + final case class LiveDataSamplesDto( + nodeTransitionResults: List[NodeTransitionResult], + invocationResults: Map[String, List[ExpressionInvocationResult[Json]]], + externalInvocationResults: Map[String, List[ExternalInvocationResult[Json]]], + exceptions: List[ExceptionResult[Json]] + ) + + object LiveDataSamplesDto { + + def from(testResults: TestResults[Json]): LiveDataSamplesDto = { + lazy val nodeTransitionResults = testResults.nodeTransitionResults.map { case (nodeTransition, results) => + NodeTransitionResult( + sourceNodeId = nodeTransition.sourceNodeId, + destinationNodeId = nodeTransition.destinationNodeId, + results = results, + ) + }.toList + LiveDataSamplesDto( + nodeTransitionResults = nodeTransitionResults, + invocationResults = testResults.invocationResults, + externalInvocationResults = testResults.externalInvocationResults, + exceptions = testResults.exceptions, + ) + } + + } + + final case class NodeTransitionResult( + sourceNodeId: String, + destinationNodeId: Option[String], + results: List[ResultContext[Json]] + ) + + final case class NodeTransitionThroughputDto( + sourceNodeId: String, + destinationNodeId: Option[String], + throughput: BigDecimal, + ) + + object NodeTransitionThroughput { + + def from(nodeTransitionThroughput: Map[NodeTransition, BigDecimal]): List[NodeTransitionThroughputDto] = { + nodeTransitionThroughput.map { case (k, v) => + NodeTransitionThroughputDto(k.sourceNodeId, k.destinationNodeId, v) + }.toList + } + + } + + implicit def resultContextSchema: Schema[ResultContext[Json]] = Schema.derived + implicit def expressionInvocationResultSchema: Schema[ExpressionInvocationResult[Json]] = Schema.derived + implicit def externalInvocationResultSchema: Schema[ExternalInvocationResult[Json]] = Schema.derived + implicit def throwableSchema: Schema[Throwable] = Schema.string + implicit def exceptionResultSchema: Schema[ExceptionResult[Json]] = Schema.derived + implicit def nodeTransitionResultSchema: Schema[NodeTransitionResult] = Schema.derived + implicit def testResultsSchema: Schema[LiveDataSamplesDto] = Schema.derived + implicit def nodeCountSchema: Schema[NodeCount] = Schema.anyObject + implicit def nodeTransitionThroughputDtoSchema: Schema[NodeTransitionThroughputDto] = Schema.derived + implicit def resultsWithCountsSchema: Schema[LiveDataDto] = Schema.derived + implicit def typingResultDecoder: Decoder[TypingResult] = Decoder.decodeJson.map(_ => typing.Unknown) + implicit def scenarioGraphSchema: Schema[ScenarioGraph] = Schema.anyObject + + sealed trait LiveDataError + + object LiveDataError { + + case object NoPermission extends LiveDataError with CustomAuthorizationError + + case object NoScenario extends LiveDataError + + implicit val noScenarioErrorCodec: Codec[String, NoScenario.type, CodecFormat.TextPlain] = + BaseEndpointDefinitions.toTextPlainCodecSerializationOnly[NoScenario.type] { _ => + s"Scenario not found" + } + + case object LiveDataNotSupported extends LiveDataError + + implicit val liveDataNotSupportedErrorCodec: Codec[String, LiveDataNotSupported.type, CodecFormat.TextPlain] = + BaseEndpointDefinitions.toTextPlainCodecSerializationOnly[LiveDataNotSupported.type] { _ => + s"Live data preview is not supported by this scenario" + } + + case object LiveDataNotAvailable extends LiveDataError + + implicit val liveDataNotAvailableErrorCodec: Codec[String, LiveDataNotAvailable.type, CodecFormat.TextPlain] = + BaseEndpointDefinitions.toTextPlainCodecSerializationOnly[LiveDataNotAvailable.type] { _ => + s"There is currently no live data available for this scenario" + } + + } + +} diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/scenarioLiveData/LiveDataCodecs.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/scenarioLiveData/LiveDataCodecs.scala new file mode 100644 index 00000000000..0e1b784defe --- /dev/null +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/scenarioLiveData/LiveDataCodecs.scala @@ -0,0 +1,62 @@ +package pl.touk.nussknacker.ui.api.description.scenarioLiveData + +import io.circe.{Decoder, DecodingFailure, Encoder, Json} +import io.circe.generic.extras.semiauto.deriveConfiguredEncoder +import pl.touk.nussknacker.engine.testmode.TestProcess.{ + ExceptionResult, + ExpressionInvocationResult, + ExternalInvocationResult, + ResultContext +} +import pl.touk.nussknacker.ui.api.description.scenarioLiveData.Dtos.{ + LiveDataDto, + LiveDataSamplesDto, + NodeTransitionResult, + NodeTransitionThroughputDto +} + +object LiveDataCodecs { + + import io.circe.syntax._ + import pl.touk.nussknacker.engine.api.CirceUtil._ + + implicit val liveDataDtoEncoder: Encoder[LiveDataDto] = + deriveConfiguredEncoder[LiveDataDto].mapJson(_.dropNullValues) + + implicit val liveDataDtoDecoder: Decoder[LiveDataDto] = + Decoder.failed(DecodingFailure("Not implemented", List.empty)) + + implicit val liveDataSamplesDtoEncoder: Encoder[LiveDataSamplesDto] = new Encoder[LiveDataSamplesDto]() { + + implicit val nodeResult: Encoder[ResultContext[Json]] = deriveConfiguredEncoder + implicit val expressionInvocationResult: Encoder[ExpressionInvocationResult[Json]] = deriveConfiguredEncoder + implicit val externalInvocationResult: Encoder[ExternalInvocationResult[Json]] = deriveConfiguredEncoder + implicit val nodeTransitionResult: Encoder[NodeTransitionResult] = deriveConfiguredEncoder + implicit val exceptionResultEncoder: Encoder[ExceptionResult[Json]] = deriveConfiguredEncoder + implicit val throwableEncoder: Encoder[Throwable] = Encoder[Option[String]].contramap(th => Option(th.getMessage)) + + override def apply(a: LiveDataSamplesDto): Json = a match { + case LiveDataSamplesDto( + nodeTransitionResults, + invocationResults, + externalInvocationResults, + exceptions + ) => + Json.obj( + "nodeTransitionResults" -> nodeTransitionResults.asJson, + "invocationResults" -> invocationResults.map { case (node, list) => node -> list.sortBy(_.contextId) }.asJson, + "externalInvocationResults" -> externalInvocationResults.map { case (node, list) => + node -> list.sortBy(_.contextId) + }.asJson, + "exceptions" -> exceptions.sortBy(_.context.id).asJson + ) + } + + } + + implicit val nodeTransitionThroughputDto: Encoder[NodeTransitionThroughputDto] = deriveConfiguredEncoder + + implicit val liveDataSamplesDtoDecoder: Decoder[LiveDataSamplesDto] = + Decoder.failed(DecodingFailure("Not implemented", List.empty)) + +} diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/scenarioLiveData/ScenarioLiveDataApiEndpoints.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/scenarioLiveData/ScenarioLiveDataApiEndpoints.scala new file mode 100644 index 00000000000..4de204c4fff --- /dev/null +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/scenarioLiveData/ScenarioLiveDataApiEndpoints.scala @@ -0,0 +1,66 @@ +package pl.touk.nussknacker.ui.api.description.scenarioLiveData + +import pl.touk.nussknacker.engine.api.process.ProcessName +import pl.touk.nussknacker.restmodel.BaseEndpointDefinitions +import pl.touk.nussknacker.restmodel.BaseEndpointDefinitions.SecuredEndpoint +import pl.touk.nussknacker.security.AuthCredentials +import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioNameCodec._ +import pl.touk.nussknacker.ui.api.description.scenarioLiveData.Dtos.LiveDataError._ +import sttp.model.StatusCode +import sttp.model.StatusCode._ +import sttp.tapir._ +import sttp.tapir.EndpointIO.Example +import sttp.tapir.json.circe.jsonBody + +class ScenarioLiveDataApiEndpoints(auth: EndpointInput[AuthCredentials]) extends BaseEndpointDefinitions { + + import Dtos._ + import LiveDataCodecs._ + + def scenarioLiveDataEndpoint: SecuredEndpoint[ + ProcessName, + LiveDataError, + LiveDataDto, + Any + ] = + baseNuApiEndpoint + .summary("Preview of the data samples currently processed by the scenario") + .tag("Live data") + .get + .in("liveData" / path[ProcessName]("scenarioName")) + .out(statusCode(Ok).and(jsonBody[LiveDataDto])) + .errorOut(errorOutput) + .withSecurity(auth) + + private val errorOutput = oneOf[LiveDataError]( + oneOfVariantFromMatchType[NoScenario.type]( + NotFound, + plainBody[NoScenario.type] + .examples( + List( + Example.of( + summary = Some("Scenario not found"), + value = NoScenario + ), + ) + ) + ), + oneOfVariant[LiveDataNotSupported.type]( + NotImplemented, + plainBody[LiveDataNotSupported.type] + .examples( + List( + Example.of( + summary = Some("Live data preview is not supported by this scenario"), + value = LiveDataNotSupported + ), + ) + ) + ), + oneOfVariant[LiveDataNotAvailable.type]( + StatusCode.NoContent, + emptyOutputAs(LiveDataNotAvailable).description("There is currently no live data available for this scenario") + ) + ) + +} diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicDeploymentManager.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicDeploymentManager.scala index eae34662d13..f3c63e6a526 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicDeploymentManager.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicDeploymentManager.scala @@ -261,6 +261,8 @@ class PeriodicDeploymentManager private[periodic] ( override def schedulingSupport: SchedulingSupport = NoSchedulingSupport + override def liveDataPreviewSupport: LiveDataPreviewSupport = delegate.liveDataPreviewSupport + override def scenarioCompilationDependenciesResource: Resource[SyncIO, EngineScenarioCompilationDependencies] = delegate.scenarioCompilationDependenciesResource diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala index 47e9a62c6ab..df805a24893 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala @@ -42,6 +42,8 @@ object InvalidDeploymentManagerStub extends DeploymentManager { override def schedulingSupport: SchedulingSupport = NoSchedulingSupport + override def liveDataPreviewSupport: LiveDataPreviewSupport = NoLiveDataPreviewSupport + override def scenarioCompilationDependenciesResource: Resource[SyncIO, EngineScenarioCompilationDependencies] = Resource.pure(EngineScenarioCompilationDependencies.empty) diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/test/ScenarioTestService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/test/ScenarioTestService.scala index 897a4b6ad57..5f54b8df816 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/test/ScenarioTestService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/test/ScenarioTestService.scala @@ -4,6 +4,7 @@ import cats.data.EitherT import cats.syntax.either._ import com.carrotsearch.sizeof.RamUsageEstimator import com.typesafe.scalalogging.LazyLogging +import io.circe.Json import pl.touk.nussknacker.engine.api.{MetaData, ProcessVersion} import pl.touk.nussknacker.engine.api.definition.Parameter import pl.touk.nussknacker.engine.api.graph.ScenarioGraph @@ -169,6 +170,16 @@ class ScenarioTestService( } yield ResultsWithCounts(testResults, computeCounts(canonical, isFragment, testResults))).value } + def resultsWithCounts( + testResults: TestResults[Json], + scenarioGraph: ScenarioGraph, + processVersion: ProcessVersion, + isFragment: Boolean, + )(implicit user: LoggedUser): ResultsWithCounts = { + val canonical = toCanonicalProcess(scenarioGraph, processVersion, isFragment) + ResultsWithCounts(testResults, computeCounts(canonical, isFragment, testResults)) + } + def validateSampleSize[E](size: Int)(tooManySamplesError: Int => E): Either[E, Unit] = { Either.cond( size <= testDataSettings.maxSamplesCount, diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/TapirHttpServiceFactory.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/TapirHttpServiceFactory.scala index e36d1bfc36f..1166015efb2 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/TapirHttpServiceFactory.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/TapirHttpServiceFactory.scala @@ -119,6 +119,14 @@ object TapirHttpServiceFactory { scenarioService = processService, ) + val liveDataApiHttpService = new ScenarioLiveDataApiHttpService( + authManager = authManager, + scenarioAuthorizer = processAuthorizer, + processingTypeToScenarioTestServices = processingTypeServicesProvider.mapValues(_.scenarioTestService), + scenarioService = processService, + dmDispatcher = dmDispatcher, + ) + val actionInfoHttpService = new ActionInfoHttpService( authManager = authManager, processingTypeToActionInfoService = processingTypeServicesProvider.mapValues(_.actionInfoService), @@ -231,6 +239,7 @@ object TapirHttpServiceFactory { migrationApiHttpService, nodesApiHttpService, testingApiHttpService, + liveDataApiHttpService, actionInfoHttpService, notificationApiHttpService, scenarioActivityApiHttpService, diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala index 8b76015b910..19e2bbf351d 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala @@ -148,6 +148,9 @@ object FlinkScenarioJobRunnerStub extends FlinkScenarioJobRunner { ): Future[Option[JobID]] = Future.failed(new IllegalAccessException("This implementation shouldn't be used")) + override def liveDataPreviewSupport: LiveDataPreviewSupport = + throw new IllegalAccessException("This implementation shouldn't be used") + } object MockDeploymentManager { diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/livedata/ScenarioLiveDataApiHttpServiceSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/livedata/ScenarioLiveDataApiHttpServiceSpec.scala new file mode 100644 index 00000000000..41fe6dfb37d --- /dev/null +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/livedata/ScenarioLiveDataApiHttpServiceSpec.scala @@ -0,0 +1,133 @@ +package pl.touk.nussknacker.ui.api.livedata + +import io.circe.Json +import io.restassured.RestAssured.given +import io.restassured.module.scala.RestAssuredSupport.AddThenToResponse +import org.apache.pekko.http.scaladsl.model.StatusCodes +import org.scalatest.freespec.AnyFreeSpecLike +import pl.touk.nussknacker.development.manager.MockableDeploymentManagerProvider.MockableDeploymentManager +import pl.touk.nussknacker.engine.api.deployment.{LiveDataPreviewSupported, NoLiveDataPreviewSupport} +import pl.touk.nussknacker.engine.api.deployment.LiveDataPreviewSupported.{LiveData, LiveDataError} +import pl.touk.nussknacker.engine.api.process.ProcessIdWithName +import pl.touk.nussknacker.engine.build.ScenarioBuilder +import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess +import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults +import pl.touk.nussknacker.test.{ + NuRestAssureMatchers, + PatientScalaFutures, + RestAssuredVerboseLoggingIfValidationFails, + WithTestHttpClient +} +import pl.touk.nussknacker.test.base.it.{NuItTest, WithSimplifiedConfigScenarioHelper} +import pl.touk.nussknacker.test.config.{ + WithBusinessCaseRestAssuredUsersExtensions, + WithMockableDeploymentManager, + WithSimplifiedDesignerConfig +} + +import scala.concurrent.Future + +class ScenarioLiveDataApiHttpServiceSpec + extends AnyFreeSpecLike + with NuItTest + with WithTestHttpClient + with WithSimplifiedDesignerConfig + with WithSimplifiedConfigScenarioHelper + with WithMockableDeploymentManager + with WithBusinessCaseRestAssuredUsersExtensions + with NuRestAssureMatchers + with RestAssuredVerboseLoggingIfValidationFails + with PatientScalaFutures { + + import pl.touk.nussknacker.engine.spel.SpelExtension._ + + private val exampleScenario: CanonicalProcess = + ScenarioBuilder + .streaming("scenario_2") + .source( + "Event Generator", + "event-generator", + "count" -> "1".spel, + "value" -> "1".spel, + "schedule" -> "T(java.time.Duration).parse('PT1M')".spel, + ) + .emptySink("end", "dead-end") + + "The endpoint for live data should" - { + "return present, but empty live data" in { + val mockedResults = + LiveData(TestResults[Json](Map.empty, Map.empty, Map.empty, Map.empty, List.empty), Map.empty) + given() + .applicationState { + createSavedScenario(exampleScenario) + MockableDeploymentManager.configureLiveDataPreviewSupport( + new LiveDataPreviewSupported { + override def getLiveData( + processIdWithName: ProcessIdWithName + ): Future[Either[LiveDataError, LiveData]] = Future.successful(Right(mockedResults)) + } + ) + } + .when() + .basicAuthAllPermUser() + .get(s"$nuDesignerHttpAddress/api/liveData/${exampleScenario.name}") + .Then() + .statusCode(StatusCodes.OK.intValue) + .equalsJsonBody( + s"""{ + | "results": { + | "nodeTransitionResults": [], + | "invocationResults": {}, + | "externalInvocationResults": {}, + | "exceptions": [] + | }, + | "counts": { + | "Event Generator": { + | "all": 0, + | "errors": 0, + | "fragmentCounts": {} + | }, + | "end": { + | "all": 0, + | "errors": 0, + | "fragmentCounts": {} + | } + | }, + | "nodeTransitionThroughput": [] + |}""".stripMargin + ) + } + "return not present live data" in { + given() + .applicationState { + createSavedScenario(exampleScenario) + MockableDeploymentManager.configureLiveDataPreviewSupport( + new LiveDataPreviewSupported { + override def getLiveData( + processIdWithName: ProcessIdWithName + ): Future[Either[LiveDataError, LiveData]] = + Future.successful(Left(LiveDataError.NoLiveDataAvailableForScenario)) + } + ) + } + .when() + .basicAuthAllPermUser() + .get(s"$nuDesignerHttpAddress/api/liveData/${exampleScenario.name}") + .Then() + .statusCode(StatusCodes.NoContent.intValue) + } + "return live data not supported error" in { + given() + .applicationState { + createSavedScenario(exampleScenario) + MockableDeploymentManager.configureLiveDataPreviewSupport(NoLiveDataPreviewSupport) + } + .when() + .basicAuthAllPermUser() + .get(s"$nuDesignerHttpAddress/api/liveData/${exampleScenario.name}") + .Then() + .statusCode(StatusCodes.NotImplemented.intValue) + } + } + +} diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/testing/SchemalessKafkaJsonTypeTests.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/testing/SchemalessKafkaJsonTypeTests.scala index d436cde671e..791190a9564 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/testing/SchemalessKafkaJsonTypeTests.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/testing/SchemalessKafkaJsonTypeTests.scala @@ -17,6 +17,7 @@ import io.restassured.RestAssured.`given` import io.restassured.module.scala.RestAssuredSupport.AddThenToResponse import org.apache.kafka.clients.admin.NewTopic import org.scalatest.freespec.AnyFreeSpecLike +import pl.touk.nussknacker.development.manager.MockableDeploymentManagerProvider.MockableDeploymentManager import pl.touk.nussknacker.engine.api.graph.ScenarioGraph import pl.touk.nussknacker.engine.api.json.decoders.TypingResultDecoder import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedObjectTypingResult, TypingResult} @@ -81,6 +82,11 @@ class SchemalessKafkaJsonTypeTests createKafkaTopics() } + override def beforeEach(): Unit = { + super.beforeEach() + MockableDeploymentManager.clean() + } + "The endpoint for adhoc validate should" - { "return no errors on valid parameters" in { shouldValidateParametersProperly() diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/DeploymentManagerStub.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/DeploymentManagerStub.scala index 67f00946484..4c21e6979c2 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/DeploymentManagerStub.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/DeploymentManagerStub.scala @@ -66,6 +66,8 @@ class DeploymentManagerStub extends BaseDeploymentManager { override def schedulingSupport: SchedulingSupport = NoSchedulingSupport + override def liveDataPreviewSupport: LiveDataPreviewSupport = NoLiveDataPreviewSupport + override def deploymentsStatusesQueryForAllScenariosSupport: DeploymentsStatusesQueryForAllScenariosSupport = new DeploymentsStatusesQueryForAllScenariosSupported { diff --git a/docs-internal/api/nu-designer-openapi.yaml b/docs-internal/api/nu-designer-openapi.yaml index 89cd4ef0a08..afa3700dba5 100644 --- a/docs-internal/api/nu-designer-openapi.yaml +++ b/docs-internal/api/nu-designer-openapi.yaml @@ -4588,6 +4588,88 @@ paths: security: - {} - httpAuth: [] + /api/liveData/{scenarioName}: + get: + tags: + - Live data + summary: Preview of the data samples currently processed by the scenario + operationId: getApiLivedataScenarioname + parameters: + - name: Nu-Impersonate-User-Identity + in: header + required: false + schema: + type: + - string + - 'null' + - name: scenarioName + in: path + required: true + schema: + type: string + responses: + '200': + description: '' + content: + application/json: + schema: + $ref: '#/components/schemas/LiveDataDto' + '400': + description: 'Invalid value for: header Nu-Impersonate-User-Identity' + content: + text/plain: + schema: + type: string + '204': + description: There is currently no live data available for this scenario + '401': + description: '' + content: + text/plain: + schema: + type: string + examples: + CannotAuthenticateUser: + value: The supplied authentication is invalid + ImpersonatedUserNotExistsError: + value: No impersonated user data found for provided identity + '403': + description: '' + content: + text/plain: + schema: + type: string + examples: + InsufficientPermission: + value: The supplied authentication is not authorized to access this + resource + ImpersonationMissingPermission: + value: The supplied authentication is not authorized to impersonate + '404': + description: '' + content: + text/plain: + schema: + type: string + examples: + Example: + summary: Scenario not found + value: Scenario not found + '501': + description: Impersonation is not supported for defined authentication mechanism + content: + text/plain: + schema: + anyOf: + - type: string + - type: string + examples: + Example: + summary: Live data preview is not supported by this scenario + value: Live data preview is not supported by this scenario + security: + - {} + - httpAuth: [] /api/scenarioTesting/{scenarioName}/capabilities: post: tags: @@ -5935,6 +6017,42 @@ components: - type: array items: $ref: '#/components/schemas/FragmentParameter' + LiveDataDto: + title: LiveDataDto + type: object + required: + - results + - counts + properties: + results: + $ref: '#/components/schemas/LiveDataSamplesDto' + counts: + $ref: '#/components/schemas/Map_NodeCount' + nodeTransitionThroughput: + type: + - array + - 'null' + items: + $ref: '#/components/schemas/NodeTransitionThroughputDto' + LiveDataSamplesDto: + title: LiveDataSamplesDto + type: object + required: + - invocationResults + - externalInvocationResults + properties: + nodeTransitionResults: + type: array + items: + $ref: '#/components/schemas/NodeTransitionResult' + invocationResults: + $ref: '#/components/schemas/Map_List_ExpressionInvocationResult_Json' + externalInvocationResults: + $ref: '#/components/schemas/Map_List_ExternalInvocationResult_Json' + exceptions: + type: array + items: + $ref: '#/components/schemas/ExceptionResult_Json' Map_EngineSetupName_List_String: title: Map_EngineSetupName_List_String type: object @@ -6503,6 +6621,37 @@ components: type: array items: $ref: '#/components/schemas/ResultContext_Json' + NodeTransitionResult1: + title: NodeTransitionResult + type: object + required: + - sourceNodeId + properties: + sourceNodeId: + type: string + destinationNodeId: + type: + - string + - 'null' + results: + type: array + items: + $ref: '#/components/schemas/ResultContext_Json' + NodeTransitionThroughputDto: + title: NodeTransitionThroughputDto + type: object + required: + - sourceNodeId + - throughput + properties: + sourceNodeId: + type: string + destinationNodeId: + type: + - string + - 'null' + throughput: + type: number NodeTypes: title: NodeTypes type: string @@ -6955,10 +7104,14 @@ components: type: object required: - id + - timestamp - variables properties: id: type: string + timestamp: + type: string + format: date-time variables: $ref: '#/components/schemas/Map_Json' ResultsWithCountsDto: @@ -7477,7 +7630,7 @@ components: - array - 'null' items: - $ref: '#/components/schemas/NodeTransitionResult' + $ref: '#/components/schemas/NodeTransitionResult1' invocationResults: $ref: '#/components/schemas/Map_List_ExpressionInvocationResult_Json' externalInvocationResults: diff --git a/docs/Changelog.md b/docs/Changelog.md index 3e808d260ea..99072f67559 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -177,6 +177,17 @@ description: Stay informed with detailed changelogs covering new features, impro * [#8042](https://github.com/TouK/nussknacker/pull/8042) Merge OpenAPI components into one with multiple services. * [7684](https://github.com/TouK/nussknacker/pull/7684) Add action redeploy. Till now action deploy was responsible for both deploy and redeploy operations. Now they are separated in scenario workflow and UI. Kafka source has different deployment parameters for deploy and redeploy actions. +* [#8047](https://github.com/TouK/nussknacker/pull/8047) Added functionality of collecting live data samples and node transition throughput + * live data preview is optional and available for now only for Flink minicluster + * there is a new endpoint `/liveData/{scenarioName}`, which returns live data samples and throughput information + * the functionality can be configured by setting in the 'modelConfig' section of the scenario type: + ```hocon + liveDataPreview { // optional config section, functionality disabled by default + enabled: true // disabled by default + maxNumberOfSamples: 20 // max number of latest live data samples that will be returned + throughputTimeWindowInSeconds: 60 // the time windows, for which the node transition throughput will be calculated + } + ``` ## 1.18 diff --git a/e2e-tests/src/test/scala/pl/touk/nussknacker/BatchDataGenerationSpec.scala b/e2e-tests/src/test/scala/pl/touk/nussknacker/BatchDataGenerationSpec.scala index 34dd9a452d5..45c7295ac1a 100644 --- a/e2e-tests/src/test/scala/pl/touk/nussknacker/BatchDataGenerationSpec.scala +++ b/e2e-tests/src/test/scala/pl/touk/nussknacker/BatchDataGenerationSpec.scala @@ -109,13 +109,14 @@ class BatchDataGenerationSpec ) .Then() .statusCode(200) - .equalsJsonBody( + .matchJsonWithRegexValuesBody( s"""{ | "results": { | "nodeResults": { | "sourceId": [ | { | "id": "E2ETest-SumTransactions-sourceId-0-0", + | "timestamp": "${regexes.zuluDateRegex}", | "variables": { | "input": { | "pretty": { @@ -132,6 +133,7 @@ class BatchDataGenerationSpec | "end": [ | { | "id": "E2ETest-SumTransactions-sourceId-0-0", + | "timestamp": "${regexes.zuluDateRegex}", | "variables": { | "input": { | "pretty": { @@ -153,6 +155,7 @@ class BatchDataGenerationSpec | "results": [ | { | "id": "E2ETest-SumTransactions-sourceId-0-0", + | "timestamp": "${regexes.zuluDateRegex}", | "variables": { | "input": { | "pretty": { @@ -208,13 +211,14 @@ class BatchDataGenerationSpec ) .Then() .statusCode(200) - .equalsJsonBody( + .matchJsonWithRegexValuesBody( s"""{ | "results": { | "nodeResults": { | "sourceId": [ | { | "id": "E2ETest-SumTransactions-sourceId-0-0", + | "timestamp": "${regexes.zuluDateRegex}", | "variables": { | "input": { | "pretty": { @@ -231,6 +235,7 @@ class BatchDataGenerationSpec | "end": [ | { | "id": "E2ETest-SumTransactions-sourceId-0-0", + | "timestamp": "${regexes.zuluDateRegex}", | "variables": { | "input": { | "pretty": { @@ -252,6 +257,7 @@ class BatchDataGenerationSpec | "results": [ | { | "id": "E2ETest-SumTransactions-sourceId-0-0", + | "timestamp": "${regexes.zuluDateRegex}", | "variables": { | "input": { | "pretty": { diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala index 2c3395bbfd3..b1f68dd46eb 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala @@ -197,6 +197,8 @@ class DevelopmentDeploymentManager( override def schedulingSupport: SchedulingSupport = NoSchedulingSupport + override def liveDataPreviewSupport: LiveDataPreviewSupport = NoLiveDataPreviewSupport + override def scenarioCompilationDependenciesResource: Resource[SyncIO, EngineScenarioCompilationDependencies] = Resource.pure(EngineScenarioCompilationDependencies.empty) diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala index b85f26e5ca4..9efcc91f665 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala @@ -151,6 +151,10 @@ object MockableDeploymentManagerProvider { override def schedulingSupport: SchedulingSupport = NoSchedulingSupport + override def liveDataPreviewSupport: LiveDataPreviewSupport = { + configurator.liveDataPreviewSupport.get() + } + override def managerSpecificScenarioActivities( processIdWithName: ProcessIdWithName, after: Option[Instant], @@ -176,6 +180,8 @@ object MockableDeploymentManagerProvider { new AtomicReference[Map[DeploymentId, Try[Option[ExternalDeploymentId]]]](Map.empty) private[MockableDeploymentManagerProvider] val managerSpecificScenarioActivities = new AtomicReference[List[ScenarioActivity]](List.empty) + private[MockableDeploymentManagerProvider] val liveDataPreviewSupport = + new AtomicReference[LiveDataPreviewSupport](NoLiveDataPreviewSupport) def configureScenarioStatuses(scenarioStates: Map[ScenarioName, BasicStatusDetails]): Unit = { scenarioStatuses.set(scenarioStates) @@ -193,11 +199,16 @@ object MockableDeploymentManagerProvider { managerSpecificScenarioActivities.set(scenarioActivities) } + def configureLiveDataPreviewSupport(support: LiveDataPreviewSupport): Unit = { + liveDataPreviewSupport.set(support) + } + def clean(): Unit = { scenarioStatuses.set(Map.empty) deploymentResults.set(Map.empty) testResults.set(Map.empty) managerSpecificScenarioActivities.set(List.empty) + liveDataPreviewSupport.set(NoLiveDataPreviewSupport) } } diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala index 8aaa925b8bb..a3848ac55d2 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala @@ -11,13 +11,16 @@ import org.scalatest.prop.Tables.Table import pl.touk.nussknacker.engine.ScenarioCompilationDependencies import pl.touk.nussknacker.engine.api.{FragmentSpecificData, JobData, MetaData, ProcessVersion, VariableConstants} import pl.touk.nussknacker.engine.api.component.ComponentDefinition -import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.{CannotCreateObjectError, ExpressionParserCompilationError} +import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.{ + CannotCreateObjectError, + ExpressionParserCompilationError +} import pl.touk.nussknacker.engine.api.definition.EngineScenarioCompilationDependencies import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult} import pl.touk.nussknacker.engine.build.ScenarioBuilder -import pl.touk.nussknacker.engine.canonicalgraph.{CanonicalProcess, canonicalnode} +import pl.touk.nussknacker.engine.canonicalgraph.{canonicalnode, CanonicalProcess} import pl.touk.nussknacker.engine.compile.{CompilationResult, FragmentResolver, ProcessValidator} import pl.touk.nussknacker.engine.definition.component.parameter.editor.ParameterTypeEditorDeterminer import pl.touk.nussknacker.engine.flink.FlinkBaseUnboundedComponentProvider diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkProcessCompilerDataFactory.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkProcessCompilerDataFactory.scala index 24e1636129d..17b798fe83f 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkProcessCompilerDataFactory.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkProcessCompilerDataFactory.scala @@ -40,18 +40,20 @@ class FlinkProcessCompilerDataFactory( runtimeMode: RuntimeMode, configsFromProviderWithDictionaryEditor: Map[DesignerWideComponentId, ComponentAdditionalConfig], nodesData: NodesDeploymentData, + processListeners: List[ProcessListener], ) extends Serializable { import net.ceedubs.ficus.Ficus._ import net.ceedubs.ficus.readers.ArbitraryTypeReader._ - def this(modelData: ModelData, deploymentData: DeploymentData) = this( + def this(modelData: ModelData, deploymentData: DeploymentData, processListeners: List[ProcessListener]) = this( modelData.configCreator, modelData.extractModelDefinitionFun, modelData.modelConfig, runtimeMode = RuntimeMode.Live, modelData.additionalConfigsFromProvider, - nodesData = deploymentData.nodesData + nodesData = deploymentData.nodesData, + processListeners = processListeners, ) def prepareCompilerData( @@ -76,7 +78,7 @@ class FlinkProcessCompilerDataFactory( modelConfig.underlyingConfig.as[DefaultServiceExecutionContextPreparer]("asyncExecutionConfig") ) val defaultListeners = prepareDefaultListeners(usedNodes) ++ creator.listeners(modelConfig) - val listenersToUse = adjustListeners(defaultListeners, modelConfig) + val listenersToUse = adjustListeners(defaultListeners, modelConfig) ++ processListeners val (definitionWithTypes, dictRegistry) = definitions(modelConfig, userCodeClassLoader) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/StubbedFlinkProcessCompilerDataFactory.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/StubbedFlinkProcessCompilerDataFactory.scala index a8744203f02..26a5d70c0b6 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/StubbedFlinkProcessCompilerDataFactory.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/StubbedFlinkProcessCompilerDataFactory.scala @@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.process.compiler import pl.touk.nussknacker.engine.{ModelConfig, RuntimeMode} import pl.touk.nussknacker.engine.ModelData.ExtractDefinitionFun -import pl.touk.nussknacker.engine.api.{NodeId, Params} +import pl.touk.nussknacker.engine.api.{NodeId, Params, ProcessListener} import pl.touk.nussknacker.engine.api.component.{ ComponentAdditionalConfig, ComponentType, @@ -36,6 +36,7 @@ abstract class StubbedFlinkProcessCompilerDataFactory( runtimeMode: RuntimeMode, configsFromProviderWithDictionaryEditor: Map[DesignerWideComponentId, ComponentAdditionalConfig], nodesDeploymentData: NodesDeploymentData, + processListeners: List[ProcessListener], ) extends FlinkProcessCompilerDataFactory( creator, extractModelDefinition, @@ -43,6 +44,7 @@ abstract class StubbedFlinkProcessCompilerDataFactory( runtimeMode, configsFromProviderWithDictionaryEditor, nodesDeploymentData, + processListeners, ) { override protected def adjustDefinitions( diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/TestFlinkProcessCompilerDataFactory.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/TestFlinkProcessCompilerDataFactory.scala index 823ef2c3e1b..b06ab1a7da7 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/TestFlinkProcessCompilerDataFactory.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/TestFlinkProcessCompilerDataFactory.scala @@ -42,6 +42,7 @@ object TestFlinkProcessCompilerDataFactory { RuntimeMode.Test, modelData.additionalConfigsFromProvider, NodesDeploymentData.empty, + List.empty, ) { override protected def adjustListeners( diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/VerificationFlinkProcessCompilerDataFactory.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/VerificationFlinkProcessCompilerDataFactory.scala index c33344941a6..065fc5574f9 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/VerificationFlinkProcessCompilerDataFactory.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/VerificationFlinkProcessCompilerDataFactory.scala @@ -19,7 +19,8 @@ object VerificationFlinkProcessCompilerDataFactory { modelData.modelConfig, runtimeMode = RuntimeMode.Live, modelData.additionalConfigsFromProvider, - NodesDeploymentData.empty + NodesDeploymentData.empty, + List.empty, ) { override protected def adjustListeners( diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkScenarioJob.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkScenarioJob.scala index 1b72c6066fa..81263456ef6 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkScenarioJob.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkScenarioJob.scala @@ -4,7 +4,7 @@ import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import pl.touk.nussknacker.engine.{BaseModelData, ModelData} import pl.touk.nussknacker.engine.ModelData.BaseModelDataExt -import pl.touk.nussknacker.engine.api.ProcessVersion +import pl.touk.nussknacker.engine.api.{ProcessListener, ProcessVersion} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.DeploymentData import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig} @@ -18,13 +18,15 @@ object FlinkScenarioJob { scenario: CanonicalProcess, processVersion: ProcessVersion, deploymentData: DeploymentData, - env: StreamExecutionEnvironment + env: StreamExecutionEnvironment, + processListeners: List[ProcessListener], ): JobExecutionResult = new FlinkScenarioJob(modelData.asInvokableModelData).run( scenario, processVersion, deploymentData, - env + env, + processListeners, ) } @@ -35,9 +37,10 @@ class FlinkScenarioJob(modelData: ModelData) { scenario: CanonicalProcess, processVersion: ProcessVersion, deploymentData: DeploymentData, - env: StreamExecutionEnvironment + env: StreamExecutionEnvironment, + processListeners: List[ProcessListener], ): JobExecutionResult = { - val compilerFactory = new FlinkProcessCompilerDataFactory(modelData, deploymentData) + val compilerFactory = new FlinkProcessCompilerDataFactory(modelData, deploymentData, processListeners) val executionConfigPreparer = ExecutionConfigPreparer.defaultChain(modelData) val registrar = FlinkProcessRegistrar(compilerFactory, FlinkJobConfig.parse(modelData.modelConfig), executionConfigPreparer) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkScenarioMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkScenarioMain.scala index 58622acea5d..46d20a06cc1 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkScenarioMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkScenarioMain.scala @@ -43,6 +43,7 @@ class FlinkScenarioMain(preprocessArgs: Array[String] => Array[String]) extends processVersion, deploymentData, StreamExecutionEnvironment.getExecutionEnvironment, + List.empty, ) } catch { // marker exception for graph optimalization diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkScenarioUnitTestJob.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkScenarioUnitTestJob.scala index 00fed6e2ddb..016e8b7fdab 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkScenarioUnitTestJob.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkScenarioUnitTestJob.scala @@ -32,7 +32,7 @@ class FlinkScenarioUnitTestJob(modelData: ModelData) { val version = ProcessVersion.empty val registrar = FlinkProcessRegistrar( - new FlinkProcessCompilerDataFactory(modelData, deploymentData), + new FlinkProcessCompilerDataFactory(modelData, deploymentData, List.empty), FlinkJobConfig.parse(modelData.modelConfig), ExecutionConfigPreparer.unOptimizedChain(modelData) ) diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkScenarioJobSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkScenarioJobSpec.scala index 84fb4bf0a1c..bc74d97d8b7 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkScenarioJobSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkScenarioJobSpec.scala @@ -37,7 +37,8 @@ class FlinkScenarioJobSpec extends AnyFlatSpec with Matchers with Inside with Be process, ProcessVersion.empty, DeploymentData.empty, - env + env, + List.empty, ) flinkMiniClusterWithServices.waitForJobIsFinished(executionResult.getJobID) } diff --git a/engine/flink/management/src/it/resources/application.conf b/engine/flink/management/src/it/resources/application.conf index dbf412e135a..6a3552b7d3c 100644 --- a/engine/flink/management/src/it/resources/application.conf +++ b/engine/flink/management/src/it/resources/application.conf @@ -7,4 +7,9 @@ modelConfig { // TODO: we should test more common case - with rocksdb turned on. See designer/README.md for details how enable: false } + liveDataPreview { + enabled: true + maxNumberOfSamples: 20 + throughputTimeWindowInSeconds: 60 + } } diff --git a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/BaseFlinkDeploymentManagerSpec.scala b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/BaseFlinkDeploymentManagerSpec.scala index b50d09d8bce..30d575e88ba 100644 --- a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/BaseFlinkDeploymentManagerSpec.scala +++ b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/BaseFlinkDeploymentManagerSpec.scala @@ -1,6 +1,8 @@ package pl.touk.nussknacker.engine.management.streaming import com.typesafe.scalalogging.StrictLogging +import io.circe.Json +import io.circe.syntax.EncoderOps import org.apache.flink.api.common.JobID import org.scalatest.funsuite.AnyFunSuiteLike import org.scalatest.matchers.should.Matchers @@ -16,9 +18,12 @@ import pl.touk.nussknacker.engine.classloader.ModelClassLoaderFactory import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode import pl.touk.nussknacker.engine.deployment.{DeploymentData, DeploymentId, ExternalDeploymentId} import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory +import pl.touk.nussknacker.engine.livedata.LiveDataCollectingListenerHolder +import pl.touk.nussknacker.engine.testmode.TestProcess._ import java.net.URI import java.nio.file.{Files, Paths} +import java.time.Instant import java.util.UUID import scala.concurrent.ExecutionContext.Implicits._ @@ -66,6 +71,96 @@ trait BaseFlinkDeploymentManagerSpec extends AnyFunSuiteLike with Matchers with } } + test("deploy scenario in running flink with event generator") { + val processName = ProcessName("runningFlinkEventGenerator") + + val version = VersionId(15) + val process = SampleProcess.prepareProcessWithEventGeneratorSource(processName) + val deploymentId = DeploymentId("with-event-generator") + + LiveDataCollectingListenerHolder.createListenerFor( + processName = processName, + maxNumberOfSamples = 20, + throughputTimeWindowInSeconds = 60 + ) + val externalDeploymentIdOpt = deployProcessAndWaitIfRunning( + process = process, + processVersion = ProcessVersion(version, processName, processId, List.empty, "user1", Some(13)), + deploymentId = deploymentId + ) + try { + deploymentStatus(processName) shouldBe List( + DeploymentStatusDetails( + status = SimpleStateStatus.Running, + deploymentId = Some(deploymentId), + version = Some(version) + ) + ) + + eventually { + if (useMiniClusterForDeployment) { + // Wait until first live data samples are collected + val liveDataOpt = LiveDataCollectingListenerHolder.getLiveDataPreview(processName).toOption + liveDataOpt shouldBe defined + val liveDataSamples = liveDataOpt.get.liveDataSamples + + // Wait until first 15 live data samples are collected + liveDataSamples.nodeResults.get("start").map(_.size) shouldBe Some(15) + + val (liveDataWithMockedTimestamp, mockedTimestamp) = withFixedTimestamp(liveDataSamples) + + externalDeploymentIdOpt shouldBe defined + val expected = new TestResults[Json]( + nodeResults = Map( + "start" -> + (0 to 14).map { idx => + ResultContext( + s"runningFlinkEventGenerator-start-0-$idx", + mockedTimestamp, + Map("input" -> Json.obj("pretty" -> "abrakadabra".asJson)), + ) + }.toList, + "endSend" -> + (0 to 14).map { idx => + ResultContext( + s"runningFlinkEventGenerator-start-0-$idx", + mockedTimestamp, + Map("input" -> Json.obj("pretty" -> "abrakadabra".asJson)), + ) + }.toList, + ), + nodeTransitionResults = Map( + NodeTransition("start", Some("endSend")) -> + (0 to 14).map { idx => + ResultContext( + s"runningFlinkEventGenerator-start-0-$idx", + mockedTimestamp, + Map("input" -> Json.obj("pretty" -> "abrakadabra".asJson)), + ) + }.toList, + ), + invocationResults = Map( + "endSend" -> + (0 to 14).map { idx => + ExpressionInvocationResult( + s"runningFlinkEventGenerator-start-0-$idx", + "Value", + Json.obj("pretty" -> "message".asJson) + ) + }.toList, + ), + externalInvocationResults = Map(), + exceptions = List(), + ) + liveDataWithMockedTimestamp shouldBe expected + } + externalDeploymentIdOpt shouldBe defined + } + } finally { + cancelProcess(processName) + } + } + test("use deploymentId passed as a jobId") { val processName = ProcessName("jobWithDeploymentIdAsAUuid") @@ -354,4 +449,43 @@ trait BaseFlinkDeploymentManagerSpec extends AnyFunSuiteLike with Matchers with private def deploymentStatus(name: ProcessName): List[DeploymentStatusDetails] = deploymentManager.getScenarioDeploymentsStatuses(name).futureValue.value + + // Test results (and live data results represented by the same data structures) contain timestamps + // In order to assert them in tests, we substitute them with fixed Instant. + private def withFixedTimestamp(testResults: TestResults[Json]): (TestResults[Json], Instant) = { + val fixedInstant = Instant.now + ( + TestResults[Json]( + nodeResults = withFixedTimestamp(testResults.nodeResults, fixedInstant), + nodeTransitionResults = withFixedTimestamp(testResults.nodeTransitionResults, fixedInstant), + invocationResults = testResults.invocationResults, + externalInvocationResults = testResults.externalInvocationResults, + exceptions = testResults.exceptions.map(withFixedTimestamp(_, fixedInstant)), + ), + fixedInstant + ) + } + + private def withFixedTimestamp( + exceptionResult: ExceptionResult[Json], + fixedTimestamp: Instant + ): ExceptionResult[Json] = { + ExceptionResult( + withFixedTimestamp(exceptionResult.context, fixedTimestamp), + exceptionResult.nodeId, + exceptionResult.throwable + ) + } + + private def withFixedTimestamp[K]( + results: Map[K, List[ResultContext[Json]]], + fixedTimestamp: Instant + ): Map[K, List[ResultContext[Json]]] = { + results.map { case (k, v) => (k, v.map(withFixedTimestamp(_, fixedTimestamp))) } + } + + private def withFixedTimestamp(resultContext: ResultContext[Json], fixedTimestamp: Instant): ResultContext[Json] = { + ResultContext(resultContext.id, fixedTimestamp, resultContext.variables) + } + } diff --git a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkDeploymentManagerScenarioTestingSpec.scala b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkDeploymentManagerScenarioTestingSpec.scala index 91fa566f74c..ee1aa3cfc1f 100644 --- a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkDeploymentManagerScenarioTestingSpec.scala +++ b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkDeploymentManagerScenarioTestingSpec.scala @@ -13,7 +13,6 @@ import pl.touk.nussknacker.engine.api.deployment.DMTestScenarioCommand import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.api.test.{ScenarioTestData, ScenarioTestJsonRecord} import pl.touk.nussknacker.engine.build.ScenarioBuilder -import pl.touk.nussknacker.engine.testmode.TestProcess.ResultContext import pl.touk.nussknacker.test.{KafkaConfigProperties, VeryPatientScalaFutures, WithConfig} import java.util.UUID @@ -65,10 +64,10 @@ class FlinkDeploymentManagerScenarioTestingSpec val process = SampleProcess.prepareProcess(processName) whenReady(deploymentManager.processCommand(DMTestScenarioCommand(processVersion, process, scenarioTestData))) { r => - r.nodeResults shouldBe Map( - "startProcess" -> List(ResultContext(s"$processName-startProcess-0-0", Map("input" -> variable("terefere")))), - "nightFilter" -> List(ResultContext(s"$processName-startProcess-0-0", Map("input" -> variable("terefere")))), - "endSend" -> List(ResultContext(s"$processName-startProcess-0-0", Map("input" -> variable("terefere")))) + r.nodeResults.map { case (key, values) => (key, values.map(v => (v.id, v.variables))) } shouldBe Map( + "startProcess" -> List((s"$processName-startProcess-0-0", Map("input" -> variable("terefere")))), + "nightFilter" -> List((s"$processName-startProcess-0-0", Map("input" -> variable("terefere")))), + "endSend" -> List((s"$processName-startProcess-0-0", Map("input" -> variable("terefere")))), ) } } diff --git a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/SampleProcess.scala b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/SampleProcess.scala index 3cd15b344cd..abef25f07dc 100644 --- a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/SampleProcess.scala +++ b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/SampleProcess.scala @@ -19,6 +19,21 @@ object SampleProcess { .emptySink("endSend", "sendSms", "Value" -> "'message'".spel) } + def prepareProcessWithEventGeneratorSource(name: ProcessName, parallelism: Option[Int] = None): CanonicalProcess = { + val baseProcessBuilder = ScenarioBuilder.streaming(name.value) + parallelism + .map(baseProcessBuilder.parallelism) + .getOrElse(baseProcessBuilder) + .source( + "start", + "event-generator", + "schedule" -> "T(java.time.Duration).ofSeconds(1)".spel, + "count" -> "1".spel, + "value" -> s"'abrakadabra'".spel + ) + .emptySink("endSend", "sendSms", "Value" -> "'message'".spel) + } + def kafkaProcess(name: ProcessName, topic: String): CanonicalProcess = { ScenarioBuilder .streaming(name.value) diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala index 118219333cf..f9ec4f12328 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala @@ -376,6 +376,9 @@ class FlinkDeploymentManager( .map(new FlinkScenarioCompilationDependencies(_)) } + override def liveDataPreviewSupport: LiveDataPreviewSupport = + jobRunner.liveDataPreviewSupport + } object FlinkDeploymentManager { diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/jobrunner/FlinkMiniClusterScenarioJobRunner.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/jobrunner/FlinkMiniClusterScenarioJobRunner.scala index 9e07dcbd9e6..e4a593dea01 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/jobrunner/FlinkMiniClusterScenarioJobRunner.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/jobrunner/FlinkMiniClusterScenarioJobRunner.scala @@ -4,8 +4,17 @@ import org.apache.flink.api.common.{JobExecutionResult, JobID} import org.apache.flink.configuration.{Configuration, PipelineOptionsInternal} import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings import pl.touk.nussknacker.engine.BaseModelDataProvider -import pl.touk.nussknacker.engine.api.deployment.DMRunDeploymentCommand +import pl.touk.nussknacker.engine.ModelConfig.LiveDataPreviewMode +import pl.touk.nussknacker.engine.api.deployment.{ + DMRunDeploymentCommand, + LiveDataPreviewSupport, + LiveDataPreviewSupported, + NoLiveDataPreviewSupport +} +import pl.touk.nussknacker.engine.api.deployment.LiveDataPreviewSupported.{LiveData, LiveDataError} +import pl.touk.nussknacker.engine.api.process.ProcessIdWithName import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterWithServices +import pl.touk.nussknacker.engine.livedata.LiveDataCollectingListenerHolder import pl.touk.nussknacker.engine.management.FlinkDeploymentManager.DeploymentIdOps import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker @@ -40,13 +49,27 @@ class FlinkMiniClusterScenarioJobRunner( conf.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId.toHexString) } env.configure(conf) + val liveDataCollectingListener = + modelDataProvider.getCurrentModelData().modelConfig.liveDataPreviewMode match { + case LiveDataPreviewMode.Disabled => + None + case LiveDataPreviewMode.Enabled(maxNumberOfSamples, throughputTimeWindowInSeconds) => + Some( + LiveDataCollectingListenerHolder.createListenerFor( + command.processVersion.processName, + maxNumberOfSamples, + throughputTimeWindowInSeconds + ) + ) + } val jobID = jobInvoker .invokeStaticMethod( modelDataProvider.getCurrentModelData(), command.canonicalProcess, command.processVersion, command.deploymentData, - env + env, + liveDataCollectingListener.toList, ) .getJobID Some(jobID) @@ -54,4 +77,16 @@ class FlinkMiniClusterScenarioJobRunner( } } + override def liveDataPreviewSupport: LiveDataPreviewSupport = { + modelDataProvider.getCurrentModelData().modelConfig.liveDataPreviewMode match { + case LiveDataPreviewMode.Enabled(_, _) => + new LiveDataPreviewSupported { + override def getLiveData(processIdWithName: ProcessIdWithName): Future[Either[LiveDataError, LiveData]] = + Future(LiveDataCollectingListenerHolder.getLiveDataPreview(processIdWithName.name)) + } + case LiveDataPreviewMode.Disabled => + NoLiveDataPreviewSupport + } + } + } diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/jobrunner/FlinkScenarioJobRunner.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/jobrunner/FlinkScenarioJobRunner.scala index bb46821ca13..a8aa25bb466 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/jobrunner/FlinkScenarioJobRunner.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/jobrunner/FlinkScenarioJobRunner.scala @@ -1,7 +1,7 @@ package pl.touk.nussknacker.engine.management.jobrunner import org.apache.flink.api.common.JobID -import pl.touk.nussknacker.engine.api.deployment.DMRunDeploymentCommand +import pl.touk.nussknacker.engine.api.deployment.{DMRunDeploymentCommand, LiveDataPreviewSupport} import scala.concurrent.Future @@ -12,4 +12,6 @@ trait FlinkScenarioJobRunner { savepointPathOpt: Option[String] ): Future[Option[JobID]] + def liveDataPreviewSupport: LiveDataPreviewSupport + } diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/jobrunner/RemoteFlinkScenarioJobRunner.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/jobrunner/RemoteFlinkScenarioJobRunner.scala index 1f16b6491ad..171de9ff72d 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/jobrunner/RemoteFlinkScenarioJobRunner.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/jobrunner/RemoteFlinkScenarioJobRunner.scala @@ -4,7 +4,11 @@ import io.circe.syntax.EncoderOps import org.apache.flink.api.common.JobID import pl.touk.nussknacker.engine.BaseModelDataProvider import pl.touk.nussknacker.engine.api.ProcessVersion -import pl.touk.nussknacker.engine.api.deployment.DMRunDeploymentCommand +import pl.touk.nussknacker.engine.api.deployment.{ + DMRunDeploymentCommand, + LiveDataPreviewSupport, + NoLiveDataPreviewSupport +} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.DeploymentData import pl.touk.nussknacker.engine.management.FlinkDeploymentManager.DeploymentIdOps @@ -39,6 +43,9 @@ class RemoteFlinkScenarioJobRunner(modelDataProvider: BaseModelDataProvider, cli ) } + override def liveDataPreviewSupport: LiveDataPreviewSupport = + NoLiveDataPreviewSupport + } object RemoteFlinkScenarioJobRunner { diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala index 6658a24218d..c00dd3a9c32 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala @@ -88,7 +88,7 @@ abstract class FlinkWithKafkaSuite val modelData = LocalModelData(config, components, configCreator = creator) registrar = FlinkProcessRegistrar( - new FlinkProcessCompilerDataFactory(modelData, DeploymentData.empty), + new FlinkProcessCompilerDataFactory(modelData, DeploymentData.empty, List.empty), FlinkJobConfig.parse(modelData.modelConfig), executionConfigPreparerChain(modelData, schemaRegistryClientProvider) ) diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/engine/flink/minicluster/scenariotesting/FlinkMiniClusterScenarioTestRunnerSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/engine/flink/minicluster/scenariotesting/FlinkMiniClusterScenarioTestRunnerSpec.scala index b49258f36b4..cad06e3996c 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/engine/flink/minicluster/scenariotesting/FlinkMiniClusterScenarioTestRunnerSpec.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/engine/flink/minicluster/scenariotesting/FlinkMiniClusterScenarioTestRunnerSpec.scala @@ -41,6 +41,7 @@ import pl.touk.nussknacker.engine.graph.node.FragmentInputDefinition.{FragmentCl import pl.touk.nussknacker.engine.process.helpers.SampleNodes._ import pl.touk.nussknacker.engine.process.runner.SimpleProcessConfigCreator import pl.touk.nussknacker.engine.testing.LocalModelData +import pl.touk.nussknacker.engine.testmode.TestProcess import pl.touk.nussknacker.engine.testmode.TestProcess._ import pl.touk.nussknacker.test.VeryPatientScalaFutures @@ -145,7 +146,7 @@ class FlinkMiniClusterScenarioTestRunnerSpec ) .futureValue - val nodeResults = results.nodeResults + val nodeResults = nodeResultsWithoutTimestamp(results) nodeResults(sourceNodeId) shouldBe List(nodeResult(0, "input" -> input), nodeResult(1, "input" -> input2)) nodeResults("filter1") shouldBe List(nodeResult(0, "input" -> input), nodeResult(1, "input" -> input2)) @@ -208,7 +209,7 @@ class FlinkMiniClusterScenarioTestRunnerSpec ) .futureValue - val nodeResults = results.nodeResults + val nodeResults = nodeResultsWithoutTimestamp(results) nodeResults(sourceNodeId) shouldBe List(nodeResult(0, "input" -> input)) nodeResults("out") shouldBe List(nodeResult(0, "input" -> input)) @@ -241,7 +242,7 @@ class FlinkMiniClusterScenarioTestRunnerSpec ) .futureValue - results.nodeResults("splitId1") shouldBe List( + nodeResultsWithoutTimestamp(results)("splitId1") shouldBe List( nodeResult( 0, "input" -> @@ -276,7 +277,7 @@ class FlinkMiniClusterScenarioTestRunnerSpec ) .futureValue - val nodeResults = results.nodeResults + val nodeResults = nodeResultsWithoutTimestamp(results) nodeResults(sourceNodeId) shouldBe List(nodeResult(0, "input" -> input), nodeResult(1, "input" -> input2)) nodeResults("cid") shouldBe List(nodeResult(0, "input" -> input), nodeResult(1, "input" -> input2)) @@ -331,7 +332,7 @@ class FlinkMiniClusterScenarioTestRunnerSpec ) .futureValue - val nodeResults = results.nodeResults + val nodeResults = nodeResultsWithoutTimestamp(results) nodeResults(sourceNodeId) should have length 5 } @@ -359,7 +360,7 @@ class FlinkMiniClusterScenarioTestRunnerSpec ) .futureValue - val nodeResults = results.nodeResults + val nodeResults = nodeResultsWithoutTimestamp(results) nodeResults(sourceNodeId) should have length 4 nodeResults("out") should have length 2 @@ -418,7 +419,7 @@ class FlinkMiniClusterScenarioTestRunnerSpec ) ).futureValue - val nodeResults = results.nodeResults + val nodeResults = nodeResultsWithoutTimestamp(results) nodeResults(sourceNodeId) should have length 4 nodeResults("out") should have length 2 @@ -474,7 +475,7 @@ class FlinkMiniClusterScenarioTestRunnerSpec val results = prepareTestRunner(useIOMonadInInterpreter).runTests(process, testData).futureValue - results.nodeResults(sourceNodeId) should have size 3 + nodeResultsWithoutTimestamp(results)(sourceNodeId) should have size 3 results.externalInvocationResults("out") shouldBe List( ExternalInvocationResult( @@ -505,7 +506,7 @@ class FlinkMiniClusterScenarioTestRunnerSpec val results = prepareTestRunner(useIOMonadInInterpreter).runTests(process, testData).futureValue - results.nodeResults(sourceNodeId) should have size 1 + nodeResultsWithoutTimestamp(results)(sourceNodeId) should have size 1 results.externalInvocationResults("out") shouldBe List( ExternalInvocationResult( @@ -745,7 +746,7 @@ class FlinkMiniClusterScenarioTestRunnerSpec .runTests(process, scenarioTestData) .futureValue - val nodeResults = results.nodeResults + val nodeResults = nodeResultsWithoutTimestamp(results) nodeResults("source1") shouldBe List( nodeResult(0, "source1", "input" -> recordA), nodeResult(1, "source1", "input" -> recordD) @@ -923,19 +924,19 @@ class FlinkMiniClusterScenarioTestRunnerSpec ) } - private def nodeResult(count: Int, vars: (String, Any)*): ResultContext[_] = + private def nodeResult(count: Int, vars: (String, Any)*): (String, Map[String, Json]) = nodeResult(count, sourceNodeId, vars: _*) - private def nodeResult(count: Int, sourceId: String, vars: (String, Any)*): ResultContext[Json] = - ResultContext(s"$scenarioName-$sourceId-$firstSubtaskIndex-$count", Map(vars: _*).mapValuesNow(variable)) + private def nodeResult(count: Int, sourceId: String, vars: (String, Any)*): (String, Map[String, Json]) = + (s"$scenarioName-$sourceId-$firstSubtaskIndex-$count", Map(vars: _*).mapValuesNow(variable)) private def nodeResult( count: Int, sourceId: String, branchId: String, vars: (String, Any)* - ): ResultContext[Json] = - ResultContext( + ): (String, Map[String, Json]) = + ( s"$scenarioName-$sourceId-$firstSubtaskIndex-$count-$branchId", Map(vars: _*).mapValuesNow(variable) ) @@ -953,6 +954,9 @@ class FlinkMiniClusterScenarioTestRunnerSpec Json.obj("pretty" -> toJson(value)) } + private def nodeResultsWithoutTimestamp[T](results: TestProcess.TestResults[T]) = + results.nodeResults.map { case (key, values) => (key, values.map(v => (v.id, v.variables))) } + } object FlinkMiniClusterScenarioTestRunnerSpec { diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/engine/flink/minicluster/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/engine/flink/minicluster/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala index 96a0c9e4bf2..ce6df5a9f2a 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/engine/flink/minicluster/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/engine/flink/minicluster/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala @@ -36,6 +36,7 @@ import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.Confluen import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.MockSchemaRegistryClientFactory import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.testing.LocalModelData +import pl.touk.nussknacker.engine.testmode.TestProcess import pl.touk.nussknacker.engine.testmode.TestProcess._ import pl.touk.nussknacker.engine.util.json.ToJsonEncoder import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, KafkaConfigProperties, VeryPatientScalaFutures} @@ -199,14 +200,14 @@ class SchemedKafkaScenarioTestingSpec val scenarioTestData = ScenarioTestData("fragment1", parameterExpressions) val results = testRunner.runTests(fragment, scenarioTestData).futureValue - results.nodeResults("fragment1").loneElement shouldBe ResultContext( + nodeResults(results, "fragment1").loneElement shouldBe ( "fragment1-fragment1-0-0", Map( "in" -> Json.fromFields(Seq("pretty" -> Json.fromString("some-text-id"))) ) ) - results.nodeResults("fragmentEnd").loneElement shouldBe ResultContext( + nodeResults(results, "fragmentEnd").loneElement shouldBe ( "fragment1-fragment1-0-0", Map( "in" -> Json.fromFields(Seq("pretty" -> Json.fromString("some-text-id"))), @@ -229,6 +230,9 @@ class SchemedKafkaScenarioTestingSpec schemaRegistryMockClient.register(subject, parsedSchema) } + private def nodeResults[T](results: TestProcess.TestResults[T], key: String) = + results.nodeResults(key).map(r => (r.id, r.variables)) + } object SchemedKafkaScenarioTestingSpec { diff --git a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala index d96191a4e6c..acc79e95181 100644 --- a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala +++ b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala @@ -204,6 +204,8 @@ class EmbeddedDeploymentManager( override def schedulingSupport: SchedulingSupport = NoSchedulingSupport + override def liveDataPreviewSupport: LiveDataPreviewSupport = NoLiveDataPreviewSupport + override def processStateDefinitionManager: ProcessStateDefinitionManager = EmbeddedProcessStateDefinitionManager override def close(): Unit = { diff --git a/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManager.scala b/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManager.scala index b22356f1fa3..99c93980c0e 100644 --- a/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManager.scala +++ b/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManager.scala @@ -388,6 +388,8 @@ class K8sDeploymentManager( NoDeploymentsStatusesQueryForAllScenariosSupport override def schedulingSupport: SchedulingSupport = NoSchedulingSupport + + override def liveDataPreviewSupport: LiveDataPreviewSupport = NoLiveDataPreviewSupport } object K8sDeploymentManager { diff --git a/engine/lite/request-response/runtime/src/test/scala/pl/touk/nussknacker/engine/requestresponse/test/RequestResponseTestMainSpec.scala b/engine/lite/request-response/runtime/src/test/scala/pl/touk/nussknacker/engine/requestresponse/test/RequestResponseTestMainSpec.scala index 15238597f80..661c3003d0c 100644 --- a/engine/lite/request-response/runtime/src/test/scala/pl/touk/nussknacker/engine/requestresponse/test/RequestResponseTestMainSpec.scala +++ b/engine/lite/request-response/runtime/src/test/scala/pl/touk/nussknacker/engine/requestresponse/test/RequestResponseTestMainSpec.scala @@ -20,6 +20,7 @@ import pl.touk.nussknacker.engine.requestresponse.{ Response } import pl.touk.nussknacker.engine.testing.LocalModelData +import pl.touk.nussknacker.engine.testmode.TestProcess import pl.touk.nussknacker.engine.testmode.TestProcess._ import scala.concurrent.ExecutionContext.Implicits.global @@ -57,9 +58,9 @@ class RequestResponseTestMainSpec extends AnyFunSuite with Matchers with BeforeA val firstId = contextIds.nextContextId() val secondId = contextIds.nextContextId() - results.nodeResults("filter1").toSet shouldBe Set( - ResultContext(firstId, Map("input" -> variable(Request1("a", "b")))), - ResultContext(secondId, Map("input" -> variable(Request1("c", "d")))) + nodeResults(results, "filter1").toSet shouldBe Set( + (firstId, Map("input" -> variable(Request1("a", "b")))), + (secondId, Map("input" -> variable(Request1("c", "d")))) ) results.invocationResults("filter1").toSet shouldBe Set( @@ -105,10 +106,8 @@ class RequestResponseTestMainSpec extends AnyFunSuite with Matchers with BeforeA ) results.exceptions should have size 1 - results.exceptions.head.context shouldBe ResultContext( - firstId, - Map("input" -> variable(Request1("a", "b"))) - ) + results.exceptions.head.context.id shouldBe firstId + results.exceptions.head.context.variables shouldBe Map("input" -> variable(Request1("a", "b"))) results.exceptions.head.nodeId shouldBe Some("occasionallyThrowFilter") results.exceptions.head.throwable.getMessage shouldBe """Expression [#input.field1() == 'a' ? 1/{0, 1}[0] == 0 : true] evaluation failed, message: / by zero""" } @@ -132,8 +131,8 @@ class RequestResponseTestMainSpec extends AnyFunSuite with Matchers with BeforeA scenarioTestData = scenarioTestData, ) - results.nodeResults("endNodeIID").toSet shouldBe Set( - ResultContext(firstId, Map("input" -> variable(Request1("a", "b")))) + nodeResults(results, "endNodeIID").toSet shouldBe Set( + (firstId, Map("input" -> variable(Request1("a", "b")))) ) results.externalInvocationResults("endNodeIID").toSet shouldBe Set( @@ -178,7 +177,7 @@ class RequestResponseTestMainSpec extends AnyFunSuite with Matchers with BeforeA val unionContextIds = results.nodeResults("union1").map(_.id) unionContextIds should contain only (s"$sourceContextId-$branch1NodeId", s"$sourceContextId-$branch2NodeId") unionContextIds should contain theSameElementsAs unionContextIds.toSet - results.nodeResults("union1") shouldBe results.nodeResults("collect1") + nodeResults(results, "union1") shouldBe nodeResults(results, "collect1") val endNodeIdInvocationResult = results.externalInvocationResults("endNodeIID").loneElement endNodeIdInvocationResult.contextId shouldBe contextIdGenForNodeId(process, "collect1").nextContextId() @@ -219,4 +218,7 @@ class RequestResponseTestMainSpec extends AnyFunSuite with Matchers with BeforeA Json.obj("pretty" -> toJson(value)) } + private def nodeResults[T](results: TestProcess.TestResults[T], key: String) = + results.nodeResults(key).map(r => (r.id, r.variables)) + } diff --git a/engine/lite/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/InterpreterTestRunnerTest.scala b/engine/lite/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/InterpreterTestRunnerTest.scala index 2f5d2fccf5c..430edd958bd 100644 --- a/engine/lite/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/InterpreterTestRunnerTest.scala +++ b/engine/lite/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/InterpreterTestRunnerTest.scala @@ -3,7 +3,7 @@ package pl.touk.nussknacker.engine.lite import io.circe.Json import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers -import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion} +import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.api.test.{ScenarioTestData, ScenarioTestJsonRecord} import pl.touk.nussknacker.engine.build.{GraphBuilder, ScenarioBuilder} @@ -12,11 +12,8 @@ import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.graph.expression.Expression.Language import pl.touk.nussknacker.engine.lite.sample.SampleInputWithListAndMap import pl.touk.nussknacker.engine.spel.SpelExtension._ -import pl.touk.nussknacker.engine.testmode.TestProcess.{ - ExpressionInvocationResult, - ExternalInvocationResult, - ResultContext -} +import pl.touk.nussknacker.engine.testmode.TestProcess +import pl.touk.nussknacker.engine.testmode.TestProcess.{ExpressionInvocationResult, ExternalInvocationResult} import scala.jdk.CollectionConverters._ @@ -41,20 +38,20 @@ class InterpreterTestRunnerTest extends AnyFunSuite with Matchers { val results = sample.test(scenario, processVersionFor(scenario), scenarioTestData) - results.nodeResults("start") shouldBe List( - ResultContext("A", Map("input" -> variable(2))), - ResultContext("B", Map("input" -> variable(1))), - ResultContext("C", Map("input" -> variable(3))) + nodeResults(results, "start") shouldBe List( + ("A", Map("input" -> variable(2))), + ("B", Map("input" -> variable(1))), + ("C", Map("input" -> variable(3))) ) - results.nodeResults("sum") shouldBe List( - ResultContext("A", Map("input" -> 2, "out1" -> 2).mapValuesNow(variable)), - ResultContext("C", Map("input" -> 3, "out1" -> 3).mapValuesNow(variable)) + nodeResults(results, "sum") shouldBe List( + ("A", Map("input" -> 2, "out1" -> 2).mapValuesNow(variable)), + ("C", Map("input" -> 3, "out1" -> 3).mapValuesNow(variable)) ) - results.nodeResults("end") shouldBe List( - ResultContext("A", Map("input" -> 2, "out1" -> 2, "sum" -> 2).mapValuesNow(variable)), - ResultContext("C", Map("input" -> 3, "out1" -> 3, "sum" -> 5).mapValuesNow(variable)) + nodeResults(results, "end") shouldBe List( + ("A", Map("input" -> 2, "out1" -> 2, "sum" -> 2).mapValuesNow(variable)), + ("C", Map("input" -> 3, "out1" -> 3, "sum" -> 5).mapValuesNow(variable)) ) results.invocationResults("sum") shouldBe List( @@ -85,11 +82,11 @@ class InterpreterTestRunnerTest extends AnyFunSuite with Matchers { val results = sample.test(scenario, processVersionFor(scenario), scenarioTestData) - results.nodeResults("source1") shouldBe List( - ResultContext("A", Map("input" -> variable(1))), - ResultContext("B", Map("input" -> variable(2))) + nodeResults(results, "source1") shouldBe List( + ("A", Map("input" -> variable(1))), + ("B", Map("input" -> variable(2))) ) - results.nodeResults("source2") shouldBe List(ResultContext("C", Map("input" -> variable(3)))) + nodeResults(results, "source2") shouldBe List(("C", Map("input" -> variable(3)))) results.externalInvocationResults("end1") shouldBe List( ExternalInvocationResult("A", "end1", variable(1)), @@ -111,8 +108,8 @@ class InterpreterTestRunnerTest extends AnyFunSuite with Matchers { val scenarioTestData = ScenarioTestData("source1", parameterExpressions) val results = sample.test(scenario, processVersionFor(scenario), scenarioTestData) - results.nodeResults("source1") shouldBe List( - ResultContext( + nodeResults(results, "source1") shouldBe List( + ( "some-ctx-id", Map( "input" -> variable( @@ -146,8 +143,8 @@ class InterpreterTestRunnerTest extends AnyFunSuite with Matchers { val scenarioTestData = ScenarioTestData("source1", parameterExpressions) val results = sample.test(scenario, processVersionFor(scenario), scenarioTestData) - results.nodeResults("source1") shouldBe List( - ResultContext( + nodeResults(results, "source1") shouldBe List( + ( "some-ctx-id", Map( "input" -> variable( @@ -182,9 +179,9 @@ class InterpreterTestRunnerTest extends AnyFunSuite with Matchers { val scenarioTestData = ScenarioTestData("fragment1", parameterExpressions) val results = sample.test(fragment, processVersionFor(fragment), scenarioTestData) - results.nodeResults("fragment1") shouldBe List(ResultContext("fragment1", Map("in" -> variable("some-text-id")))) - results.nodeResults("fragmentEnd") shouldBe List( - ResultContext("fragment1", Map("in" -> variable("some-text-id"), "out" -> variable("some-text-id"))) + nodeResults(results, "fragment1") shouldBe List(("fragment1", Map("in" -> variable("some-text-id")))) + nodeResults(results, "fragmentEnd") shouldBe List( + ("fragment1", Map("in" -> variable("some-text-id"), "out" -> variable("some-text-id"))) ) results.invocationResults("fragmentEnd") shouldBe List( ExpressionInvocationResult("fragment1", "out", variable("some-text-id")) @@ -203,16 +200,16 @@ class InterpreterTestRunnerTest extends AnyFunSuite with Matchers { val scenarioTestData = ScenarioTestData("fragment1", parameterExpressions) val results = sample.test(fragment, processVersionFor(fragment), scenarioTestData) - results.nodeResults("fragment1") shouldBe List(ResultContext("fragment1", Map("in" -> variable(0)))) - results.nodeResults("fragmentEnd") shouldBe List(ResultContext("fragment1", Map("in" -> variable(0)))) - results.exceptions.map(e => (e.context, e.nodeId, e.throwable.getMessage)) shouldBe List( + nodeResults(results, "fragment1") shouldBe List(("fragment1", Map("in" -> variable(0)))) + nodeResults(results, "fragmentEnd") shouldBe List(("fragment1", Map("in" -> variable(0)))) + results.exceptions.map(e => ((e.context.id, e.context.variables), e.nodeId, e.throwable.getMessage)) shouldBe List( ( - ResultContext("fragment1", Map("in" -> variable(0))), + ("fragment1", Map("in" -> variable(0))), Some("fragmentEnd"), "Expression [4 / #in] evaluation failed, message: / by zero" ), ( - ResultContext("fragment1", Map("in" -> variable(0))), + ("fragment1", Map("in" -> variable(0))), Some("fragmentEnd"), "Expression [8 / #in] evaluation failed, message: / by zero" ) @@ -235,4 +232,7 @@ class InterpreterTestRunnerTest extends AnyFunSuite with Matchers { ProcessVersion.empty.copy(processName = scenario.metaData.name) } + private def nodeResults[T](results: TestProcess.TestResults[T], key: String) = + results.nodeResults(key).map(r => (r.id, r.variables)) + } diff --git a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/testmode/TestProcess.scala b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/testmode/TestProcess.scala index e76496291d5..1fbbdd1a990 100644 --- a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/testmode/TestProcess.scala +++ b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/testmode/TestProcess.scala @@ -3,6 +3,8 @@ package pl.touk.nussknacker.engine.testmode import pl.touk.nussknacker.engine.api.{Context, ContextId} import pl.touk.nussknacker.engine.api.exception.NuExceptionInfo +import java.time.Instant + object TestProcess { case class TestResults[T]( @@ -16,7 +18,7 @@ object TestProcess { def updateNodeResult(nodeId: String, context: Context, variableEncoder: Any => T): TestResults[T] = copy(nodeResults = nodeResults + (nodeId -> (nodeResults.getOrElse(nodeId, List()) :+ ResultContext - .fromContext(context, variableEncoder))) + .fromContext(context, Instant.now(), variableEncoder))) ) def updateNodeOutputResult( @@ -29,7 +31,7 @@ object TestProcess { nodeTransitionResults + (NodeTransition(nodeId, nextNodeIdOpt) -> (nodeTransitionResults .getOrElse(NodeTransition(nodeId, nextNodeIdOpt), List()) :+ ResultContext - .fromContext(context, variableEncoder))) + .fromContext(context, Instant.now(), variableEncoder))) ) } @@ -63,7 +65,9 @@ object TestProcess { exceptionInfo: NuExceptionInfo, variableEncoder: Any => T ): TestResults[T] = - copy(exceptions = exceptions :+ ExceptionResult.fromNuExceptionInfo(exceptionInfo, variableEncoder)) + copy(exceptions = + exceptions :+ ExceptionResult.fromNuExceptionInfo(exceptionInfo, Instant.now(), variableEncoder) + ) // when evaluating e.g. keyBy expression can be invoked more than once... // TODO: is it the best way to handle it?? @@ -76,6 +80,30 @@ object TestProcess { } + object TestResults { + + def empty[T]: TestResults[T] = TestResults[T](Map.empty, Map.empty, Map.empty, Map.empty, List.empty) + + def aggregate[T](testResults: Iterable[TestResults[T]]): TestResults[T] = { + TestResults[T]( + nodeResults = mergeMaps(testResults.map(_.nodeResults)), + nodeTransitionResults = mergeMaps(testResults.map(_.nodeTransitionResults)), + invocationResults = mergeMaps(testResults.map(_.invocationResults)), + externalInvocationResults = mergeMaps(testResults.map(_.externalInvocationResults)), + exceptions = testResults.flatMap(_.exceptions).toList, + ) + } + + private def mergeMaps[K, V](listOfMaps: Iterable[Map[K, List[V]]]): Map[K, List[V]] = { + listOfMaps.foldLeft(Map.empty[K, List[V]]) { case (acc, map) => + map.foldLeft(acc) { case (innerAcc, (key, value)) => + innerAcc.updated(key, innerAcc.getOrElse(key, Nil) ++ value) + } + } + } + + } + final case class NodeTransition(sourceNodeId: String, destinationNodeId: Option[String]) case class ExpressionInvocationResult[T](contextId: String, name: String, value: T) @@ -86,10 +114,11 @@ object TestProcess { def fromNuExceptionInfo[T]( exceptionInfo: NuExceptionInfo, + timestamp: Instant, variableEncoder: Any => T ): ExceptionResult[T] = ExceptionResult( - ResultContext.fromContext(exceptionInfo.context, variableEncoder), + ResultContext.fromContext(exceptionInfo.context, timestamp, variableEncoder), exceptionInfo.nodeComponentInfo.map(_.nodeId), exceptionInfo.throwable ) @@ -99,11 +128,11 @@ object TestProcess { case class ExceptionResult[T](context: ResultContext[T], nodeId: Option[String], throwable: Throwable) object ResultContext { - def fromContext[T](context: Context, variableEncoder: Any => T): ResultContext[T] = - ResultContext(context.id, context.variables.map { case (k, v) => k -> variableEncoder(v) }) + def fromContext[T](context: Context, timestamp: Instant, variableEncoder: Any => T): ResultContext[T] = + ResultContext(context.id, timestamp, context.variables.map { case (k, v) => k -> variableEncoder(v) }) } - case class ResultContext[T](id: String, variables: Map[String, T]) { + case class ResultContext[T](id: String, timestamp: Instant, variables: Map[String, T]) { def variableTyped[U <: T](name: String): Option[U] = variables.get(name).map(_.asInstanceOf[U]) } diff --git a/nussknacker-dist/src/universal/conf/dev-application.conf b/nussknacker-dist/src/universal/conf/dev-application.conf index d502b0f66f0..b37870a2826 100644 --- a/nussknacker-dist/src/universal/conf/dev-application.conf +++ b/nussknacker-dist/src/universal/conf/dev-application.conf @@ -131,6 +131,11 @@ scenarioTypes { } } allowEndingScenarioWithoutSink: true + liveDataPreview { + enabled: true + maxNumberOfSamples: 20 + throughputTimeWindowInSeconds: 60 + } } category: "Default" diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/testmode/TestInterpreterRunner.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/testmode/TestInterpreterRunner.scala index 45a922b788e..942974e81b9 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/testmode/TestInterpreterRunner.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/testmode/TestInterpreterRunner.scala @@ -11,7 +11,7 @@ object TestInterpreterRunner { * occur a situation where designer with its classLoader tries to encode value with e.g. is loaded by Flink's encoder * and in this case there is loaders conflict */ - private[testmode] def testResultsVariableEncoder: Any => io.circe.Json = { + private[engine] def testResultsVariableEncoder: Any => io.circe.Json = { lazy val encoder = ToJsonEncoder(failOnUnknown = false, Thread.currentThread().getContextClassLoader) def encode(a: Any): Json = a match { case scenarioGraph: DisplayJson => diff --git a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkProcessCompilerDataFactoryWithTestComponents.scala b/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkProcessCompilerDataFactoryWithTestComponents.scala index e824fa45ffb..0a7d74b397f 100644 --- a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkProcessCompilerDataFactoryWithTestComponents.scala +++ b/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkProcessCompilerDataFactoryWithTestComponents.scala @@ -59,7 +59,8 @@ object FlinkProcessCompilerDataFactoryWithTestComponents { modelConfig, runtimeMode, configsFromProviderWithDictionaryEditor, - nodesData + nodesData, + List.empty, ) { override protected def adjustDefinitions( diff --git a/utils/lite-components-testkit/src/main/scala/pl/touk/nussknacker/engine/lite/util/test/LiteTestScenarioRunner.scala b/utils/lite-components-testkit/src/main/scala/pl/touk/nussknacker/engine/lite/util/test/LiteTestScenarioRunner.scala index 6f76aeaa1dd..48bf44bc818 100644 --- a/utils/lite-components-testkit/src/main/scala/pl/touk/nussknacker/engine/lite/util/test/LiteTestScenarioRunner.scala +++ b/utils/lite-components-testkit/src/main/scala/pl/touk/nussknacker/engine/lite/util/test/LiteTestScenarioRunner.scala @@ -20,6 +20,7 @@ import pl.touk.nussknacker.engine.testmode.TestProcess.ExceptionResult import pl.touk.nussknacker.engine.util.test._ import pl.touk.nussknacker.engine.util.test.TestScenarioRunner.RunnerListResult +import java.time.Instant import scala.reflect.ClassTag object LiteTestScenarioRunner { @@ -87,7 +88,7 @@ class LiteTestScenarioRunner( runWithDataReturningDetails(scenario, data, NodesDeploymentData.empty) .map { case (errors, endResults) => RunListResult( - errors.map(ExceptionResult.fromNuExceptionInfo(_, identity)), + errors.map(ExceptionResult.fromNuExceptionInfo(_, Instant.now(), identity)), endResults.map(_.result.asInstanceOf[OUTPUT]) ) } diff --git a/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/NuRestAssureExtensions.scala b/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/NuRestAssureExtensions.scala index 5ac86b90909..49a6c49638d 100644 --- a/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/NuRestAssureExtensions.scala +++ b/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/NuRestAssureExtensions.scala @@ -4,7 +4,7 @@ import io.restassured.http.ContentType import io.restassured.response.ValidatableResponse import io.restassured.specification.RequestSpecification import org.hamcrest.core.IsEqual -import pl.touk.nussknacker.test.NuRestAssureMatchers.equalsJson +import pl.touk.nussknacker.test.NuRestAssureMatchers.{equalsJson, matchJsonWithRegexValues} import java.nio.charset.StandardCharsets import scala.jdk.CollectionConverters._ @@ -104,6 +104,17 @@ trait NuRestAssureExtensions { } + implicit class MatchJsonWithRegexValues[T <: ValidatableResponse](validatableResponse: T) { + + def matchJsonWithRegexValuesBody(json: String): ValidatableResponse = { + validatableResponse + .body( + matchJsonWithRegexValues(json) + ) + } + + } + implicit class EqualsPlainBody[T <: ValidatableResponse](validatableResponse: T) { def equalsPlainBody(body: String): ValidatableResponse = {