Skip to content

Commit 27e77d7

Browse files
committed
wait for call to close before asserting
1 parent 979174d commit 27e77d7

File tree

3 files changed

+21
-22
lines changed

3 files changed

+21
-22
lines changed

kroto-plus-coroutines/src/test/kotlin/com/github/marcoferrer/krotoplus/coroutines/client/ClientCallBidiStreamingTests.kt

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package com.github.marcoferrer.krotoplus.coroutines.client
2020
import com.github.marcoferrer.krotoplus.coroutines.RpcCallTest
2121
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFails
2222
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFailsWithCancellation
23-
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFailsWithStatus
2423
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFailsWithStatus2
2524
import com.github.marcoferrer.krotoplus.coroutines.utils.matchStatus
2625
import com.github.marcoferrer.krotoplus.coroutines.utils.matchThrowable
@@ -136,6 +135,8 @@ class ClientCallBidiStreamingTests :
136135
responseChannel.consumeAsFlow().map { it.message }.toList()
137136
}
138137

138+
callState.blockUntilClosed()
139+
139140
assertEquals(3,result.size)
140141
result.forEachIndexed { index, message ->
141142
assertEquals("Req:#$index/Resp:#$index",message)
@@ -180,6 +181,8 @@ class ClientCallBidiStreamingTests :
180181
}
181182
}
182183

184+
callState.blockUntilClosed()
185+
183186
assert(requestChannel.isClosedForSend) { "Request channel should be closed for send" }
184187
assert(responseChannel.isClosedForReceive) { "Response channel should be closed for receive" }
185188
}
@@ -222,6 +225,8 @@ class ClientCallBidiStreamingTests :
222225
}
223226
}
224227

228+
callState.blockUntilCancellation()
229+
225230
verify(exactly = 1) { rpcSpy.call.cancel(any(), any()) }
226231
assert(requestChannel.isClosedForSend) { "Request channel should be closed for send" }
227232
assert(responseChannel.isClosedForReceive) { "Response channel should be closed for receive" }
@@ -272,6 +277,8 @@ class ClientCallBidiStreamingTests :
272277
}
273278
}
274279

280+
callState.blockUntilCancellation()
281+
275282
verify(exactly = 1) { rpcSpy.call.cancel(any(), any()) }
276283
assert(requestChannel.isClosedForSend) { "Request channel should be closed for send" }
277284
assert(responseChannel.isClosedForReceive) { "Response channel should be closed for receive" }
@@ -286,7 +293,7 @@ class ClientCallBidiStreamingTests :
286293
lateinit var requestChannel: SendChannel<HelloRequest>
287294
lateinit var responseChannel: ReceiveChannel<HelloReply>
288295
assertFailsWith(IllegalStateException::class, "cancel") {
289-
runBlocking {
296+
runBlocking(Dispatchers.Default) {
290297
val callChannel = stub
291298
.withCoroutineContext()
292299
.clientCallBidiStreaming(methodDescriptor)
@@ -317,6 +324,8 @@ class ClientCallBidiStreamingTests :
317324
}
318325
}
319326

327+
callState.blockUntilCancellation()
328+
320329
verify(exactly = 1) { rpcSpy.call.cancel(any(), any()) }
321330
assert(requestChannel.isClosedForSend) { "Request channel should be closed for send" }
322331
assert(responseChannel.isClosedForReceive) { "Response channel should be closed for receive" }
@@ -345,18 +354,15 @@ class ClientCallBidiStreamingTests :
345354
requestChannel.close(expectedException)
346355
}
347356

348-
//TODO: Cleanup
349-
// cause = Status.CANCELLED
350-
// .withDescription("Cancelled by client with StreamObserver.onError()")
351-
// .withCause(expectedException)
352-
// .asRuntimeException()
353357
assertFailsWithStatus2(Status.CANCELLED, "CANCELLED: $expectedCancelMessage") {
354358
responseChannel.consumeAsFlow()
355359
.map { it.message }
356360
.collect { result.add(it) }
357361
}
358362
}
359363

