Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public enum TestCases {
RPC_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on the same channel"),
CHANNEL_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on a new channel"),
ORCA_PER_RPC("report backend metrics per query"),
ORCA_OOB("report backend metrics out-of-band");
ORCA_OOB("report backend metrics out-of-band"),
MCS("max concurrent streaming");

private final String description;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package io.grpc.testing.integration;

import static com.google.common.truth.Truth.assertThat;
import static io.grpc.testing.integration.TestCases.MCS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.ComputeEngineCredentials;
Expand All @@ -38,12 +40,16 @@
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.InsecureServerCredentials;
import io.grpc.InternalManagedChannelBuilder;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.LongUpDownCounterMetricInstrument;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MetricInstrument;
import io.grpc.MetricSink;
import io.grpc.ServerBuilder;
import io.grpc.TlsChannelCredentials;
import io.grpc.alts.AltsChannelCredentials;
Expand All @@ -52,6 +58,7 @@
import io.grpc.auth.MoreCallCredentials;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.JsonParser;
import io.grpc.internal.testing.StreamRecorder;
import io.grpc.netty.InsecureFromHttp1ChannelCredentials;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.NettyChannelBuilder;
Expand All @@ -65,15 +72,20 @@
import io.grpc.testing.integration.Messages.ResponseParameters;
import io.grpc.testing.integration.Messages.SimpleRequest;
import io.grpc.testing.integration.Messages.SimpleResponse;
import io.grpc.testing.integration.Messages.StreamingInputCallRequest;
import io.grpc.testing.integration.Messages.StreamingInputCallResponse;
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
import io.grpc.testing.integration.Messages.TestOrcaReport;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -563,6 +575,11 @@ private void runTest(TestCases testCase) throws Exception {
tester.testOrcaOob();
break;
}

case MCS: {
tester.testMcs();
break;
}

default:
throw new IllegalArgumentException("Unknown test case: " + testCase);
Expand Down Expand Up @@ -596,9 +613,12 @@ private ClientInterceptor maybeCreateAdditionalMetadataInterceptor(
}

private class Tester extends AbstractInteropTest {
private FakeMetricsSink fakeMetricsSink = new FakeMetricsSink();

@Override
protected ManagedChannelBuilder<?> createChannelBuilder() {
boolean useGeneric = false;
boolean useSubchannelMetricsSink = testCase.equals(MCS.toString());
boolean useGeneric = testCase.equals(MCS.toString())? true : false;
ChannelCredentials channelCredentials;
if (customCredentialsType != null) {
useGeneric = true; // Retain old behavior; avoids erroring if incompatible
Expand Down Expand Up @@ -658,13 +678,26 @@ protected ManagedChannelBuilder<?> createChannelBuilder() {
if (serverHostOverride != null) {
channelBuilder.overrideAuthority(serverHostOverride);
}
if (serviceConfig != null) {
if (testCase.equals(MCS.toString())) {
channelBuilder.disableServiceConfigLookUp();
try {
@SuppressWarnings("unchecked")
Map<String, ?> serviceConfigMap = (Map<String, ?>) JsonParser.parse(
"{\"connection_scaling\":{\"max_connections_per_subchannel\": 2}}");
channelBuilder.defaultServiceConfig(serviceConfigMap);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else if (serviceConfig != null) {
channelBuilder.disableServiceConfigLookUp();
channelBuilder.defaultServiceConfig(serviceConfig);
}
if (addMdInterceptor != null) {
channelBuilder.intercept(addMdInterceptor);
}
if (useSubchannelMetricsSink) {
InternalManagedChannelBuilder.addMetricSink(channelBuilder, fakeMetricsSink);
}
return channelBuilder;
}
if (!useOkHttp) {
Expand Down Expand Up @@ -979,31 +1012,16 @@ public void testOrcaOob() throws Exception {
.build();

final int retryLimit = 5;
BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
final Object lastItem = new Object();
StreamingOutputCallResponseObserver streamingOutputCallResponseObserver =
new StreamingOutputCallResponseObserver();
StreamObserver<StreamingOutputCallRequest> streamObserver =
asyncStub.fullDuplexCall(new StreamObserver<StreamingOutputCallResponse>() {

@Override
public void onNext(StreamingOutputCallResponse value) {
queue.add(value);
}

@Override
public void onError(Throwable t) {
queue.add(t);
}

@Override
public void onCompleted() {
queue.add(lastItem);
}
});
asyncStub.fullDuplexCall(streamingOutputCallResponseObserver);

streamObserver.onNext(StreamingOutputCallRequest.newBuilder()
.setOrcaOobReport(answer)
.addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build());
assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class);
assertThat(streamingOutputCallResponseObserver.take())
.isInstanceOf(StreamingOutputCallResponse.class);
int i = 0;
for (; i < retryLimit; i++) {
Thread.sleep(1000);
Expand All @@ -1016,7 +1034,7 @@ public void onCompleted() {
streamObserver.onNext(StreamingOutputCallRequest.newBuilder()
.setOrcaOobReport(answer2)
.addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build());
assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class);
assertThat(streamingOutputCallResponseObserver.isCompleted).isTrue();

for (i = 0; i < retryLimit; i++) {
Thread.sleep(1000);
Expand All @@ -1026,8 +1044,6 @@ public void onCompleted() {
}
}
assertThat(i).isLessThan(retryLimit);
streamObserver.onCompleted();
assertThat(queue.take()).isSameInstanceAs(lastItem);
}

@Override
Expand All @@ -1054,6 +1070,59 @@ protected ServerBuilder<?> getHandshakerServerBuilder() {
protected int operationTimeoutMillis() {
return 15000;
}

class StreamingOutputCallResponseObserver implements StreamObserver<StreamingOutputCallResponse> {
private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
private volatile boolean isCompleted = true;

@Override
public void onNext(StreamingOutputCallResponse value) {
queue.add(value);
}

@Override
public void onError(Throwable t) {
queue.add(t);
}

@Override
public void onCompleted() {
isCompleted = true;
}

Object take() throws InterruptedException {
return queue.take();
}
}

public void testMcs() throws Exception {
StreamingOutputCallResponseObserver responseObserver1 = new StreamingOutputCallResponseObserver();
StreamObserver<StreamingOutputCallRequest> streamObserver1 =
asyncStub.fullDuplexCall(responseObserver1);
streamObserver1.onNext(StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build());
assertThat(responseObserver1.take()).isInstanceOf(StreamingOutputCallResponse.class);

StreamingOutputCallResponseObserver responseObserver2 = new StreamingOutputCallResponseObserver();
StreamObserver<StreamingOutputCallRequest> streamObserver2 =
asyncStub.fullDuplexCall(responseObserver2);
streamObserver2.onNext(StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build());
assertThat(responseObserver2.take()).isInstanceOf(StreamingOutputCallResponse.class);

assertThat(fakeMetricsSink.openConnectionCount).isEqualTo(1);

// The first connection is at max rpc call count of 2, so the 3rd rpc will cause a new
// connection to be created in the same subchannel and not get queued.
StreamingOutputCallResponseObserver responseObserver3 = new StreamingOutputCallResponseObserver();
StreamObserver<StreamingOutputCallRequest> streamObserver3 =
asyncStub.fullDuplexCall(responseObserver3);
streamObserver3.onNext(StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build());
assertThat(responseObserver3.take()).isInstanceOf(StreamingOutputCallResponse.class);

assertThat(fakeMetricsSink.openConnectionCount).isEqualTo(2);
}
}

private static String validTestCasesHelpText() {
Expand All @@ -1067,4 +1136,39 @@ private static String validTestCasesHelpText() {
}
return builder.toString();
}

static class FakeMetricsSink implements MetricSink {
private volatile long openConnectionCount;

@Override
public Map<String, Boolean> getEnabledMetrics() {
return null;
}

@Override
public Set<String> getOptionalLabels() {
return null;
}

@Override
public int getMeasuresSize() {
return 0;
}

@Override
public void updateMeasures(List<MetricInstrument> instruments) {}

@Override
public void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value,
List<String> requiredLabelValues,
List<String> optionalLabelValues) {
if (metricInstrument.getName().equals("grpc.subchannel.open_connections")) {
openConnectionCount = value;
}
}

synchronized long getOpenConnectionCount() {
return openConnectionCount;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public void run() {
private int port = 8080;
private boolean useTls = true;
private boolean useAlts = false;
private boolean useMcs = false;

private ScheduledExecutorService executor;
private Server server;
Expand Down Expand Up @@ -118,6 +119,9 @@ void parseArgs(String[] args) {
usage = true;
break;
}
} else if ("use_mcs".equals(key)) {
useMcs = Boolean.parseBoolean(value);
addressType = Util.AddressType.IPV4; // To use NettyServerBuilder
} else {
System.err.println("Unknown argument: " + key);
usage = true;
Expand Down Expand Up @@ -186,6 +190,9 @@ void start() throws Exception {
if (v4Address != null && !v4Address.equals(localV4Address)) {
((NettyServerBuilder) serverBuilder).addListenAddress(v4Address);
}
if (useMcs) {
((NettyServerBuilder) serverBuilder).maxConcurrentCallsPerConnection(2);
}
break;
case IPV6:
List<SocketAddress> v6Addresses = Util.getV6Addresses(port);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public void testCaseNamesShouldMapToEnums() {
"cancel_after_first_response",
"timeout_on_sleeping_server",
"orca_per_rpc",
"orca_oob"
"orca_oob",
"mcs",
};

// additional test cases
Expand Down
Loading