Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
307d8f5
replace delegate based server impl with call handler
marcoferrer Dec 31, 2019
4c4078b
fix codeblock for bind service funspec
marcoferrer Dec 31, 2019
23de00c
refactor server call server streaming call handler impl to flow
marcoferrer Dec 31, 2019
e523216
refactor client call client streaming impl
marcoferrer Jan 1, 2020
6b2d2f2
refactor server bidi call handler
marcoferrer Jan 12, 2020
a34a0c5
wait for call completion before asserting in server bidi test
marcoferrer Jan 12, 2020
0b5af23
only check call before signaling ready channel
marcoferrer Jan 12, 2020
02bf110
increase timeout for high volume bidi call test
marcoferrer Jan 12, 2020
979174d
remove usage of unconfined dispatcher in outbound message actor
marcoferrer Jan 12, 2020
27e77d7
wait for call to close before asserting
marcoferrer Jan 12, 2020
07d5193
check exception type in cancellation handler
marcoferrer Jan 12, 2020
ae9777e
address deadlock in bidi integration test
marcoferrer Jan 12, 2020
7a6fad1
fix ready observer ready handler to always signal
marcoferrer Jan 12, 2020
5e35e7c
inline requests for messages from the call stream
marcoferrer Jan 12, 2020
7e3ae1f
refactor exception and status assertions
marcoferrer Jan 13, 2020
092c190
update bidi integration test and introduce suspendForever util
marcoferrer Jan 13, 2020
fb4e1df
simplify server streaming atomic invoke test
marcoferrer Jan 13, 2020
fe7b87a
remove usage of runBlocking from client streaming tests
marcoferrer Jan 13, 2020
30c4e64
remove redundant await ready in server streaming call handlers
marcoferrer Jan 13, 2020
b232741
debug ci test
marcoferrer Jan 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions kroto-plus-coroutines/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ protobuf {
}
}

test {
jacoco {
// These deprecated extensions are no longer covered by tests since
// they're not referenced in generated sources anymore. They will be
// removed in the future but remain as to provide backwards compatibility
// with existing generated sources.
excludes += ['com/github/marcoferrer/krotoplus/coroutines/server/ServerCallsKt.class']
}
}

