diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java index 2d16065254a..9e7d31c55b0 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java @@ -59,7 +59,8 @@ public enum TestCases { RPC_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on the same channel"), CHANNEL_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on a new channel"), ORCA_PER_RPC("report backend metrics per query"), - ORCA_OOB("report backend metrics out-of-band"); + ORCA_OOB("report backend metrics out-of-band"), + MCS("max concurrent streaming"); private final String description; diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index 125d876b705..b375ee7e4bf 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -17,10 +17,12 @@ package io.grpc.testing.integration; import static com.google.common.truth.Truth.assertThat; +import static io.grpc.testing.integration.TestCases.MCS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.ComputeEngineCredentials; @@ -38,12 +40,16 @@ import io.grpc.Grpc; import io.grpc.InsecureChannelCredentials; import io.grpc.InsecureServerCredentials; +import io.grpc.InternalManagedChannelBuilder; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; +import io.grpc.LongUpDownCounterMetricInstrument; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.MetricInstrument; +import io.grpc.MetricSink; import io.grpc.ServerBuilder; import io.grpc.TlsChannelCredentials; import io.grpc.alts.AltsChannelCredentials; @@ -52,6 +58,7 @@ import io.grpc.auth.MoreCallCredentials; import io.grpc.internal.GrpcUtil; import io.grpc.internal.JsonParser; +import io.grpc.internal.testing.StreamRecorder; import io.grpc.netty.InsecureFromHttp1ChannelCredentials; import io.grpc.netty.InternalNettyChannelBuilder; import io.grpc.netty.NettyChannelBuilder; @@ -65,15 +72,20 @@ import io.grpc.testing.integration.Messages.ResponseParameters; import io.grpc.testing.integration.Messages.SimpleRequest; import io.grpc.testing.integration.Messages.SimpleResponse; +import io.grpc.testing.integration.Messages.StreamingInputCallRequest; +import io.grpc.testing.integration.Messages.StreamingInputCallResponse; import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; import io.grpc.testing.integration.Messages.TestOrcaReport; import java.io.File; import java.io.FileInputStream; +import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; import java.util.Arrays; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -563,6 +575,11 @@ private void runTest(TestCases testCase) throws Exception { tester.testOrcaOob(); break; } + + case MCS: { + tester.testMcs(); + break; + } default: throw new IllegalArgumentException("Unknown test case: " + testCase); @@ -596,9 +613,12 @@ private ClientInterceptor maybeCreateAdditionalMetadataInterceptor( } private class Tester extends AbstractInteropTest { + private FakeMetricsSink fakeMetricsSink = new FakeMetricsSink(); + @Override protected ManagedChannelBuilder createChannelBuilder() { - boolean useGeneric = false; + boolean useSubchannelMetricsSink = testCase.equals(MCS.toString()); + boolean useGeneric = testCase.equals(MCS.toString())? true : false; ChannelCredentials channelCredentials; if (customCredentialsType != null) { useGeneric = true; // Retain old behavior; avoids erroring if incompatible @@ -658,13 +678,26 @@ protected ManagedChannelBuilder createChannelBuilder() { if (serverHostOverride != null) { channelBuilder.overrideAuthority(serverHostOverride); } - if (serviceConfig != null) { + if (testCase.equals(MCS.toString())) { + channelBuilder.disableServiceConfigLookUp(); + try { + @SuppressWarnings("unchecked") + Map serviceConfigMap = (Map) JsonParser.parse( + "{\"connection_scaling\":{\"max_connections_per_subchannel\": 2}}"); + channelBuilder.defaultServiceConfig(serviceConfigMap); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else if (serviceConfig != null) { channelBuilder.disableServiceConfigLookUp(); channelBuilder.defaultServiceConfig(serviceConfig); } if (addMdInterceptor != null) { channelBuilder.intercept(addMdInterceptor); } + if (useSubchannelMetricsSink) { + InternalManagedChannelBuilder.addMetricSink(channelBuilder, fakeMetricsSink); + } return channelBuilder; } if (!useOkHttp) { @@ -979,31 +1012,16 @@ public void testOrcaOob() throws Exception { .build(); final int retryLimit = 5; - BlockingQueue queue = new LinkedBlockingQueue<>(); - final Object lastItem = new Object(); + StreamingOutputCallResponseObserver streamingOutputCallResponseObserver = + new StreamingOutputCallResponseObserver(); StreamObserver streamObserver = - asyncStub.fullDuplexCall(new StreamObserver() { - - @Override - public void onNext(StreamingOutputCallResponse value) { - queue.add(value); - } - - @Override - public void onError(Throwable t) { - queue.add(t); - } - - @Override - public void onCompleted() { - queue.add(lastItem); - } - }); + asyncStub.fullDuplexCall(streamingOutputCallResponseObserver); streamObserver.onNext(StreamingOutputCallRequest.newBuilder() .setOrcaOobReport(answer) .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); - assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class); + assertThat(streamingOutputCallResponseObserver.take()) + .isInstanceOf(StreamingOutputCallResponse.class); int i = 0; for (; i < retryLimit; i++) { Thread.sleep(1000); @@ -1016,7 +1034,7 @@ public void onCompleted() { streamObserver.onNext(StreamingOutputCallRequest.newBuilder() .setOrcaOobReport(answer2) .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); - assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class); + assertThat(streamingOutputCallResponseObserver.isCompleted).isTrue(); for (i = 0; i < retryLimit; i++) { Thread.sleep(1000); @@ -1026,8 +1044,6 @@ public void onCompleted() { } } assertThat(i).isLessThan(retryLimit); - streamObserver.onCompleted(); - assertThat(queue.take()).isSameInstanceAs(lastItem); } @Override @@ -1054,6 +1070,59 @@ protected ServerBuilder getHandshakerServerBuilder() { protected int operationTimeoutMillis() { return 15000; } + + class StreamingOutputCallResponseObserver implements StreamObserver { + private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private volatile boolean isCompleted = true; + + @Override + public void onNext(StreamingOutputCallResponse value) { + queue.add(value); + } + + @Override + public void onError(Throwable t) { + queue.add(t); + } + + @Override + public void onCompleted() { + isCompleted = true; + } + + Object take() throws InterruptedException { + return queue.take(); + } + } + + public void testMcs() throws Exception { + StreamingOutputCallResponseObserver responseObserver1 = new StreamingOutputCallResponseObserver(); + StreamObserver streamObserver1 = + asyncStub.fullDuplexCall(responseObserver1); + streamObserver1.onNext(StreamingOutputCallRequest.newBuilder() + .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); + assertThat(responseObserver1.take()).isInstanceOf(StreamingOutputCallResponse.class); + + StreamingOutputCallResponseObserver responseObserver2 = new StreamingOutputCallResponseObserver(); + StreamObserver streamObserver2 = + asyncStub.fullDuplexCall(responseObserver2); + streamObserver2.onNext(StreamingOutputCallRequest.newBuilder() + .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); + assertThat(responseObserver2.take()).isInstanceOf(StreamingOutputCallResponse.class); + + assertThat(fakeMetricsSink.openConnectionCount).isEqualTo(1); + + // The first connection is at max rpc call count of 2, so the 3rd rpc will cause a new + // connection to be created in the same subchannel and not get queued. + StreamingOutputCallResponseObserver responseObserver3 = new StreamingOutputCallResponseObserver(); + StreamObserver streamObserver3 = + asyncStub.fullDuplexCall(responseObserver3); + streamObserver3.onNext(StreamingOutputCallRequest.newBuilder() + .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); + assertThat(responseObserver3.take()).isInstanceOf(StreamingOutputCallResponse.class); + + assertThat(fakeMetricsSink.openConnectionCount).isEqualTo(2); + } } private static String validTestCasesHelpText() { @@ -1067,4 +1136,39 @@ private static String validTestCasesHelpText() { } return builder.toString(); } + + static class FakeMetricsSink implements MetricSink { + private volatile long openConnectionCount; + + @Override + public Map getEnabledMetrics() { + return null; + } + + @Override + public Set getOptionalLabels() { + return null; + } + + @Override + public int getMeasuresSize() { + return 0; + } + + @Override + public void updateMeasures(List instruments) {} + + @Override + public void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value, + List requiredLabelValues, + List optionalLabelValues) { + if (metricInstrument.getName().equals("grpc.subchannel.open_connections")) { + openConnectionCount = value; + } + } + + synchronized long getOpenConnectionCount() { + return openConnectionCount; + } + } } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java index fc4cdf9178f..cf995e4f8d4 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java @@ -75,6 +75,7 @@ public void run() { private int port = 8080; private boolean useTls = true; private boolean useAlts = false; + private boolean useMcs = false; private ScheduledExecutorService executor; private Server server; @@ -118,6 +119,9 @@ void parseArgs(String[] args) { usage = true; break; } + } else if ("use_mcs".equals(key)) { + useMcs = Boolean.parseBoolean(value); + addressType = Util.AddressType.IPV4; // To use NettyServerBuilder } else { System.err.println("Unknown argument: " + key); usage = true; @@ -186,6 +190,9 @@ void start() throws Exception { if (v4Address != null && !v4Address.equals(localV4Address)) { ((NettyServerBuilder) serverBuilder).addListenAddress(v4Address); } + if (useMcs) { + ((NettyServerBuilder) serverBuilder).maxConcurrentCallsPerConnection(2); + } break; case IPV6: List v6Addresses = Util.getV6Addresses(port); diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java index ab32d584e7c..4f6ea8e7931 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java @@ -67,7 +67,8 @@ public void testCaseNamesShouldMapToEnums() { "cancel_after_first_response", "timeout_on_sleeping_server", "orca_per_rpc", - "orca_oob" + "orca_oob", + "mcs", }; // additional test cases