Skip to content

Commit 18d7c88

Browse files
committed
SyncIO
1 parent 1732ed1 commit 18d7c88

File tree

3 files changed

+12
-10
lines changed

3 files changed

+12
-10
lines changed

engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/FlinkMiniClusterFactory.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package pl.touk.nussknacker.engine.flink.minicluster
22

3-
import cats.effect.IO
3+
import cats.Applicative
4+
import cats.effect.{Sync, SyncIO}
45
import cats.effect.kernel.Resource
5-
import cats.effect.unsafe.implicits.global
66
import com.typesafe.scalalogging.LazyLogging
77
import org.apache.flink.configuration._
88
import org.apache.flink.core.fs.FileSystem
@@ -16,6 +16,7 @@ import pl.touk.nussknacker.engine.util.ThreadUtils
1616
import pl.touk.nussknacker.engine.util.loader.ModelClassLoader
1717

1818
import java.net.URLClassLoader
19+
import scala.language.higherKinds
1920

2021
object FlinkMiniClusterFactory extends LazyLogging {
2122

@@ -109,19 +110,19 @@ class FlinkMiniClusterWithServices(
109110
) extends AutoCloseable {
110111

111112
def withDetachedStreamExecutionEnvironment[T](action: StreamExecutionEnvironment => T): T = {
112-
createDetachedStreamExecutionEnvironment.use(env => IO.pure(action(env))).unsafeRunSync()
113+
createDetachedStreamExecutionEnvironment[SyncIO].use(env => SyncIO.pure(action(env))).unsafeRunSync()
113114
}
114115

115-
def createDetachedStreamExecutionEnvironment: Resource[IO, StreamExecutionEnvironment] = {
116-
Resource.fromAutoCloseable(IO.pure(streamExecutionEnvironmentFactory(false)))
116+
def createDetachedStreamExecutionEnvironment[F[_]: Sync]: Resource[F, StreamExecutionEnvironment] = {
117+
Resource.fromAutoCloseable(Applicative[F].pure(streamExecutionEnvironmentFactory(false)))
117118
}
118119

119120
def withAttachedStreamExecutionEnvironment[T](action: StreamExecutionEnvironment => T): T = {
120-
createAttachedStreamExecutionEnvironment.use(env => IO.pure(action(env))).unsafeRunSync()
121+
createAttachedStreamExecutionEnvironment[SyncIO].use(env => SyncIO.pure(action(env))).unsafeRunSync()
121122
}
122123

123-
def createAttachedStreamExecutionEnvironment: Resource[IO, StreamExecutionEnvironment] = {
124-
Resource.fromAutoCloseable(IO.pure(streamExecutionEnvironmentFactory(true)))
124+
def createAttachedStreamExecutionEnvironment[F[_]: Sync]: Resource[F, StreamExecutionEnvironment] = {
125+
Resource.fromAutoCloseable(Applicative[F].pure(streamExecutionEnvironmentFactory(true)))
125126
}
126127

127128
override def close(): Unit = miniCluster.close()

engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/scenariotesting/FlinkMiniClusterScenarioStateVerifier.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ class FlinkMiniClusterScenarioStateVerifier(
5757
env
5858
)
5959
val scenarioName = processVersion.processName
60-
miniClusterWithServices.createDetachedStreamExecutionEnvironment
60+
miniClusterWithServices
61+
.createDetachedStreamExecutionEnvironment[IO]
6162
.use { env =>
6263
logger.info(s"Starting to verify $scenarioName")
6364
(for {

engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/scenariotesting/FlinkMiniClusterScenarioTestRunner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class FlinkMiniClusterScenarioTestRunner(
6060
)
6161
}
6262
(for {
63-
env <- miniClusterWithServices.createDetachedStreamExecutionEnvironment
63+
env <- miniClusterWithServices.createDetachedStreamExecutionEnvironment[IO]
6464
collectingListener <- ResultsCollectingListenerHolder.registerTestEngineListener
6565
} yield (env, collectingListener))
6666
.use { case (env, collectingListener) =>

0 commit comments

Comments
 (0)