Skip to content

Commit ca90d04

Browse files
authored
[NU-2120] Data sample from Kafka Source in Ad-Hoc test input (#7937)
1 parent 9059793 commit ca90d04

File tree

8 files changed

+165
-13
lines changed

8 files changed

+165
-13
lines changed

designer/server/src/test/scala/pl/touk/nussknacker/ui/api/testing/EventGeneratorSourceTestingApiHttpServiceSpec.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,15 @@ trait EventGeneratorSourceTestingApiHttpServiceSpec extends TestingApiHttpServic
6464
|]
6565
|""".stripMargin
6666

67+
override protected def expectedTestParametersJson: String = {
68+
s"""
69+
|[
70+
| {
71+
| "sourceId": "$exampleScenarioSourceId",
72+
| "parameters": []
73+
| }
74+
|]
75+
|""".stripMargin
76+
}
77+
6778
}

designer/server/src/test/scala/pl/touk/nussknacker/ui/api/testing/GenericSourceWithCustomVariablesTestingApiHttpServiceSpec.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,48 @@ class GenericSourceWithCustomVariablesTestingApiHttpServiceSpec extends TestingA
8787
| }
8888
|]""".stripMargin
8989

90+
override protected def expectedTestParametersJson: String = {
91+
s"""
92+
|[
93+
| {
94+
| "sourceId": "$exampleScenarioSourceId",
95+
| "parameters": [
96+
| {
97+
| "name": "elements",
98+
| "typ": {
99+
| "display": "List[String]",
100+
| "type": "TypedClass",
101+
| "refClazzName": "java.util.List",
102+
| "params": [
103+
| {
104+
| "display": "String",
105+
| "type": "TypedClass",
106+
| "refClazzName": "java.lang.String",
107+
| "params": []
108+
| }
109+
| ]
110+
| },
111+
| "editors": [
112+
| {
113+
| "type": "SpelParameterEditor"
114+
| }
115+
| ],
116+
| "defaultValue": {
117+
| "language":"spel",
118+
| "expression":"{}"
119+
| },
120+
| "additionalVariables": {},
121+
| "variablesToHide": [],
122+
| "branchParam": false,
123+
| "hintText": null,
124+
| "label": "elements",
125+
| "requiredParam": true,
126+
| "category": "Standard"
127+
| }
128+
| ]
129+
| }
130+
|]
131+
|""".stripMargin
132+
}
133+
90134
}

designer/server/src/test/scala/pl/touk/nussknacker/ui/api/testing/JsonSchemalessAdHocTestsSpec.scala

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import com.dimafeng.testcontainers.{
77
MultipleContainers,
88
SchemaRegistryContainer
99
}
10-
import com.typesafe.config.Config
10+
import com.typesafe.config.{Config, ConfigValueFactory}
1111
import com.typesafe.config.ConfigValueFactory.fromMap
1212
import com.typesafe.scalalogging.StrictLogging
1313
import io.circe.syntax.EncoderOps
@@ -70,6 +70,12 @@ class JsonSchemalessAdHocTestsSpec
7070
}
7171
}
7272

73+
"The endpoint for adhoc test parameters should" - {
74+
"return test parameters" in {
75+
shouldProperlyGetTestParameters()
76+
}
77+
}
78+
7379
protected val kafkaContainer: KafkaContainer =
7480
KafkaContainer().configure { self =>
7581
self.setNetwork(network)
@@ -90,6 +96,10 @@ class JsonSchemalessAdHocTestsSpec
9096
"scenarioTypes.streaming.modelConfig.components.kafka.config.kafkaProperties",
9197
fromMap(Map("bootstrap.servers" -> "localhost:8070", "schema.registry.url" -> "http://localhost:8069").asJava)
9298
)
99+
.withValue(
100+
"scenarioTypes.streaming.modelConfig.components.kafka.config.useDataSampleParamForSchemalessJsonTopicBasedKafkaSource",
101+
ConfigValueFactory.fromAnyRef(true)
102+
)
93103

