Skip to content

Commit 6f20d2b

Browse files
Merge remote-tracking branch 'origin/staging' into preview/NU-2168
2 parents 66f8fc4 + 3c1f3b2 commit 6f20d2b

File tree

76 files changed

+743
-364
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+743
-364
lines changed

build.sbt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1797,9 +1797,10 @@ lazy val openapiComponents = (project in component("openapi"))
17971797
ExclusionRule(organization = "jakarta.activation"),
17981798
ExclusionRule(organization = "jakarta.validation")
17991799
),
1800-
"org.apache.flink" % "flink-streaming-java" % flinkV % Provided,
1801-
"org.scalatest" %% "scalatest" % scalaTestV % "it,test",
1802-
"org.wiremock" % "wiremock" % wireMockV % Test,
1800+
"org.apache.flink" % "flink-streaming-java" % flinkV % Provided,
1801+
"org.scalatest" %% "scalatest" % scalaTestV % "it,test",
1802+
"org.wiremock" % "wiremock" % wireMockV % Test,
1803+
"org.scalatestplus" %% "mockito-5-10" % scalaTestPlusV % Test
18031804
),
18041805
)
18051806
.dependsOn(

components/openapi/src/it/scala/pl/touk/nussknacker/openapi/functional/OpenAPIServiceSpec.scala

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ import pl.touk.nussknacker.engine.api.test.EmptyInvocationCollector.Instance
1313
import pl.touk.nussknacker.engine.api.typed.TypedMap
1414
import pl.touk.nussknacker.engine.util.ResourceLoader
1515
import pl.touk.nussknacker.engine.util.runtimecontext.TestEngineRuntimeContext
16-
import pl.touk.nussknacker.engine.util.service.EagerServiceWithStaticParametersAndReturnType
16+
import pl.touk.nussknacker.engine.util.service.AsyncExecutionTimeMeasurement
1717
import pl.touk.nussknacker.http.backend.FixedAsyncHttpClientBackendProvider
1818
import pl.touk.nussknacker.openapi.{ApiKeySecret, OpenAPIServicesConfig, SecuritySchemeName}
19-
import pl.touk.nussknacker.openapi.enrichers.{SwaggerEnricherCreator, SwaggerEnrichers}
19+
import pl.touk.nussknacker.openapi.enrichers.OpenAPIEnricher
2020
import pl.touk.nussknacker.openapi.parser.SwaggerParser
2121
import pl.touk.nussknacker.test.PatientScalaFutures
2222

@@ -36,7 +36,7 @@ class OpenAPIServiceSpec
3636
implicit val context: Context = Context("testContextId", Map.empty)
3737
val jobData = JobData(metaData, ProcessVersion.empty.copy(processName = metaData.name))
3838

39-
type FixtureParam = EagerServiceWithStaticParametersAndReturnType
39+
type FixtureParam = ServiceInvoker
4040

4141
def withFixture(test: OneArgTest): Outcome = {
4242
val definition = ResourceLoader.load("/customer-swagger.json")
@@ -54,12 +54,14 @@ class OpenAPIServiceSpec
5454
service
5555
}
5656

57-
val enricher = SwaggerEnrichers
58-
.prepare(config, services, new SwaggerEnricherCreator(new FixedAsyncHttpClientBackendProvider(client)))
59-
.head
60-
.service
61-
.asInstanceOf[EagerServiceWithStaticParametersAndReturnType]
62-
enricher.open(TestEngineRuntimeContext(jobData))
57+
val enricher = OpenAPIEnricher(
58+
service = services.head,
59+
config = config,
60+
clientProvider = new FixedAsyncHttpClientBackendProvider(client),
61+
params = Params.fromRawValuesMap(Map(ParameterName("customer_id") -> "10")),
62+
getTimeMeasurement =
63+
() => new AsyncExecutionTimeMeasurement(TestEngineRuntimeContext(jobData), "openAPI", Map.empty)
64+
)
6365

6466
withFixture(test.toNoArgTest(enricher))
6567
}
@@ -69,10 +71,9 @@ class OpenAPIServiceSpec
6971
}
7072

