Skip to content

Commit 7bfdb56

Browse files
authored
Non suspend flow (#299)
* KRPC-133 Don't require `suspend` modifier in functions returning `Flow` * gold tests * update compiler tests * Added cancellation tests and GC tests, checked exceptions * apiDump * detekt * uncomment subplugin options * fix timeout * review comments
1 parent d702ecf commit 7bfdb56

File tree

57 files changed

+848
-72
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+848
-72
lines changed

compiler-plugin/compiler-plugin-backend/src/main/core/kotlinx/rpc/codegen/extension/RpcIrContext.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.rpc.codegen.extension
@@ -176,6 +176,10 @@ internal class RpcIrContext(
176176
rpcClient.namedFunction("call")
177177
}
178178

179+
val rpcClientCallServerStreaming by lazy {
180+
rpcClient.namedFunction("callServerStreaming")
181+
}
182+
179183
val provideStubContext by lazy {
180184
rpcClient.namedFunction("provideStubContext")
181185
}

compiler-plugin/compiler-plugin-backend/src/main/core/kotlinx/rpc/codegen/extension/RpcStubGenerator.kt

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.rpc.codegen.extension
@@ -530,7 +530,7 @@ internal class RpcStubGenerator(
530530
returnType = method.function.returnType
531531
modality = Modality.OPEN
532532

533-
isSuspend = true
533+
isSuspend = method.function.isSuspend
534534
}.apply {
535535
val functionThisReceiver = vsApi {
536536
stubClassThisReceiver.copyToVS(this@apply, origin = IrDeclarationOrigin.DEFINED)
@@ -550,6 +550,20 @@ internal class RpcStubGenerator(
550550
overriddenSymbols = listOf(method.function.symbol)
551551

552552
body = irBuilder(symbol).irBlockBody {
553+
if (method.function.isNonSuspendingWithFlowReturn()) {
554+
+irReturn(
555+
irRpcMethodClientCall(
556+
method = method,
557+
functionThisReceiver = functionThisReceiver,
558+
isMethodObject = isMethodObject,
559+
methodClass = methodClass,
560+
arguments = arguments,
561+
)
562+
)
563+
564+
return@irBlockBody
565+
}
566+
553567
+irReturn(
554568
irCall(
555569
callee = ctx.functions.scopedClientCall,
@@ -742,8 +756,14 @@ internal class RpcStubGenerator(
742756
methodClass: IrClass,
743757
arguments: List<IrValueParameter>,
744758
): IrCall {
759+
val callee = if (method.function.isNonSuspendingWithFlowReturn()) {
760+
ctx.functions.rpcClientCallServerStreaming.symbol
761+
} else {
762+
ctx.functions.rpcClientCall.symbol
763+
}
764+
745765
val call = irCall(
746-
callee = ctx.functions.rpcClientCall.symbol,
766+
callee = callee,
747767
type = method.function.returnType,
748768
typeArgumentsCount = 1,
749769
).apply {
@@ -1218,13 +1238,15 @@ internal class RpcStubGenerator(
12181238
* ),
12191239
* ...
12201240
* ),
1241+
* isNonSuspendFunction = !function.isSuspend,
12211242
* )
12221243
*```
12231244
*
12241245
* Where:
12251246
* - `<callable-name>` - the name of the method (field)
12261247
* - `<callable-data-type>` - a method class for a method and `FieldDataObject` for fields
1227-
* - `<callable-return-type>` - the return type for the method and the field type for a field
1248+
* - `<callable-return-type>` - the return type for the method and the field type for a field.
1249+
* For a non-suspending flow the return type is its element type
12281250
* - `<callable-invokator>` - an invokator, previously generated by [generateInvokators]
12291251
* - `<method-parameter-name-k>` - if a method, its k-th parameter name
12301252
* - `<method-parameter-type-k>` - if a method, its k-th parameter type
@@ -1253,7 +1275,16 @@ internal class RpcStubGenerator(
12531275
putValueArgument(1, irRpcTypeCall(dataType))
12541276

12551277
val returnType = when (callable) {
1256-
is ServiceDeclaration.Method -> callable.function.returnType
1278+
is ServiceDeclaration.Method -> when {
1279+
callable.function.isNonSuspendingWithFlowReturn() -> {
1280+
(callable.function.returnType as IrSimpleType).arguments.single().typeOrFail
1281+
}
1282+
1283+
else -> {
1284+
callable.function.returnType
1285+
}
1286+
}
1287+
12571288
is ServiceDeclaration.FlowField -> callable.property.getterOrFail.returnType
12581289
}
12591290

@@ -1321,9 +1352,14 @@ internal class RpcStubGenerator(
13211352
}
13221353

13231354
putValueArgument(4, arrayOfCall)
1355+
putValueArgument(5, booleanConst(callable is ServiceDeclaration.Method && !callable.function.isSuspend))
13241356
}
13251357
}
13261358

1359+
private fun IrSimpleFunction.isNonSuspendingWithFlowReturn(): Boolean {
1360+
return returnType.classOrNull == ctx.flow && !isSuspend
1361+
}
1362+
13271363
/**
13281364
* Accessor function for the `callableMap` property
13291365
* Defined in `RpcServiceDescriptor`
@@ -1525,7 +1561,7 @@ internal class RpcStubGenerator(
15251561
}
15261562

15271563
/**
1528-
* IR call of the `RpcType(KType, Array<Annotation>)` function
1564+
* IR call of the `RpcType(KType)` function
15291565
*/
15301566
private fun irRpcTypeCall(type: IrType): IrConstructorCallImpl {
15311567
return vsApi {
@@ -1644,6 +1680,13 @@ internal class RpcStubGenerator(
16441680
value = value,
16451681
)
16461682

1683+
private fun booleanConst(value: Boolean) = IrConstImpl.boolean(
1684+
startOffset = UNDEFINED_OFFSET,
1685+
endOffset = UNDEFINED_OFFSET,
1686+
type = ctx.irBuiltIns.booleanType,
1687+
value = value,
1688+
)
1689+
16471690
private fun <T> vsApi(body: VersionSpecificApi.() -> T): T {
16481691
return ctx.versionSpecificApi.body()
16491692
}

compiler-plugin/compiler-plugin-k2/src/main/core/kotlinx/rpc/codegen/StrictMode.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.rpc.codegen
@@ -52,8 +52,8 @@ fun CompilerConfiguration.strictModeAggregator(): StrictModeAggregator {
5252
stateFlow = get(StrictModeConfigurationKeys.STATE_FLOW, StrictMode.WARNING),
5353
sharedFlow = get(StrictModeConfigurationKeys.SHARED_FLOW, StrictMode.WARNING),
5454
nestedFlow = get(StrictModeConfigurationKeys.NESTED_FLOW, StrictMode.WARNING),
55-
streamScopedFunctions = get(StrictModeConfigurationKeys.STREAM_SCOPED_FUNCTIONS, StrictMode.NONE),
56-
suspendingServerStreaming = get(StrictModeConfigurationKeys.SUSPENDING_SERVER_STREAMING, StrictMode.NONE),
55+
streamScopedFunctions = get(StrictModeConfigurationKeys.STREAM_SCOPED_FUNCTIONS, StrictMode.WARNING),
56+
suspendingServerStreaming = get(StrictModeConfigurationKeys.SUSPENDING_SERVER_STREAMING, StrictMode.WARNING),
5757
notTopLevelServerFlow = get(StrictModeConfigurationKeys.NOT_TOP_LEVEL_SERVER_FLOW, StrictMode.WARNING),
5858
fields = get(StrictModeConfigurationKeys.FIELDS, StrictMode.WARNING),
5959
)

core/api/core.api

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,14 @@ public final class kotlinx/rpc/RpcCall {
3232
public abstract interface class kotlinx/rpc/RpcClient : kotlinx/coroutines/CoroutineScope {
3333
public abstract fun call (Lkotlinx/rpc/RpcCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
3434
public abstract fun callAsync (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/rpc/RpcCall;)Lkotlinx/coroutines/Deferred;
35+
public abstract fun callServerStreaming (Lkotlinx/rpc/RpcCall;)Lkotlinx/coroutines/flow/Flow;
3536
public abstract fun provideStubContext (J)Lkotlin/coroutines/CoroutineContext;
3637
}
3738

39+
public final class kotlinx/rpc/RpcClient$DefaultImpls {
40+
public static fun callServerStreaming (Lkotlinx/rpc/RpcClient;Lkotlinx/rpc/RpcCall;)Lkotlinx/coroutines/flow/Flow;
41+
}
42+
3843
public abstract interface annotation class kotlinx/rpc/RpcEagerField : java/lang/annotation/Annotation {
3944
}
4045

@@ -59,12 +64,13 @@ public abstract interface annotation class kotlinx/rpc/annotations/Rpc : java/la
5964
}
6065

6166
public final class kotlinx/rpc/descriptor/RpcCallable {
62-
public fun <init> (Ljava/lang/String;Lkotlinx/rpc/descriptor/RpcType;Lkotlinx/rpc/descriptor/RpcType;Lkotlinx/rpc/descriptor/RpcInvokator;[Lkotlinx/rpc/descriptor/RpcParameter;)V
67+
public fun <init> (Ljava/lang/String;Lkotlinx/rpc/descriptor/RpcType;Lkotlinx/rpc/descriptor/RpcType;Lkotlinx/rpc/descriptor/RpcInvokator;[Lkotlinx/rpc/descriptor/RpcParameter;Z)V
6368
public final fun getDataType ()Lkotlinx/rpc/descriptor/RpcType;
6469
public final fun getInvokator ()Lkotlinx/rpc/descriptor/RpcInvokator;
6570
public final fun getName ()Ljava/lang/String;
6671
public final fun getParameters ()[Lkotlinx/rpc/descriptor/RpcParameter;
6772
public final fun getReturnType ()Lkotlinx/rpc/descriptor/RpcType;
73+
public final fun isNonSuspendFunction ()Z
6874
}
6975

7076
public abstract interface class kotlinx/rpc/descriptor/RpcInvokator {

core/src/commonMain/kotlin/kotlinx/rpc/RpcClient.kt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kotlinx.rpc
66

77
import kotlinx.coroutines.CoroutineScope
88
import kotlinx.coroutines.Deferred
9+
import kotlinx.coroutines.flow.Flow
910
import kotlin.coroutines.CoroutineContext
1011

1112
@Deprecated("Use RpcClient instead", ReplaceWith("RpcClient"), level = DeprecationLevel.ERROR)
@@ -36,8 +37,25 @@ public interface RpcClient : CoroutineScope {
3637
* that is needed to route it properly to the server.
3738
* @return actual result of the call, for example, data from the server
3839
*/
40+
@Deprecated(
41+
"This method was primarily used for fields in RPC services, which are now deprecated. " +
42+
"See https://kotlin.github.io/kotlinx-rpc/strict-mode.html fields guide for more information"
43+
)
3944
public fun <T> callAsync(serviceScope: CoroutineScope, call: RpcCall): Deferred<T>
4045

46+
/**
47+
* This method is used by generated clients to perform a call to the server
48+
* that returns a streaming flow.
49+
*
50+
* @param T type of the result
51+
* @param call an object that contains all required information about the called method,
52+
* that is needed to route it properly to the server.
53+
* @return the actual result of the call, for example, data from the server
54+
*/
55+
public fun <T> callServerStreaming(call: RpcCall): Flow<T> {
56+
error("Non-suspending server streaming is not supported by this client")
57+
}
58+
4159
/**
4260
* Provides child [CoroutineContext] for a new [RemoteService] service stub.
4361
*

core/src/commonMain/kotlin/kotlinx/rpc/descriptor/RpcServiceDescriptor.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public class RpcCallable<@Rpc T : Any>(
6262
public val returnType: RpcType,
6363
public val invokator: RpcInvokator<T>,
6464
public val parameters: Array<RpcParameter>,
65+
public val isNonSuspendFunction: Boolean,
6566
)
6667

6768
@ExperimentalRpcApi

gradle-plugin/src/main/kotlin/kotlinx/rpc/Extensions.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
@file:Suppress("unused")
@@ -64,16 +64,16 @@ open class RpcStrictModeExtension @Inject constructor(objects: ObjectFactory) {
6464
val nestedFlow: Property<RpcStrictMode> = objects.strictModeProperty()
6565

6666
/**
67-
* WIP: https://youtrack.jetbrains.com/issue/KRPC-133
68-
* Will be enabled later, when an alternative is ready.
67+
* StreamScoped functions are deprecated.
6968
*/
70-
private val streamScopedFunctions: Property<RpcStrictMode> = objects.strictModeProperty(RpcStrictMode.NONE)
69+
val streamScopedFunctions: Property<RpcStrictMode> = objects.strictModeProperty()
7170

7271
/**
73-
* WIP: https://youtrack.jetbrains.com/issue/KRPC-133
74-
* Will be enabled later, when an alternative is ready.
72+
* Suspending functions with server-streaming are deprecated in RPC.
73+
*
74+
* Consider returning a Flow in a non-suspending function.
7575
*/
76-
private val suspendingServerStreaming: Property<RpcStrictMode> = objects.strictModeProperty(RpcStrictMode.NONE)
76+
val suspendingServerStreaming: Property<RpcStrictMode> = objects.strictModeProperty()
7777

7878
/**
7979
* Not top-level flows in the return value are deprecated in RPC for streaming.

gradle-plugin/src/main/kotlin/kotlinx/rpc/compilerPlugins.kt

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
@file:Suppress("detekt.ClassNaming", "ClassName")
@@ -36,12 +36,11 @@ class CompilerPluginCli : KotlinCompilerPluginSupportPlugin by compilerPlugin({
3636
SubpluginOption("strict-stateFlow", strict.stateFlow.get().toCompilerArg()),
3737
SubpluginOption("strict-sharedFlow", strict.sharedFlow.get().toCompilerArg()),
3838
SubpluginOption("strict-nested-flow", strict.nestedFlow.get().toCompilerArg()),
39-
// WIP: https://youtrack.jetbrains.com/issue/KRPC-133
40-
// SubpluginOption("strict-stream-scope", strict.streamScopedFunctions.get().toCompilerArg()),
41-
// SubpluginOption(
42-
// "strict-suspending-server-streaming",
43-
// strict.suspendingServerStreaming.get().toCompilerArg()
44-
// ),
39+
SubpluginOption("strict-stream-scope", strict.streamScopedFunctions.get().toCompilerArg()),
40+
SubpluginOption(
41+
"strict-suspending-server-streaming",
42+
strict.suspendingServerStreaming.get().toCompilerArg()
43+
),
4544
SubpluginOption("strict-not-top-level-server-flow", strict.notTopLevelServerFlow.get().toCompilerArg()),
4645
SubpluginOption("strict-fields", strict.fields.get().toCompilerArg()),
4746
@OptIn(RpcDangerousApi::class)

krpc/krpc-client/api/krpc-client.api

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ public abstract class kotlinx/rpc/krpc/client/KrpcClient : kotlinx/rpc/krpc/inte
22
public fun <init> (Lkotlinx/rpc/krpc/KrpcConfig$Client;Lkotlinx/rpc/krpc/KrpcTransport;)V
33
public final fun call (Lkotlinx/rpc/RpcCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
44
public fun callAsync (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/rpc/RpcCall;)Lkotlinx/coroutines/Deferred;
5+
public fun callServerStreaming (Lkotlinx/rpc/RpcCall;)Lkotlinx/coroutines/flow/Flow;
56
protected final fun getConfig ()Lkotlinx/rpc/krpc/KrpcConfig$Client;
67
public synthetic fun getConfig ()Lkotlinx/rpc/krpc/KrpcConfig;
78
public final fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext;

0 commit comments

Comments
 (0)