94104
lazy val defaultKafkaConfig: KafkaConfig = KafkaConfig(
95105
kafkaProperties = Some(Map("bootstrap.servers" -> kafkaContainer.bootstrapServers)),
@@ -110,17 +120,18 @@ object JsonSchemalessAdHocTestsSpec {
110120

111121
private[JsonSchemalessAdHocTestsSpec] trait WithSchemalessAdHocTestParameters extends WithAdHocTestParameters {
112122

113-
protected def exampleScenarioSourceId: String = "start"
123+
override protected def exampleScenarioSourceId: String = "start"
114124

115-
protected def exampleScenario: CanonicalProcess = {
125+
override protected def exampleScenario: CanonicalProcess = {
116126
ScenarioBuilder
117127
.streaming("without-schema")
118128
.parallelism(1)
119129
.source(
120130
exampleScenarioSourceId,
121131
"kafka",
122132
"Topic" -> s"'$sourceTopicName'".spel,
123-
"Content type" -> "'JSON'".spel
133+
"Content type" -> "'JSON'".spel,
134+
"Data sample" -> Expression.json("{\"name\": \"Tom\"}")
124135
)
125136
.emptySink(
126137
"end",
@@ -134,18 +145,18 @@ object JsonSchemalessAdHocTestsSpec {
134145
)
135146
}
136147

137-
protected def validParameters: TestSourceParameters =
148+
override protected def validParameters: TestSourceParameters =
138149
TestSourceParameters(exampleScenarioSourceId, Map(inputParameterName -> Expression.json(validJson)))
139150

140-
protected def invalidParameters: TestSourceParameters =
151+
override protected def invalidParameters: TestSourceParameters =
141152
TestSourceParameters(exampleScenarioSourceId, Map(inputParameterName -> Expression.json(invalidJson)))
142153

143-
protected def parametersProvidedForDryRun: String = AdhocTestParametersRequest(
154+
override protected def parametersProvidedForDryRun: String = AdhocTestParametersRequest(
144155
sourceParameters = validParameters,
145156
scenarioGraph = toScenarioGraph(exampleScenario)
146157
).asJson.toString()
147158

148-
protected def expectedValidationErrorsOnInvalidParametersJson: String =
159+
override protected def expectedValidationErrorsOnInvalidParametersJson: String =
149160
s"""
150161
|[
151162
| {
@@ -158,6 +169,43 @@ object JsonSchemalessAdHocTestsSpec {
158169
| }
159170
|]""".stripMargin
160171

172+
override protected def expectedTestParametersJson: String = {
173+
s"""
174+
|[
175+
| {
176+
| "sourceId": "$exampleScenarioSourceId",
177+
| "parameters": [
178+
| {
179+
| "name": "Value",
180+
| "typ": {
181+
| "display": "Unknown",
182+
| "type": "Unknown",
183+
| "refClazzName": "java.lang.Object",
184+
| "params": []
185+
| },
186+
| "editors": [
187+
| {
188+
| "type": "JsonParameterEditor"
189+
| }
190+
| ],
191+
| "defaultValue": {
192+
| "language": "json",
193+
| "expression": "{\\n \\"name\\" : \\"Tom\\"\\n}"
194+
| },
195+
| "additionalVariables": {},
196+
| "variablesToHide": [],
197+
| "branchParam": false,
198+
| "hintText": null,
199+
| "label": "Value",
200+
| "requiredParam": true,
201+
| "category": "Standard"
202+
| }
203+
| ]
204+
| }
205+
|]
206+
|""".stripMargin
207+
}
208+
161209
private val validJson = """|[
162210
| {
163211
| "products": [

designer/server/src/test/scala/pl/touk/nussknacker/ui/api/testing/TestingApiHttpServiceSpec.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,12 @@ trait TestingApiHttpServiceSpec
344344
}
345345
}
346346

347+
"The endpoint for adhoc test parameters should" - {
348+
"return test parameters" in {
349+
shouldProperlyGetTestParameters()
350+
}
351+
}
352+
347353
private def exampleScenarioGraphStr = Encoder[ScenarioGraph].apply(exampleScenarioGraph).toString()
348354

349355
private def canonicalGraphStr(canonical: CanonicalProcess) =

designer/server/src/test/scala/pl/touk/nussknacker/ui/api/testing/WithAdHocTestParameters.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ trait WithAdHocTestParameters {
1919

2020
protected def expectedValidationErrorsOnInvalidParametersJson: String
2121

22+
protected def expectedTestParametersJson: String
23+
2224
protected def exampleScenarioGraph: ScenarioGraph = CanonicalProcessConverter.toScenarioGraph(exampleScenario)
2325

2426
}

designer/server/src/test/scala/pl/touk/nussknacker/ui/api/testing/WithAdHocTestsLogic.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,22 @@ trait WithAdHocTestsLogic {
8080
)
8181
}
8282

83+
def shouldProperlyGetTestParameters(): Unit = {
84+
val request = exampleScenarioGraph.asJson.noSpaces
85+
86+
given()
87+
.applicationState {
88+
createSavedScenario(exampleScenario)
89+
}
90+
.when()
91+
.basicAuthAllPermUser()
92+
.jsonBody(request)
93+
.post(s"$nuDesignerHttpAddress/api/scenarioTesting/${exampleScenario.name}/parameters")
94+
.Then()
95+
.statusCode(200)
96+
.equalsJsonBody(
97+
expectedTestParametersJson
98+
)
99+
}
100+
83101
}

docs/Changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@
146146
Those endpoints are automatically added to the Nussknacker OpenAPI documentation.
147147
* `CustomHttpServiceProvider` providing Pekko route was renamed to `PekkoCustomHttpServiceProvider`
148148
* [#7922](https://github.com/TouK/nussknacker/pull/7922) Fixed the hiding of components configured with `disabled` flag in `componentsUiConfig` section.
149+
* [#7937](https://github.com/TouK/nussknacker/pull/7937) Data sample from Kafka Source as initial input for Ad-Hoc test
149150

150151
## 1.18
151152

utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/UniversalKafkaSourceFactory.scala

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ import pl.touk.nussknacker.engine.kafka.consumerrecord.SerializableConsumerRecor
2525
import pl.touk.nussknacker.engine.kafka.source._
2626
import pl.touk.nussknacker.engine.kafka.source.KafkaSourceFactory.{KafkaSourceImplFactory, KafkaTestParametersInfo}
2727
import pl.touk.nussknacker.engine.schemedkafka.{KafkaUniversalComponentTransformer, RuntimeSchemaData}
28-
import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer.schemaVersionParamName
28+
import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer.{
29+
schemaVersionParamName,
30+
sinkValueParamName
31+
}
2932
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry._
3033
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.formatter.SchemaBasedSerializableConsumerRecord
3134
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaSupport
@@ -223,6 +226,17 @@ class UniversalKafkaSourceFactory(
223226
schemaBasedMessagesSerdeProvider.deserializationSchemaFactory.create[Any, Any](kafkaConfig, None, None)
224227
val recordFormatter =
225228
schemaBasedMessagesSerdeProvider.recordFormatterFactory.create[Any, Any](kafkaConfig, formatterSchema)
229+
230+
val defaultValuesForTestParameters: Map[ParameterName, Expression] =
231+
if (params.isPresent(dataSampleParamName)) {
232+
params
233+
.extract[Json](dataSampleParamName)
234+
.map { dataSample => sinkValueParamName -> Expression.json(dataSample.spaces2) }
235+
.toMap
236+
} else {
237+
Map.empty
238+
}
239+
226240
implProvider.createSource(
227241
params,
228242
dependencies,
@@ -232,21 +246,22 @@ class UniversalKafkaSourceFactory(
232246
deserializationSchema,
233247
recordFormatter,
234248
kafkaContextInitializer,
235-
prepareKafkaTestParametersInfo(valueSchemaUsedInRuntime, preparedTopic.original),
249+
prepareKafkaTestParametersInfo(valueSchemaUsedInRuntime, preparedTopic.original, defaultValuesForTestParameters),
236250
modelDependencies.namingStrategy
237251
)
238252
}
239253

240254
private def prepareKafkaTestParametersInfo(
241255
runtimeSchemaOpt: Option[RuntimeSchemaData[ParsedSchema]],
242-
topic: TopicName.ForSource
256+
topic: TopicName.ForSource,
257+
defaultValuesForTestParameters: Map[ParameterName, Expression]
243258
)(
244259
implicit nodeId: NodeId
245260
): KafkaTestParametersInfo = {
246261
Validated
247262
.fromOption(
248263
runtimeSchemaOpt,
249-
NonEmptyList.one(CustomNodeError(nodeId.id, "Cannot generate test parameters: no runtime schema found", None))
264+
NonEmptyList.one(CustomNodeError("Cannot generate test parameters: no runtime schema found", None))
250265
)
251266
.andThen { runtimeSchema =>
252267
val parsedSchema = runtimeSchema.schema
@@ -256,7 +271,14 @@ class UniversalKafkaSourceFactory(
256271
.extractParameterForTests(parsedSchema)
257272
.map(_.toParameters)
258273
.map { params =>
259-
KafkaTestParametersInfo(params, prepareTestRecord(runtimeSchema, universalSchemaSupport, topic))
274+
val enrichedParams = params.map {
275+
case param if defaultValuesForTestParameters.contains(param.name) =>
276+
param.copy(
277+
defaultValue = Some(defaultValuesForTestParameters(param.name))
278+
)
279+
case other => other
280+
}
281+
KafkaTestParametersInfo(enrichedParams, prepareTestRecord(runtimeSchema, universalSchemaSupport, topic))
260282
}
261283
}
262284
.valueOr(e => throw new RuntimeException(e.toList.mkString("")))

0 commit comments

Comments
 (0)