Skip to content

[NU-2149] End scenario without sink #8004

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class InterpreterSetup[T: ClassTag] {
ComponentDefinitionExtractionMode.FinalDefinition
),
ModelDefinitionBuilder.emptyExpressionConfig,
ClassExtractionSettings.Default
ClassExtractionSettings.Default,
allowEndingScenarioWithoutSink = false,
)
val definitionsWithTypes = ModelDefinitionWithClasses(definitions)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package pl.touk.nussknacker.engine

import com.typesafe.config.Config
import net.ceedubs.ficus.Ficus.toFicusConfig
import net.ceedubs.ficus.readers.AnyValReaders._
import net.ceedubs.ficus.readers.OptionReader._

final case class ModelConfig(
allowEndingScenarioWithoutSink: Boolean,
// TODO: we should parse this underlying config as ModelConfig class fields instead of passing raw config
underlyingConfig: Config,
)

object ModelConfig {

def parse(modelConfig: Config): ModelConfig = {
ModelConfig(
allowEndingScenarioWithoutSink = modelConfig.getOrElse[Boolean]("allowEndingScenarioWithoutSink", false),
underlyingConfig = modelConfig,
)
}

}
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
package pl.touk.nussknacker.engine.api.process

import com.typesafe.config.Config
import pl.touk.nussknacker.engine.ModelConfig
import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy

// TODO: Rename to ModelDependencies + rename config to modelConfig
final case class ProcessObjectDependencies private (config: Config, namingStrategy: NamingStrategy) extends Serializable
final case class ProcessObjectDependencies private (modelConfig: ModelConfig, namingStrategy: NamingStrategy)
extends Serializable {
def config: Config = modelConfig.underlyingConfig
}

