@@ -91,42 +91,47 @@ class DeploymentService(
91
91
command = command,
92
92
actionName = ScenarioActionName .Deploy
93
93
) { case (ctx, actionFinalizer) =>
94
+ implicit class FinalizerExt [T ](val future : Future [T ]) {
95
+ def whenThrowRemoveInvalidAction (): Future [T ] = {
96
+ future.transformWith {
97
+ case Success (result) => Future .successful(result)
98
+ case Failure (ex) => actionFinalizer.removeInvalidAction().transform(_ => Failure (ex))
99
+ }
100
+ }
101
+ }
102
+
94
103
for {
95
104
dmCommand <- prepareDMRunDeploymentCommand(
96
105
ctx.latestScenarioDetails,
97
106
ctx.actionId,
98
107
// TODO: We should validate node deployment data - e.g. if sql expression is a correct sql expression,
99
108
// references to existing fields and uses correct types. We should also protect from sql injection attacks
100
109
command
101
- )
102
- _ <- validateScenario(ctx.latestScenarioDetails)
110
+ ).whenThrowRemoveInvalidAction()
111
+ _ <- validateScenario(ctx.latestScenarioDetails).whenThrowRemoveInvalidAction()
103
112
actionResult <- checkActiveScenariosLimits(ctx.latestScenarioDetails, dmCommand.updateStrategy) {
104
113
IO .fromFuture {
105
114
IO {
106
- validateUsingDeploymentManager(ctx.latestScenarioDetails, dmCommand)
107
- .transformWith {
108
- case Failure (ex) =>
109
- actionFinalizer.removeInvalidAction().transform(_ => Failure (ex))
110
- case Success (_) =>
111
- // we notify of deployment finish/fail only if initial validation succeeded - this step is done asynchronously
112
- Future {
113
- actionFinalizer.handleResult {
114
- dispatcher
115
- .deploymentManagerUnsafe(ctx.latestScenarioDetails.processingType)
116
- .processCommand(dmCommand)
117
- }
118
- }
115
+ for {
116
+ _ <- validateUsingDeploymentManager(ctx.latestScenarioDetails, dmCommand)
117
+ .whenThrowRemoveInvalidAction()
118
+ } yield {
119
+ // we notify of deployment finish/fail only if initial validation succeeded - this step is done asynchronously
120
+ actionFinalizer.handleResult {
121
+ dispatcher
122
+ .deploymentManagerUnsafe(ctx.latestScenarioDetails.processingType)
123
+ .processCommand(dmCommand)
119
124
}
125
+ }
120
126
}
121
127
}
122
128
}
123
- result <- actionResult match {
124
- case Right (result) =>
125
- Future .successful(result)
126
- case Left (error : MaxActiveScenariosCountExceededError ) =>
127
- actionFinalizer.removeInvalidAction().transform(_ => Failure (error))
128
- }
129
- } yield result
129
+ .map {
130
+ case Right (result) => result
131
+ case Left (error : MaxActiveScenariosCountExceededError ) =>
132
+ Future .failed(error).whenThrowRemoveInvalidAction()
133
+ }
134
+ } yield actionResult
130
135
}
131
136
}
132
137
0 commit comments