Skip to content

Commit 1b06aa0

Browse files
review fixes
1 parent 51fa815 commit 1b06aa0

File tree

15 files changed

+449
-220
lines changed

15 files changed

+449
-220
lines changed

designer/server/src/main/scala/pl/touk/nussknacker/ui/limits/LimitsService.scala

+19-3
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ class LimitsService(
1616
scenarioStatusProvider: ScenarioStatusProvider
1717
) {
1818

19-
// todo: check locking
20-
2119
def checkScenarioLimitsBeforeDeployment(
2220
scenarioProcessingType: ProcessingType
2321
)(implicit user: LoggedUser): IO[Either[LimitError, Unit]] = {
@@ -49,7 +47,25 @@ class LimitsService(
4947
}
5048
}
5149

52-
private def checkGlobalLimits(): EitherT[IO, ActiveScenariosLimitExceededError, Unit] = ???
50+
private def checkGlobalLimits()(implicit user: LoggedUser): EitherT[IO, ActiveScenariosLimitExceededError, Unit] = {
51+
globalLimitsConfig.activeScenariosLimit match {
52+
case Some(activeScenariosLimit) =>
53+
EitherT { // todo: user nu user?
54+
scenarioStatusProvider
55+
.getActiveScenariosCountFor(activeScenariosLimitProvider.all.keys)
56+
.map { activeScenariosCount =>
57+
Either.cond(
58+
test = activeScenariosCount < activeScenariosLimit.value,
59+
right = (),
60+
left = ActiveScenariosLimitExceededError(activeScenariosLimit.value)
61+
)
62+
}
63+
}
64+
case None =>
65+
EitherT.right(IO.unit)
66+
}
67+
}
68+
5369
}
5470

5571
object LimitsService {

designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/scenariostatus/ScenarioStatusProvider.scala

+18-13
Original file line numberDiff line numberDiff line change
@@ -79,20 +79,25 @@ class ScenarioStatusProvider(
7979
)
8080
}
8181

