Skip to content

Preview/nu 2009 #7978

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .run/RunEnvForLocalDesigner.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
<option name="Make" enabled="true" />
</method>
</configuration>
</component>
</component>
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package pl.touk.nussknacker.engine.api.deployment

import cats.effect.{IO, Resource, SyncIO}
import cats.effect.{Resource, SyncIO}
import com.typesafe.config.Config
import pl.touk.nussknacker.engine.api.definition.EngineScenarioCompilationDependencies
import pl.touk.nussknacker.engine.api.deployment.scheduler.services._
Expand Down Expand Up @@ -35,7 +35,6 @@ trait DeploymentManager extends AutoCloseable {

protected final def notImplemented: Future[Nothing] =
Future.failed(new NotImplementedError())

}

trait ManagerSpecificScenarioActivitiesStoredByManager { self: DeploymentManager =>
Expand Down Expand Up @@ -90,3 +89,8 @@ trait SchedulingSupported extends SchedulingSupport {
}

case object NoSchedulingSupport extends SchedulingSupport

class ActiveScenariosLimitExceeded(activeScenariosCount: Int)
extends IllegalArgumentException(
s"Cannot deploy scenario. Active scenarios count is $activeScenariosCount"
)
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ class DeploymentApiHttpService(
ScenarioGraphValidationError(errors)
case DeploymentService.DeployValidationError(message) =>
DeployValidationError(message)
case DeploymentService.ActiveScenariosLimitExceededError(limit) =>
DeployValidationError(
s"The limit of active scenarios has been reached. You can have a maximum of $limit active scenarios."
)
}
case ActivityService.CommentValidationError(message) => CommentValidationError(message)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import pl.touk.nussknacker.engine.api.component.{AdditionalUIConfigProvider, Des
import pl.touk.nussknacker.engine.api.process.ProcessingType
import pl.touk.nussknacker.engine.classloader.{DeploymentManagersClassLoader, DeploymentManagersClassLoaderFactory}
import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode
import pl.touk.nussknacker.engine.util.ExecutionContextWithIORuntime
import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap
import pl.touk.nussknacker.processCounts.CountsReporter
import pl.touk.nussknacker.ui.api.{AuthorizeProcess, ScenarioStatusPresenter}
Expand Down Expand Up @@ -172,7 +171,7 @@ object DomainServices extends LazyLogging {
new EngineSideDeploymentStatusesProvider(
dmDispatcher,
alreadyLoadedConfig.scenarioStateTimeout
)(actorSystem)
)
processRepository = DBFetchingProcessRepository.create(dbRef, actionRepository, scenarioLabelsRepository)
scenarioStatusProvider = new ScenarioStatusProvider(
deploymentsStatusesProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@ package pl.touk.nussknacker.ui.process.deployment

import cats.data.Validated
import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.engine.api.component.{
ComponentAdditionalConfig,
DesignerWideComponentId,
NodesDeploymentData
}
import pl.touk.nussknacker.engine.ProcessingTypeConfig.ActiveScenariosLimit
import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId}
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.process._
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment._
import pl.touk.nussknacker.engine.util.AdditionalComponentConfigsForRuntimeExtractor
import pl.touk.nussknacker.ui.process.deployment.DeploymentService.ActiveScenariosLimitExceededError
import pl.touk.nussknacker.ui.process.deployment.LoggedUserConversions.LoggedUserOps
import pl.touk.nussknacker.ui.process.deployment.scenariostatus.ScenarioStatusProvider
import pl.touk.nussknacker.ui.process.exception.DeployingInvalidScenarioError
import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider
import pl.touk.nussknacker.ui.process.repository._
Expand All @@ -33,6 +32,8 @@ class DeploymentService(
Map[DesignerWideComponentId, ComponentAdditionalConfig],
_
],
scenarioStatusProvider: ScenarioStatusProvider,
activeScenariosLimitProvider: ProcessingTypeDataProvider[Option[ActiveScenariosLimit], _],
)(implicit ec: ExecutionContext)
extends LazyLogging {

Expand Down Expand Up @@ -83,7 +84,6 @@ class DeploymentService(

private def runDeployment(command: RunDeploymentCommand): Future[Future[Option[ExternalDeploymentId]]] = {
import command.commonData._

actionService
.actionProcessorForLatestVersion[CanonicalProcess]
.processActionWithCustomFinalization[RunDeploymentCommand, Future[Option[ExternalDeploymentId]]](
Expand Down Expand Up @@ -136,7 +136,9 @@ class DeploymentService(
Future.failed(DeployingInvalidScenarioError(validationResult.errors))
case _ => Future.successful(())
}
// 2. deployment managers specific checks
// 2. limits check
_ <- checkActiveScenariosLimits(processDetails.processingType, user)
// 3. deployment managers specific checks
// TODO: scenario was already resolved during validation - use it here
_ <- dispatcher
.deploymentManagerUnsafe(processDetails.processingType)
Expand All @@ -151,6 +153,22 @@ class DeploymentService(
} yield ()
}

private def checkActiveScenariosLimits(processingType: ProcessingType, deployer: LoggedUser) = {
implicit val loggedUser: LoggedUser = deployer
activeScenariosLimitProvider.forProcessingType(processingType).flatten match {
case Some(ActiveScenariosLimit(activeScenariosLimit)) =>
scenarioStatusProvider
.getActiveScenariosCountFor(processingType)
.map { activeScenariosCount =>
if (activeScenariosCount >= activeScenariosLimit) {
throw ActiveScenariosLimitExceededError(activeScenariosLimit)
}
}
case None =>
Future.unit
}
}

private def prepareDMRunDeploymentCommand(
processDetails: ScenarioWithDetailsEntity[CanonicalProcess],
actionId: ProcessActionId,
Expand Down Expand Up @@ -192,3 +210,12 @@ class DeploymentService(
}

}

object DeploymentService {

final case class ActiveScenariosLimitExceededError(activeScenariosLimit: Int)
extends IllegalArgumentException(
s"The limit of active scenarios has been reached. You can have a maximum of $activeScenariosLimit active scenarios."
)

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package pl.touk.nussknacker.ui.process.deployment.deploymentstatus

import org.apache.pekko.actor.ActorSystem
import cats.effect.IO
import pl.touk.nussknacker.engine.api.deployment.{DataFreshnessPolicy, DeploymentStatusDetails, WithDataFreshnessStatus}
import pl.touk.nussknacker.engine.api.process.{ProcessingType, ProcessName}
import pl.touk.nussknacker.engine.util.ExecutionContextWithIORuntime
import pl.touk.nussknacker.ui.process.deployment.DeploymentManagerDispatcher
import pl.touk.nussknacker.ui.process.repository.ScenarioIdData
import pl.touk.nussknacker.ui.security.api.LoggedUser
import pl.touk.nussknacker.ui.util.FutureUtils.FutureOps

import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
Expand All @@ -22,11 +22,10 @@ object DeploymentManagerReliableStatusesWrapper {
)(
implicit user: LoggedUser,
freshnessPolicy: DataFreshnessPolicy,
actorSystem: ActorSystem
executionContextWithIORuntime: ExecutionContextWithIORuntime
): Future[Either[GetDeploymentsStatusesError, WithDataFreshnessStatus[List[DeploymentStatusDetails]]]] = {
import actorSystem._
val deploymentStatusesOptFuture
: Future[Either[GetDeploymentsStatusesError, WithDataFreshnessStatus[List[DeploymentStatusDetails]]]] =
import executionContextWithIORuntime.ioRuntime
val deploymentStatusesOpt = IO.fromFuture(IO {
dmDispatcher
.deploymentManager(scenarioIdData.processingType)
.map(
Expand All @@ -37,13 +36,14 @@ object DeploymentManagerReliableStatusesWrapper {
.getOrElse(
Future.successful(Left(ProcessingTypeIsNotConfigured(scenarioIdData.name, scenarioIdData.processingType)))
)
})

timeoutOpt
.map { timeout =>
deploymentStatusesOptFuture
.withTimeout(timeout, timeoutResult = Left(GetDeploymentsStatusTimeout(scenarioIdData.name)))
deploymentStatusesOpt.timeoutTo(timeout, IO.delay(Left(GetDeploymentsStatusTimeout(scenarioIdData.name))))
}
.getOrElse(deploymentStatusesOptFuture)
.getOrElse(deploymentStatusesOpt)
.unsafeToFuture()
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package pl.touk.nussknacker.ui.process.deployment.deploymentstatus

import com.typesafe.scalalogging.LazyLogging
import org.apache.pekko.actor.ActorSystem
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus
import pl.touk.nussknacker.engine.api.process.{ProcessingType, ProcessName}
import pl.touk.nussknacker.engine.util.ExecutionContextWithIORuntime
import pl.touk.nussknacker.engine.util.WithDataFreshnessStatusUtils.WithDataFreshnessStatusMapOps
import pl.touk.nussknacker.ui.process.deployment.DeploymentManagerDispatcher
import pl.touk.nussknacker.ui.process.deployment.deploymentstatus.DeploymentManagerReliableStatusesWrapper.Ops
import pl.touk.nussknacker.ui.process.repository._
import pl.touk.nussknacker.ui.security.api.LoggedUser

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal

Expand All @@ -30,11 +30,9 @@ class EngineSideDeploymentStatusesProvider(
dispatcher: DeploymentManagerDispatcher,
scenarioStateTimeout: Option[FiniteDuration]
)(
implicit system: ActorSystem
implicit executionContextWithIORuntime: ExecutionContextWithIORuntime
) extends LazyLogging {

private implicit val ec: ExecutionContext = system.dispatcher

// DeploymentManager's may support fetching state of all scenarios at once
// State is prefetched only when:
// - DM has capability DeploymentsStatusesQueryForAllScenariosSupport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,20 @@ class ScenarioStatusProvider(
)
}

def getActiveScenariosCountFor(processingType: ProcessingType)(implicit user: LoggedUser): Future[Int] = {
implicit val freshnessStatus: DataFreshnessPolicy = DataFreshnessPolicy.Fresh
deploymentStatusesProvider
.getBulkQueriedDeploymentStatusesForSupportedManagers(processingType :: Nil)
.map { statuses =>
statuses.getAllDeploymentStatuses
.count { status =>
status.status == SimpleStateStatus.DuringDeploy ||
status.status == SimpleStateStatus.Running ||
status.status == SimpleStateStatus.Restarting
}
}
}

private def getNonFragmentScenarioStatus[ScenarioShape, F[_]: Traverse](
actionsInProgress: Map[ProcessId, Set[ScenarioActionName]],
prefetchedDeploymentStatuses: BulkQueriedDeploymentStatuses,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import cats.Applicative
import cats.data.{EitherT, NonEmptyList}
import com.typesafe.scalalogging.LazyLogging
import db.util.DBIOActionInstances._
import pl.touk.nussknacker.engine.ProcessingTypeConfig.ActiveScenariosLimit
import pl.touk.nussknacker.engine.api.{ProcessVersion => RuntimeVersionData}
import pl.touk.nussknacker.engine.api.component.{
ComponentAdditionalConfig,
Expand All @@ -26,6 +27,7 @@ import pl.touk.nussknacker.ui.db.entity.ProcessVersionEntityData
import pl.touk.nussknacker.ui.process.ScenarioMetadata
import pl.touk.nussknacker.ui.process.deployment.DeploymentManagerDispatcher
import pl.touk.nussknacker.ui.process.deployment.LoggedUserConversions.LoggedUserOps
import pl.touk.nussknacker.ui.process.deployment.scenariostatus.ScenarioStatusProvider
import pl.touk.nussknacker.ui.process.newdeployment.DeploymentEntityFactory.{DeploymentEntityData, WithModifiedAt}
import pl.touk.nussknacker.ui.process.newdeployment.DeploymentService._
import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider
Expand Down Expand Up @@ -55,6 +57,8 @@ class DeploymentService(
Map[DesignerWideComponentId, ComponentAdditionalConfig],
_
],
scenarioStatusProvider: ScenarioStatusProvider,
activeScenariosLimitProvider: ProcessingTypeDataProvider[Option[ActiveScenariosLimit], _],
)(implicit ec: ExecutionContext)
extends LazyLogging {

Expand All @@ -78,12 +82,8 @@ class DeploymentService(
category = scenarioMetadata.processCategory,
permission = Permission.Deploy
)
_ <- EitherT.cond[Future](!scenarioMetadata.isFragment, (), DeploymentOfFragmentError)
_ <- EitherT.cond[Future](!scenarioMetadata.isArchived, (), DeploymentOfArchivedScenarioError)
scenarioGraphVersion <- EitherT(
scenarioGraphVersionService.getValidResolvedLatestScenarioGraphVersion(scenarioMetadata, command.user)
).leftMap[RunDeploymentError](error => ScenarioGraphValidationError(error.errors))
_ <- validateUsingDeploymentManager(scenarioMetadata, scenarioGraphVersion, command.user)
scenarioGraphVersion <- getScenarioGraphVersion(scenarioMetadata, command.user)
_ <- validateDeployment(scenarioMetadata, scenarioGraphVersion, command.user)
// We keep deployments metrics (used by counts mechanism) keyed by scenario name.
// Because of that we can't run more than one deployment for scenario in a time.
// TODO: We should key metrics by deployment id and remove this limitation
Expand All @@ -92,6 +92,25 @@ class DeploymentService(
_ <- runDeploymentUsingDeploymentManagerAsync(scenarioMetadata, scenarioGraphVersion, command)
} yield DeploymentForeignKeys(scenarioMetadata.id, scenarioGraphVersion.id)).value

private def validateDeployment(
scenarioMetadata: ScenarioMetadata,
scenarioGraphVersion: ProcessVersionEntityData,
deployer: LoggedUser
) = {
for {
_ <- EitherT.cond[Future](!scenarioMetadata.isFragment, (), DeploymentOfFragmentError)
_ <- EitherT.cond[Future](!scenarioMetadata.isArchived, (), DeploymentOfArchivedScenarioError)
_ <- checkActiveScenariosLimits(scenarioMetadata, deployer)
_ <- runDeploymentManagerSpecificValidations(scenarioMetadata, scenarioGraphVersion, deployer)
} yield ()
}

private def getScenarioGraphVersion(scenarioMetadata: ScenarioMetadata, deployer: LoggedUser) = {
EitherT(
scenarioGraphVersionService.getValidResolvedLatestScenarioGraphVersion(scenarioMetadata, deployer)
).leftMap[RunDeploymentError](error => ScenarioGraphValidationError(error.errors))
}

private def getScenarioMetadata(
command: RunDeploymentCommand
): EitherT[Future, RunDeploymentError, ScenarioMetadata] =
Expand All @@ -107,9 +126,7 @@ class DeploymentService(
EitherT(dbioRunner.runInSerializableTransactionWithRetry((for {
nonFinishedDeployments <- getConcurrentlyPerformedDeploymentsForScenario(scenarioMetadata)
_ <- checkNoConcurrentDeploymentsForScenario(nonFinishedDeployments, scenarioMetadata.name)
_ = {
logger.debug(s"Saving deployment: ${command.id}")
}
_ = logger.debug(s"Saving deployment: ${command.id}")
_ <- saveDeployment(command, scenarioMetadata)
} yield ()).value))
}
Expand Down Expand Up @@ -157,7 +174,30 @@ class DeploymentService(
).leftMap(e => ConflictingDeploymentIdError(e.id))
}

private def validateUsingDeploymentManager(
private def checkActiveScenariosLimits(
scenarioMetadata: ScenarioMetadata,
deployer: LoggedUser
): EitherT[Future, RunDeploymentError, Unit] = {
implicit val loggedUser: LoggedUser = deployer
activeScenariosLimitProvider.forProcessingType(scenarioMetadata.processingType).flatten match {
case Some(ActiveScenariosLimit(activeScenariosLimit)) =>
EitherT {
scenarioStatusProvider
.getActiveScenariosCountFor(scenarioMetadata.processingType)
.map { activeScenariosCount =>
Either.cond(
test = activeScenariosCount < activeScenariosLimit,
right = (),
left = ActiveScenariosLimitExceededError(activeScenariosLimit)
)
}
}
case None =>
EitherT.pure(())
}
}

private def runDeploymentManagerSpecificValidations(
scenarioMetadata: ScenarioMetadata,
scenarioGraphVersion: ProcessVersionEntityData,
user: LoggedUser
Expand Down Expand Up @@ -312,6 +352,8 @@ object DeploymentService {

case object DeploymentOfArchivedScenarioError extends RunDeploymentError

final case class ActiveScenariosLimitExceededError(activeScenariosLimit: Int) extends RunDeploymentError

final case class DeploymentNotFoundError(id: DeploymentId) extends GetDeploymentStatusError

case object NoPermissionError extends RunDeploymentError with GetDeploymentStatusError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pl.touk.nussknacker.ui.process.processingtype

import cats.effect.SyncIO
import pl.touk.nussknacker.engine._
import pl.touk.nussknacker.engine.ProcessingTypeConfig.ActiveScenariosLimit
import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion}
import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig
import pl.touk.nussknacker.engine.api.definition.EngineScenarioCompilationDependencies
Expand All @@ -22,6 +23,7 @@ final class ProcessingTypeData private (
// to fully split deployment managers from model
val deploymentData: DeploymentData,
val category: String,
val activeScenariosLimit: Option[ActiveScenariosLimit]
) {

// TODO: We should allow to have >1 processing mode configured inside one model and return a List here
Expand Down Expand Up @@ -49,6 +51,7 @@ object ProcessingTypeData {
modelData: ModelData,
deploymentData: DeploymentData,
category: String,
activeScenariosLimit: Option[ActiveScenariosLimit],
componentDefinitionExtractionMode: ComponentDefinitionExtractionMode
): ProcessingTypeData = {
val designerModelData =
Expand All @@ -62,7 +65,8 @@ object ProcessingTypeData {
processingType,
designerModelData,
deploymentData,
category
category,
activeScenariosLimit
)
}

Expand Down
Loading
Loading