364+
callState.blockUntilCancellation()
365+
360366
assert(result.isNotEmpty())
361367
result.forEachIndexed { index, message ->
362368
assertEquals("Req:#$index/Resp:#$index",message)
@@ -386,11 +392,6 @@ class ClientCallBidiStreamingTests :
386392
result.add(responseChannel.receive().message)
387393
requestChannel.close(expectedException)
388394

389-
//TODO clean up
390-
// cause = Status.CANCELLED
391-
// .withDescription("Cancelled by client with StreamObserver.onError()")
392-
// .withCause(expectedException)
393-
// .asRuntimeException()
394395
assertFailsWithStatus2(Status.CANCELLED, "CANCELLED: $expectedCancelMessage") {
395396
responseChannel.consumeAsFlow()
396397
.collect { result.add(it.message) }
@@ -437,6 +438,7 @@ class ClientCallBidiStreamingTests :
437438
responseChannel.cancel()
438439
}
439440

441+
callState.blockUntilCancellation()
440442

441443
verify(exactly = 1) { rpcSpy.call.cancel(MESSAGE_CLIENT_CANCELLED_CALL,matchStatus(Status.CANCELLED)) }
442444
assert(requestChannel.isClosedForSend) { "Request channel should be closed for send" }

kroto-plus-coroutines/src/test/kotlin/com/github/marcoferrer/krotoplus/coroutines/integration/BidiStreamingTests.kt

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,8 @@ class BidiStreamingTests : RpcCallTest<HelloRequest, HelloReply>(GreeterCoroutin
111111
}
112112
})
113113
lateinit var reqChanSpy: SendChannel<HelloRequest>
114-
runTest(5000L) {
115-
val stub = GreeterCoroutineGrpc.newStub(nonDirectGrpcServerRule.channel)
114+
runTest(30_000) {
115+
val stub = GreeterCoroutineGrpc.newStub(grpcServerRule.channel)
116116
.withInterceptors(callState)
117117
.withCoroutineContext()
118118

@@ -131,11 +131,6 @@ class BidiStreamingTests : RpcCallTest<HelloRequest, HelloReply>(GreeterCoroutin
131131
reqJob.join()
132132
}
133133

134-
runBlocking { serverJob.join() }
135-
136-
// 1 - Server requests and receives
137-
// 2 - Message is loaded into outbound buffer
138-
// 3 - Suspending invocation awaiting next onReady
139134
coVerify(exactly = 3) { reqChanSpy.send(any()) }
140135
assert(reqChanSpy.isClosedForSend) { "Request channel should be closed after response channel is closed" }
141136
}
@@ -162,7 +157,7 @@ class BidiStreamingTests : RpcCallTest<HelloRequest, HelloReply>(GreeterCoroutin
162157

163158
val numMessages = 500000
164159
val receivedCount = AtomicInteger()
165-
runTest(timeout = 60_000) {
160+
runTest(timeout = 60_000 * 2) {
166161
val req = HelloRequest.newBuilder()
167162
.setName("test").build()
168163

kroto-plus-coroutines/src/test/kotlin/com/github/marcoferrer/krotoplus/coroutines/integration/ClientStreamingBackPressureTests.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,13 +201,15 @@ class ClientStreamingBackPressureTests :
201201
}
202202

203203
callState {
204-
// blockUntilCancellation()
205204
client.closed.assertBlocking { "Client must be closed" }
206205
}
207206

208207
verify(exactly = 1) { rpcSpy.call.cancel(expectedCancelMessage, matchThrowable(expectedException)) }
209208
assert(requestChannel.isClosedForSend) { "Request channel should be closed for send" }
210-
assertExEquals(expectedException, response.getCompletionExceptionOrNull()?.cause)
209+
assertExEquals(
210+
expectedException,
211+
response.getCompletionExceptionOrNull()?.cause ?:response.getCompletionExceptionOrNull()
212+
)
211213
assert(response.isCancelled) { "Response should not be cancelled" }
212214
assert(runBlocking {serverJob.await() }.isCancelled){ "Server job should be cancelled" }
213215
}

0 commit comments

Comments
 (0)