From fb52bbf17a6e983980879e0dcefa16ff27e5a1e6 Mon Sep 17 00:00:00 2001 From: Kannan J Date: Mon, 19 Jan 2026 06:40:18 +0000 Subject: [PATCH 1/7] Save changes. --- .../main/java/io/grpc/testing/integration/TestCases.java | 3 ++- .../io/grpc/testing/integration/TestServiceClient.java | 8 ++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java index 2d16065254a..9e7d31c55b0 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java @@ -59,7 +59,8 @@ public enum TestCases { RPC_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on the same channel"), CHANNEL_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on a new channel"), ORCA_PER_RPC("report backend metrics per query"), - ORCA_OOB("report backend metrics out-of-band"); + ORCA_OOB("report backend metrics out-of-band"), + MCS("max concurrent streaming"); private final String description; diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index 125d876b705..aa700e43494 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -563,6 +563,11 @@ private void runTest(TestCases testCase) throws Exception { tester.testOrcaOob(); break; } + + case MCS: { + tester.testMcs(); + break; + } default: throw new IllegalArgumentException("Unknown test case: " + testCase); @@ -1054,6 +1059,9 @@ protected ServerBuilder getHandshakerServerBuilder() { protected int operationTimeoutMillis() { return 15000; } + + public void testMcs() { + } } private static String validTestCasesHelpText() { From f518d5efaf011eb8c57bff1dcf703c6d7d017b41 Mon Sep 17 00:00:00 2001 From: Kannan J Date: Tue, 20 Jan 2026 06:23:27 +0000 Subject: [PATCH 2/7] Save changes. --- bom/build.gradle | 38 ---------- build.gradle | 5 ++ interop-testing/build.gradle | 5 ++ .../integration/TestServiceClient.java | 76 ++++++++++++++----- .../integration/TestServiceServer.java | 7 ++ 5 files changed, 73 insertions(+), 58 deletions(-) delete mode 100644 bom/build.gradle diff --git a/bom/build.gradle b/bom/build.gradle deleted file mode 100644 index f7f3918372f..00000000000 --- a/bom/build.gradle +++ /dev/null @@ -1,38 +0,0 @@ -plugins { - id 'java-platform' - id "maven-publish" -} - -description = 'gRPC: BOM' - -gradle.projectsEvaluated { - def projectsToInclude = rootProject.subprojects.findAll { - return it.name != 'grpc-compiler' - && it.plugins.hasPlugin('java') - && it.plugins.hasPlugin('maven-publish') - && it.tasks.findByName('publishMavenPublicationToMavenRepository')?.enabled - } - dependencies { - constraints { - projectsToInclude.each { api it } - } - } -} - -publishing { - publications { - maven(MavenPublication) { - from components.javaPlatform - pom.withXml { - def dependencies = asNode().dependencyManagement.dependencies.last() - // add protoc gen (produced by grpc-compiler with different artifact name) - // not sure how to express "pom" in gradle, kept in XML - def dependencyNode = dependencies.appendNode('dependency') - dependencyNode.appendNode('groupId', project.group) - dependencyNode.appendNode('artifactId', 'protoc-gen-grpc-java') - dependencyNode.appendNode('version', project.version) - dependencyNode.appendNode('type', 'pom') - } - } - } -} diff --git a/build.gradle b/build.gradle index 91690c1e3c3..01e09a5c920 100644 --- a/build.gradle +++ b/build.gradle @@ -19,6 +19,11 @@ subprojects { apply plugin: "com.google.osdetector" apply plugin: "net.ltgt.errorprone" + apply plugin: "java" + test { + testLogging.showStandardStreams = true + systemProperty 'java.util.logging.config.file', "/home/kannanj/grpc-logger.properties" + } group = "io.grpc" version = "1.79.0-SNAPSHOT" // CURRENT_GRPC_VERSION diff --git a/interop-testing/build.gradle b/interop-testing/build.gradle index 5160759460c..7be6dac279b 100644 --- a/interop-testing/build.gradle +++ b/interop-testing/build.gradle @@ -8,6 +8,11 @@ plugins { } description = "gRPC: Integration Testing" +apply plugin: "java-library" +test { + testLogging.showStandardStreams = true + systemProperty 'java.util.logging.config.file', "/home/kannanj/grpc-logger.properties" +} dependencies { implementation project(path: ':grpc-alts', configuration: 'shadow'), diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index aa700e43494..2412b8cc52d 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -17,6 +17,7 @@ package io.grpc.testing.integration; import static com.google.common.truth.Truth.assertThat; +import static io.grpc.testing.integration.TestCases.MCS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -30,28 +31,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.io.Files; import com.google.protobuf.ByteString; -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ChannelCredentials; -import io.grpc.ClientInterceptor; -import io.grpc.ClientInterceptors; -import io.grpc.Grpc; -import io.grpc.InsecureChannelCredentials; -import io.grpc.InsecureServerCredentials; -import io.grpc.LoadBalancerProvider; -import io.grpc.LoadBalancerRegistry; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; -import io.grpc.ServerBuilder; -import io.grpc.TlsChannelCredentials; +import io.grpc.*; import io.grpc.alts.AltsChannelCredentials; import io.grpc.alts.ComputeEngineChannelCredentials; import io.grpc.alts.GoogleDefaultChannelCredentials; import io.grpc.auth.MoreCallCredentials; import io.grpc.internal.GrpcUtil; import io.grpc.internal.JsonParser; +import io.grpc.internal.testing.StreamRecorder; import io.grpc.netty.InsecureFromHttp1ChannelCredentials; import io.grpc.netty.InternalNettyChannelBuilder; import io.grpc.netty.NettyChannelBuilder; @@ -65,6 +52,8 @@ import io.grpc.testing.integration.Messages.ResponseParameters; import io.grpc.testing.integration.Messages.SimpleRequest; import io.grpc.testing.integration.Messages.SimpleResponse; +import io.grpc.testing.integration.Messages.StreamingInputCallRequest; +import io.grpc.testing.integration.Messages.StreamingInputCallResponse; import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; import io.grpc.testing.integration.Messages.TestOrcaReport; @@ -72,8 +61,7 @@ import java.io.FileInputStream; import java.io.InputStream; import java.nio.charset.Charset; -import java.util.Arrays; -import java.util.Map; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -601,9 +589,12 @@ private ClientInterceptor maybeCreateAdditionalMetadataInterceptor( } private class Tester extends AbstractInteropTest { + private FakeMetricsSink fakeMetricsSink = new FakeMetricsSink(); + @Override protected ManagedChannelBuilder createChannelBuilder() { - boolean useGeneric = false; + boolean useSubchannelMetricsSink = testCase.equals(MCS.toString()); + boolean useGeneric = useSubchannelMetricsSink? true : false; ChannelCredentials channelCredentials; if (customCredentialsType != null) { useGeneric = true; // Retain old behavior; avoids erroring if incompatible @@ -670,6 +661,9 @@ protected ManagedChannelBuilder createChannelBuilder() { if (addMdInterceptor != null) { channelBuilder.intercept(addMdInterceptor); } + if (useSubchannelMetricsSink) { + InternalManagedChannelBuilder.addMetricSink(channelBuilder, fakeMetricsSink); + } return channelBuilder; } if (!useOkHttp) { @@ -1060,7 +1054,19 @@ protected int operationTimeoutMillis() { return 15000; } - public void testMcs() { + public void testMcs() throws Exception { + final StreamingInputCallRequest request = StreamingInputCallRequest.newBuilder() + .setPayload(Payload.newBuilder() + .setBody(ByteString.copyFrom(new byte[27182]))) + .build(); + StreamRecorder responseObserver = StreamRecorder.create(); + StreamObserver requestObserver = + asyncStub.streamingInputCall(responseObserver); + requestObserver.onNext(request); + + // assertThat(fakeMetricsSink.longUpDownCounterMetricInstrumentValues.get("grpc.subchannel.open_connections")).isEqualTo(1); + requestObserver.onCompleted(); + responseObserver.awaitCompletion(); } } @@ -1075,4 +1081,34 @@ private static String validTestCasesHelpText() { } return builder.toString(); } + + static class FakeMetricsSink implements MetricSink { + Map longUpDownCounterMetricInstrumentValues = new HashMap<>(); + @Override + public Map getEnabledMetrics() { + return null; + } + + @Override + public Set getOptionalLabels() { + return null; + } + + @Override + public int getMeasuresSize() { + return 0; + } + + @Override + public void updateMeasures(List instruments) { + System.out.println("updateMeasures"); + } + + @Override + public void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value, + List requiredLabelValues, + List optionalLabelValues) { + longUpDownCounterMetricInstrumentValues.put(metricInstrument, value); + } + } } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java index fc4cdf9178f..511e264410c 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java @@ -75,6 +75,7 @@ public void run() { private int port = 8080; private boolean useTls = true; private boolean useAlts = false; + private int mcsLimit = 0; private ScheduledExecutorService executor; private Server server; @@ -118,6 +119,9 @@ void parseArgs(String[] args) { usage = true; break; } + } else if ("mcs_limit".equals(key)) { + mcsLimit = Integer.parseInt(value); + addressType = Util.AddressType.IPV4; // To use NettyServerBuilder } else { System.err.println("Unknown argument: " + key); usage = true; @@ -186,6 +190,9 @@ void start() throws Exception { if (v4Address != null && !v4Address.equals(localV4Address)) { ((NettyServerBuilder) serverBuilder).addListenAddress(v4Address); } + if (mcsLimit > 0) { + ((NettyServerBuilder) serverBuilder).maxConcurrentCallsPerConnection(mcsLimit); + } break; case IPV6: List v6Addresses = Util.getV6Addresses(port); From 3874631bc39e347e98995c7341a749c5e26673e8 Mon Sep 17 00:00:00 2001 From: Kannan J Date: Tue, 20 Jan 2026 12:15:52 +0000 Subject: [PATCH 3/7] Save changes. --- .../integration/TestServiceClient.java | 52 +++++++++++++++---- .../integration/TestServiceServer.java | 10 ++-- 2 files changed, 48 insertions(+), 14 deletions(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index 2412b8cc52d..b97a8f8f911 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -30,6 +30,8 @@ import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.common.annotations.VisibleForTesting; import com.google.common.io.Files; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.ByteString; import io.grpc.*; import io.grpc.alts.AltsChannelCredentials; @@ -1059,17 +1061,51 @@ public void testMcs() throws Exception { .setPayload(Payload.newBuilder() .setBody(ByteString.copyFrom(new byte[27182]))) .build(); - StreamRecorder responseObserver = StreamRecorder.create(); - StreamObserver requestObserver = - asyncStub.streamingInputCall(responseObserver); - requestObserver.onNext(request); + StreamRecorder responseObserver1 = StreamRecorder.create(); + StreamObserver requestObserver1 = + asyncStub.streamingInputCall(responseObserver1); + requestObserver1.onNext(request); + StreamRecorder responseObserver2 = StreamRecorder.create(); + StreamObserver requestObserver2 = + asyncStub.streamingInputCall(responseObserver2); + requestObserver2.onNext(request); // assertThat(fakeMetricsSink.longUpDownCounterMetricInstrumentValues.get("grpc.subchannel.open_connections")).isEqualTo(1); - requestObserver.onCompleted(); - responseObserver.awaitCompletion(); + requestObserver2.onCompleted(); + responseObserver2.awaitCompletion(); + requestObserver1.onCompleted(); + + responseObserver1.awaitCompletion(); + } } + /* + public static ListenableFuture performTaskAsync() { + // Create a SettableFuture. This is the "handle" we control externally. + final SettableFuture future = SettableFuture.create(); + + // Submit the actual work to the executor service + executorService.submit(() -> { + try { + System.out.println("Worker thread: Task starting..."); + // Simulate some work that takes time + TimeUnit.SECONDS.sleep(2); + System.out.println("Worker thread: Task finished."); + + // When the work is done, set the future's value to true + future.set(true); + + } catch (InterruptedException e) { + // If something goes wrong, set the future to an exception + future.setException(e); + } + }); + + // Return the future immediately + return future; + } +*/ private static String validTestCasesHelpText() { StringBuilder builder = new StringBuilder(); for (TestCases testCase : TestCases.values()) { @@ -1100,9 +1136,7 @@ public int getMeasuresSize() { } @Override - public void updateMeasures(List instruments) { - System.out.println("updateMeasures"); - } + public void updateMeasures(List instruments) {} @Override public void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value, diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java index 511e264410c..5845b1c387f 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java @@ -75,7 +75,7 @@ public void run() { private int port = 8080; private boolean useTls = true; private boolean useAlts = false; - private int mcsLimit = 0; + private int mcs = 0; private ScheduledExecutorService executor; private Server server; @@ -119,8 +119,8 @@ void parseArgs(String[] args) { usage = true; break; } - } else if ("mcs_limit".equals(key)) { - mcsLimit = Integer.parseInt(value); + } else if ("mcs".equals(key)) { + mcs = Integer.parseInt(value); addressType = Util.AddressType.IPV4; // To use NettyServerBuilder } else { System.err.println("Unknown argument: " + key); @@ -190,8 +190,8 @@ void start() throws Exception { if (v4Address != null && !v4Address.equals(localV4Address)) { ((NettyServerBuilder) serverBuilder).addListenAddress(v4Address); } - if (mcsLimit > 0) { - ((NettyServerBuilder) serverBuilder).maxConcurrentCallsPerConnection(mcsLimit); + if (mcs > 0) { + ((NettyServerBuilder) serverBuilder).maxConcurrentCallsPerConnection(mcs); } break; case IPV6: From 0bad82f1555fec97a24032cac82911129e3e7501 Mon Sep 17 00:00:00 2001 From: Kannan J Date: Wed, 21 Jan 2026 14:11:27 +0000 Subject: [PATCH 4/7] Save changes. --- .../integration/TestServiceClient.java | 172 ++++++++++-------- .../integration/TestServiceServer.java | 10 +- 2 files changed, 104 insertions(+), 78 deletions(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index b97a8f8f911..b375ee7e4bf 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.ComputeEngineCredentials; @@ -30,10 +31,27 @@ import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.common.annotations.VisibleForTesting; import com.google.common.io.Files; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.ByteString; -import io.grpc.*; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ChannelCredentials; +import io.grpc.ClientInterceptor; +import io.grpc.ClientInterceptors; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.InsecureServerCredentials; +import io.grpc.InternalManagedChannelBuilder; +import io.grpc.LoadBalancerProvider; +import io.grpc.LoadBalancerRegistry; +import io.grpc.LongUpDownCounterMetricInstrument; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.MetricInstrument; +import io.grpc.MetricSink; +import io.grpc.ServerBuilder; +import io.grpc.TlsChannelCredentials; import io.grpc.alts.AltsChannelCredentials; import io.grpc.alts.ComputeEngineChannelCredentials; import io.grpc.alts.GoogleDefaultChannelCredentials; @@ -61,9 +79,13 @@ import io.grpc.testing.integration.Messages.TestOrcaReport; import java.io.File; import java.io.FileInputStream; +import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; -import java.util.*; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -596,7 +618,7 @@ private class Tester extends AbstractInteropTest { @Override protected ManagedChannelBuilder createChannelBuilder() { boolean useSubchannelMetricsSink = testCase.equals(MCS.toString()); - boolean useGeneric = useSubchannelMetricsSink? true : false; + boolean useGeneric = testCase.equals(MCS.toString())? true : false; ChannelCredentials channelCredentials; if (customCredentialsType != null) { useGeneric = true; // Retain old behavior; avoids erroring if incompatible @@ -656,7 +678,17 @@ protected ManagedChannelBuilder createChannelBuilder() { if (serverHostOverride != null) { channelBuilder.overrideAuthority(serverHostOverride); } - if (serviceConfig != null) { + if (testCase.equals(MCS.toString())) { + channelBuilder.disableServiceConfigLookUp(); + try { + @SuppressWarnings("unchecked") + Map serviceConfigMap = (Map) JsonParser.parse( + "{\"connection_scaling\":{\"max_connections_per_subchannel\": 2}}"); + channelBuilder.defaultServiceConfig(serviceConfigMap); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else if (serviceConfig != null) { channelBuilder.disableServiceConfigLookUp(); channelBuilder.defaultServiceConfig(serviceConfig); } @@ -980,31 +1012,16 @@ public void testOrcaOob() throws Exception { .build(); final int retryLimit = 5; - BlockingQueue queue = new LinkedBlockingQueue<>(); - final Object lastItem = new Object(); + StreamingOutputCallResponseObserver streamingOutputCallResponseObserver = + new StreamingOutputCallResponseObserver(); StreamObserver streamObserver = - asyncStub.fullDuplexCall(new StreamObserver() { - - @Override - public void onNext(StreamingOutputCallResponse value) { - queue.add(value); - } - - @Override - public void onError(Throwable t) { - queue.add(t); - } - - @Override - public void onCompleted() { - queue.add(lastItem); - } - }); + asyncStub.fullDuplexCall(streamingOutputCallResponseObserver); streamObserver.onNext(StreamingOutputCallRequest.newBuilder() .setOrcaOobReport(answer) .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); - assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class); + assertThat(streamingOutputCallResponseObserver.take()) + .isInstanceOf(StreamingOutputCallResponse.class); int i = 0; for (; i < retryLimit; i++) { Thread.sleep(1000); @@ -1017,7 +1034,7 @@ public void onCompleted() { streamObserver.onNext(StreamingOutputCallRequest.newBuilder() .setOrcaOobReport(answer2) .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); - assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class); + assertThat(streamingOutputCallResponseObserver.isCompleted).isTrue(); for (i = 0; i < retryLimit; i++) { Thread.sleep(1000); @@ -1027,8 +1044,6 @@ public void onCompleted() { } } assertThat(i).isLessThan(retryLimit); - streamObserver.onCompleted(); - assertThat(queue.take()).isSameInstanceAs(lastItem); } @Override @@ -1056,56 +1071,60 @@ protected int operationTimeoutMillis() { return 15000; } - public void testMcs() throws Exception { - final StreamingInputCallRequest request = StreamingInputCallRequest.newBuilder() - .setPayload(Payload.newBuilder() - .setBody(ByteString.copyFrom(new byte[27182]))) - .build(); - StreamRecorder responseObserver1 = StreamRecorder.create(); - StreamObserver requestObserver1 = - asyncStub.streamingInputCall(responseObserver1); - requestObserver1.onNext(request); - StreamRecorder responseObserver2 = StreamRecorder.create(); - StreamObserver requestObserver2 = - asyncStub.streamingInputCall(responseObserver2); - requestObserver2.onNext(request); - - // assertThat(fakeMetricsSink.longUpDownCounterMetricInstrumentValues.get("grpc.subchannel.open_connections")).isEqualTo(1); - requestObserver2.onCompleted(); - responseObserver2.awaitCompletion(); - requestObserver1.onCompleted(); - - responseObserver1.awaitCompletion(); + class StreamingOutputCallResponseObserver implements StreamObserver { + private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private volatile boolean isCompleted = true; + + @Override + public void onNext(StreamingOutputCallResponse value) { + queue.add(value); + } + + @Override + public void onError(Throwable t) { + queue.add(t); + } + + @Override + public void onCompleted() { + isCompleted = true; + } + Object take() throws InterruptedException { + return queue.take(); + } } - } - /* - public static ListenableFuture performTaskAsync() { - // Create a SettableFuture. This is the "handle" we control externally. - final SettableFuture future = SettableFuture.create(); + public void testMcs() throws Exception { + StreamingOutputCallResponseObserver responseObserver1 = new StreamingOutputCallResponseObserver(); + StreamObserver streamObserver1 = + asyncStub.fullDuplexCall(responseObserver1); + streamObserver1.onNext(StreamingOutputCallRequest.newBuilder() + .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); + assertThat(responseObserver1.take()).isInstanceOf(StreamingOutputCallResponse.class); - // Submit the actual work to the executor service - executorService.submit(() -> { - try { - System.out.println("Worker thread: Task starting..."); - // Simulate some work that takes time - TimeUnit.SECONDS.sleep(2); - System.out.println("Worker thread: Task finished."); + StreamingOutputCallResponseObserver responseObserver2 = new StreamingOutputCallResponseObserver(); + StreamObserver streamObserver2 = + asyncStub.fullDuplexCall(responseObserver2); + streamObserver2.onNext(StreamingOutputCallRequest.newBuilder() + .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); + assertThat(responseObserver2.take()).isInstanceOf(StreamingOutputCallResponse.class); - // When the work is done, set the future's value to true - future.set(true); + assertThat(fakeMetricsSink.openConnectionCount).isEqualTo(1); - } catch (InterruptedException e) { - // If something goes wrong, set the future to an exception - future.setException(e); - } - }); + // The first connection is at max rpc call count of 2, so the 3rd rpc will cause a new + // connection to be created in the same subchannel and not get queued. + StreamingOutputCallResponseObserver responseObserver3 = new StreamingOutputCallResponseObserver(); + StreamObserver streamObserver3 = + asyncStub.fullDuplexCall(responseObserver3); + streamObserver3.onNext(StreamingOutputCallRequest.newBuilder() + .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); + assertThat(responseObserver3.take()).isInstanceOf(StreamingOutputCallResponse.class); - // Return the future immediately - return future; + assertThat(fakeMetricsSink.openConnectionCount).isEqualTo(2); + } } -*/ + private static String validTestCasesHelpText() { StringBuilder builder = new StringBuilder(); for (TestCases testCase : TestCases.values()) { @@ -1119,7 +1138,8 @@ private static String validTestCasesHelpText() { } static class FakeMetricsSink implements MetricSink { - Map longUpDownCounterMetricInstrumentValues = new HashMap<>(); + private volatile long openConnectionCount; + @Override public Map getEnabledMetrics() { return null; @@ -1142,7 +1162,13 @@ public void updateMeasures(List instruments) {} public void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value, List requiredLabelValues, List optionalLabelValues) { - longUpDownCounterMetricInstrumentValues.put(metricInstrument, value); + if (metricInstrument.getName().equals("grpc.subchannel.open_connections")) { + openConnectionCount = value; + } + } + + synchronized long getOpenConnectionCount() { + return openConnectionCount; } } } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java index 5845b1c387f..cf995e4f8d4 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java @@ -75,7 +75,7 @@ public void run() { private int port = 8080; private boolean useTls = true; private boolean useAlts = false; - private int mcs = 0; + private boolean useMcs = false; private ScheduledExecutorService executor; private Server server; @@ -119,8 +119,8 @@ void parseArgs(String[] args) { usage = true; break; } - } else if ("mcs".equals(key)) { - mcs = Integer.parseInt(value); + } else if ("use_mcs".equals(key)) { + useMcs = Boolean.parseBoolean(value); addressType = Util.AddressType.IPV4; // To use NettyServerBuilder } else { System.err.println("Unknown argument: " + key); @@ -190,8 +190,8 @@ void start() throws Exception { if (v4Address != null && !v4Address.equals(localV4Address)) { ((NettyServerBuilder) serverBuilder).addListenAddress(v4Address); } - if (mcs > 0) { - ((NettyServerBuilder) serverBuilder).maxConcurrentCallsPerConnection(mcs); + if (useMcs) { + ((NettyServerBuilder) serverBuilder).maxConcurrentCallsPerConnection(2); } break; case IPV6: From 51b38e27726eeb8b8f75fabbee17bd8b3370e5c1 Mon Sep 17 00:00:00 2001 From: Kannan J Date: Wed, 21 Jan 2026 14:19:19 +0000 Subject: [PATCH 5/7] Fix enum test. --- .../test/java/io/grpc/testing/integration/TestCasesTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java index ab32d584e7c..4f6ea8e7931 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java @@ -67,7 +67,8 @@ public void testCaseNamesShouldMapToEnums() { "cancel_after_first_response", "timeout_on_sleeping_server", "orca_per_rpc", - "orca_oob" + "orca_oob", + "mcs", }; // additional test cases From 4da06a418cfabfcee9980f5c59e5597dd489ef2b Mon Sep 17 00:00:00 2001 From: Kannan J Date: Wed, 21 Jan 2026 14:37:36 +0000 Subject: [PATCH 6/7] Revert temp changes. --- bom/build.gradle | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 bom/build.gradle diff --git a/bom/build.gradle b/bom/build.gradle new file mode 100644 index 00000000000..f7f3918372f --- /dev/null +++ b/bom/build.gradle @@ -0,0 +1,38 @@ +plugins { + id 'java-platform' + id "maven-publish" +} + +description = 'gRPC: BOM' + +gradle.projectsEvaluated { + def projectsToInclude = rootProject.subprojects.findAll { + return it.name != 'grpc-compiler' + && it.plugins.hasPlugin('java') + && it.plugins.hasPlugin('maven-publish') + && it.tasks.findByName('publishMavenPublicationToMavenRepository')?.enabled + } + dependencies { + constraints { + projectsToInclude.each { api it } + } + } +} + +publishing { + publications { + maven(MavenPublication) { + from components.javaPlatform + pom.withXml { + def dependencies = asNode().dependencyManagement.dependencies.last() + // add protoc gen (produced by grpc-compiler with different artifact name) + // not sure how to express "pom" in gradle, kept in XML + def dependencyNode = dependencies.appendNode('dependency') + dependencyNode.appendNode('groupId', project.group) + dependencyNode.appendNode('artifactId', 'protoc-gen-grpc-java') + dependencyNode.appendNode('version', project.version) + dependencyNode.appendNode('type', 'pom') + } + } + } +} From df70a2721e437203760a5cfaea9ea0dcfff1baad Mon Sep 17 00:00:00 2001 From: Kannan J Date: Wed, 21 Jan 2026 14:39:45 +0000 Subject: [PATCH 7/7] Revert temp changes. --- build.gradle | 5 ----- interop-testing/build.gradle | 5 ----- 2 files changed, 10 deletions(-) diff --git a/build.gradle b/build.gradle index 01e09a5c920..91690c1e3c3 100644 --- a/build.gradle +++ b/build.gradle @@ -19,11 +19,6 @@ subprojects { apply plugin: "com.google.osdetector" apply plugin: "net.ltgt.errorprone" - apply plugin: "java" - test { - testLogging.showStandardStreams = true - systemProperty 'java.util.logging.config.file', "/home/kannanj/grpc-logger.properties" - } group = "io.grpc" version = "1.79.0-SNAPSHOT" // CURRENT_GRPC_VERSION diff --git a/interop-testing/build.gradle b/interop-testing/build.gradle index 7be6dac279b..5160759460c 100644 --- a/interop-testing/build.gradle +++ b/interop-testing/build.gradle @@ -8,11 +8,6 @@ plugins { } description = "gRPC: Integration Testing" -apply plugin: "java-library" -test { - testLogging.showStandardStreams = true - systemProperty 'java.util.logging.config.file', "/home/kannanj/grpc-logger.properties" -} dependencies { implementation project(path: ':grpc-alts', configuration: 'shadow'),