diff --git a/arrow-libs/fx/arrow-fx-coroutines/api/android/arrow-fx-coroutines.api b/arrow-libs/fx/arrow-fx-coroutines/api/android/arrow-fx-coroutines.api index 669e1fea5a9..c6759ddc904 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/api/android/arrow-fx-coroutines.api +++ b/arrow-libs/fx/arrow-fx-coroutines/api/android/arrow-fx-coroutines.api @@ -214,6 +214,12 @@ public final class arrow/fx/coroutines/Race3Kt { public static synthetic fun raceN$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; } +public final class arrow/fx/coroutines/RaceUntilKt { + public static final fun raceUntil (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;[Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun raceUntil (Lkotlin/jvm/functions/Function1;[Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun raceUntil$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;[Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; +} + public abstract interface annotation class arrow/fx/coroutines/ResourceDSL : java/lang/annotation/Annotation { } diff --git a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.klib.api b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.klib.api index 09e4fdb83d8..1699519efaf 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.klib.api +++ b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.klib.api @@ -231,7 +231,9 @@ final suspend inline fun <#A: kotlin/Any?, #B: kotlin/Any?> arrow.fx.coroutines/ final suspend inline fun <#A: kotlin/Any?, #B: kotlin/Any?> arrow.fx.coroutines/bracketCase(crossinline kotlin.coroutines/SuspendFunction0<#A>, kotlin.coroutines/SuspendFunction1<#A, #B>, crossinline kotlin.coroutines/SuspendFunction2<#A, arrow.fx.coroutines/ExitCase, kotlin/Unit>): #B // arrow.fx.coroutines/bracketCase|bracketCase(kotlin.coroutines.SuspendFunction0<0:0>;kotlin.coroutines.SuspendFunction1<0:0,0:1>;kotlin.coroutines.SuspendFunction2<0:0,arrow.fx.coroutines.ExitCase,kotlin.Unit>){0§;1§}[0] final suspend inline fun <#A: kotlin/Any?, #B: kotlin/Any?> arrow.fx.coroutines/raceN(crossinline kotlin.coroutines/SuspendFunction1, crossinline kotlin.coroutines/SuspendFunction1): arrow.core/Either<#A, #B> // arrow.fx.coroutines/raceN|raceN(kotlin.coroutines.SuspendFunction1;kotlin.coroutines.SuspendFunction1){0§;1§}[0] final suspend inline fun <#A: kotlin/Any?, #B: kotlin/Any?> arrow.fx.coroutines/raceN(kotlin.coroutines/CoroutineContext = ..., crossinline kotlin.coroutines/SuspendFunction1, crossinline kotlin.coroutines/SuspendFunction1): arrow.core/Either<#A, #B> // arrow.fx.coroutines/raceN|raceN(kotlin.coroutines.CoroutineContext;kotlin.coroutines.SuspendFunction1;kotlin.coroutines.SuspendFunction1){0§;1§}[0] +final suspend inline fun <#A: kotlin/Any?, #B: kotlin/Any?> arrow.fx.coroutines/raceUntil(crossinline kotlin/Function1<#A, kotlin/Boolean>, kotlin/Array>...): #A // arrow.fx.coroutines/raceUntil|raceUntil(kotlin.Function1<0:0,kotlin.Boolean>;kotlin.Array>...){0§;1§}[0] final suspend inline fun <#A: kotlin/Any?> arrow.fx.coroutines/guarantee(kotlin.coroutines/SuspendFunction0<#A>, crossinline kotlin.coroutines/SuspendFunction0): #A // arrow.fx.coroutines/guarantee|guarantee(kotlin.coroutines.SuspendFunction0<0:0>;kotlin.coroutines.SuspendFunction0){0§}[0] final suspend inline fun <#A: kotlin/Any?> arrow.fx.coroutines/guaranteeCase(kotlin.coroutines/SuspendFunction0<#A>, crossinline kotlin.coroutines/SuspendFunction1): #A // arrow.fx.coroutines/guaranteeCase|guaranteeCase(kotlin.coroutines.SuspendFunction0<0:0>;kotlin.coroutines.SuspendFunction1){0§}[0] final suspend inline fun <#A: kotlin/Any?> arrow.fx.coroutines/onCancel(kotlin.coroutines/SuspendFunction0<#A>, crossinline kotlin.coroutines/SuspendFunction0): #A // arrow.fx.coroutines/onCancel|onCancel(kotlin.coroutines.SuspendFunction0<0:0>;kotlin.coroutines.SuspendFunction0){0§}[0] +final suspend inline fun <#A: kotlin/Any?> arrow.fx.coroutines/raceUntil(kotlin.coroutines/CoroutineContext = ..., crossinline kotlin/Function1<#A, kotlin/Boolean>, kotlin/Array>...): #A // arrow.fx.coroutines/raceUntil|raceUntil(kotlin.coroutines.CoroutineContext;kotlin.Function1<0:0,kotlin.Boolean>;kotlin.Array>...){0§}[0] final suspend inline fun <#A: kotlin/Any?> arrow.fx.coroutines/resourceScope(kotlin.coroutines/SuspendFunction1): #A // arrow.fx.coroutines/resourceScope|resourceScope(kotlin.coroutines.SuspendFunction1){0§}[0] diff --git a/arrow-libs/fx/arrow-fx-coroutines/api/jvm/arrow-fx-coroutines.api b/arrow-libs/fx/arrow-fx-coroutines/api/jvm/arrow-fx-coroutines.api index 669e1fea5a9..c6759ddc904 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/api/jvm/arrow-fx-coroutines.api +++ b/arrow-libs/fx/arrow-fx-coroutines/api/jvm/arrow-fx-coroutines.api @@ -214,6 +214,12 @@ public final class arrow/fx/coroutines/Race3Kt { public static synthetic fun raceN$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; } +public final class arrow/fx/coroutines/RaceUntilKt { + public static final fun raceUntil (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;[Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun raceUntil (Lkotlin/jvm/functions/Function1;[Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun raceUntil$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;[Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; +} + public abstract interface annotation class arrow/fx/coroutines/ResourceDSL : java/lang/annotation/Annotation { } diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Race3.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Race3.kt index 7ab2d336670..bfbef36a9b3 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Race3.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Race3.kt @@ -102,7 +102,7 @@ public suspend inline fun raceN( } @PublishedApi -internal suspend fun cancelAndCompose(first: Deferred<*>, second: Deferred<*>): Unit { +internal suspend fun cancelAndCompose(first: Deferred<*>, second: Deferred<*>) { val e1 = try { first.cancelAndJoin() null diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/RaceUntil.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/RaceUntil.kt new file mode 100644 index 00000000000..95885220b67 --- /dev/null +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/RaceUntil.kt @@ -0,0 +1,41 @@ +package arrow.fx.coroutines + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.channelFlow +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.launch +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +// Based on https://medium.com/@sam-cooper/custom-kotlin-coroutine-races-77161141b2ac + +/** + * Races the participants [tasks] in parallel on the [Dispatchers.Default], + * until one of them returns a value which satisfies [condition]. + * The winner of the race cancels the other participants. + * Cancelling the operation cancels all participants. + */ +public suspend inline fun raceUntil( + crossinline condition: (A) -> Boolean, + vararg tasks: suspend CoroutineScope.() -> A +): A = raceUntil(Dispatchers.Default, condition, *tasks) + +/** + * Races the participants [tasks] in parallel on the provided [ctx], + * until one of them returns a value which satisfies [condition]. + * The winner of the race cancels the other participants. + * Cancelling the operation cancels all participants. + */ +public suspend inline fun raceUntil( + ctx: CoroutineContext = EmptyCoroutineContext, + crossinline condition: (A) -> Boolean, + vararg tasks: suspend CoroutineScope.() -> A +): A { + return coroutineScope { + channelFlow { + tasks.forEach { launch(ctx) { send(it()) } } + }.first { condition(it) } + } +} diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/flow.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/flow.kt index 69d74fb5002..b31821058fe 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/flow.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/flow.kt @@ -263,7 +263,7 @@ public fun fixedRate( } else { val ticks: Long = ((now - lastAwakeAt).inWholeMilliseconds - 1) / period.inWholeMilliseconds when { - ticks < 0L -> Unit + ticks < 0L -> { } ticks == 0L || dampen -> emit(Unit) else -> repeat(ticks.toInt()) { emit(Unit) } }