Skip to content

Commit 712d37f

Browse files
authored
Merge pull request #4263 from kapunga/4254-custom-non-fatal
Create `UnsafeNonFatal` for use in the fiber runtime
2 parents d3a25db + 59d0995 commit 712d37f

File tree

11 files changed

+92
-56
lines changed

11 files changed

+92
-56
lines changed

core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import org.scalajs.macrotaskexecutor.MacrotaskExecutor
2424
import scala.collection.mutable
2525
import scala.concurrent.ExecutionContextExecutor
2626
import scala.scalajs.{js, LinkingInfo}
27-
import scala.util.control.NonFatal
2827

2928
/**
3029
* An `ExecutionContext` that improves throughput by providing a method to `schedule` fibers to
@@ -65,7 +64,7 @@ private[effect] final class BatchingMacrotaskExecutor(
6564
val fiber = fibers.take()
6665
try fiber.run()
6766
catch {
68-
case t if NonFatal(t) => reportFailure(t)
67+
case t if UnsafeNonFatal(t) => reportFailure(t)
6968
case t: Throwable => IOFiber.onFatalFailure(t)
7069
}
7170
i += 1

core/jvm/src/main/scala/cats/effect/IOApp.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ package cats.effect
1919
import cats.effect.metrics.{CpuStarvationWarningMetrics, JvmCpuStarvationMetrics}
2020
import cats.effect.std.Console
2121
import cats.effect.tracing.TracingConstants._
22+
import cats.effect.unsafe.UnsafeNonFatal
2223
import cats.syntax.all._
2324

2425
import scala.concurrent.{blocking, CancellationException, ExecutionContext}
2526
import scala.concurrent.duration._
26-
import scala.util.control.NonFatal
2727

2828
import java.util.concurrent.{ArrayBlockingQueue, CountDownLatch}
2929
import java.util.concurrent.atomic.AtomicInteger
@@ -218,7 +218,7 @@ trait IOApp {
218218
new ExecutionContext {
219219
def reportFailure(t: Throwable): Unit =
220220
t match {
221-
case t if NonFatal(t) =>
221+
case t if UnsafeNonFatal(t) =>
222222
IOApp.this.reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime)
223223

224224
case t =>
@@ -555,7 +555,7 @@ trait IOApp {
555555
throw e
556556

557557
case t: Throwable =>
558-
if (NonFatal(t)) {
558+
if (UnsafeNonFatal(t)) {
559559
if (isForked) {
560560
t.printStackTrace()
561561
System.exit(1)
@@ -571,7 +571,7 @@ trait IOApp {
571571
try {
572572
r.run()
573573
} catch {
574-
case t if NonFatal(t) =>
574+
case t if UnsafeNonFatal(t) =>
575575
IOApp.this.reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime)
576576

577577
case t: Throwable =>

core/jvm/src/main/scala/cats/effect/IOFiberPlatform.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package cats.effect
1818

19-
import scala.util.control.NonFatal
19+
import cats.effect.unsafe.UnsafeNonFatal
2020

2121
import java.nio.channels.ClosedByInterruptException
2222
import java.util.{concurrent => juc}
@@ -68,7 +68,7 @@ private[effect] abstract class IOFiberPlatform[A] extends AtomicBoolean(false) {
6868
case ex: ClosedByInterruptException => throw ex
6969

7070
// this won't suppress InterruptedException:
71-
case t if NonFatal(t) => Left(t)
71+
case t if UnsafeNonFatal(t) => Left(t)
7272
}
7373

7474
// this is why it has to be a semaphore rather than an atomic boolean

core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala

+2-4
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ package unsafe
1919

2020
import cats.effect.unsafe.metrics.PollerMetrics
2121

22-
import scala.util.control.NonFatal
23-
2422
import java.nio.channels.{SelectableChannel, SelectionKey}
2523
import java.nio.channels.spi.{AbstractSelector, SelectorProvider}
2624
import java.util.Iterator
@@ -72,7 +70,7 @@ final class SelectorSystem private (provider: SelectorProvider) extends PollingS
7270
// reset interest in triggered ops
7371
key.interestOps(key.interestOps() & ~readyOps)
7472
} catch {
75-
case ex if NonFatal(ex) =>
73+
case ex if UnsafeNonFatal(ex) =>
7674
error = ex
7775
readyOps = -1 // interest all waiters
7876
}
@@ -150,7 +148,7 @@ final class SelectorSystem private (provider: SelectorProvider) extends PollingS
150148

151149
cb(Right(Some(cancel)))
152150
} catch {
153-
case ex if NonFatal(ex) =>
151+
case ex if UnsafeNonFatal(ex) =>
154152
poller.countErroredOperation(ops)
155153
cb(Left(ex))
156154
}

core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ import cats.effect.tracing.TracingConstants
3636
import scala.collection.mutable
3737
import scala.concurrent.ExecutionContextExecutor
3838
import scala.concurrent.duration.{Duration, FiniteDuration}
39-
import scala.util.control.NonFatal
4039

4140
import java.time.Instant
4241
import java.time.temporal.ChronoField
@@ -680,7 +679,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
680679
try {
681680
task.run()
682681
} catch {
683-
case ex if NonFatal(ex) =>
682+
case ex if UnsafeNonFatal(ex) =>
684683
reportFailure(ex)
685684
}
686685
}

core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala

+9-10
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import scala.annotation.tailrec
2424
import scala.collection.mutable
2525
import scala.concurrent.{BlockContext, CanAwait}
2626
import scala.concurrent.duration.{Duration, FiniteDuration}
27-
import scala.util.control.NonFatal
2827

2928
import java.lang.Long.MIN_VALUE
3029
import java.util.concurrent.{ArrayBlockingQueue, ThreadLocalRandom}
@@ -376,7 +375,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
376375
pool.notifyParked(rnd)
377376
try fiber.run()
378377
catch {
379-
case t if NonFatal(t) => pool.reportFailure(t)
378+
case t if UnsafeNonFatal(t) => pool.reportFailure(t)
380379
case t: Throwable => IOFiber.onFatalFailure(t)
381380
}
382381

@@ -393,7 +392,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
393392
// The dequeued element is a single fiber. Execute it immediately.
394393
try fiber.run()
395394
catch {
396-
case t if NonFatal(t) => pool.reportFailure(t)
395+
case t if UnsafeNonFatal(t) => pool.reportFailure(t)
397396
case t: Throwable => IOFiber.onFatalFailure(t)
398397
}
399398

@@ -443,7 +442,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
443442
// Run the stolen fiber.
444443
try fiber.run()
445444
catch {
446-
case t if NonFatal(t) => pool.reportFailure(t)
445+
case t if UnsafeNonFatal(t) => pool.reportFailure(t)
447446
case t: Throwable => IOFiber.onFatalFailure(t)
448447
}
449448
}
@@ -495,7 +494,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
495494
pool.notifyParked(rnd)
496495
try fiber.run()
497496
catch {
498-
case t if NonFatal(t) => pool.reportFailure(t)
497+
case t if UnsafeNonFatal(t) => pool.reportFailure(t)
499498
case t: Throwable => IOFiber.onFatalFailure(t)
500499
}
501500

@@ -515,7 +514,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
515514
// The dequeued element is a single fiber. Execute it immediately.
516515
try fiber.run()
517516
catch {
518-
case t if NonFatal(t) => pool.reportFailure(t)
517+
case t if UnsafeNonFatal(t) => pool.reportFailure(t)
519518
case t: Throwable => IOFiber.onFatalFailure(t)
520519
}
521520

@@ -545,7 +544,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
545544
// Run the stolen fiber.
546545
try fiber.run()
547546
catch {
548-
case t if NonFatal(t) => pool.reportFailure(t)
547+
case t if UnsafeNonFatal(t) => pool.reportFailure(t)
549548
case t: Throwable => IOFiber.onFatalFailure(t)
550549
}
551550
}
@@ -813,7 +812,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
813812

814813
try fiber.run()
815814
catch {
816-
case t if NonFatal(t) => pool.reportFailure(t)
815+
case t if UnsafeNonFatal(t) => pool.reportFailure(t)
817816
case t: Throwable => IOFiber.onFatalFailure(t)
818817
}
819818
} else if (element.isInstanceOf[Runnable]) {
@@ -827,7 +826,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
827826
// The dequeued element is a single fiber. Execute it immediately.
828827
try fiber.run()
829828
catch {
830-
case t if NonFatal(t) => pool.reportFailure(t)
829+
case t if UnsafeNonFatal(t) => pool.reportFailure(t)
831830
case t: Throwable => IOFiber.onFatalFailure(t)
832831
}
833832
}
@@ -862,7 +861,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
862861
// Run the fiber.
863862
try fiber.run()
864863
catch {
865-
case t if NonFatal(t) => pool.reportFailure(t)
864+
case t if UnsafeNonFatal(t) => pool.reportFailure(t)
866865
case t: Throwable => IOFiber.onFatalFailure(t)
867866
}
868867
} else {

core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala

+2-3
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import scala.scalanative.meta.LinktimeInfo
2626
import scala.scalanative.posix.time._
2727
import scala.scalanative.posix.timeOps._
2828
import scala.scalanative.unsafe._
29-
import scala.util.control.NonFatal
3029

3130
import java.util.{ArrayDeque, PriorityQueue}
3231

@@ -96,7 +95,7 @@ private[effect] final class EventLoopExecutorScheduler[P](
9695
val task = sleepQueue.poll()
9796
try task.runnable.run()
9897
catch {
99-
case t if NonFatal(t) => reportFailure(t)
98+
case t if UnsafeNonFatal(t) => reportFailure(t)
10099
case t: Throwable => IOFiber.onFatalFailure(t)
101100
}
102101
}
@@ -107,7 +106,7 @@ private[effect] final class EventLoopExecutorScheduler[P](
107106
val runnable = executeQueue.poll()
108107
try runnable.run()
109108
catch {
110-
case t if NonFatal(t) => reportFailure(t)
109+
case t if UnsafeNonFatal(t) => reportFailure(t)
111110
case t: Throwable => IOFiber.onFatalFailure(t)
112111
}
113112
i += 1

core/shared/src/main/scala/cats/effect/IO.scala

+4-5
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,14 @@ import cats.effect.std.{
5050
UUIDGen
5151
}
5252
import cats.effect.tracing.{Tracing, TracingEvent}
53-
import cats.effect.unsafe.IORuntime
53+
import cats.effect.unsafe.{IORuntime, UnsafeNonFatal}
5454
import cats.syntax._
5555
import cats.syntax.all._
5656

5757
import scala.annotation.unchecked.uncheckedVariance
5858
import scala.concurrent._
5959
import scala.concurrent.duration._
6060
import scala.util.{Failure, Success, Try}
61-
import scala.util.control.NonFatal
6261

6362
import java.util.UUID
6463
import java.util.concurrent.Executor
@@ -1014,7 +1013,7 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
10141013
unsafeRunFiber(
10151014
cb(Left(new CancellationException("The fiber was canceled"))),
10161015
t => {
1017-
if (!NonFatal(t)) {
1016+
if (!UnsafeNonFatal(t)) {
10181017
t.printStackTrace()
10191018
}
10201019
cb(Left(t))
@@ -1027,7 +1026,7 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
10271026
unsafeRunFiber(
10281027
cb(Outcome.canceled),
10291028
t => {
1030-
if (!NonFatal(t)) {
1029+
if (!UnsafeNonFatal(t)) {
10311030
t.printStackTrace()
10321031
}
10331032
cb(Outcome.errored(t))
@@ -1050,7 +1049,7 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
10501049
val _ = unsafeRunFiber(
10511050
(),
10521051
t => {
1053-
if (NonFatal(t)) {
1052+
if (UnsafeNonFatal(t)) {
10541053
if (runtime.config.reportUnhandledFiberErrors)
10551054
runtime.compute.reportFailure(t)
10561055
} else { t.printStackTrace() }

0 commit comments

Comments
 (0)