Skip to content

Commit b04dbc4

Browse files
committed
Merge branch 'minicluster-SyncIO-attached' into flink-sql-validations-using-env
2 parents 607a57c + 960ff62 commit b04dbc4

File tree

3 files changed

+34
-17
lines changed

3 files changed

+34
-17
lines changed

engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/FlinkMiniClusterFactory.scala

+31-15
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package pl.touk.nussknacker.engine.flink.minicluster
22

3-
import cats.effect.IO
3+
import cats.Applicative
4+
import cats.effect.{Sync, SyncIO}
45
import cats.effect.kernel.Resource
5-
import cats.effect.unsafe.implicits.global
66
import com.typesafe.scalalogging.LazyLogging
77
import org.apache.flink.configuration._
88
import org.apache.flink.core.fs.FileSystem
@@ -16,6 +16,7 @@ import pl.touk.nussknacker.engine.util.ThreadUtils
1616
import pl.touk.nussknacker.engine.util.loader.ModelClassLoader
1717

1818
import java.net.URLClassLoader
19+
import scala.language.higherKinds
1920

2021
object FlinkMiniClusterFactory extends LazyLogging {
2122

@@ -78,15 +79,7 @@ object FlinkMiniClusterFactory extends LazyLogging {
7879
miniCluster.start()
7980
}
8081

81-
def createStreamExecutionEnv(attached: Boolean): StreamExecutionEnvironment = {
82-
FlinkMiniClusterStreamExecutionEnvironmentFactory.createStreamExecutionEnvironment(
83-
miniCluster,
84-
modelClassLoader,
85-
attached
86-
)
87-
}
88-
89-
new FlinkMiniClusterWithServices(miniCluster, createStreamExecutionEnv)
82+
new FlinkMiniClusterWithServices(miniCluster, modelClassLoader)
9083
}
9184

9285
private def createMiniCluster(configuration: Configuration) = {
@@ -105,17 +98,40 @@ object FlinkMiniClusterFactory extends LazyLogging {
10598

10699
class FlinkMiniClusterWithServices(
107100
val miniCluster: MiniCluster,
108-
streamExecutionEnvironmentFactory: Boolean => StreamExecutionEnvironment
101+
modelClassLoader: URLClassLoader
109102
) extends AutoCloseable {
110103

111104
def withDetachedStreamExecutionEnvironment[T](action: StreamExecutionEnvironment => T): T = {
112-
createDetachedStreamExecutionEnvironment.use(env => IO(action(env))).unsafeRunSync()
105+
// We use SyncIO, because passed actions sometimes uses ThreadLocal and we don't want to change the Thread which run this action
106+
createDetachedStreamExecutionEnvironment[SyncIO].use(env => SyncIO.pure(action(env))).unsafeRunSync()
107+
}
108+
109+
def createDetachedStreamExecutionEnvironment[F[_]: Sync]: Resource[F, StreamExecutionEnvironment] = {
110+
Resource.fromAutoCloseable(Applicative[F].pure(streamExecutionEnvironmentFactory(false)))
113111
}
114112

115-
def createDetachedStreamExecutionEnvironment: Resource[IO, StreamExecutionEnvironment] = {
116-
Resource.fromAutoCloseable(IO(streamExecutionEnvironmentFactory(false)))
113+
// This method is used only by external project. It should be used with caution because action on StreamExecutionEnvironment
114+
// can be blocking and it can cause thread pool starvation or deadlock. As an alternative, we recommend to use withDetachedStreamExecutionEnvironment
115+
// combined with MiniClusterJobStatusCheckingOps
116+
def withAttachedStreamExecutionEnvironment[T](action: StreamExecutionEnvironment => T): T = {
117+
// We use SyncIO, because passed actions sometimes uses ThreadLocal and we don't want to change the Thread which run this action
118+
createAttachedStreamExecutionEnvironment[SyncIO].use(env => SyncIO.pure(action(env))).unsafeRunSync()
117119
}
118120

121+
// This method is used only by external project. It should be used with caution because action on StreamExecutionEnvironment
122+
// can be blocking and it can cause thread pool starvation or deadlock. As an alternative, we recommend to use createDetachedStreamExecutionEnvironment
123+
// combined with MiniClusterJobStatusCheckingOps
124+
def createAttachedStreamExecutionEnvironment[F[_]: Sync]: Resource[F, StreamExecutionEnvironment] = {
125+
Resource.fromAutoCloseable(Applicative[F].pure(streamExecutionEnvironmentFactory(true)))
126+
}
127+
128+
private def streamExecutionEnvironmentFactory(attached: Boolean): StreamExecutionEnvironment =
129+
FlinkMiniClusterStreamExecutionEnvironmentFactory.createStreamExecutionEnvironment(
130+
miniCluster,
131+
modelClassLoader,
132+
attached
133+
)
134+
119135
override def close(): Unit = miniCluster.close()
120136

121137
}

engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/scenariotesting/FlinkMiniClusterScenarioStateVerifier.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ class FlinkMiniClusterScenarioStateVerifier(
5757
env
5858
)
5959
val scenarioName = processVersion.processName
60-
miniClusterWithServices.createDetachedStreamExecutionEnvironment
60+
miniClusterWithServices
61+
.createDetachedStreamExecutionEnvironment[IO]
6162
.use { env =>
6263
logger.info(s"Starting to verify $scenarioName")
6364
(for {

engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/scenariotesting/FlinkMiniClusterScenarioTestRunner.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class FlinkMiniClusterScenarioTestRunner(
6060
)
6161
}
6262
(for {
63-
env <- miniClusterWithServices.createDetachedStreamExecutionEnvironment
63+
env <- miniClusterWithServices.createDetachedStreamExecutionEnvironment[IO]
6464
collectingListener <- ResultsCollectingListenerHolder.registerTestEngineListener
6565
} yield (env, collectingListener))
6666
.use { case (env, collectingListener) =>

0 commit comments

Comments
 (0)