jacoco {
toolVersion = "0.8.5"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package com.github.marcoferrer.krotoplus.coroutines.call

import com.github.marcoferrer.krotoplus.coroutines.CALL_OPTION_COROUTINE_CONTEXT
import com.github.marcoferrer.krotoplus.coroutines.asContextElement
import io.grpc.CallOptions
import io.grpc.ClientCall
import io.grpc.MethodDescriptor
import io.grpc.Status
Expand Down Expand Up @@ -88,6 +90,16 @@ internal fun Throwable.toRpcException(): Throwable =
internal fun MethodDescriptor<*, *>.getCoroutineName(): CoroutineName =
CoroutineName(fullMethodName)

internal fun newRpcScope(
callOptions: CallOptions,
methodDescriptor: MethodDescriptor<*, *>,
grpcContext: io.grpc.Context = io.grpc.Context.current()
): CoroutineScope = newRpcScope(
callOptions.getOption(CALL_OPTION_COROUTINE_CONTEXT),
methodDescriptor,
grpcContext
)

internal fun newRpcScope(
coroutineContext: CoroutineContext,
methodDescriptor: MethodDescriptor<*, *>,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2019 Kroto+ Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.marcoferrer.krotoplus.coroutines.call

import io.grpc.stub.CallStreamObserver
import kotlinx.coroutines.channels.Channel

internal fun CallStreamObserver<*>.newCallReadyObserver(): CallReadyObserver =
CallReadyObserver(this)

internal class CallReadyObserver(
callStreamObserver: CallStreamObserver<*>
) : Runnable {

private val notificationChannel = Channel<READY_TOKEN>(1)

private var hasRan = false

private val callStreamObserver: CallStreamObserver<*> = callStreamObserver
.apply { setOnReadyHandler(this@CallReadyObserver) }

suspend fun isReady(): Boolean {
// Suspend until the call is ready.
// If the call is cancelled before then, an exception
// will be thrown.
awaitReady()
return true
}

suspend fun awaitReady() {
// If our handler hasnt run yet we will want to
// suspend immediately since its early enough that
// calls to `callStreamObserver.isReady` will throw
// and NPE
if(!hasRan)
notificationChannel.receive()
// By the time the on ready handler is invoked, calls
// to `callStreamObserver.isReady` could return false
// Here we will continue to poll notifications until
// the call is ready. For more details reference the
// documentation for `callStreamObserver.setOnReadyHandler()`
while(!callStreamObserver.isReady){
notificationChannel.receive()
}
}

fun cancel(t: Throwable? = null){
notificationChannel.close(t)
}

private fun signalReady() = notificationChannel.offer(READY_TOKEN)

@Deprecated(
message = "This method should not be called directly",
level = DeprecationLevel.HIDDEN)
override fun run() {
if(!hasRan) {
hasRan = true
}
signalReady()
}

companion object{
private object READY_TOKEN
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,32 @@ package com.github.marcoferrer.krotoplus.coroutines.client

import com.github.marcoferrer.krotoplus.coroutines.CALL_OPTION_COROUTINE_CONTEXT
import com.github.marcoferrer.krotoplus.coroutines.call.bindScopeCancellationToCall
import com.github.marcoferrer.krotoplus.coroutines.call.completeSafely
import com.github.marcoferrer.krotoplus.coroutines.call.newRpcScope
import com.github.marcoferrer.krotoplus.coroutines.withCoroutineContext
import io.grpc.CallOptions
import io.grpc.MethodDescriptor
import io.grpc.Status
import io.grpc.stub.AbstractStub
import io.grpc.stub.ClientCallStreamObserver
import io.grpc.stub.ClientCalls.asyncBidiStreamingCall
import io.grpc.stub.ClientCalls.asyncClientStreamingCall
import io.grpc.stub.ClientCalls.asyncServerStreamingCall
import io.grpc.stub.ClientCalls.asyncUnaryCall
import io.grpc.stub.ClientResponseObserver
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ProducerScope
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.produceIn
import kotlinx.coroutines.suspendCancellableCoroutine
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException

Expand Down Expand Up @@ -121,41 +120,27 @@ public fun <ReqT, RespT> clientCallServerStreaming(
callOptions: CallOptions = CallOptions.DEFAULT
): ReceiveChannel<RespT> {

val observerAdapter = ResponseObserverChannelAdapter<ReqT, RespT>()
val responseObserver = ServerStreamingResponseObserver<ReqT, RespT>()
val rpcScope = newRpcScope(callOptions.getOption(CALL_OPTION_COROUTINE_CONTEXT), method)
val responseFlow = callbackFlow<RespT> flow@ {
observerAdapter.scope = this
val responseFlow = callbackFlow<RespT> flow@{
responseObserver.responseProducerScope = this

val call = grpcChannel
.newCall(method, callOptions.withCoroutineContext(coroutineContext))
.beforeCancellation { message, cause ->
observerAdapter.beforeCallCancellation(message, cause)
responseObserver.beforeCallCancellation(message, cause)
}

val job = coroutineContext[Job]!!

// Start the RPC Call
asyncServerStreamingCall<ReqT, RespT>(call, request, observerAdapter)

// If our parent job is cancelled before we can
// start the call then we need to propagate the
// cancellation to the underlying call
job.invokeOnCompletion { error ->
// Our job can be cancelled after completion due to the inner machinery
// of kotlinx.coroutines.flow.Channels.kt.emitAll(). Its final operation
// after receiving a close is a call to channel.cancelConsumed(cause).
// Even if it doesnt encounter an exception it will cancel with null.
// We will only invoke cancel on the call
if(job.isCancelled && observerAdapter.isActive){
call.cancel(MESSAGE_CLIENT_CANCELLED_CALL, error)
}
}
asyncServerStreamingCall<ReqT, RespT>(call, request, responseObserver)

bindScopeCompletionToCall(responseObserver)

suspendCancellableCoroutine<Unit> { cont ->
// Here we need to handle not only parent job cancellation
// but calls to `channel.cancel(...)` as well.
cont.invokeOnCancellation { error ->
if (observerAdapter.isActive) {
if (responseObserver.isActive) {
call.cancel(MESSAGE_CLIENT_CANCELLED_CALL, error)
}
}
Expand All @@ -168,12 +153,12 @@ public fun <ReqT, RespT> clientCallServerStreaming(
}

// Use buffer UNLIMITED so that we dont drop any inbound messages
return flow { emitAll(responseFlow.buffer(Channel.UNLIMITED)) }
.onEach {
if(observerAdapter.isActive){
observerAdapter.callStreamObserver.request(1)
}
return flow {
responseFlow.buffer(Channel.UNLIMITED).collect{ message ->
emit(message)
responseObserver.callStreamObserver.request(1)
}
}
// We use buffer RENDEZVOUS on the outer flow so that our
// `onEach` operator is only invoked each time a message is
// collected instead of each time a message is received from
Expand All @@ -188,22 +173,26 @@ public fun <ReqT, RespT, T : AbstractStub<T>> T.clientCallBidiStreaming(
): ClientBidiCallChannel<ReqT, RespT> =
clientCallBidiStreaming(method, channel, callOptions)



public fun <ReqT, RespT> clientCallBidiStreaming(
method: MethodDescriptor<ReqT, RespT>,
channel: io.grpc.Channel,
callOptions: CallOptions = CallOptions.DEFAULT
): ClientBidiCallChannel<ReqT, RespT> {

val initialContext = callOptions.getOption(CALL_OPTION_COROUTINE_CONTEXT)
with(newRpcScope(initialContext, method)) {
val rpcScope = newRpcScope(callOptions, method)
val responseObserver = BidiStreamingResponseObserver<ReqT, RespT>(rpcScope)

val call = channel.newCall(method, callOptions.withCoroutineContext(coroutineContext))
val callChannel = ClientBidiCallChannelImpl<ReqT, RespT>(coroutineContext)
asyncBidiStreamingCall<ReqT, RespT>(call, callChannel)
bindScopeCancellationToCall(call)
val call = channel
.newCall(method, callOptions.withCoroutineContext(rpcScope.coroutineContext))
.beforeCancellation { message, cause ->
responseObserver.beforeCallCancellation(message, cause)
}

return callChannel
}
asyncBidiStreamingCall<ReqT, RespT>(call, responseObserver)

return responseObserver.asClientBidiCallChannel()
}

public fun <ReqT, RespT, T : AbstractStub<T>> T.clientCallClientStreaming(
Expand All @@ -216,14 +205,62 @@ public fun <ReqT, RespT> clientCallClientStreaming(
channel: io.grpc.Channel,
callOptions: CallOptions = CallOptions.DEFAULT
): ClientStreamingCallChannel<ReqT, RespT> {
val initialContext = callOptions.getOption(CALL_OPTION_COROUTINE_CONTEXT)
with(newRpcScope(initialContext, method)) {
val call = channel.newCall(method, callOptions.withCoroutineContext(coroutineContext))
val callChannel = ClientStreamingCallChannelImpl<ReqT, RespT>(coroutineContext)
asyncClientStreamingCall<ReqT, RespT>(call, callChannel)
bindScopeCancellationToCall(call)

return callChannel
val rpcScope = newRpcScope(callOptions, method)
val response = CompletableDeferred<RespT>(parent = rpcScope.coroutineContext[Job])
val requestChannel = rpcScope.actor<ReqT>(capacity = Channel.RENDEZVOUS) {
val responseObserver = ClientStreamingResponseObserver(
[email protected], response
)

val call = channel
.newCall(method, callOptions.withCoroutineContext(coroutineContext))
.beforeCancellation { message, cause ->
responseObserver.beforeCallCancellation(message, cause)
}

val requestObserver = asyncClientStreamingCall<ReqT, RespT>(call, responseObserver)

bindScopeCompletionToCall(responseObserver)

var error: Throwable? = null
try {
val iter = [email protected]()
while(responseObserver.isReady() && iter.hasNext()){
requestObserver.onNext(iter.next())
}
} catch (e: Throwable) {
error = e
} finally {
if(responseObserver.isActive) {
requestObserver.completeSafely(error, convertError = false)
}
}
}

return object : ClientStreamingCallChannel<ReqT, RespT>, SendChannel<ReqT> by requestChannel {
override val requestChannel: SendChannel<ReqT>
get() = requestChannel
override val response: Deferred<RespT>
get() = response
}
}

internal fun CoroutineScope.bindScopeCompletionToCall(
observer: StatefulClientResponseObserver<*, *>
){
val job = coroutineContext[Job]!!
// If our parent job is cancelled before we can
// start the call then we need to propagate the
// cancellation to the underlying call
job.invokeOnCompletion { error ->
// Our job can be cancelled after completion due to the inner machinery
// of kotlinx.coroutines.flow.Channels.kt.emitAll(). Its final operation
// after receiving a close is a call to channel.cancelConsumed(cause).
// Even if it doesnt encounter an exception it will cancel with null.
// We will only invoke cancel on the call
if (job.isCancelled && observer.isActive) {
observer.callStreamObserver.cancel(MESSAGE_CLIENT_CANCELLED_CALL, error)
}
}
}
Loading