Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ public final class arrow/fx/coroutines/Race3Kt {
public static synthetic fun raceN$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public final class arrow/fx/coroutines/RaceIfKt {
public static final fun raceNUtil (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;[Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun raceNUtil (Lkotlin/jvm/functions/Function1;[Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun raceNUtil$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;[Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public abstract interface annotation class arrow/fx/coroutines/ResourceDSL : java/lang/annotation/Annotation {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,9 @@ final suspend inline fun <#A: kotlin/Any?, #B: kotlin/Any?> arrow.fx.coroutines/
final suspend inline fun <#A: kotlin/Any?, #B: kotlin/Any?> arrow.fx.coroutines/bracketCase(crossinline kotlin.coroutines/SuspendFunction0<#A>, kotlin.coroutines/SuspendFunction1<#A, #B>, crossinline kotlin.coroutines/SuspendFunction2<#A, arrow.fx.coroutines/ExitCase, kotlin/Unit>): #B // arrow.fx.coroutines/bracketCase|bracketCase(kotlin.coroutines.SuspendFunction0<0:0>;kotlin.coroutines.SuspendFunction1<0:0,0:1>;kotlin.coroutines.SuspendFunction2<0:0,arrow.fx.coroutines.ExitCase,kotlin.Unit>){0§<kotlin.Any?>;1§<kotlin.Any?>}[0]
final suspend inline fun <#A: kotlin/Any?, #B: kotlin/Any?> arrow.fx.coroutines/raceN(crossinline kotlin.coroutines/SuspendFunction1<kotlinx.coroutines/CoroutineScope, #A>, crossinline kotlin.coroutines/SuspendFunction1<kotlinx.coroutines/CoroutineScope, #B>): arrow.core/Either<#A, #B> // arrow.fx.coroutines/raceN|raceN(kotlin.coroutines.SuspendFunction1<kotlinx.coroutines.CoroutineScope,0:0>;kotlin.coroutines.SuspendFunction1<kotlinx.coroutines.CoroutineScope,0:1>){0§<kotlin.Any?>;1§<kotlin.Any?>}[0]
final suspend inline fun <#A: kotlin/Any?, #B: kotlin/Any?> arrow.fx.coroutines/raceN(kotlin.coroutines/CoroutineContext = ..., crossinline kotlin.coroutines/SuspendFunction1<kotlinx.coroutines/CoroutineScope, #A>, crossinline kotlin.coroutines/SuspendFunction1<kotlinx.coroutines/CoroutineScope, #B>): arrow.core/Either<#A, #B> // arrow.fx.coroutines/raceN|raceN(kotlin.coroutines.CoroutineContext;kotlin.coroutines.SuspendFunction1<kotlinx.coroutines.CoroutineScope,0:0>;kotlin.coroutines.SuspendFunction1<kotlinx.coroutines.CoroutineScope,0:1>){0§<kotlin.Any?>;1§<kotlin.Any?>}[0]
final suspend inline fun <#A: kotlin/Any?, #B: kotlin/Any?> arrow.fx.coroutines/raceNUtil(crossinline kotlin/Function1<#A, kotlin/Boolean>, kotlin/Array<out kotlin.coroutines/SuspendFunction1<kotlinx.coroutines/CoroutineScope, #A>>...): #A // arrow.fx.coroutines/raceNUtil|raceNUtil(kotlin.Function1<0:0,kotlin.Boolean>;kotlin.Array<out|kotlin.coroutines.SuspendFunction1<kotlinx.coroutines.CoroutineScope,0:0>>...){0§<kotlin.Any?>;1§<kotlin.Any?>}[0]
final suspend inline fun <#A: kotlin/Any?> arrow.fx.coroutines/guarantee(kotlin.coroutines/SuspendFunction0<#A>, crossinline kotlin.coroutines/SuspendFunction0<kotlin/Unit>): #A // arrow.fx.coroutines/guarantee|guarantee(kotlin.coroutines.SuspendFunction0<0:0>;kotlin.coroutines.SuspendFunction0<kotlin.Unit>){0§<kotlin.Any?>}[0]
final suspend inline fun <#A: kotlin/Any?> arrow.fx.coroutines/guaranteeCase(kotlin.coroutines/SuspendFunction0<#A>, crossinline kotlin.coroutines/SuspendFunction1<arrow.fx.coroutines/ExitCase, kotlin/Unit>): #A // arrow.fx.coroutines/guaranteeCase|guaranteeCase(kotlin.coroutines.SuspendFunction0<0:0>;kotlin.coroutines.SuspendFunction1<arrow.fx.coroutines.ExitCase,kotlin.Unit>){0§<kotlin.Any?>}[0]
final suspend inline fun <#A: kotlin/Any?> arrow.fx.coroutines/onCancel(kotlin.coroutines/SuspendFunction0<#A>, crossinline kotlin.coroutines/SuspendFunction0<kotlin/Unit>): #A // arrow.fx.coroutines/onCancel|onCancel(kotlin.coroutines.SuspendFunction0<0:0>;kotlin.coroutines.SuspendFunction0<kotlin.Unit>){0§<kotlin.Any?>}[0]
final suspend inline fun <#A: kotlin/Any?> arrow.fx.coroutines/raceNUtil(kotlin.coroutines/CoroutineContext = ..., crossinline kotlin/Function1<#A, kotlin/Boolean>, kotlin/Array<out kotlin.coroutines/SuspendFunction1<kotlinx.coroutines/CoroutineScope, #A>>...): #A // arrow.fx.coroutines/raceNUtil|raceNUtil(kotlin.coroutines.CoroutineContext;kotlin.Function1<0:0,kotlin.Boolean>;kotlin.Array<out|kotlin.coroutines.SuspendFunction1<kotlinx.coroutines.CoroutineScope,0:0>>...){0§<kotlin.Any?>}[0]
final suspend inline fun <#A: kotlin/Any?> arrow.fx.coroutines/resourceScope(kotlin.coroutines/SuspendFunction1<arrow.fx.coroutines/ResourceScope, #A>): #A // arrow.fx.coroutines/resourceScope|resourceScope(kotlin.coroutines.SuspendFunction1<arrow.fx.coroutines.ResourceScope,0:0>){0§<kotlin.Any?>}[0]
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ public final class arrow/fx/coroutines/Race3Kt {
public static synthetic fun raceN$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public final class arrow/fx/coroutines/RaceIfKt {
public static final fun raceNUtil (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;[Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun raceNUtil (Lkotlin/jvm/functions/Function1;[Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun raceNUtil$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;[Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public abstract interface annotation class arrow/fx/coroutines/ResourceDSL : java/lang/annotation/Annotation {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public suspend inline fun <A, B, C> raceN(
}

@PublishedApi
internal suspend fun cancelAndCompose(first: Deferred<*>, second: Deferred<*>): Unit {
internal suspend fun cancelAndCompose(first: Deferred<*>, second: Deferred<*>) {
val e1 = try {
first.cancelAndJoin()
null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package arrow.fx.coroutines

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

// Based on https://medium.com/@sam-cooper/custom-kotlin-coroutine-races-77161141b2ac

/**
* Races the participants [tasks] in parallel on the [Dispatchers.Default],
* until one of them returns a value which satisfies [condition].
* The winner of the race cancels the other participants.
* Cancelling the operation cancels all participants.
*/
public suspend inline fun <A, B> raceNUtil(
Comment thread
franciscodr marked this conversation as resolved.
Outdated
crossinline condition: (A) -> Boolean,
vararg tasks: suspend CoroutineScope.() -> A
): A = raceNUtil(Dispatchers.Default, condition, *tasks)

/**
* Races the participants [tasks] in parallel on the provided [ctx],
* until one of them returns a value which satisfies [condition].
* The winner of the race cancels the other participants.
* Cancelling the operation cancels all participants.
*/
public suspend inline fun <A> raceNUtil(
ctx: CoroutineContext = EmptyCoroutineContext,
crossinline condition: (A) -> Boolean,
vararg tasks: suspend CoroutineScope.() -> A
): A {
return coroutineScope {
channelFlow {
tasks.forEach { launch(ctx) { send(it()) } }
}.first { condition(it) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public fun fixedRate(
} else {
val ticks: Long = ((now - lastAwakeAt).inWholeMilliseconds - 1) / period.inWholeMilliseconds
when {
ticks < 0L -> Unit
ticks < 0L -> { }
ticks == 0L || dampen -> emit(Unit)
else -> repeat(ticks.toInt()) { emit(Unit) }
}
Expand Down