@@ -65,7 +65,15 @@ internal class LimitedDispatcher(
65
65
// `runningWorkers` when they observed an empty queue.
66
66
if (! tryAllocateWorker()) return
67
67
val task = obtainTaskOrDeallocateWorker() ? : return
68
- startWorker(Worker (task))
68
+ try {
69
+ startWorker(Worker (task))
70
+ } catch (e: Throwable ) {
71
+ // If we failed to start a worker, we should deallocate the worker slot
72
+ synchronized(workerAllocationLock) {
73
+ runningWorkers.decrementAndGet()
74
+ }
75
+ throw e
76
+ }
69
77
}
70
78
71
79
/* *
@@ -107,21 +115,29 @@ internal class LimitedDispatcher(
107
115
*/
108
116
private inner class Worker (private var currentTask : Runnable ) : Runnable {
109
117
override fun run () {
110
- var fairnessCounter = 0
111
- while (true ) {
112
- try {
113
- currentTask.run ()
114
- } catch (e: Throwable ) {
115
- handleCoroutineException(EmptyCoroutineContext , e)
118
+ try {
119
+ var fairnessCounter = 0
120
+ while (true ) {
121
+ try {
122
+ currentTask.run ()
123
+ } catch (e: Throwable ) {
124
+ handleCoroutineException(EmptyCoroutineContext , e)
125
+ }
126
+ currentTask = obtainTaskOrDeallocateWorker() ? : return
127
+ // 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
128
+ if (++ fairnessCounter >= 16 && dispatcher.safeIsDispatchNeeded(this @LimitedDispatcher)) {
129
+ // Do "yield" to let other views execute their runnable as well
130
+ // Note that we do not decrement 'runningWorkers' as we are still committed to our part of work
131
+ dispatcher.safeDispatch(this @LimitedDispatcher, this )
132
+ return
133
+ }
116
134
}
117
- currentTask = obtainTaskOrDeallocateWorker() ? : return
118
- // 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
119
- if (++ fairnessCounter >= 16 && dispatcher.safeIsDispatchNeeded(this @LimitedDispatcher)) {
120
- // Do "yield" to let other views execute their runnable as well
121
- // Note that we do not decrement 'runningWorkers' as we are still committed to our part of work
122
- dispatcher.safeDispatch(this @LimitedDispatcher, this )
123
- return
135
+ } catch (e: Throwable ) {
136
+ // If the worker failed, we should deallocate its slot
137
+ synchronized(workerAllocationLock) {
138
+ runningWorkers.decrementAndGet()
124
139
}
140
+ throw e
125
141
}
126
142
}
127
143
}
@@ -132,4 +148,4 @@ internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive pa
132
148
internal fun CoroutineDispatcher.namedOrThis (name : String? ): CoroutineDispatcher {
133
149
if (name != null ) return NamedDispatcher (this , name)
134
150
return this
135
- }
151
+ }
0 commit comments