7173
test("service returns customers") { service =>
72-
implicit val contextId: ContextId = ContextId("1")
7374
val valueWithChosenFields =
7475
service
75-
.invoke(Map(ParameterName("customer_id") -> "10"))
76+
.invoke(context)
7677
.futureValue
7778
.asInstanceOf[TypedMap]
7879
.asScala

components/openapi/src/it/scala/pl/touk/nussknacker/openapi/functional/OpenApiScenarioIntegrationTest.scala

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package pl.touk.nussknacker.openapi.functional
22

3+
import cats.data.Validated
34
import cats.data.Validated.Valid
45
import com.typesafe.config.ConfigFactory
56
import com.typesafe.scalalogging.LazyLogging
@@ -13,9 +14,10 @@ import pl.touk.nussknacker.engine.build.ScenarioBuilder
1314
import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory
1415
import pl.touk.nussknacker.engine.graph.expression.Expression
1516
import pl.touk.nussknacker.engine.util.test.{ClassBasedTestScenarioRunner, RunResult, TestScenarioRunner}
16-
import pl.touk.nussknacker.openapi.{OpenAPIServicesConfig, SingleBodyParameter}
17-
import pl.touk.nussknacker.openapi.enrichers.SwaggerEnricher
18-
import pl.touk.nussknacker.openapi.parser.SwaggerParser
17+
import pl.touk.nussknacker.openapi.{OpenAPIServicesConfig, SingleBodyParameter, SwaggerService}
18+
import pl.touk.nussknacker.openapi.discovery.OpenApiDefinitionDiscovery
19+
import pl.touk.nussknacker.openapi.enrichers.OpenAPIEnricherFactory
20+
import pl.touk.nussknacker.openapi.parser.{ServiceParseError, SwaggerParser}
1921
import pl.touk.nussknacker.test.{ValidatedValuesDetailedMessage, VeryPatientScalaFutures}
2022
import sttp.client3.{Response, SttpBackend}
2123
import sttp.client3.testing.SttpBackendStub
@@ -71,14 +73,18 @@ class OpenApiScenarioIntegrationTest
7173
if headers.exists(_.name == HeaderNames.ContentType)
7274
&& !headers.contains(Header(HeaderNames.ContentType, MediaType.ApplicationJson.toString())) =>
7375
Response("Unsupported media type", StatusCode.UnsupportedMediaType)
74-
case _ => Response.ok((s"""{"name": "Robert Wright", "id": 10, "category": "GOLD"}"""))
76+
case _ if request.method.method == "GET" && request.uri.path.last == "10" =>
77+
Response.ok((s"""{"name": "Robert Wright", "id": 10, "category": "GOLD"}"""))
78+
case _ if request.method.method == "POST" =>
79+
Response.ok((s"""{"name": "Robert Wright", "id": 10, "category": "GOLD"}"""))
80+
case _ => Response(null, StatusCode.BadRequest)
7581
}
7682
}
7783

