Skip to content

Commit 1b9bd38

Browse files
authored
Bug fixes and api updates (#48)
* reduce visibility of FlowControlledInboundStreamObserver to internal * default server rpc method invocation to CoroutineStart.ATOMIC * revise coroutine context call option builders * add tests for atomic server method invocation * remove message propagation from rpc exception mapper * add tests for new stub context extensions * refactor atomic server tests
1 parent b6b5d94 commit 1b9bd38

File tree

19 files changed

+457
-96
lines changed

19 files changed

+457
-96
lines changed

.travis.yml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
language: java
22
jdk:
33
- oraclejdk8
4+
dist: trusty
45
sudo: false
56
addons:
67
apt:
@@ -16,7 +17,11 @@ before_cache:
1617
- rm -rf $HOME/.gradle/caches/*/fileHashes/
1718
jobs:
1819
include:
19-
- stage: "Tests"
20-
script: ./gradlew build && cd example-project && ./gradlew test
20+
- name: Gradle Check
21+
install:
22+
- ./gradlew assemble
23+
script:
24+
- ./gradlew check
25+
- cd example-project && ./gradlew test
2126
after_success:
2227
- bash <(curl -s https://codecov.io/bash)

CHANGELOG.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
## Version 0.3.1
1+
## Version 0.4.0
22
_\*\*-\*\*\-\*\*_
33
* New: Update to kotlin `1.3.31`
44
* New: Update to kotlin Coroutines `1.2.1`
@@ -9,7 +9,12 @@ _\*\*-\*\*\-\*\*_
99
* Fix: Address bug in parallelization of generator execution
1010

1111
#### Coroutines
12-
* Fix: Disable auto flow control for inbound client and server streams during bidi calls
12+
* New: Default server method execution to `CoroutineStart.ATOMIC`
13+
* New: Introduce abstract stub ext for concatenating coroutine contexts, `AbstactStub.plusContext`.
14+
* Fix: Don't propagate message for `UNKNOWN` exceptions in rpc exception mapper
15+
* Fix: Disable auto flow control for inbound client and server streams during bidi calls
16+
* Fix: Reduce visibility of `FlowControlledInboundStreamObserver` to `internal`
17+
* Deprecated: `AbstractStub.coroutineContext` ext in favor of `AbstractStub.context`
1318

1419

1520
## Version 0.3.0

kroto-plus-coroutines/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ dependencies {
3434
testImplementation project(':test-api:grpc')
3535
testImplementation project(':test-api:java')
3636
testImplementation "io.mockk:mockk:${Versions.mockk}"
37+
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:${Versions.coroutines}"
3738
}
3839

3940
tasks.withType(JavaCompile) {

kroto-plus-coroutines/src/main/kotlin/com/github/marcoferrer/krotoplus/coroutines/CoroutineCallOptions.kt

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,27 +32,33 @@ public val CALL_OPTION_COROUTINE_CONTEXT: CallOptions.Key<CoroutineContext> =
3232
/**
3333
* Get the coroutineContext the receiving stub is using for cooperative cancellation.
3434
*/
35+
@Deprecated("Use extension property context instead", ReplaceWith("context"))
3536
public val <T : AbstractStub<T>> T.coroutineContext: CoroutineContext
3637
get() = callOptions.getOption(CALL_OPTION_COROUTINE_CONTEXT)
3738

39+
40+
public val <T : AbstractStub<T>> T.context: CoroutineContext
41+
get() = callOptions.getOption(CALL_OPTION_COROUTINE_CONTEXT)
42+
3843
/**
3944
* Returns a new stub with the value of [coroutineContext] attached as a [CallOptions].
4045
* Any rpcs invoked on the resulting stub will use this context to participate in cooperative cancellation.
4146
*/
42-
public fun <T : AbstractStub<T>> T.withCoroutineContext(context: CoroutineContext): T{
43-
val newContext = this.coroutineContext + context
44-
return this.withOption(CALL_OPTION_COROUTINE_CONTEXT, newContext)
45-
}
47+
public fun <T : AbstractStub<T>> T.withCoroutineContext(context: CoroutineContext): T =
48+
withOption(CALL_OPTION_COROUTINE_CONTEXT, context)
4649

50+
public fun <T : AbstractStub<T>> T.plusCoroutineContext(context: CoroutineContext): T =
51+
withOption(CALL_OPTION_COROUTINE_CONTEXT, this.context + context)
4752

4853
/**
4954
* Returns a new stub with the 'coroutineContext' from the current suspension attached as a [CallOptions].
5055
* Any rpcs invoked on the resulting stub will use this context to participate in cooperative cancellation.
5156
*/
52-
public suspend fun <T : AbstractStub<T>> T.withCoroutineContext(): T {
53-
val newContext = this.coroutineContext + kotlin.coroutines.coroutineContext
54-
return this.withOption(CALL_OPTION_COROUTINE_CONTEXT, newContext)
55-
}
57+
public suspend fun <T : AbstractStub<T>> T.withCoroutineContext(): T =
58+
withOption(CALL_OPTION_COROUTINE_CONTEXT, kotlin.coroutines.coroutineContext)
59+
60+
public suspend fun <T : AbstractStub<T>> T.plusCoroutineContext(): T =
61+
withOption(CALL_OPTION_COROUTINE_CONTEXT, context + kotlin.coroutines.coroutineContext)
5662

5763
internal fun CallOptions.withCoroutineContext(coroutineContext: CoroutineContext): CallOptions =
5864
this.withOption(CALL_OPTION_COROUTINE_CONTEXT, coroutineContext)

kroto-plus-coroutines/src/main/kotlin/com/github/marcoferrer/krotoplus/coroutines/call/CallExts.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,7 @@ internal fun Throwable.toRpcException(): Throwable =
7979
statusFromThrowable
8080
}
8181

