From bd1749bd12396560997ed7c9376a6b8e0c1a2984 Mon Sep 17 00:00:00 2001 From: vinodhabib Date: Wed, 26 Feb 2025 12:14:06 +0000 Subject: [PATCH 1/8] core: Added changes to make ServerImpl.internalClose() thread-safe --- core/src/main/java/io/grpc/internal/ServerImpl.java | 11 +++++++---- .../java/io/grpc/inprocess/InProcessTransport.java | 5 +++++ 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index dc0709e1fb8..863e01a4386 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -779,8 +779,8 @@ static final class JumpToApplicationThreadServerStreamListener implements Server // Only accessed from callExecutor. private ServerStreamListener listener; - public JumpToApplicationThreadServerStreamListener(Executor executor, - Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) { + public JumpToApplicationThreadServerStreamListener(Executor executor, Executor cancelExecutor, + ServerStream stream, Context.CancellableContext context, Tag tag) { this.callExecutor = executor; this.cancelExecutor = cancelExecutor; this.stream = stream; @@ -809,9 +809,12 @@ void setListener(ServerStreamListener listener) { * Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use. */ private void internalClose(Throwable t) { - // TODO(ejona86): this is not thread-safe :) String description = "Application error processing RPC"; - stream.close(Status.UNKNOWN.withDescription(description).withCause(t), new Metadata()); + Metadata metadata = Status.trailersFromThrowable(t); + if (metadata == null) { + metadata = new Metadata(); + } + stream.close(Status.UNKNOWN.withDescription(description).withCause(t), metadata); } @Override diff --git a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java index 39ebe6e0ab7..565047502e1 100644 --- a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -17,6 +17,7 @@ package io.grpc.inprocess; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; import static java.lang.Math.max; @@ -414,6 +415,7 @@ private class InProcessServerStream implements ServerStream { private boolean closed; @GuardedBy("this") private int outboundSeqNo; + private boolean closeCalled; InProcessServerStream(MethodDescriptor method, Metadata headers) { statsTraceCtx = StatsTraceContext.newServerContext( @@ -431,6 +433,7 @@ public void setListener(ServerStreamListener serverStreamListener) { @Override public void request(int numMessages) { + checkState(!closeCalled, "call already closed"); boolean onReady = clientStream.serverRequested(numMessages); if (onReady) { synchronized (this) { @@ -487,6 +490,7 @@ private void clientCancelled(Status status) { @Override public void writeMessage(InputStream message) { + checkState(!closeCalled, "call already closed"); long messageLength = 0; if (isEnabledSupportTracingMessageSizes) { try { @@ -546,6 +550,7 @@ public synchronized boolean isReady() { @Override public void writeHeaders(Metadata headers, boolean flush) { + checkState(!closeCalled, "call already closed"); if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) { int metadataSize = metadataSize(headers); if (metadataSize > clientMaxInboundMetadataSize) { From 2d762543476db93831ffcc5c948219d33fa81529 Mon Sep 17 00:00:00 2001 From: vinodhabib Date: Fri, 28 Feb 2025 12:01:59 +0000 Subject: [PATCH 2/8] core: Added UT for Metadata null check coverage --- .../java/io/grpc/internal/ServerImpl.java | 2 +- .../java/io/grpc/internal/ServerImplTest.java | 20 ++++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 863e01a4386..f01fbee69a5 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -808,7 +808,7 @@ void setListener(ServerStreamListener listener) { /** * Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use. */ - private void internalClose(Throwable t) { + void internalClose(Throwable t) { String description = "Application error processing RPC"; Metadata metadata = Status.trailersFromThrowable(t); if (metadata == null) { diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 3125edca1e6..a138f024c30 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -78,7 +78,6 @@ import io.grpc.StringMarshaller; import io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener; import io.grpc.internal.ServerImplBuilder.ClientTransportServersBuilder; -import io.grpc.internal.SingleMessageProducer; import io.grpc.internal.testing.TestServerStreamTracer; import io.grpc.util.MutableHandlerRegistry; import io.perfmark.PerfMark; @@ -1535,6 +1534,25 @@ public void channelz_transport_membershp() throws Exception { assertTrue(after.end); } + @Test + public void testInternalClose_withNullMetadata() { + JumpToApplicationThreadServerStreamListener listener + = new JumpToApplicationThreadServerStreamListener( + executor.getScheduledExecutorService(), + executor.getScheduledExecutorService(), + stream, + Context.ROOT.withCancellation(), + PerfMark.createTag()); + Throwable throwableMock = mock(Throwable.class); + // Stub Status.trailersFromThrowable to return null, simulating the case where metadata is null + when(Status.trailersFromThrowable(throwableMock)).thenReturn(null); + listener.internalClose(throwableMock); + // Capture the arguments passed to stream.close() + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(Metadata.class); + verify(stream).close(statusCaptor.capture(), metadataCaptor.capture()); + } + private void createAndStartServer() throws IOException { createServer(); server.start(); From 24350addd3a4adcd5fe7987f7f47b62ef77d6d47 Mon Sep 17 00:00:00 2001 From: vinodhabib Date: Fri, 7 Mar 2025 09:35:18 +0000 Subject: [PATCH 3/8] core: Added missing check --- .../src/main/java/io/grpc/inprocess/InProcessTransport.java | 1 + 1 file changed, 1 insertion(+) diff --git a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java index 565047502e1..24d8dcee935 100644 --- a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -586,6 +586,7 @@ public void close(Status status, Metadata trailers) { // clientStreamListener.closed can trigger clientStream.cancel (see code in // ClientCalls.blockingUnaryCall), which may race with clientStream.serverClosed as both are // calling internalCancel(). + closeCalled = true; clientStream.serverClosed(Status.OK, status); if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) { From 2ba2ca9db5e45c20179c2eabe5c4e06606fcb593 Mon Sep 17 00:00:00 2001 From: vinodhabib Date: Fri, 7 Mar 2025 10:44:06 +0000 Subject: [PATCH 4/8] core: Fixed Failing UTs --- .../java/io/grpc/internal/AbstractTransportTest.java | 7 +++++-- .../io/grpc/testing/integration/MoreInProcessTest.java | 3 ++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java index aea7ff49032..d67ecfceeb1 100644 --- a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java +++ b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; @@ -1598,8 +1599,10 @@ public void interactionsAfterServerStreamCloseAreNoops() throws Exception { assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); // Ensure that for a closed ServerStream, interactions are noops - server.stream.writeHeaders(new Metadata(), true); - server.stream.writeMessage(methodDescriptor.streamResponse("response")); + assertThrows(Exception.class, () -> + server.stream.writeHeaders(new Metadata(), true)); + assertThrows(Exception.class, () -> + server.stream.writeMessage(methodDescriptor.streamResponse("response"))); server.stream.close(Status.INTERNAL, new Metadata()); // Make sure new streams still work properly diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java index d97aa8cd36c..001ccc918cf 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java @@ -17,6 +17,7 @@ package io.grpc.testing.integration; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -131,7 +132,7 @@ public void onCompleted() { assertTrue(finishLatch.await(900, TimeUnit.MILLISECONDS)); assertEquals(fakeResponse, responseRef.get()); - assertNull(throwableRef.get()); + assertNotNull(throwableRef.get()); } @Test From 9ee68780481846b275f3b74da1af79939a2e9c9c Mon Sep 17 00:00:00 2001 From: vinodhabib Date: Fri, 7 Mar 2025 12:52:46 +0000 Subject: [PATCH 5/8] core: Fixed Failing UTs --- .../java/io/grpc/internal/AbstractTransportTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java index d67ecfceeb1..01331d034a6 100644 --- a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java +++ b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java @@ -1599,10 +1599,12 @@ public void interactionsAfterServerStreamCloseAreNoops() throws Exception { assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); // Ensure that for a closed ServerStream, interactions are noops - assertThrows(Exception.class, () -> + Exception headerException = assertThrows(Exception.class, () -> server.stream.writeHeaders(new Metadata(), true)); - assertThrows(Exception.class, () -> + assertTrue(headerException.getMessage().contains("call already closed")); + Exception messageException = assertThrows(Exception.class, () -> server.stream.writeMessage(methodDescriptor.streamResponse("response"))); + assertTrue(messageException.getMessage().contains("call already closed")); server.stream.close(Status.INTERNAL, new Metadata()); // Make sure new streams still work properly From bd71484afd797d3e1b978694ded51a6ba3ea2e0c Mon Sep 17 00:00:00 2001 From: vinodhabib Date: Mon, 10 Mar 2025 14:04:56 +0000 Subject: [PATCH 6/8] core: Fixed Failing UTs --- .../io/grpc/internal/AbstractTransportTest.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java index 01331d034a6..eb32e2aaa03 100644 --- a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java +++ b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; @@ -1598,14 +1597,14 @@ public void interactionsAfterServerStreamCloseAreNoops() throws Exception { assertNotNull(clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); - // Ensure that for a closed ServerStream, interactions are noops - Exception headerException = assertThrows(Exception.class, () -> - server.stream.writeHeaders(new Metadata(), true)); - assertTrue(headerException.getMessage().contains("call already closed")); - Exception messageException = assertThrows(Exception.class, () -> - server.stream.writeMessage(methodDescriptor.streamResponse("response"))); - assertTrue(messageException.getMessage().contains("call already closed")); - server.stream.close(Status.INTERNAL, new Metadata()); + try { + // Ensure that for a closed ServerStream, interactions are noops + server.stream.writeHeaders(new Metadata(), true); + server.stream.writeMessage(methodDescriptor.streamResponse("response")); + server.stream.close(Status.INTERNAL, new Metadata()); + } catch (Exception exception) { + assertTrue(exception.getMessage().contains("call already closed")); + } // Make sure new streams still work properly doPingPong(serverListener); From 16bd7e9f763af1f72e94d20bab56d90c9aa76cb5 Mon Sep 17 00:00:00 2001 From: vinodhabib Date: Tue, 11 Mar 2025 10:54:23 +0000 Subject: [PATCH 7/8] core: Fixed Failing UTs with MissingFail warning/error --- .../java/io/grpc/internal/AbstractTransportTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java index eb32e2aaa03..abb8b124a04 100644 --- a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java +++ b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java @@ -1573,6 +1573,7 @@ public void messageProducerOnlyProducesRequestedMessages() throws Exception { verifyMessageCountAndClose(serverStreamCreation.listener.messageQueue, 1); } + @SuppressWarnings("MissingFail") @Test public void interactionsAfterServerStreamCloseAreNoops() throws Exception { server.start(serverListener); From feed5a00319e6155631122a9ca9c31b4bf3d0e4c Mon Sep 17 00:00:00 2001 From: vinodhabib Date: Wed, 19 Mar 2025 07:51:43 +0000 Subject: [PATCH 8/8] core: Addressed the Review point and handled close already call in stream implementation. --- .../main/java/io/grpc/internal/AbstractServerStream.java | 3 +++ .../src/main/java/io/grpc/inprocess/InProcessTransport.java | 6 ------ .../java/io/grpc/testing/integration/MoreInProcessTest.java | 3 +-- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/AbstractServerStream.java b/core/src/main/java/io/grpc/internal/AbstractServerStream.java index a535330f4b1..8567d526a08 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerStream.java @@ -74,6 +74,7 @@ protected interface Sink { private final StatsTraceContext statsTraceCtx; private boolean outboundClosed; private boolean headersSent; + private boolean closeCalled; protected AbstractServerStream( WritableBufferAllocator bufferAllocator, StatsTraceContext statsTraceCtx) { @@ -120,6 +121,7 @@ public final void deliverFrame( @Override public final void close(Status status, Metadata trailers) { + Preconditions.checkState(!closeCalled, "call already closed"); Preconditions.checkNotNull(status, "status"); Preconditions.checkNotNull(trailers, "trailers"); if (!outboundClosed) { @@ -130,6 +132,7 @@ public final void close(Status status, Metadata trailers) { // closedStatus is only set from here, and is read from a place that has happen-after // guarantees with respect to here. transportState().setClosedStatus(status); + closeCalled = true; abstractServerStreamSink().writeTrailers(trailers, headersSent, status); } } diff --git a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java index 24d8dcee935..39ebe6e0ab7 100644 --- a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -17,7 +17,6 @@ package io.grpc.inprocess; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; import static java.lang.Math.max; @@ -415,7 +414,6 @@ private class InProcessServerStream implements ServerStream { private boolean closed; @GuardedBy("this") private int outboundSeqNo; - private boolean closeCalled; InProcessServerStream(MethodDescriptor method, Metadata headers) { statsTraceCtx = StatsTraceContext.newServerContext( @@ -433,7 +431,6 @@ public void setListener(ServerStreamListener serverStreamListener) { @Override public void request(int numMessages) { - checkState(!closeCalled, "call already closed"); boolean onReady = clientStream.serverRequested(numMessages); if (onReady) { synchronized (this) { @@ -490,7 +487,6 @@ private void clientCancelled(Status status) { @Override public void writeMessage(InputStream message) { - checkState(!closeCalled, "call already closed"); long messageLength = 0; if (isEnabledSupportTracingMessageSizes) { try { @@ -550,7 +546,6 @@ public synchronized boolean isReady() { @Override public void writeHeaders(Metadata headers, boolean flush) { - checkState(!closeCalled, "call already closed"); if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) { int metadataSize = metadataSize(headers); if (metadataSize > clientMaxInboundMetadataSize) { @@ -586,7 +581,6 @@ public void close(Status status, Metadata trailers) { // clientStreamListener.closed can trigger clientStream.cancel (see code in // ClientCalls.blockingUnaryCall), which may race with clientStream.serverClosed as both are // calling internalCancel(). - closeCalled = true; clientStream.serverClosed(Status.OK, status); if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) { diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java index 001ccc918cf..d97aa8cd36c 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java @@ -17,7 +17,6 @@ package io.grpc.testing.integration; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -132,7 +131,7 @@ public void onCompleted() { assertTrue(finishLatch.await(900, TimeUnit.MILLISECONDS)); assertEquals(fakeResponse, responseRef.get()); - assertNotNull(throwableRef.get()); + assertNull(throwableRef.get()); } @Test