Skip to content

Live data preview and node transition throughput on Flink minicluster #8047

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 49 commits into from
May 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
b7f6fe9
live data draft
mgoworko May 7, 2025
549018e
Merge remote-tracking branch 'origin/staging' into live-data
mgoworko May 12, 2025
caf5ad6
config and API refactor
mgoworko May 13, 2025
75b40ae
Merge remote-tracking branch 'origin/staging' into live-data
mgoworko May 13, 2025
a8a7772
qs
mgoworko May 13, 2025
f0f3713
fix scala 2.12
mgoworko May 13, 2025
5fab9d1
qs
mgoworko May 13, 2025
29eb20a
qs
mgoworko May 13, 2025
922bc1b
qs
mgoworko May 14, 2025
875ed75
Merge remote-tracking branch 'origin/staging' into live-data
mgoworko May 14, 2025
b18a021
tests
mgoworko May 16, 2025
a353c19
tests and frequency
mgoworko May 16, 2025
c63a84f
tests and frequency
mgoworko May 16, 2025
8acef21
qs
mgoworko May 16, 2025
c5a5483
qs
mgoworko May 16, 2025
94c6242
qs
mgoworko May 16, 2025
8fe11c8
qs
mgoworko May 18, 2025
66f3af5
qs
mgoworko May 18, 2025
07ffb12
qs
mgoworko May 18, 2025
e2e24fd
Merge remote-tracking branch 'origin/staging' into live-data
mgoworko May 18, 2025
83cc4fe
qs
mgoworko May 18, 2025
140a62b
qs
mgoworko May 18, 2025
8ed4c8b
Merge remote-tracking branch 'origin/staging' into live-data
mgoworko May 21, 2025
d1d7004
qs
mgoworko May 21, 2025
fd4ce84
Merge remote-tracking branch 'origin/staging' into live-data
mgoworko May 22, 2025
8063a7f
qs
mgoworko May 22, 2025
3a8111e
qs
mgoworko May 23, 2025
f3bf584
qs
mgoworko May 23, 2025
53ef524
Merge remote-tracking branch 'origin/staging' into live-data
mgoworko May 23, 2025
35e7599
review changes
mgoworko May 23, 2025
9bde15c
review changes
mgoworko May 23, 2025
38bbde3
Merge remote-tracking branch 'origin/staging' into live-data
mgoworko May 23, 2025
68dfadf
review changes
mgoworko May 23, 2025
e0ccee6
review changes
mgoworko May 23, 2025
f1e1b92
qs
mgoworko May 23, 2025
47be346
qs
mgoworko May 23, 2025
936f68f
qs
mgoworko May 25, 2025
cbf2ef9
qs
mgoworko May 25, 2025
38f530c
qs
mgoworko May 25, 2025
9a4db61
improve regex
mgoworko May 26, 2025
66a1db4
Merge remote-tracking branch 'origin/staging' into live-data
mgoworko May 26, 2025
1d8061d
improve regex
mgoworko May 26, 2025
b4daff0
improve regex
mgoworko May 26, 2025
1359c22
separate live data API and logn sliding window
mgoworko May 26, 2025
bdebf1f
Merge remote-tracking branch 'origin/staging' into live-data
mgoworko May 26, 2025
238f7e7
qs
mgoworko May 26, 2025
61632ef
qs
mgoworko May 26, 2025
e6c7ee3
qs
mgoworko May 26, 2025
0cb1d87
qs
mgoworko May 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ lazy val flinkDeploymentManager = (project in flink("management"))
)
.dependsOn(
deploymentManagerApi % Provided,
liveDataCollector % Provided,
scenarioCompilerFlinkDeps,
flinkMiniCluster,
commonUtils % Provided,
Expand Down Expand Up @@ -2023,6 +2024,20 @@ lazy val deploymentManagerApi = (project in file("designer/deployment-manager-ap
)
.dependsOn(extensionsApi, testUtils % Test)

lazy val liveDataCollector = (project in file("designer/live-data-collector"))
.settings(commonSettings)
.settings(
name := "nussknacker-live-data-collector",
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % scalaTestV % Test,
),
)
.dependsOn(
deploymentManagerApi % Provided,
// For testResultsVariableEncoder purpose
scenarioCompiler % Provided,
)

lazy val prepareDesignerTests = taskKey[Unit]("Prepare all necessary artifacts before running designer module tests")
lazy val prepareDesignerSlowTests =
taskKey[Unit]("Prepare all necessary artifacts before running designer module slow tests")
Expand Down Expand Up @@ -2166,6 +2181,7 @@ lazy val designer = (project in file("designer/server"))
processReports,
security,
deploymentManagerApi,
liveDataCollector,
componentsApi,
restmodel,
listenerApi,
Expand Down Expand Up @@ -2317,6 +2333,7 @@ lazy val modules = List[ProjectReference](
customHttpServiceApi,
configLoaderApi,
deploymentManagerApi,
liveDataCollector,
designer,
sqlComponents,
schemedKafkaComponentsUtils,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import com.typesafe.config.Config
import net.ceedubs.ficus.Ficus.toFicusConfig
import net.ceedubs.ficus.readers.AnyValReaders._
import net.ceedubs.ficus.readers.OptionReader._
import pl.touk.nussknacker.engine.ModelConfig.LiveDataPreviewMode
import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy

final case class ModelConfig(
allowEndingScenarioWithoutSink: Boolean,
namingStrategy: NamingStrategy,
liveDataPreviewMode: LiveDataPreviewMode,
// TODO: we should parse this underlying config as ModelConfig class fields instead of passing raw config
underlyingConfig: Config,
) {
Expand All @@ -23,8 +25,33 @@ object ModelConfig {
ModelConfig(
allowEndingScenarioWithoutSink = rawModelConfig.getOrElse[Boolean]("allowEndingScenarioWithoutSink", false),
namingStrategy = NamingStrategy.fromConfig(rawModelConfig),
liveDataPreviewMode = parseLiveDataPreviewMode(rawModelConfig),
underlyingConfig = rawModelConfig,
)
}

sealed trait LiveDataPreviewMode

object LiveDataPreviewMode {

case object Disabled extends LiveDataPreviewMode

final case class Enabled(
maxNumberOfSamples: Int,
throughputTimeWindowInSeconds: Int,
) extends LiveDataPreviewMode

}

private def parseLiveDataPreviewMode(config: Config): LiveDataPreviewMode = {
if (config.getOrElse("liveDataPreview.enabled", false)) {
LiveDataPreviewMode.Enabled(
maxNumberOfSamples = config.getOrElse("liveDataPreview.maxNumberOfSamples", 10),
throughputTimeWindowInSeconds = config.getOrElse("liveDataPreview.throughputTimeWindowInSeconds", 60),
)
} else {
LiveDataPreviewMode.Disabled
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package pl.touk.nussknacker.engine.api.deployment

import cats.effect.{Resource, SyncIO}
import com.typesafe.config.Config
import io.circe.Json
import pl.touk.nussknacker.engine.api.definition.EngineScenarioCompilationDependencies
import pl.touk.nussknacker.engine.api.deployment.LiveDataPreviewSupported.{LiveData, LiveDataError}
import pl.touk.nussknacker.engine.api.deployment.scheduler.services._
import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName}
import pl.touk.nussknacker.engine.newdeployment
import pl.touk.nussknacker.engine.testmode.TestProcess.{NodeTransition, TestResults}

import java.time.Instant
import scala.concurrent.Future
Expand Down Expand Up @@ -33,6 +36,8 @@ trait DeploymentManager extends AutoCloseable {

def scenarioCompilationDependenciesResource: Resource[SyncIO, EngineScenarioCompilationDependencies]

def liveDataPreviewSupport: LiveDataPreviewSupport

protected final def notImplemented: Future[Nothing] =
Future.failed(new NotImplementedError())
}
Expand Down Expand Up @@ -89,3 +94,30 @@ trait SchedulingSupported extends SchedulingSupport {
}

case object NoSchedulingSupport extends SchedulingSupport

sealed trait LiveDataPreviewSupport

trait LiveDataPreviewSupported extends LiveDataPreviewSupport {

def getLiveData(
processIdWithName: ProcessIdWithName,
): Future[Either[LiveDataError, LiveData]]

}

object LiveDataPreviewSupported {

final case class LiveData(
liveDataSamples: TestResults[Json],
nodeTransitionThroughput: Map[NodeTransition, BigDecimal],
)

sealed trait LiveDataError

object LiveDataError {
case object NoLiveDataAvailableForScenario extends LiveDataError
}

}

case object NoLiveDataPreviewSupport extends LiveDataPreviewSupport
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class CachingProcessStateDeploymentManager(
override val deploymentSynchronisationSupport: DeploymentSynchronisationSupport,
override val deploymentsStatusesQueryForAllScenariosSupport: DeploymentsStatusesQueryForAllScenariosSupport,
override val schedulingSupport: SchedulingSupport,
override val liveDataPreviewSupport: LiveDataPreviewSupport,
) extends DeploymentManager {

private val cache: AsyncCache[ProcessName, List[DeploymentStatusDetails]] = Caffeine
Expand Down Expand Up @@ -74,6 +75,7 @@ object CachingProcessStateDeploymentManager extends LazyLogging {
delegate.deploymentSynchronisationSupport,
delegate.deploymentsStatusesQueryForAllScenariosSupport,
delegate.schedulingSupport,
delegate.liveDataPreviewSupport,
)
}
.getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class DeploymentManagerStub(implicit ec: ExecutionContext) extends BaseDeploymen

override def schedulingSupport: SchedulingSupport = NoSchedulingSupport

override def liveDataPreviewSupport: LiveDataPreviewSupport = NoLiveDataPreviewSupport

override def close(): Unit = {}

override def scenarioCompilationDependenciesResource: Resource[SyncIO, EngineScenarioCompilationDependencies] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class CachingProcessStateDeploymentManagerSpec
NoDeploymentSynchronisationSupport,
NoDeploymentsStatusesQueryForAllScenariosSupport,
NoSchedulingSupport,
NoLiveDataPreviewSupport,
)

val results = List(
Expand All @@ -52,6 +53,7 @@ class CachingProcessStateDeploymentManagerSpec
NoDeploymentSynchronisationSupport,
NoDeploymentsStatusesQueryForAllScenariosSupport,
NoSchedulingSupport,
NoLiveDataPreviewSupport,
)

val firstInvocation = cachingManager.getProcessStatesDeploymentIdNow(DataFreshnessPolicy.CanBeCached)
Expand All @@ -71,6 +73,7 @@ class CachingProcessStateDeploymentManagerSpec
NoDeploymentSynchronisationSupport,
NoDeploymentsStatusesQueryForAllScenariosSupport,
NoSchedulingSupport,
NoLiveDataPreviewSupport,
)

val resultForFresh = cachingManager.getProcessStatesDeploymentIdNow(DataFreshnessPolicy.Fresh)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package pl.touk.nussknacker.engine.livedata

import io.circe.Json
import pl.touk.nussknacker.engine.api._
import pl.touk.nussknacker.engine.api.exception.NuExceptionInfo
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.testmode.TestInterpreterRunner
import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults

import scala.util.Try

// This class must be serializable. It means, that when deserializing, we lose the reference to it.
// The actual data is stored in the LiveDataCollectingListenerHolder, and all instances of LiveDataCollectingListener can access the data.
class LiveDataCollectingListener private[livedata] (
processName: ProcessName,
maxNumberOfSamples: Int,
throughputTimeWindowInSeconds: Int,
) extends ProcessListener
with Serializable {

private val variableEncoder: Any => io.circe.Json = TestInterpreterRunner.testResultsVariableEncoder

private def storage = LiveDataCollectingListenerHolder.storage(
processName = processName,
maxNumberOfSamples = maxNumberOfSamples,
throughputTimeWindowInSeconds = throughputTimeWindowInSeconds,
)

override def nodeEntered(nodeId: String, context: Context, processMetaData: MetaData): Unit = {
updateResults(context, _.updateNodeResult(nodeId, context, variableEncoder))
}

override def transitionToNextNode(
nodeId: String,
nextNodeId: String,
context: Context,
processMetaData: MetaData,
): Unit = {
storage.registerTransitionBetweenNodes(nodeId, Some(nextNodeId))
updateResults(context, _.updateNodeOutputResult(nodeId, Some(nextNodeId), context, variableEncoder))
}

override def processingFinishedInNode(
nodeId: String,
context: Context,
processMetaData: MetaData,
): Unit = {
storage.registerTransitionBetweenNodes(nodeId, None)
updateResults(context, _.updateNodeOutputResult(nodeId, None, context, variableEncoder))
}

override def endEncountered(
nodeId: String,
ref: String,
context: Context,
processMetaData: MetaData
): Unit = ()

override def deadEndEncountered(
lastNodeId: String,
context: Context,
processMetaData: MetaData
): Unit = {}

override def expressionEvaluated(
nodeId: String,
expressionId: String,
expression: String,
context: Context,
processMetaData: MetaData,
result: Any
): Unit = {
updateResults(context, _.updateExpressionResult(nodeId, context, expressionId, result, variableEncoder))
}

override def serviceInvoked(
nodeId: String,
id: String,
context: Context,
processMetaData: MetaData,
result: Try[Any]
): Unit = {}

override def exceptionThrown(exceptionInfo: NuExceptionInfo): Unit = {
updateResults(exceptionInfo.context, _.updateExceptionResult(exceptionInfo, variableEncoder))
}

override final def close(): Unit = ()

private def updateResults(context: Context, action: TestResults[Json] => TestResults[Json]): Unit = {
// todo: The ContextId will be refactored to be represented by case class with fields,
// but for now we have to use regex in order to parse the initial context id as generated by the source.
val pattern = raw"^(.+?-.+?-\d+-\d+)(-.+)?".r
val initialContextIdOpt = context.id match {
case pattern(initialContextId, _*) => Some(initialContextId)
case _ => None
}
initialContextIdOpt match {
case Some(initialContextId) => storage.updateResults(initialContextId, action)
case None => ()
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package pl.touk.nussknacker.engine.livedata

import com.github.benmanes.caffeine.cache.Caffeine
import pl.touk.nussknacker.engine.api.deployment.LiveDataPreviewSupported.{LiveData, LiveDataError}
import pl.touk.nussknacker.engine.api.process.ProcessName

import java.time.Clock
import scala.compat.java8.FunctionConverters._

object LiveDataCollectingListenerHolder {

private implicit val clock: Clock = Clock.systemUTC()

private val listenerStorages =
Caffeine
.newBuilder()
.expireAfterAccess(java.time.Duration.ofHours(1))
.build[String, LiveDataCollectingListenerStorage]()

def createListenerFor(
processName: ProcessName,
maxNumberOfSamples: Int,
throughputTimeWindowInSeconds: Int,
): LiveDataCollectingListener = {
cleanResults(processName)
new LiveDataCollectingListener(processName, maxNumberOfSamples, throughputTimeWindowInSeconds)
}

def getLiveDataPreview(processName: ProcessName): Either[LiveDataError, LiveData] = {
Option(listenerStorages.getIfPresent(processName.value)).map(_.getLiveData) match {
case Some(liveData) => Right(liveData)
case None => Left(LiveDataError.NoLiveDataAvailableForScenario)
}
}

private[livedata] def storage(
processName: ProcessName,
maxNumberOfSamples: Int,
throughputTimeWindowInSeconds: Int,
): LiveDataCollectingListenerStorage = {
listenerStorages.get(
processName.value,
asJavaFunction((_: String) =>
new LiveDataCollectingListenerStorage(maxNumberOfSamples, throughputTimeWindowInSeconds)
)
)
}

// We want to store and present the live data from the most recent deployment:
// - the data from the old run is stored until the listener storage cache expires (currently hardcoded 1 hour)
// - or new deployment is done - then we discard all old data
private def cleanResults(processName: ProcessName): Unit = {
listenerStorages.invalidate(processName.value)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package pl.touk.nussknacker.engine.livedata

import io.circe.Json
import pl.touk.nussknacker.engine.api.deployment.LiveDataPreviewSupported.LiveData
import pl.touk.nussknacker.engine.testmode.TestProcess._

import java.time.{Clock, Instant}

private[livedata] class LiveDataCollectingListenerStorage(
maxNumberOfSamples: Int,
throughputTimeWindowInSeconds: Int,
)(implicit clock: Clock) {

private val results = new RingBuffer[String, TestResults[Json]](maxNumberOfSamples)
private val transitionsCounter = new SlidingWindowCounter[NodeTransition](Instant.now, throughputTimeWindowInSeconds)

def getLiveData: LiveData = {
LiveData(
liveDataSamples = TestResults.aggregate(results.values),
nodeTransitionThroughput = transitionsCounter.getThroughput,
)
}

def updateResults(contextId: String, action: TestResults[Json] => TestResults[Json]): Unit = {
results.update(
contextId,
{
case Some(resultsSoFar) => action(resultsSoFar)
case None => action(TestResults.empty[Json])
}
)
}

def registerTransitionBetweenNodes(from: String, to: Option[String]): Unit = {
transitionsCounter.add(NodeTransition(from, to))
}

}
Loading
Loading