7884
it should "should enrich scenario with data" in withSwagger(stubbedBackend) { testScenarioRunner =>
7985
// given
8086
val data = List("10")
81-
val scenario = scenarioWithEnricher(("customer_id", "#input".spel))
87+
val scenario = scenarioWithEnricher(("Service", "'getCustomer'".spel), ("customer_id", "#input".spel))
8288

8389
// when
8490
val result = testScenarioRunner.runWithData(scenario, data)
@@ -92,8 +98,11 @@ class OpenApiScenarioIntegrationTest
9298
it should "call enricher with primitive request body" in withPrimitiveRequestBody(stubbedBackend) {
9399
testScenarioRunner =>
94100
// given
95-
val data = List("10")
96-
val scenario = scenarioWithEnricher((SingleBodyParameter.name, "#input".spel))
101+
val data = List("10")
102+
val scenario = scenarioWithEnricher(
103+
("Service", "'POST-customer'".spel),
104+
(SingleBodyParameter.name, "#input".spel)
105+
)
97106

98107
// when
99108
val result = testScenarioRunner.runWithData(scenario, data)
@@ -107,8 +116,10 @@ class OpenApiScenarioIntegrationTest
107116
it should "call enricher with request body" in withRequestBody(stubbedBackend) { testScenarioRunner =>
108117
// given
109118
val data = List("10")
110-
val scenario =
111-
scenarioWithEnricher((SingleBodyParameter.name, """{{additionalKey:"sss", primaryKey:"dfgdf"}}""".spel))
119+
val scenario = scenarioWithEnricher(
120+
("Service", "'POST-customer'".spel),
121+
(SingleBodyParameter.name, """{{additionalKey:"sss", primaryKey:"dfgdf"}}""".spel)
122+
)
112123

113124
// when
114125
val result = testScenarioRunner.runWithData(scenario, data)
@@ -125,8 +136,11 @@ class OpenApiScenarioIntegrationTest
125136
}
126137
) { testScenarioRunner =>
127138
// given
128-
val data = List("10")
129-
val scenario = scenarioWithEnricher((SingleBodyParameter.name, "#input".spel))
139+
val data = List("10")
140+
val scenario = scenarioWithEnricher(
141+
("Service", "'POST-customer'".spel),
142+
(SingleBodyParameter.name, "#input".spel)
143+
)
130144

131145
// when
132146
val result = testScenarioRunner.runWithData(scenario, data)
@@ -140,7 +154,7 @@ class OpenApiScenarioIntegrationTest
140154
.streaming("openapi-test")
141155
.parallelism(1)
142156
.source("start", TestScenarioRunner.testDataSource)
143-
.enricher("customer", "customer", "getCustomer", params: _*)
157+
.enricher("customer", "customer", "openAPI", params: _*)
144158
.emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#customer".spel)
145159
}
146160

@@ -169,9 +183,21 @@ class OpenApiScenarioIntegrationTest
169183
val services = SwaggerParser.parse(definition, openAPIsConfig).collect { case Valid(service) =>
170184
service
171185
}
172-
val stubbedGetCustomerOpenApiService =
173-
new SwaggerEnricher(url, services.head, Map.empty, (_: ExecutionContext) => sttpBackend, Nil)
174-
ComponentDefinition("getCustomer", stubbedGetCustomerOpenApiService)
186+
val stubbedGetCustomerOpenApiService = new OpenAPIEnricherFactory(
187+
openAPIsConfig,
188+
(_: ExecutionContext) => sttpBackend,
189+
new MockOpenApiDefinitionDiscovery(services)
190+
)
191+
ComponentDefinition("openAPI", stubbedGetCustomerOpenApiService)
192+
}
193+
194+
class MockOpenApiDefinitionDiscovery(services: List[SwaggerService]) extends OpenApiDefinitionDiscovery {
195+
196+
override def getServices(
197+
openAPIsConfig: OpenAPIServicesConfig
198+
): List[Validated[ServiceParseError, SwaggerService]] =
199+
services.map(s => Valid(s))
200+
175201
}
176202

177203
}

components/openapi/src/main/scala/pl/touk/nussknacker/openapi/OpenAPIComponentProvider.scala

Lines changed: 12 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,69 +1,29 @@
11
package pl.touk.nussknacker.openapi
22

3-
import cats.data.Validated
4-
import cats.data.Validated.{Invalid, Valid}
5-
import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions, ConfigValueFactory}
3+
import com.typesafe.config.Config
64
import com.typesafe.scalalogging.LazyLogging
7-
import io.circe.syntax.EncoderOps
8-
import net.ceedubs.ficus.Ficus._
95
import pl.touk.nussknacker.engine.ModelConfig
10-
import pl.touk.nussknacker.engine.api.CirceUtil
116
import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, ComponentProvider, NussknackerVersion}
127
import pl.touk.nussknacker.engine.util.config.ConfigEnrichments._
138
import pl.touk.nussknacker.openapi.OpenAPIServicesConfig._
14-
import pl.touk.nussknacker.openapi.discovery.SwaggerOpenApiDefinitionDiscovery
15-
import pl.touk.nussknacker.openapi.enrichers.{SwaggerEnricherCreator, SwaggerEnrichers}
16-
import pl.touk.nussknacker.openapi.parser.ServiceParseError
17-
18-
import scala.jdk.CollectionConverters._
19-
import scala.util.control.NonFatal
9+
import pl.touk.nussknacker.openapi.discovery.{CachingOpenApiDefinitionDiscovery, SwaggerOpenApiDefinitionDiscovery}
10+
import pl.touk.nussknacker.openapi.enrichers.OpenAPIEnricherFactory
2011