82-
def getActiveScenariosCountFor(processingType: ProcessingType)(implicit user: LoggedUser): IO[Int] = IO.fromFuture {
83-
IO {
84-
implicit val freshnessStatus: DataFreshnessPolicy = DataFreshnessPolicy.Fresh
85-
deploymentStatusesProvider
86-
.getBulkQueriedDeploymentStatusesForSupportedManagers(processingType :: Nil)
87-
.map { statuses =>
88-
statuses.getAllDeploymentStatuses
89-
.count { status =>
90-
status.status == SimpleStateStatus.DuringDeploy ||
91-
status.status == SimpleStateStatus.Running ||
92-
status.status == SimpleStateStatus.Restarting
93-
}
94-
}
82+
def getActiveScenariosCountFor(processingTypes: Iterable[ProcessingType])(implicit user: LoggedUser): IO[Int] =
83+
IO.fromFuture {
84+
IO {
85+
implicit val freshnessStatus: DataFreshnessPolicy = DataFreshnessPolicy.Fresh
86+
deploymentStatusesProvider
87+
.getBulkQueriedDeploymentStatusesForSupportedManagers(processingTypes)
88+
.map { statuses =>
89+
statuses.getAllDeploymentStatuses
90+
.count { status =>
91+
status.status == SimpleStateStatus.DuringDeploy ||
92+
status.status == SimpleStateStatus.Running ||
93+
status.status == SimpleStateStatus.Restarting
94+
}
95+
}
96+
}
9597
}
98+
99+
def getActiveScenariosCountFor(processingType: ProcessingType)(implicit user: LoggedUser): IO[Int] = {
100+
getActiveScenariosCountFor(processingType :: Nil)
96101
}
97102

98103
private def getNonFragmentScenarioStatus[ScenarioShape, F[_]: Traverse](

designer/server/src/test/scala/pl/touk/nussknacker/test/base/it/NuResourcesTest.scala

+8-8
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,8 @@ trait NuResourcesTest
157157
protected val deploymentService: DeploymentService =
158158
new DeploymentService(
159159
dispatcher = dmDispatcher,
160-
processValidator = processValidatorByProcessingType,
161-
scenarioResolver = scenarioResolverByProcessingType,
160+
processValidator = processValidatorByProcessingType(),
161+
scenarioResolver = scenarioResolverByProcessingType(),
162162
actionService = actionService,
163163
additionalComponentConfigs = mapProcessingTypeDataProvider(),
164164
limitsService = new LimitsService(
@@ -259,9 +259,9 @@ trait NuResourcesTest
259259
new DBProcessService(
260260
processStateProvider,
261261
scenarioStatusPresenter,
262-
newProcessPreparerByProcessingType,
262+
newProcessPreparerByProcessingType(),
263263
processingTypeDataProvider.mapCombined(_.parametersService),
264-
processResolverByProcessingType,
264+
processResolverByProcessingType(),
265265
dbioRunner,
266266
futureFetchingScenarioRepository,
267267
actionRepository,
@@ -278,12 +278,12 @@ trait NuResourcesTest
278278
): ScenarioTestService =
279279
new ScenarioTestService(
280280
testInfoProvider,
281-
processResolver,
281+
processResolver(),
282282
designerConfig.testDataSettings,
283283
new PreliminaryScenarioTestDataSerDe(designerConfig.testDataSettings),
284-
new ProcessCounter(TestFactory.prepareSampleFragmentRepository),
284+
new ProcessCounter(TestFactory.prepareSampleFragmentRepository()),
285285
new ScenarioTestExecutorServiceImpl(
286-
new ScenarioResolver(sampleResolver, Streaming.stringify),
286+
new ScenarioResolver(sampleResolver(), Streaming.stringify),
287287
deploymentManager
288288
)
289289
)
@@ -582,7 +582,7 @@ trait NuResourcesTest
582582
processName: ProcessName,
583583
isFragment: Boolean = false,
584584
): ProcessId = {
585-
val emptyProcess = newProcessPreparer.prepareEmptyProcess(processName, isFragment)
585+
val emptyProcess = newProcessPreparer().prepareEmptyProcess(processName, isFragment)
586586
saveAndGetId(emptyProcess, Category1, isFragment).futureValue
587587
}
588588

designer/server/src/test/scala/pl/touk/nussknacker/test/utils/domain/TestFactory.scala

+74-50
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import pl.touk.nussknacker.engine.management.FlinkStreamingPropertiesConfig
1616
import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap
1717
import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioParameters
1818
import pl.touk.nussknacker.security.Permission
19-
import pl.touk.nussknacker.test.config.WithSimplifiedDesignerConfig.{TestCategory, TestProcessingType}
19+
import pl.touk.nussknacker.test.config.WithSimplifiedDesignerConfig.TestCategory
2020
import pl.touk.nussknacker.test.config.WithSimplifiedDesignerConfig.TestProcessingType.Streaming
2121
import pl.touk.nussknacker.test.mock.{StubFragmentRepository, TestAdditionalUIConfigProvider}
2222
import pl.touk.nussknacker.ui.api.{RouteWithoutUser, RouteWithUser}
@@ -65,57 +65,73 @@ object TestFactory {
6565

6666
val possibleValues: List[FixedExpressionValue] = List(FixedExpressionValue("a", "a"))
6767

68-
val processValidator: UIProcessValidator = ProcessTestData.testProcessValidator(fragmentResolver = sampleResolver)
68+
def processValidator(processingTypes: List[String] = List(Streaming.stringify)): UIProcessValidator =
69+
ProcessTestData.testProcessValidator(fragmentResolver = sampleResolver(processingTypes))
6970

70-
val flinkProcessValidator: UIProcessValidator = ProcessTestData.testProcessValidator(
71-
fragmentResolver = sampleResolver,
72-
scenarioProperties = FlinkStreamingPropertiesConfig.properties
73-
)
71+
def flinkProcessValidator(processingTypes: List[String] = List(Streaming.stringify)): UIProcessValidator =
72+
ProcessTestData.testProcessValidator(
73+
fragmentResolver = sampleResolver(processingTypes),
74+
scenarioProperties = FlinkStreamingPropertiesConfig.properties
75+
)
7476

75-
val processValidatorByProcessingType: ProcessingTypeDataProvider[UIProcessValidator, _] =
76-
mapProcessingTypeDataProvider(Streaming.stringify -> flinkProcessValidator)
77+
def processValidatorByProcessingType(
78+
processingTypes: List[String] = List(Streaming.stringify)
79+
): ProcessingTypeDataProvider[UIProcessValidator, _] = {
80+
val validator = flinkProcessValidator(processingTypes)
81+
mapProcessingTypeDataProvider(processingTypes.map(pt => (pt, validator)): _*)
82+
}
7783

78-
val processResolver = new UIProcessResolver(
79-
processValidator,
84+
def processResolver(processingTypes: List[String] = List(Streaming.stringify)) = new UIProcessResolver(
85+
processValidator(processingTypes),
8086
ProcessDictSubstitutor(new SimpleDictRegistry(Map.empty))
8187
)
8288

83-
val processResolverByProcessingType: ProcessingTypeDataProvider[UIProcessResolver, _] =
84-
mapProcessingTypeDataProvider(Streaming.stringify -> processResolver)
85-
86-
val scenarioParametersService: ScenarioParametersService = {
87-
val combinations = Map(
88-
TestProcessingType.Streaming.stringify ->
89-
ScenarioParametersWithEngineSetupErrors(
90-
ScenarioParameters(
91-
ProcessingMode.UnboundedStream,
92-
TestCategory.Category1.stringify,
93-
EngineSetupName("Flink")
94-
),
95-
List.empty
96-
)
89+
def processResolverByProcessingType(
90+
processingTypes: List[String] = List(Streaming.stringify)
91+
): ProcessingTypeDataProvider[UIProcessResolver, _] = {
92+
val resolver = processResolver(processingTypes)
93+
mapProcessingTypeDataProvider(processingTypes.map(pt => (pt, resolver)): _*)
94+
}
95+
96+
def scenarioParametersService(
97+
processingTypes: List[String] = List(Streaming.stringify)
98+
): ScenarioParametersService = {
99+
val scenarioParameters = ScenarioParametersWithEngineSetupErrors(
100+
ScenarioParameters(
101+
ProcessingMode.UnboundedStream,
102+
TestCategory.Category1.stringify,
103+
EngineSetupName("Flink")
104+
),
105+
List.empty
97106
)
107+
val combinations = processingTypes.map(pt => (pt, scenarioParameters)).toMap
98108
ScenarioParametersService.createUnsafe(combinations)
99109
}
100110

101-
val scenarioParametersServiceProvider: ProcessingTypeDataProvider[_, ScenarioParametersService] =
102-
TestProcessingTypeDataProviderFactory.create(Map.empty, scenarioParametersService)
111+
def scenarioParametersServiceProvider(
112+
processingTypes: List[String] = List(Streaming.stringify)
113+
): ProcessingTypeDataProvider[_, ScenarioParametersService] =
114+
TestProcessingTypeDataProviderFactory.create(Map.empty, scenarioParametersService(processingTypes))
103115

104116
// It should be defined as method, because when it's defined as val then there is bug in IDEA at DefinitionPreparerSpec - it returns null
105-
def prepareSampleFragmentRepository: StubFragmentRepository = new StubFragmentRepository(
106-
Map(
107-
Streaming.stringify -> List(ProcessTestData.sampleFragment)
117+
def prepareSampleFragmentRepository(
118+
processingTypes: List[String] = List(Streaming.stringify)
119+
): StubFragmentRepository =
120+
new StubFragmentRepository(
121+
processingTypes.map(pt => (pt, List(ProcessTestData.sampleFragment))).toMap
108122
)
109-
)
110123

111-
def sampleResolver = new FragmentResolver(prepareSampleFragmentRepository)
124+
def sampleResolver(processingTypes: List[String] = List(Streaming.stringify)) =
125+
new FragmentResolver(prepareSampleFragmentRepository(processingTypes))
112126

113-
val scenarioResolver = new ScenarioResolver(sampleResolver, Streaming.stringify)
127+
def scenarioResolver(processingType: String = Streaming.stringify) =
128+
new ScenarioResolver(sampleResolver(processingType :: Nil), processingType)
114129

115-
def scenarioResolverByProcessingType: ProcessingTypeDataProvider[ScenarioResolver, _] =
116-
mapProcessingTypeDataProvider(
117-
Streaming.stringify -> scenarioResolver
118-
)
130+
def scenarioResolverByProcessingType(
131+
processingTypes: List[String] = List(Streaming.stringify)
132+
): ProcessingTypeDataProvider[ScenarioResolver, _] = {
133+
mapProcessingTypeDataProvider(processingTypes.map(pt => (pt, scenarioResolver(pt))): _*)
134+
}
119135

120136
def additionalComponentConfigsByProcessingType
121137
: ProcessingTypeDataProvider[Map[DesignerWideComponentId, ComponentAdditionalConfig], _] =
@@ -160,24 +176,32 @@ object TestFactory {
160176
new DBFetchingProcessRepository[DB](dbRef, newActionProcessRepository(dbRef), newScenarioLabelsRepository(dbRef))
161177
with DbioRepository
162178

163-
def newWriteProcessRepository(dbRef: DbRef, clock: Clock, modelVersions: Option[Int] = Some(1)) =
179+
def newWriteProcessRepository(
180+
dbRef: DbRef,
181+
clock: Clock,
182+
processingTypes: List[String] = List(Streaming.stringify),
183+
modelVersions: Option[Int] = Some(1)
184+
) =
164185
new DBProcessRepository(
165186
dbRef,
166187
clock,
167188
newScenarioActivityRepository(dbRef, clock),
168189
newScenarioLabelsRepository(dbRef),
169-
mapProcessingTypeDataProvider(modelVersions.map(Streaming.stringify -> _).toList: _*),
190+
mapProcessingTypeDataProvider(modelVersions.map(mv => processingTypes.map(pt => (pt, mv))).toList.flatten: _*),
170191
)
171192

172193
def newDummyWriteProcessRepository(): DBProcessRepository =
173194
newWriteProcessRepository(dummyDbRef, Clock.systemUTC())
174195

175-
def newScenarioGraphVersionService(dbRef: DbRef) = new ScenarioGraphVersionService(
176-
newScenarioGraphVersionRepository(dbRef),
177-
mapProcessingTypeDataProvider(Streaming.stringify -> processValidator),
178-
scenarioResolverByProcessingType,
179-
newDBIOActionRunner(dbRef)
180-
)
196+
def newScenarioGraphVersionService(dbRef: DbRef, processingTypes: List[String] = List(Streaming.stringify)) = {
197+
val validator = processValidator(processingTypes)
198+
new ScenarioGraphVersionService(
199+
newScenarioGraphVersionRepository(dbRef),
200+
mapProcessingTypeDataProvider(processingTypes.map(pt => (pt, validator)): _*),
201+
scenarioResolverByProcessingType(processingTypes),
202+
newDBIOActionRunner(dbRef)
203+
)
204+
}
181205

182206
def newScenarioGraphVersionRepository(dbRef: DbRef) = new ScenarioGraphVersionRepository(dbRef)
183207

@@ -197,17 +221,17 @@ object TestFactory {
197221
def asAdmin(route: RouteWithUser): Route =
198222
route.securedRouteWithErrorHandling(adminUser())
199223

200-
val newProcessPreparer: NewProcessPreparer =
224+
def newProcessPreparer(processingType: String = Streaming.stringify): NewProcessPreparer =
201225
new NewProcessPreparer(
202226
ProcessTestData.streamingTypeSpecificInitialData,
203227
FlinkStreamingPropertiesConfig.properties,
204-
new ScenarioPropertiesConfigFinalizer(TestAdditionalUIConfigProvider, Streaming.stringify)
228+
new ScenarioPropertiesConfigFinalizer(TestAdditionalUIConfigProvider, processingType)
205229
)
206230

207-
val newProcessPreparerByProcessingType: ProcessingTypeDataProvider[NewProcessPreparer, _] =
208-
mapProcessingTypeDataProvider(
209-
Streaming.stringify -> newProcessPreparer
210-
)
231+
def newProcessPreparerByProcessingType(
232+
processingTypes: List[String] = List(Streaming.stringify)
233+
): ProcessingTypeDataProvider[NewProcessPreparer, _] =
234+
mapProcessingTypeDataProvider(processingTypes.map(pt => (pt, newProcessPreparer(pt))): _*)
211235

212236
def withPermissions(route: RouteWithUser, permissions: Permission.Permission*): Route =
213237
route.securedRouteWithErrorHandling(user(permissions = permissions))

designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ProcessesExportImportResourcesSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class ProcessesExportImportResourcesSpec
4747
futureFetchingScenarioRepository,
4848
processService,
4949
scenarioActivityRepository,
50-
processResolverByProcessingType,
50+
processResolverByProcessingType(),
5151
dbioRunner,
5252
)
5353

designer/server/src/test/scala/pl/touk/nussknacker/ui/definition/component/DefaultComponentServiceSpec.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -919,9 +919,9 @@ class DefaultComponentServiceSpec
919919
new DBProcessService(
920920
scenarioStatusProvider = mock[ScenarioStatusProvider],
921921
scenarioStatusPresenter = mock[ScenarioStatusPresenter],
922-
newProcessPreparers = TestFactory.newProcessPreparerByProcessingType,
922+
newProcessPreparers = TestFactory.newProcessPreparerByProcessingType(),
923923
scenarioParametersServiceProvider = scenarioParametersServiceProvider,
924-
processResolverByProcessingType = TestFactory.processResolverByProcessingType,
924+
processResolverByProcessingType = TestFactory.processResolverByProcessingType(),
925925
dbioRunner = TestFactory.newDummyDBIOActionRunner(),
926926
fetchingProcessRepository = MockFetchingProcessRepository.withProcessesDetails(processes),
927927
scenarioActionRepository = TestFactory.newDummyActionRepository(),

designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -314,8 +314,8 @@ class NotificationServiceTest
314314
)
315315
val deploymentService = new DeploymentService(
316316
managerDispatcher,
317-
TestFactory.processValidatorByProcessingType,
318-
TestFactory.scenarioResolverByProcessingType,
317+
TestFactory.processValidatorByProcessingType(),
318+
TestFactory.scenarioResolverByProcessingType(),
319319
actionService,
320320
TestFactory.additionalComponentConfigsByProcessingType,
321321
new LimitsService(

designer/server/src/test/scala/pl/touk/nussknacker/ui/process/DBProcessServiceSpec.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -207,9 +207,9 @@ class DBProcessServiceSpec extends AnyFlatSpec with Matchers with PatientScalaFu
207207
new DBProcessService(
208208
scenarioStatusProvider = mock[ScenarioStatusProvider],
209209
scenarioStatusPresenter = mock[ScenarioStatusPresenter],
210-
newProcessPreparers = TestFactory.newProcessPreparerByProcessingType,
211-
scenarioParametersServiceProvider = TestFactory.scenarioParametersServiceProvider,
212-
processResolverByProcessingType = TestFactory.processResolverByProcessingType,
210+
newProcessPreparers = TestFactory.newProcessPreparerByProcessingType(),
211+
scenarioParametersServiceProvider = TestFactory.scenarioParametersServiceProvider(),
212+
processResolverByProcessingType = TestFactory.processResolverByProcessingType(),
213213
dbioRunner = TestFactory.newDummyDBIOActionRunner(),
214214
fetchingProcessRepository = MockFetchingProcessRepository.withProcessesDetails(processes),
215215
scenarioActionRepository = TestFactory.newDummyActionRepository(),

designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/TestDeploymentServiceFactory.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ class TestDeploymentServiceFactory(dbRef: DbRef) {
101101
deploymentManager,
102102
EngineSetupName("mock"),
103103
JobsRecoverySettings.noRecovery,
104-
TestFactory.scenarioResolver
104+
TestFactory.scenarioResolver()
105105
)
106106
)
107107
)
@@ -117,7 +117,7 @@ class TestDeploymentServiceFactory(dbRef: DbRef) {
117117
TestFactory.mapProcessingTypeDataProvider(
118118
Streaming.stringify -> ProcessTestData.testProcessValidator(validator = ProcessValidator.default(modelData))
119119
),
120-
TestFactory.scenarioResolverByProcessingType,
120+
TestFactory.scenarioResolverByProcessingType(),
121121
actionService,
122122
additionalComponentConfigsByProcessingType,
123123
new LimitsService(

designer/server/src/test/scala/pl/touk/nussknacker/ui/process/migrate/HttpRemoteEnvironmentSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ object HttpRemoteEnvironmentSpec {
136136
mapProcessingTypeDataProvider(
137137
"streaming" -> new ProcessModelMigrator(new TestMigrations(1, 2))
138138
),
139-
mapProcessingTypeDataProvider("streaming" -> flinkProcessValidator)
139+
mapProcessingTypeDataProvider("streaming" -> flinkProcessValidator("streaming" :: Nil))
140140
)
141141

142142
private val impersonationSupport = new ImpersonationSupported {

designer/server/src/test/scala/pl/touk/nussknacker/ui/process/migrate/StandardRemoteEnvironmentSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ object StandardRemoteEnvironmentSpec {
164164
mapProcessingTypeDataProvider(
165165
"streaming" -> new ProcessModelMigrator(new TestMigrations(1, 2))
166166
),
167-
mapProcessingTypeDataProvider("streaming" -> flinkProcessValidator)
167+
mapProcessingTypeDataProvider("streaming" -> flinkProcessValidator("streaming" :: Nil))
168168
)
169169

170170
protected def fetchRemoteMigrationScenarioDescriptionVersion: FutureE[Int] = ???

0 commit comments

Comments
 (0)