Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/devtools/mobileharness/infra/container/proto/test_engine.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ message TestEngineLocator {
GrpcLocator grpc_locator = 5;
bool enable_grpc = 6;

DualConduitLocator dual_conduit_locator = 7;
bool enable_dual_conduit = 8;

message StubbyLocator {
// Optional. A test engine may report no or more than one IPs. If so, client
// should use the IP of the lab server (which is detected by MH master)
Expand Down Expand Up @@ -63,6 +66,13 @@ message TestEngineLocator {
// Since lab 4.270.0.
string host_ip = 4;
}

message DualConduitLocator {
// Required.
//
// An xDS address like xds:///myserver.hostname.dcon.
string xds_address = 1;
}
}

enum TestEngineStatus {
Expand Down
3 changes: 3 additions & 0 deletions src/java/com/google/devtools/mobileharness/infra/lab/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ java_library(
"//src/java/com/google/devtools/mobileharness/shared/util/auto:auto_value",
"//src/java/com/google/devtools/mobileharness/shared/util/base",
"//src/java/com/google/devtools/mobileharness/shared/util/base:proto_text_format",
"//src/java/com/google/devtools/mobileharness/shared/util/comm/dualconduit/client",
"//src/java/com/google/devtools/mobileharness/shared/util/comm/dualconduit/proxy",
"//src/java/com/google/devtools/mobileharness/shared/util/comm/dualconduit/proxy:server_type",
"//src/java/com/google/devtools/mobileharness/shared/util/comm/filetransfer/cloud/rpc/service:cloud_file_transfer_service_grpc_impl",
"//src/java/com/google/devtools/mobileharness/shared/util/comm/filetransfer/cloud/rpc/service:cloud_file_transfer_service_impl",
"//src/java/com/google/devtools/mobileharness/shared/util/comm/filetransfer/common:tagged_file_handler",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@
import com.google.devtools.mobileharness.shared.labinfo.LocalLabInfoProvider;
import com.google.devtools.mobileharness.shared.util.base.ProtoTextFormat;
import com.google.devtools.mobileharness.shared.util.base.StrUtil;
import com.google.devtools.mobileharness.shared.util.comm.dualconduit.client.DualConduitClient;
import com.google.devtools.mobileharness.shared.util.comm.dualconduit.proxy.ReverseProxiedServer;
import com.google.devtools.mobileharness.shared.util.comm.dualconduit.proxy.ServerType;
import com.google.devtools.mobileharness.shared.util.comm.filetransfer.cloud.rpc.service.CloudFileTransferServiceGrpcImpl;
import com.google.devtools.mobileharness.shared.util.comm.filetransfer.cloud.rpc.service.CloudFileTransferServiceImpl;
import com.google.devtools.mobileharness.shared.util.comm.filetransfer.common.TaggedFileHandler;
Expand Down Expand Up @@ -115,6 +118,7 @@
import com.google.wireless.qa.mobileharness.shared.util.DeviceUtil;
import com.google.wireless.qa.mobileharness.shared.util.NetUtil;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.protobuf.services.ProtoReflectionService;
import java.io.IOException;
Expand Down Expand Up @@ -153,6 +157,8 @@ public class LabServer {
private final boolean enableStubbyRpcServer;
private final int rpcPort;

private volatile Server localGrpcServer;

@Inject
LabServer(
ProxyTestManager testManager,
Expand Down Expand Up @@ -373,7 +379,25 @@ ListenableFuture<LocalDeviceManager> provideLocalDeviceManager() {
for (BindableService service : localGrpcServices) {
localGrpcServerBuilder.addService(service);
}
localGrpcServerBuilder.build().start();
Server server = localGrpcServerBuilder.build();
if (Flags.dconDialerAddress.get() != null
&& !Flags.dconDialerAddress.getNonNull().isEmpty()) {
String instanceId = netUtil.getParentHostname();
String localHostname = Flags.localHostname.getNonNull();

DualConduitClient client = new DualConduitClient(Flags.dconDialerAddress.getNonNull());

localGrpcServer =
new ReverseProxiedServer(
server,
client,
ServerType.LAB_SERVER.name().toLowerCase(),
instanceId,
localHostname);
} else {
localGrpcServer = server;
}
localGrpcServer.start();

// NOTE: Only for debug/test purpose.
if (Flags.debugRandomExit.getNonNull()) {
Expand Down Expand Up @@ -421,6 +445,10 @@ public ListenableFuture<TestServices> getStartingFuture() {
public void onShutdown() {
logger.atInfo().log("Lab server is shutting down.");

if (localGrpcServer != null) {
localGrpcServer.shutdown();
}

SystemUtil.setProcessIsShuttingDown();
// Shuts down the server and all threads here.
if (mainThreadPool != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

load("@rules_java//java:defs.bzl", "java_library")

package(
default_applicable_licenses = ["//:license"],
default_visibility = [
"//:deviceinfra_all_pkg",
],
)

java_library(
name = "client",
srcs = ["DualConduitClient.java"],
deps = [
"//src/devtools/mobileharness/shared/util/comm/dualconduit/proto:dual_conduit_java_proto",
"//src/devtools/mobileharness/shared/util/comm/dualconduit/proto:dual_conduit_service_java_grpc",
"@grpc-java//core",
"@maven//:com_google_code_findbugs_jsr305",
"@maven//:com_google_guava_guava",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.devtools.mobileharness.shared.util.comm.dualconduit.client;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.mobileharness.shared.util.comm.dualconduit.proto.DualConduitProto.EstablishConduitRequest;
import com.google.devtools.mobileharness.shared.util.comm.dualconduit.proto.DualConduitProto.EstablishConduitRequest.ConduitType;
import com.google.devtools.mobileharness.shared.util.comm.dualconduit.proto.DualConduitProto.EstablishConduitRequest.Protocol;
import com.google.devtools.mobileharness.shared.util.comm.dualconduit.proto.DualConduitProto.EstablishConduitResponse;
import com.google.devtools.mobileharness.shared.util.comm.dualconduit.proto.DualConduitProto.TeardownConduitRequest;
import com.google.devtools.mobileharness.shared.util.comm.dualconduit.proto.DualConduitProto.TeardownConduitResponse;
import com.google.devtools.mobileharness.shared.util.comm.dualconduit.proto.DualConduitServiceGrpc;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import javax.annotation.Nullable;

/** Client for DualConduitService. */
public class DualConduitClient {

private final DualConduitServiceGrpc.DualConduitServiceBlockingStub blockingStub;
private final DualConduitServiceGrpc.DualConduitServiceFutureStub futureStub;
@Nullable private final ManagedChannel managedChannel;

public DualConduitClient(Channel channel) {
this.blockingStub = DualConduitServiceGrpc.newBlockingStub(channel);
this.futureStub = DualConduitServiceGrpc.newFutureStub(channel);
this.managedChannel = null;
}

public DualConduitClient(String target) {
this.managedChannel = ManagedChannelBuilder.forTarget(target).usePlaintext().build();
this.blockingStub = DualConduitServiceGrpc.newBlockingStub(managedChannel);
this.futureStub = DualConduitServiceGrpc.newFutureStub(managedChannel);
}

/** Shuts down the managed channel if it was created by this client. */
public void shutdown() {
if (managedChannel != null) {
managedChannel.shutdown();
}
}

/** Establishes a gRPC reverse conduit. */
public EstablishConduitResponse establishReverseGrpcConduit(
String serverName, String instanceId, String destinationEndpoint) {
EstablishConduitRequest request =
EstablishConduitRequest.newBuilder()
.setType(ConduitType.CONDUIT_TYPE_REVERSE)
.setProtocol(Protocol.PROTOCOL_GRPC)
.setAutoReconnect(true)
.setServerName(serverName)
.setInstanceId(instanceId)
.setDestinationEndpoint(destinationEndpoint)
.build();
return blockingStub.establishConduit(request);
}

/** Establishes a gRPC reverse conduit asynchronously. */
public ListenableFuture<EstablishConduitResponse> establishReverseGrpcConduitAsync(
String serverName, String instanceId, String destinationEndpoint) {
EstablishConduitRequest request =
EstablishConduitRequest.newBuilder()
.setType(ConduitType.CONDUIT_TYPE_REVERSE)
.setProtocol(Protocol.PROTOCOL_GRPC)
.setAutoReconnect(true)
.setServerName(serverName)
.setInstanceId(instanceId)
.setDestinationEndpoint(destinationEndpoint)
.build();
return futureStub.establishConduit(request);
}

/** Tears down a conduit. */
public TeardownConduitResponse teardownConduit(String conduitId) {
TeardownConduitRequest request =
TeardownConduitRequest.newBuilder().setConduitId(conduitId).build();
return blockingStub.teardownConduit(request);
}

/** Tears down a conduit asynchronously. */
public ListenableFuture<TeardownConduitResponse> teardownConduitAsync(String conduitId) {
TeardownConduitRequest request =
TeardownConduitRequest.newBuilder().setConduitId(conduitId).build();
return futureStub.teardownConduit(request);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

load("@rules_java//java:defs.bzl", "java_library")

package(
default_applicable_licenses = ["//:license"],
default_visibility = [
"//:deviceinfra_all_pkg",
],
)

java_library(
name = "proxy",
srcs = ["ReverseProxiedServer.java"],
deps = [
"//src/devtools/mobileharness/shared/util/comm/dualconduit/proto:dual_conduit_java_proto",
"//src/java/com/google/devtools/mobileharness/shared/util/comm/dualconduit/client",
"//src/java/com/google/devtools/mobileharness/shared/util/logging:google_logger",
"@grpc-java//core",
],
)

java_library(
name = "server_type",
srcs = ["ServerType.java"],
deps = [],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.devtools.mobileharness.shared.util.comm.dualconduit.proxy;

import com.google.common.flogger.FluentLogger;
import com.google.devtools.mobileharness.shared.util.comm.dualconduit.client.DualConduitClient;
import com.google.devtools.mobileharness.shared.util.comm.dualconduit.proto.DualConduitProto.EstablishConduitResponse;
import io.grpc.Server;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
* A decorator for {@link io.grpc.Server} that establishes a DualConduit after starting and tears it
* down before shutting down.
*/
public class ReverseProxiedServer extends Server {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();

private final Server delegate;
private final DualConduitClient client;
private final String serverName;
private final String instanceId;
private final String localHostname;
private String conduitId;

public ReverseProxiedServer(
Server delegate,
DualConduitClient client,
String serverName,
String instanceId,
String localHostname) {
this.delegate = delegate;
this.client = client;
this.serverName = serverName;
this.instanceId = instanceId;
this.localHostname = localHostname;
}

@Override
public Server start() throws IOException {
delegate.start();
logger.atInfo().log("Delegate server started, establishing conduit...");
try {
String destinationEndpoint = localHostname + ":" + delegate.getPort();
EstablishConduitResponse response =
client.establishReverseGrpcConduit(serverName, instanceId, destinationEndpoint);
this.conduitId = response.getConduitId();
logger.atInfo().log("Conduit established, conduitId: %s", conduitId);
} catch (RuntimeException e) {
delegate.shutdown();
throw new IOException("Failed to establish conduit", e);
}
return this;
}

@Override
public Server shutdown() {
logger.atInfo().log("Shutting down ReverseProxiedServer...");
if (conduitId != null) {
try {
logger.atInfo().log("Tearing down conduit %s", conduitId);
var unused = client.teardownConduit(conduitId);
} catch (RuntimeException e) {
logger.atWarning().withCause(e).log("Failed to teardown conduit %s", conduitId);
}
}
client.shutdown();
delegate.shutdown();
return this;
}

@Override
public Server shutdownNow() {
logger.atInfo().log("Shutting down ReverseProxiedServer now...");
if (conduitId != null) {
try {
logger.atInfo().log("Tearing down conduit %s", conduitId);
var unused = client.teardownConduit(conduitId);
} catch (RuntimeException e) {
logger.atWarning().withCause(e).log("Failed to teardown conduit %s", conduitId);
}
}
client.shutdown();
delegate.shutdownNow();
return this;
}

@Override
public boolean isShutdown() {
return delegate.isShutdown();
}

@Override
public boolean isTerminated() {
return delegate.isTerminated();
}

@Override
public void awaitTermination() throws InterruptedException {
delegate.awaitTermination();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}
}
Loading
Loading