Skip to content

Commit 960ff62

Browse files
committed
[NU-2045] FlinkMiniClusterWithServices: attached variants of methods creating StreamExecutionEnvironment + SyncIO to fix problem with ThreadLocal + clean up
1 parent 18d7c88 commit 960ff62

File tree

1 file changed

+17
-10
lines changed

1 file changed

+17
-10
lines changed

engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/FlinkMiniClusterFactory.scala

+17-10
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,7 @@ object FlinkMiniClusterFactory extends LazyLogging {
7979
miniCluster.start()
8080
}
8181

82-
def createStreamExecutionEnv(attached: Boolean): StreamExecutionEnvironment = {
83-
FlinkMiniClusterStreamExecutionEnvironmentFactory.createStreamExecutionEnvironment(
84-
miniCluster,
85-
modelClassLoader,
86-
attached
87-
)
88-
}
89-
90-
new FlinkMiniClusterWithServices(miniCluster, createStreamExecutionEnv)
82+
new FlinkMiniClusterWithServices(miniCluster, modelClassLoader)
9183
}
9284

9385
private def createMiniCluster(configuration: Configuration) = {
@@ -106,25 +98,40 @@ object FlinkMiniClusterFactory extends LazyLogging {
10698

10799
class FlinkMiniClusterWithServices(
108100
val miniCluster: MiniCluster,
109-
streamExecutionEnvironmentFactory: Boolean => StreamExecutionEnvironment
101+
modelClassLoader: URLClassLoader
110102
) extends AutoCloseable {
111103

112104
def withDetachedStreamExecutionEnvironment[T](action: StreamExecutionEnvironment => T): T = {
105+
// We use SyncIO, because passed actions sometimes uses ThreadLocal and we don't want to change the Thread which run this action
113106
createDetachedStreamExecutionEnvironment[SyncIO].use(env => SyncIO.pure(action(env))).unsafeRunSync()
114107
}
115108

116109
def createDetachedStreamExecutionEnvironment[F[_]: Sync]: Resource[F, StreamExecutionEnvironment] = {
117110
Resource.fromAutoCloseable(Applicative[F].pure(streamExecutionEnvironmentFactory(false)))
118111
}
119112

113+
// This method is used only by external project. It should be used with caution because action on StreamExecutionEnvironment
114+
// can be blocking and it can cause thread pool starvation or deadlock. As an alternative, we recommend to use withDetachedStreamExecutionEnvironment
115+
// combined with MiniClusterJobStatusCheckingOps
120116
def withAttachedStreamExecutionEnvironment[T](action: StreamExecutionEnvironment => T): T = {
117+
// We use SyncIO, because passed actions sometimes uses ThreadLocal and we don't want to change the Thread which run this action
121118
createAttachedStreamExecutionEnvironment[SyncIO].use(env => SyncIO.pure(action(env))).unsafeRunSync()
122119
}
123120

121+
// This method is used only by external project. It should be used with caution because action on StreamExecutionEnvironment
122+
// can be blocking and it can cause thread pool starvation or deadlock. As an alternative, we recommend to use createDetachedStreamExecutionEnvironment
123+
// combined with MiniClusterJobStatusCheckingOps
124124
def createAttachedStreamExecutionEnvironment[F[_]: Sync]: Resource[F, StreamExecutionEnvironment] = {
125125
Resource.fromAutoCloseable(Applicative[F].pure(streamExecutionEnvironmentFactory(true)))
126126
}
127127

128+
private def streamExecutionEnvironmentFactory(attached: Boolean): StreamExecutionEnvironment =
129+
FlinkMiniClusterStreamExecutionEnvironmentFactory.createStreamExecutionEnvironment(
130+
miniCluster,
131+
modelClassLoader,
132+
attached
133+
)
134+
128135
override def close(): Unit = miniCluster.close()
129136

130137
}

0 commit comments

Comments
 (0)