object ProcessObjectDependencies {

def withConfig(modelConfig: Config): ProcessObjectDependencies = {
ProcessObjectDependencies(modelConfig, NamingStrategy.fromConfig(modelConfig))
def apply(underlyingConfig: Config, namingStrategy: NamingStrategy): ProcessObjectDependencies = {
new ProcessObjectDependencies(ModelConfig.parse(underlyingConfig), namingStrategy)
}

def withConfig(config: Config): ProcessObjectDependencies = {
ProcessObjectDependencies(ModelConfig.parse(config), NamingStrategy.fromConfig(config))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ class AdditionalInfoProviders(typeToConfig: ProcessingTypeDataProvider[ModelData
ScalaServiceLoader
.load[AdditionalInfoProvider](pt.modelClassLoader)
.headOption
.map(_.nodeAdditionalInfo(pt.modelConfig))
.map(_.nodeAdditionalInfo(pt.modelConfig.underlyingConfig))
)

private val propertiesProviders: ProcessingTypeDataProvider[Option[MetaData => Future[Option[AdditionalInfo]]], _] =
typeToConfig.mapValues(pt =>
ScalaServiceLoader
.load[AdditionalInfoProvider](pt.modelClassLoader)
.headOption
.map(_.propertiesAdditionalInfo(pt.modelConfig))
.map(_.propertiesAdditionalInfo(pt.modelConfig.underlyingConfig))
)

def prepareAdditionalInfoForNode(nodeData: NodeData, processingType: ProcessingType)(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class DefinitionsService(
}

import net.ceedubs.ficus.Ficus._
val scenarioPropertiesDocsUrl = modelData.modelConfig.getAs[String]("scenarioPropertiesDocsUrl")
val scenarioPropertiesDocsUrl = modelData.modelConfig.underlyingConfig.getAs[String]("scenarioPropertiesDocsUrl")

prepareUIDefinitions(
withStaticDefinition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ object ProcessingTypeData {
componentDefinitionExtractionMode: ComponentDefinitionExtractionMode
) = {
// TODO: consider using ParameterName for property names instead of String (for scenario and fragment properties)
val scenarioProperties = deploymentData.deploymentScenarioPropertiesConfig ++ modelData.modelConfig
val scenarioProperties = deploymentData.deploymentScenarioPropertiesConfig ++ modelData.modelConfig.underlyingConfig
.getOrElse[Map[String, ScenarioPropertyConfig]](
"scenarioPropertiesConfig",
Map.empty
)
val fragmentProperties = modelData.modelConfig
val fragmentProperties = modelData.modelConfig.underlyingConfig
.getOrElse[Map[String, ScenarioPropertyConfig]]("fragmentPropertiesConfig", Map.empty)

val staticDefinitionForDynamicComponents =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ object ProcessTestData {

// TODO: merge with this below
val sampleScenario: CanonicalProcess = {
def endWithMessage(idSuffix: String, message: String): SubsequentNode = {
def endWithMessage(idSuffix: String, message: String): Option[SubsequentNode] = {
GraphBuilder
.buildVariable("message" + idSuffix, "output", "message" -> s"'$message'".spel)
.emptySink(
Expand Down Expand Up @@ -496,7 +496,7 @@ object ProcessTestData {
.to(endWithMessage)
}

private def endWithMessage: SubsequentNode = {
private def endWithMessage: Option[SubsequentNode] = {
val idSuffix = "suffix"
val endMessage = "#test #{#input} #test \n#{\"abc\".toString + {1,2,3}.toString + \"abc\"}\n#test\n#{\"ab{}c\"}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ConfigurationTest extends AnyFunSuite with WithTestDeploymentManagerClassL
)
}

private lazy val modelDataConfig = modelData.modelConfig
private lazy val modelDataConfig = modelData.modelConfig.underlyingConfig

private def classLoader = {
getClass.getClassLoader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ class DefinitionsServiceSpec extends AnyFunSuite with Matchers with PatientScala
val processingType = Streaming

val alignedComponentsDefinitionProvider = new AlignedComponentsDefinitionProvider(
new BuiltInComponentsDefinitionsPreparer(ComponentsUiConfigParser.parse(model.modelConfig)),
new BuiltInComponentsDefinitionsPreparer(ComponentsUiConfigParser.parse(model.modelConfig.underlyingConfig)),
new FragmentComponentDefinitionExtractor(
getClass.getClassLoader,
model.modelDefinitionWithClasses.classDefinitions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import pl.touk.nussknacker.engine.api.component._
import pl.touk.nussknacker.engine.api.component.Component.AllowedProcessingModes
import pl.touk.nussknacker.engine.api.component.ComponentType._
import pl.touk.nussknacker.engine.api.graph.ScenarioGraph
import pl.touk.nussknacker.engine.api.process.{ProcessObjectDependencies, ProcessingType}
import pl.touk.nussknacker.engine.api.process.{ProcessingType, ProcessObjectDependencies}
import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode
import pl.touk.nussknacker.engine.definition.component.defaultconfig.DefaultsComponentGroupName._
import pl.touk.nussknacker.engine.definition.component.defaultconfig.DefaultsComponentIcon
Expand All @@ -26,23 +26,33 @@ import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap
import pl.touk.nussknacker.restmodel.component.{ComponentLink, ComponentListElement, NodeUsageData}
import pl.touk.nussknacker.restmodel.component.NodeUsageData.{FragmentUsageData, ScenarioUsageData}
import pl.touk.nussknacker.security.Permission
import pl.touk.nussknacker.test.config.ConfigWithScalaVersion
import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, PatientScalaFutures, ValidatedValuesDetailedMessage}
import pl.touk.nussknacker.test.config.ConfigWithScalaVersion
import pl.touk.nussknacker.test.mock.{MockDeploymentManager, MockFetchingProcessRepository}
import pl.touk.nussknacker.test.utils.domain.{TestFactory, TestProcessingTypeDataProviderFactory}
import pl.touk.nussknacker.test.utils.domain.TestProcessUtil.createFragmentEntity
import pl.touk.nussknacker.ui.api.ScenarioStatusPresenter
import pl.touk.nussknacker.ui.config.{ComponentLinkConfig, DesignerConfig}
import pl.touk.nussknacker.ui.config.ComponentLinkConfig._
import pl.touk.nussknacker.ui.definition.AlignedComponentsDefinitionProvider
import pl.touk.nussknacker.ui.definition.component.ComponentListQueryOptions.{FetchAllWithUsages, FetchAllWithoutUsages, FetchNonFragmentsWithUsages, FetchNonFragmentsWithoutUsages}
import pl.touk.nussknacker.ui.definition.component.ComponentListQueryOptions.{
FetchAllWithoutUsages,
FetchAllWithUsages,
FetchNonFragmentsWithoutUsages,
FetchNonFragmentsWithUsages
}
import pl.touk.nussknacker.ui.definition.component.ComponentModelData._
import pl.touk.nussknacker.ui.definition.component.ComponentTestProcessData._
import pl.touk.nussknacker.ui.definition.component.DynamicComponentProvider._
import pl.touk.nussknacker.ui.process.DBProcessService
import pl.touk.nussknacker.ui.process.deployment.scenariostatus.ScenarioStatusProvider
import pl.touk.nussknacker.ui.process.fragment.DefaultFragmentRepository
import pl.touk.nussknacker.ui.process.processingtype.{DeploymentData, ProcessingTypeData, ScenarioParametersService, ValueWithRestriction}
import pl.touk.nussknacker.ui.process.processingtype.{
DeploymentData,
ProcessingTypeData,
ScenarioParametersService,
ValueWithRestriction
}
import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider
import pl.touk.nussknacker.ui.process.repository.ScenarioWithDetailsEntity
import pl.touk.nussknacker.ui.security.api._
Expand Down
3 changes: 3 additions & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@
* [#7964](https://github.com/TouK/nussknacker/pull/7964) Add JsonTemplate language and editor.
* [#8006](https://github.com/TouK/nussknacker/pull/8006) Add JsonTemplate editor to Event Generator source and Kafka sink value.
* [#7970](https://github.com/TouK/nussknacker/pull/7970) Added "limits.maxActiveScenariosCount" setting defined per processing type and "globalLimits.maxActiveScenariosCount" to limit active scenarios globally
* [#8004](https://github.com/TouK/nussknacker/pull/8004) Scenarios no longer have to end with final `Sink` node
* set `modelConfig.allowEndingScenarioWithoutSink` of the scenarioType in the `scenarioTypes` config section to `true` in order to allow ending scenarios with nodes other than sinks
* the flag is optional, the default value of the flag is `false` (no changes in behavior)

## 1.18

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package pl.touk.nussknacker.engine.flink.api

import _root_.java.util
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import io.circe.Encoder
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters
import pl.touk.nussknacker.engine.ModelConfig
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.modelinfo.ModelInfo
import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId}
Expand Down Expand Up @@ -50,10 +50,10 @@ object NkGlobalParameters extends LazyLogging {
modelInfo: ModelInfo,
deploymentId: String, // TODO: Pass here DeploymentId?
processVersion: ProcessVersion,
modelConfig: Config,
modelConfig: ModelConfig,
additionalInformation: Map[String, String]
): NkGlobalParameters = {
val configGlobalParameters = modelConfig.getAs[ConfigGlobalParameters]("globalParameters")
val configGlobalParameters = modelConfig.underlyingConfig.getAs[ConfigGlobalParameters]("globalParameters")
NkGlobalParameters(
modelInfo,
deploymentId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package pl.touk.nussknacker.engine.flink

import com.typesafe.config.ConfigFactory
import pl.touk.nussknacker.engine.api.component.ComponentDefinition
import pl.touk.nussknacker.engine.api.process.SourceFactory
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.flink.test.FlinkSpec
import pl.touk.nussknacker.engine.flink.test.ScalatestMiniClusterJobStatusCheckingOps.miniClusterWithServicesToOps
import pl.touk.nussknacker.engine.flink.util.source.EmitWatermarkAfterEachElementCollectionSource
import pl.touk.nussknacker.engine.flink.util.transformer.FlinkBaseComponentProvider
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.AggregateWindowsConfig
import pl.touk.nussknacker.engine.process.helpers.ConfigCreatorWithCollectingListener
import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob
import pl.touk.nussknacker.engine.testing.LocalModelData
import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListener, ResultsCollectingListenerHolder}
import pl.touk.nussknacker.engine.testmode.TestProcess.{NodeTransition, TestResults}
import pl.touk.nussknacker.engine.util.config.DocsConfig
import pl.touk.nussknacker.test.ProcessUtils.convertToAnyShouldWrapper

import java.time.{Duration, Instant}
import scala.jdk.CollectionConverters._
import scala.util.Try

trait FlinkMiniClusterTestRunner { _: FlinkSpec =>

protected def sourcesWithMockedData: Map[String, List[Int]]

protected def withCollectingTestResults(
canonicalProcess: CanonicalProcess,
assertions: TestResults[Any] => Unit,
allowEndingScenarioWithoutSink: Boolean = false,
): Unit = {
ResultsCollectingListenerHolder.withListener { collectingListener =>
val model = modelData(collectingListener, AggregateWindowsConfig.Default, allowEndingScenarioWithoutSink)
flinkMiniCluster.withDetachedStreamExecutionEnvironment { env =>
val executionResult = new FlinkScenarioUnitTestJob(model).run(canonicalProcess, env)
flinkMiniCluster.waitForJobIsFinished(executionResult.getJobID)
assertions(collectingListener.results)
}
}
}

private def modelData(
collectingListener: => ResultsCollectingListener[Any],
aggregateWindowsConfig: AggregateWindowsConfig,
allowEndingScenarioWithoutSink: Boolean,
): LocalModelData = {
def sourceComponent(data: List[Int]) = SourceFactory.noParamUnboundedStreamFactory[Int](
EmitWatermarkAfterEachElementCollectionSource
.create[Int](data, _ => Instant.now.toEpochMilli, Duration.ofHours(1))
)
val config =
if (allowEndingScenarioWithoutSink) {
ConfigFactory.parseString("""allowEndingScenarioWithoutSink: true""")
} else {
ConfigFactory.empty()
}
LocalModelData(
config,
sourcesWithMockedData.toList.map { case (name, data) =>
ComponentDefinition(name, sourceComponent(data))
} :::
FlinkBaseUnboundedComponentProvider.create(
DocsConfig.Default,
aggregateWindowsConfig
) ::: FlinkBaseComponentProvider.Components,
configCreator = new ConfigCreatorWithCollectingListener(collectingListener),
)
}

protected def transitionVariables(
testResults: TestResults[Any],
fromNodeId: String,
toNodeId: Option[String]
): Set[Map[String, Any]] =
testResults
.nodeTransitionResults(NodeTransition(fromNodeId, toNodeId))
.map(_.variables)
.toSet[Map[String, Any]]
.map(_.map { case (key, value) => (key, scalaMap(value)) })

private def scalaMap(value: Any): Any = {
value match {
case hashMap: java.util.HashMap[_, _] => hashMap.asScala.toMap
case other => other
}
}

protected def assertNumberOfSamplesThatFinishedInNode(testResults: TestResults[Any], sinkId: String, expected: Int) =
testResults.nodeTransitionResults.get(NodeTransition(sinkId, None)).map(_.length) shouldBe Some(expected)

protected def catchExceptionMessage(f: => Any): String = Try(f).failed.get.getMessage

}
Loading