2112
class OpenAPIComponentProvider extends ComponentProvider with LazyLogging {
2213

2314
override def providerName: String = "openAPI"
2415

25-
override def resolveConfigForExecution(config: Config): Config = {
26-
// we need to load config to resolve url which can be potentially a system env variable
27-
val openAPIsConfig = ConfigFactory.load(config).rootAs[OpenAPIServicesConfig]
28-
val services =
29-
try {
30-
SwaggerOpenApiDefinitionDiscovery.discoverOpenAPIServices(openAPIsConfig)
31-
} catch {
32-
case NonFatal(ex) =>
33-
logger.error("OpenAPI service resolution failed. Will be used empty services lists", ex)
34-
List.empty
35-
}
36-
val servicesToUse = services.collect { case Valid(service) =>
37-
ConfigFactory.parseString(service.asJson.spaces2).root()
38-
}
39-
logErrors(services)
40-
config.withValue("services", ConfigValueFactory.fromIterable(servicesToUse.asJava))
41-
}
42-
43-
private def logErrors(services: List[Validated[ServiceParseError, SwaggerService]]): Unit = {
44-
val errors = services.collect { case Invalid(serviceError) =>
45-
s"${serviceError.name.value} (${serviceError.errors.toList.mkString(", ")})"
46-
}
47-
if (errors.nonEmpty) {
48-
logger.warn(s"Failed to parse following services: ${errors.mkString(", ")}")
49-
}
50-
}
16+
override def resolveConfigForExecution(config: Config): Config = config
5117

5218
override def create(componentProviderConfig: Config, modelConfig: ModelConfig): List[ComponentDefinition] = {
53-
val openAPIsConfig = componentProviderConfig.rootAs[OpenAPIServicesConfig]
54-
val serviceDefinitionConfig = componentProviderConfig.getList("services").render(ConfigRenderOptions.concise())
55-
val swaggerServices =
56-
CirceUtil.decodeJsonUnsafe[List[SwaggerService]](serviceDefinitionConfig, "Failed to parse service config")
57-
val creator = prepareBaseEnricherCreator(openAPIsConfig)
58-
59-
SwaggerEnrichers
60-
.prepare(openAPIsConfig, swaggerServices, creator)
61-
.map(service => ComponentDefinition(service.name.value, service.service, docsUrl = service.documentation))
62-
.toList
63-
}
64-
65-
protected def prepareBaseEnricherCreator(config: OpenAPIServicesConfig): SwaggerEnricherCreator = {
66-
SwaggerEnricherCreator(config.httpClientConfig)
19+
val openAPIsConfig = componentProviderConfig.rootAs[OpenAPIServicesConfig]
20+
val openApiDefinitionDiscovery =
21+
new CachingOpenApiDefinitionDiscovery(SwaggerOpenApiDefinitionDiscovery, openAPIsConfig)
22+
ComponentDefinition(
23+
"openAPI",
24+
OpenAPIEnricherFactory(openAPIsConfig, openApiDefinitionDiscovery),
25+
label = Some("OpenAPI")
26+
) :: Nil
6727
}
6828

6929
override def isCompatible(version: NussknackerVersion): Boolean = true

components/openapi/src/main/scala/pl/touk/nussknacker/openapi/OpenAPIServicesConfig.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import pl.touk.nussknacker.http.backend.{DefaultHttpClientConfig, HttpClientConf
77
import sttp.model.StatusCode
88

99
import java.net.URL
10+
import scala.concurrent.duration.{DurationInt, FiniteDuration}
1011
import scala.util.matching.Regex
1112

1213
final case class OpenAPIServicesConfig(
@@ -19,7 +20,8 @@ final case class OpenAPIServicesConfig(
1920
// For backward compatibility it is called security. We should probably rename it and bundle together with secret
2021
private val security: Map[SecuritySchemeName, Secret] = Map.empty,
2122
private val secret: Option[Secret] = None,
22-
httpClientConfig: HttpClientConfig = DefaultHttpClientConfig()
23+
httpClientConfig: HttpClientConfig = DefaultHttpClientConfig(),
24+
openApiServicesDiscoveryCacheTtl: FiniteDuration = 30 seconds
2325
) {
2426
def securityConfig: SecurityConfig =
2527
new SecurityConfig(secretBySchemeName = security, commonSecretForAnyScheme = secret)
Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,41 @@
11
package pl.touk.nussknacker.openapi.discovery
22

33
import cats.data.Validated
4+
import cats.data.Validated.{Invalid, Valid}
45
import com.typesafe.scalalogging.LazyLogging
56
import org.asynchttpclient.DefaultAsyncHttpClient
67
import pl.touk.nussknacker.engine.util.ResourceLoader
8+
import pl.touk.nussknacker.engine.util.cache.SingleValueCache
79
import pl.touk.nussknacker.http.backend.HttpClientConfig
810
import pl.touk.nussknacker.openapi.{OpenAPIServicesConfig, SwaggerService}
911
import pl.touk.nussknacker.openapi.parser.{ServiceParseError, SwaggerParser}
10-
import sttp.client3.{basicRequest, SttpBackend}
12+
import sttp.client3.{SttpBackend, basicRequest}
1113
import sttp.client3.asynchttpclient.future.AsyncHttpClientFutureBackend
1214
import sttp.model.Uri
1315

1416
import java.io.File
1517
import scala.concurrent.{Await, Future}
1618
import scala.concurrent.duration.DurationInt
1719

20+
trait OpenApiDefinitionDiscovery extends LazyLogging {
21+
def getServices(openAPIsConfig: OpenAPIServicesConfig): List[Validated[ServiceParseError, SwaggerService]]
22+
23+
def getValidServices(openAPIsConfig: OpenAPIServicesConfig): List[SwaggerService] = {
24+
val services = getServices(openAPIsConfig)
25+
logErrors(services)
26+
services.collect { case Valid(service) => service }
27+
}
28+
29+
private def logErrors(services: List[Validated[ServiceParseError, SwaggerService]]): Unit = {
30+
val errors = services.collect { case Invalid(serviceError) =>
31+
s"${serviceError.name.value} (${serviceError.errors.toList.mkString(", ")})"
32+
}
33+
if (errors.nonEmpty) {
34+
logger.warn(s"Failed to parse following services: ${errors.mkString(", ")}")
35+
}
36+
}
37+
}
38+
1839
object SwaggerOpenApiDefinitionDiscovery
1940
extends SwaggerOpenApiDefinitionDiscovery()(
2041
AsyncHttpClientFutureBackend.usingClient(
@@ -33,9 +54,10 @@ object SwaggerOpenApiDefinitionDiscovery
3354
)
3455
)
3556

36-
class SwaggerOpenApiDefinitionDiscovery(implicit val httpBackend: SttpBackend[Future, Any]) extends LazyLogging {
57+
class SwaggerOpenApiDefinitionDiscovery(implicit val httpBackend: SttpBackend[Future, Any]) extends LazyLogging
58+
with OpenApiDefinitionDiscovery {
3759

38-
def discoverOpenAPIServices(
60+
override def getServices(
3961
openAPIsConfig: OpenAPIServicesConfig
4062
): List[Validated[ServiceParseError, SwaggerService]] = {
4163
val discoveryUrl = openAPIsConfig.url
@@ -49,5 +71,20 @@ class SwaggerOpenApiDefinitionDiscovery(implicit val httpBackend: SttpBackend[Fu
4971
}
5072
SwaggerParser.parse(definition, openAPIsConfig)
5173
}
74+
}
5275

76+
class CachingOpenApiDefinitionDiscovery(
77+
discovery: OpenApiDefinitionDiscovery,
78+
openAPIsConfig: OpenAPIServicesConfig,
79+
) extends OpenApiDefinitionDiscovery {
80+
@transient private lazy val servicesCache = new SingleValueCache[List[Validated[ServiceParseError, SwaggerService]]](
81+
expireAfterAccess = None,
82+
expireAfterWrite = Some(openAPIsConfig.openApiServicesDiscoveryCacheTtl)
83+
)
84+
85+
override def getServices(
86+
openAPIsConfig: OpenAPIServicesConfig
87+
): List[Validated[ServiceParseError, SwaggerService]] = servicesCache.getOrCreate {
88+
discovery.getServices(openAPIsConfig)
89+
}
5390
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package pl.touk.nussknacker.openapi.enrichers
2+
3+
import java.net.{MalformedURLException, URL}
4+
5+
object InvocationBaseUrl {
6+
7+
def determineInvocationBaseUrl(
8+
definitionUrl: URL,
9+
rootUrl: Option[URL],
10+
serversFromDefinition: List[String]
11+
): URL = {
12+
def relativeToDefinitionUrl(serversUrlPart: String): URL = {
13+
try {
14+
new URL(definitionUrl, serversUrlPart)
15+
} catch {
16+
case _: MalformedURLException =>
17+
new URL(serversUrlPart)
18+
}
19+
}
20+
// Regarding https://spec.openapis.org/oas/v3.1.0#fixed-fields default server url is /
21+
rootUrl.getOrElse(relativeToDefinitionUrl(serversFromDefinition.headOption.getOrElse("/")))
22+
}
23+
24+
}

0 commit comments

Comments
 (0)