82-
status
83-
.withDescription(this.message)
84-
.asRuntimeException(Status.trailersFromThrowable(this))
82+
status.asRuntimeException(Status.trailersFromThrowable(this))
8583
}
8684
}
8785

kroto-plus-coroutines/src/main/kotlin/com/github/marcoferrer/krotoplus/coroutines/call/FlowControlledInboundStreamObserver.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import kotlinx.coroutines.launch
2525
import java.util.concurrent.atomic.AtomicBoolean
2626
import java.util.concurrent.atomic.AtomicInteger
2727

28-
interface FlowControlledInboundStreamObserver<T> : StreamObserver<T>, CoroutineScope {
28+
internal interface FlowControlledInboundStreamObserver<T> : StreamObserver<T>, CoroutineScope {
2929

3030
val inboundChannel: Channel<T>
3131

kroto-plus-coroutines/src/main/kotlin/com/github/marcoferrer/krotoplus/coroutines/server/ServerCalls.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import io.grpc.StatusRuntimeException
2828
import io.grpc.stub.ServerCallStreamObserver
2929
import io.grpc.stub.StreamObserver
3030
import kotlinx.coroutines.CoroutineScope
31+
import kotlinx.coroutines.CoroutineStart
3132
import kotlinx.coroutines.ExperimentalCoroutinesApi
3233
import kotlinx.coroutines.Job
3334
import kotlinx.coroutines.cancel
@@ -45,7 +46,7 @@ public fun <ReqT, RespT> ServiceScope.serverCallUnary(
4546
) {
4647
with(newRpcScope(initialContext, methodDescriptor)) rpcScope@ {
4748
bindToClientCancellation(responseObserver as ServerCallStreamObserver<*>)
48-
launch {
49+
launch(start = CoroutineStart.ATOMIC) {
4950
try{
5051
responseObserver.onNext(block())
5152
responseObserver.onCompleted()
@@ -66,7 +67,7 @@ public fun <ReqT, RespT> ServiceScope.serverCallServerStreaming(
6667
with(newRpcScope(initialContext, methodDescriptor)) {
6768
bindToClientCancellation(serverCallObserver)
6869
applyOutboundFlowControl(serverCallObserver,responseChannel)
69-
launch {
70+
launch(start = CoroutineStart.ATOMIC) {
7071
try{
7172
block(responseChannel)
7273
responseChannel.close()
@@ -112,7 +113,7 @@ public fun <ReqT, RespT> ServiceScope.serverCallClientStreaming(
112113
}
113114
)
114115

115-
launch {
116+
launch(start = CoroutineStart.ATOMIC) {
116117
try {
117118
responseObserver.onNext(block(requestChannel))
118119
responseObserver.onCompleted()
@@ -162,7 +163,7 @@ public fun <ReqT, RespT> ServiceScope.serverCallBidiStreaming(
162163
}
163164
)
164165

165-
launch {
166+
launch(start = CoroutineStart.ATOMIC) {
166167
serverCallObserver.request(1)
167168
try {
168169
block(requestChannel, responseChannel)

kroto-plus-coroutines/src/test/kotlin/com/github/marcoferrer/krotoplus/coroutines/CallOptionsTest.kt

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ package com.github.marcoferrer.krotoplus.coroutines
1919
import io.grpc.CallOptions
2020
import io.grpc.Channel
2121
import io.mockk.mockk
22+
import kotlinx.coroutines.CoroutineName
2223
import kotlinx.coroutines.Dispatchers
2324
import kotlinx.coroutines.runBlocking
25+
import kotlin.coroutines.ContinuationInterceptor
2426
import kotlin.coroutines.EmptyCoroutineContext
2527
import kotlin.test.Test
2628
import kotlin.test.assertEquals
@@ -29,43 +31,87 @@ import kotlin.test.assertNotEquals
2931
class CallOptionsTest {
3032

3133
@Test
32-
fun `Test coroutine context call option defaults to EmptyCoroutineContext`(){
34+
fun `Coroutine context call option defaults to EmptyCoroutineContext`(){
3335
assertEquals(EmptyCoroutineContext, CallOptions.DEFAULT.getOption(CALL_OPTION_COROUTINE_CONTEXT))
3436
}
3537

3638
@Test
37-
fun `Test stub coroutineContext is populated via call option value`(){
39+
fun `Stub coroutineContext is populated via call option value`(){
3840
val channel = mockk<Channel>()
3941
val stub = TestStub(channel)
4042
assertEquals(EmptyCoroutineContext, stub.coroutineContext)
43+
assertEquals(EmptyCoroutineContext, stub.context)
4144
}
4245

4346
@Test
44-
fun `Test attaching coroutineContext to stub explicitly`(){
47+
fun `Attaching coroutineContext to stub explicitly`(){
4548
val channel = mockk<Channel>()
4649
val stub = TestStub(channel)
4750
assertEquals(EmptyCoroutineContext, stub.coroutineContext)
4851
assertEquals(
4952
Dispatchers.Default,
5053
stub.withCoroutineContext(Dispatchers.Default).coroutineContext
5154
)
55+
56+
assertEquals(EmptyCoroutineContext, stub.context)
57+
assertEquals(
58+
Dispatchers.Default,
59+
stub.withCoroutineContext(Dispatchers.Default).context
60+
)
5261
}
5362

5463
@Test
55-
fun `Test attaching coroutineContext to stub implicitly`(){
64+
fun `Attaching coroutineContext to stub implicitly`(){
5665
val channel = mockk<Channel>()
5766
val stub = TestStub(channel)
5867
assertEquals(EmptyCoroutineContext, stub.coroutineContext)
68+
assertEquals(EmptyCoroutineContext, stub.context)
5969

6070
runBlocking {
6171
val newStub = stub.withCoroutineContext()
72+
6273
assertEquals(coroutineContext, newStub.coroutineContext)
74+
assertEquals(coroutineContext, newStub.context)
75+
6376
assertNotEquals(stub.coroutineContext, newStub.coroutineContext)
77+
assertNotEquals(stub.context, newStub.context)
6478
}
6579
}
6680

6781
@Test
68-
fun `Test attaching coroutineContext to call options explicitly`(){
82+
fun `Merging scope context with stub context implicitly`(){
83+
val channel = mockk<Channel>()
84+
85+
val coroutineName = CoroutineName("testing")
86+
val stub = TestStub(channel)
87+
.withCoroutineContext(coroutineName)
88+
89+
runBlocking(Dispatchers.IO) {
90+
val newStub = stub.plusCoroutineContext()
91+
92+
assertEquals(coroutineName.name,newStub.context[CoroutineName]?.name)
93+
assertEquals(Dispatchers.IO,newStub.context[ContinuationInterceptor])
94+
assertNotEquals(stub.context, newStub.context)
95+
}
96+
}
97+
98+
@Test
99+
fun `Merging context with stub context explicitly`(){
100+
val channel = mockk<Channel>()
101+
102+
val coroutineName = CoroutineName("testing")
103+
val stub = TestStub(channel)
104+
.withCoroutineContext(coroutineName)
105+
106+
val newStub = stub.plusCoroutineContext(Dispatchers.IO)
107+
108+
assertEquals(coroutineName.name,newStub.context[CoroutineName]?.name)
109+
assertEquals(Dispatchers.IO,newStub.context[ContinuationInterceptor])
110+
assertNotEquals(stub.context, newStub.context)
111+
}
112+
113+
@Test
114+
fun `Attaching coroutineContext to call options explicitly`(){
69115
val callOptions = CallOptions.DEFAULT.withCoroutineContext(Dispatchers.Default)
70116
assertEquals(
71117
Dispatchers.Default,
@@ -74,7 +120,7 @@ class CallOptionsTest {
74120
}
75121

76122
@Test
77-
fun `Test attaching coroutineContext to call options implicitly`() = runBlocking {
123+
fun `Attaching coroutineContext to call options implicitly`() = runBlocking {
78124
val callOptions = CallOptions.DEFAULT.withCoroutineContext()
79125
assertEquals(
80126
coroutineContext,

kroto-plus-coroutines/src/test/kotlin/com/github/marcoferrer/krotoplus/coroutines/client/ClientCallBidiStreamingTests.kt

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,30 @@ package com.github.marcoferrer.krotoplus.coroutines.client
1919

2020
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFails
2121
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFailsWithStatus
22-
import com.github.marcoferrer.krotoplus.coroutines.utils.matchStatus
2322
import com.github.marcoferrer.krotoplus.coroutines.withCoroutineContext
24-
import io.grpc.*
23+
import io.grpc.CallOptions
24+
import io.grpc.ClientCall
25+
import io.grpc.Status
2526
import io.grpc.examples.helloworld.GreeterGrpc
2627
import io.grpc.examples.helloworld.HelloReply
2728
import io.grpc.examples.helloworld.HelloRequest
2829
import io.grpc.stub.StreamObserver
2930
import io.grpc.testing.GrpcServerRule
30-
import io.mockk.*
31-
import kotlinx.coroutines.*
31+
import io.mockk.every
32+
import io.mockk.spyk
33+
import io.mockk.verify
34+
import kotlinx.coroutines.CancellationException
35+
import kotlinx.coroutines.CoroutineStart
36+
import kotlinx.coroutines.Dispatchers
37+
import kotlinx.coroutines.Job
38+
import kotlinx.coroutines.cancel
3239
import kotlinx.coroutines.channels.ReceiveChannel
3340
import kotlinx.coroutines.channels.SendChannel
3441
import kotlinx.coroutines.channels.map
3542
import kotlinx.coroutines.channels.toList
43+
import kotlinx.coroutines.delay
44+
import kotlinx.coroutines.launch
45+
import kotlinx.coroutines.runBlocking
3646
import org.junit.Rule
3747
import org.junit.Test
3848
import kotlin.test.BeforeTest

kroto-plus-coroutines/src/test/kotlin/com/github/marcoferrer/krotoplus/coroutines/client/ClientCallClientStreamingTests.kt

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,26 @@ package com.github.marcoferrer.krotoplus.coroutines.client
2020
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFails
2121
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFailsWithStatus
2222
import com.github.marcoferrer.krotoplus.coroutines.withCoroutineContext
23-
import io.grpc.*
23+
import io.grpc.CallOptions
24+
import io.grpc.ClientCall
25+
import io.grpc.Status
2426
import io.grpc.examples.helloworld.GreeterGrpc
2527
import io.grpc.examples.helloworld.HelloReply
2628
import io.grpc.examples.helloworld.HelloRequest
2729
import io.grpc.stub.StreamObserver
2830
import io.grpc.testing.GrpcServerRule
29-
import io.mockk.*
30-
import kotlinx.coroutines.*
31+
import io.mockk.every
32+
import io.mockk.spyk
33+
import io.mockk.verify
34+
import kotlinx.coroutines.CancellationException
35+
import kotlinx.coroutines.CoroutineStart
36+
import kotlinx.coroutines.Dispatchers
37+
import kotlinx.coroutines.Job
38+
import kotlinx.coroutines.cancel
3139
import kotlinx.coroutines.channels.SendChannel
40+
import kotlinx.coroutines.delay
41+
import kotlinx.coroutines.launch
42+
import kotlinx.coroutines.runBlocking
3243
import org.junit.Rule
3344
import org.junit.Test
3445
import kotlin.test.BeforeTest

0 commit comments

Comments
 (0)