Skip to content

Commit 1d295e3

Browse files
authored
add gRPC channel to client options (#731)
1 parent 89eaf91 commit 1d295e3

File tree

2 files changed

+133
-64
lines changed

2 files changed

+133
-64
lines changed

src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java

+30-11
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ final class GrpcServiceStubs implements IGrpcServiceStubs {
6767
private static final String CLIENT_IMPL_HEADER_VALUE = "uber-java";
6868

6969
private final ManagedChannel channel;
70+
private final boolean shutdownChannel;
7071
private final AtomicBoolean shutdownRequested = new AtomicBoolean();
7172
private final DomainAPIGrpc.DomainAPIBlockingStub domainBlockingStub;
7273
private final DomainAPIGrpc.DomainAPIFutureStub domainFutureStub;
@@ -80,12 +81,17 @@ final class GrpcServiceStubs implements IGrpcServiceStubs {
8081
private final MetaAPIGrpc.MetaAPIFutureStub metaFutureStub;
8182

8283
GrpcServiceStubs(ClientOptions options) {
83-
this.channel =
84-
ManagedChannelBuilder.forAddress(options.getHost(), options.getPort())
85-
.defaultLoadBalancingPolicy("round_robin")
86-
.usePlaintext()
87-
.build();
88-
84+
if (options.getGRPCChannel() != null) {
85+
this.channel = options.getGRPCChannel();
86+
shutdownChannel = false;
87+
} else {
88+
this.channel =
89+
ManagedChannelBuilder.forAddress(options.getHost(), options.getPort())
90+
.defaultLoadBalancingPolicy("round_robin")
91+
.usePlaintext()
92+
.build();
93+
shutdownChannel = true;
94+
}
8995
ClientInterceptor deadlineInterceptor = new GrpcDeadlineInterceptor(options);
9096
ClientInterceptor tracingInterceptor = newTracingInterceptor();
9197
Metadata headers = new Metadata();
@@ -201,28 +207,41 @@ public WorkflowAPIFutureStub workflowFutureStub() {
201207
@Override
202208
public void shutdown() {
203209
shutdownRequested.set(true);
204-
channel.shutdown();
210+
if (shutdownChannel) {
211+
channel.shutdown();
212+
}
205213
}
206214

207215
@Override
208216
public void shutdownNow() {
209217
shutdownRequested.set(true);
210-
channel.shutdownNow();
218+
if (shutdownChannel) {
219+
channel.shutdownNow();
220+
}
211221
}
212222

213223
@Override
214224
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
215-
return channel.awaitTermination(timeout, unit);
225+
if (shutdownChannel) {
226+
return channel.awaitTermination(timeout, unit);
227+
}
228+
return true;
216229
}
217230

218231
@Override
219232
public boolean isShutdown() {
220-
return channel.isShutdown();
233+
if (shutdownChannel) {
234+
return channel.isShutdown();
235+
}
236+
return shutdownRequested.get();
221237
}
222238

223239
@Override
224240
public boolean isTerminated() {
225-
return channel.isTerminated();
241+
if (shutdownChannel) {
242+
return channel.isTerminated();
243+
}
244+
return shutdownRequested.get();
226245
}
227246

228247
private static class GrpcDeadlineInterceptor implements ClientInterceptor {

src/main/java/com/uber/cadence/serviceclient/ClientOptions.java

+103-53
Original file line numberDiff line numberDiff line change
@@ -23,79 +23,94 @@
2323
import com.uber.cadence.internal.metrics.NoopScope;
2424
import com.uber.cadence.serviceclient.auth.IAuthorizationProvider;
2525
import com.uber.m3.tally.Scope;
26+
import io.grpc.ManagedChannel;
2627
import java.util.Map;
2728

2829
public class ClientOptions {
30+
2931
private static final int DEFAULT_LOCAL_CADENCE_SERVER_PORT = 7933;
3032

3133
private static final String LOCALHOST = "127.0.0.1";
3234

33-
/** Default RPC timeout used for all non long poll calls. */
35+
/**
36+
* Default RPC timeout used for all non long poll calls.
37+
*/
3438
private static final long DEFAULT_RPC_TIMEOUT_MILLIS = 3 * 1000;
35-
/** Default RPC timeout used for all long poll calls. */
39+
/**
40+
* Default RPC timeout used for all long poll calls.
41+
*/
3642
private static final long DEFAULT_POLL_RPC_TIMEOUT_MILLIS = 30 * 1000;
3743

38-
/** Default RPC timeout for QueryWorkflow */
44+
/**
45+
* Default RPC timeout for QueryWorkflow
46+
*/
3947
private static final long DEFAULT_QUERY_RPC_TIMEOUT_MILLIS = 10 * 1000;
4048

41-
/** Default RPC timeout for ListArchivedWorkflow */
49+
/**
50+
* Default RPC timeout for ListArchivedWorkflow
51+
*/
4252
private static final long DEFAULT_LIST_ARCHIVED_WORKFLOW_TIMEOUT_MILLIS = 180 * 1000;
4353

4454
private static final String DEFAULT_CLIENT_APP_NAME = "cadence-client";
4555

46-
/** Name of the Cadence service front end as required by TChannel. */
56+
/**
57+
* Name of the Cadence service front end as required by TChannel.
58+
*/
4759
private static final String DEFAULT_SERVICE_NAME = "cadence-frontend";
60+
private static final ClientOptions DEFAULT_INSTANCE;
61+
62+
static {
63+
DEFAULT_INSTANCE = new Builder().build();
64+
}
4865

4966
private final String host;
5067
private final int port;
51-
52-
/** The tChannel timeout in milliseconds */
68+
private final ManagedChannel gRPCChannel;
69+
/**
70+
* The tChannel timeout in milliseconds
71+
*/
5372
private final long rpcTimeoutMillis;
54-
55-
/** The tChannel timeout for long poll calls in milliseconds */
73+
/**
74+
* The tChannel timeout for long poll calls in milliseconds
75+
*/
5676
private final long rpcLongPollTimeoutMillis;
57-
58-
/** The tChannel timeout for query workflow call in milliseconds */
77+
/**
78+
* The tChannel timeout for query workflow call in milliseconds
79+
*/
5980
private final long rpcQueryTimeoutMillis;
60-
61-
/** The tChannel timeout for list archived workflow call in milliseconds */
81+
/**
82+
* The tChannel timeout for list archived workflow call in milliseconds
83+
*/
6284
private final long rpcListArchivedWorkflowTimeoutMillis;
63-
64-
/** TChannel service name that the Cadence service was started with. */
85+
/**
86+
* TChannel service name that the Cadence service was started with.
87+
*/
6588
private final String serviceName;
66-
67-
/** Name of the service using the cadence-client. */
89+
/**
90+
* Name of the service using the cadence-client.
91+
*/
6892
private final String clientAppName;
69-
70-
/** Client for metrics reporting. */
93+
/**
94+
* Client for metrics reporting.
95+
*/
7196
private final Scope metricsScope;
72-
73-
/** Optional TChannel transport headers */
97+
/**
98+
* Optional TChannel transport headers
99+
*/
74100
private final Map<String, String> transportHeaders;
75-
76-
/** Optional TChannel headers */
101+
/**
102+
* Optional TChannel headers
103+
*/
77104
private final Map<String, String> headers;
78-
79-
/** Optional authorization provider */
105+
/**
106+
* Optional authorization provider
107+
*/
80108
private final IAuthorizationProvider authProvider;
81-
82-
private static final ClientOptions DEFAULT_INSTANCE;
83-
84-
/** Optional Feature flags to turn on/off some Cadence features */
109+
/**
110+
* Optional Feature flags to turn on/off some Cadence features
111+
*/
85112
private final FeatureFlags featureFlags;
86113

87-
static {
88-
DEFAULT_INSTANCE = new Builder().build();
89-
}
90-
91-
public static ClientOptions defaultInstance() {
92-
return DEFAULT_INSTANCE;
93-
}
94-
95-
public static Builder newBuilder() {
96-
return new Builder();
97-
}
98-
99114
private ClientOptions(Builder builder) {
100115
if (Strings.isNullOrEmpty(builder.host)) {
101116
host =
@@ -105,8 +120,8 @@ private ClientOptions(Builder builder) {
105120
} else {
106121
host = builder.host;
107122
}
108-
109123
this.port = builder.port;
124+
this.gRPCChannel = builder.gRPCChannel;
110125
this.rpcTimeoutMillis = builder.rpcTimeoutMillis;
111126
if (builder.clientAppName == null) {
112127
this.clientAppName = DEFAULT_CLIENT_APP_NAME;
@@ -141,6 +156,14 @@ private ClientOptions(Builder builder) {
141156
this.authProvider = builder.authProvider;
142157
}
143158

159+
public static ClientOptions defaultInstance() {
160+
return DEFAULT_INSTANCE;
161+
}
162+
163+
public static Builder newBuilder() {
164+
return new Builder();
165+
}
166+
144167
public String getHost() {
145168
return host;
146169
}
@@ -149,27 +172,41 @@ public int getPort() {
149172
return port;
150173
}
151174

152-
/** @return Returns the rpc timeout value in millis. */
175+
public ManagedChannel getGRPCChannel() {
176+
return gRPCChannel;
177+
}
178+
179+
/**
180+
* @return Returns the rpc timeout value in millis.
181+
*/
153182
public long getRpcTimeoutMillis() {
154183
return rpcTimeoutMillis;
155184
}
156185

157-
/** @return Returns the rpc timout for long poll requests in millis. */
186+
/**
187+
* @return Returns the rpc timout for long poll requests in millis.
188+
*/
158189
public long getRpcLongPollTimeoutMillis() {
159190
return rpcLongPollTimeoutMillis;
160191
}
161192

162-
/** @return Returns the rpc timout for query workflow requests in millis. */
193+
/**
194+
* @return Returns the rpc timout for query workflow requests in millis.
195+
*/
163196
public long getRpcQueryTimeoutMillis() {
164197
return rpcQueryTimeoutMillis;
165198
}
166199

167-
/** @return Returns the rpc timout for list archived workflow requests in millis. */
200+
/**
201+
* @return Returns the rpc timout for list archived workflow requests in millis.
202+
*/
168203
public long getRpcListArchivedWorkflowTimeoutMillis() {
169204
return rpcListArchivedWorkflowTimeoutMillis;
170205
}
171206

172-
/** Returns the client application name. */
207+
/**
208+
* Returns the client application name.
209+
*/
173210
public String getClientAppName() {
174211
return this.clientAppName;
175212
}
@@ -204,8 +241,10 @@ public FeatureFlags getFeatureFlags() {
204241
* @author venkat
205242
*/
206243
public static class Builder {
244+
207245
private String host;
208246
private int port = DEFAULT_LOCAL_CADENCE_SERVER_PORT;
247+
private ManagedChannel gRPCChannel;
209248
private String clientAppName = DEFAULT_CLIENT_APP_NAME;
210249
private long rpcTimeoutMillis = DEFAULT_RPC_TIMEOUT_MILLIS;
211250
private long rpcLongPollTimeoutMillis = DEFAULT_POLL_RPC_TIMEOUT_MILLIS;
@@ -219,7 +258,8 @@ public static class Builder {
219258
private IAuthorizationProvider authProvider;
220259
private FeatureFlags featureFlags;
221260

222-
private Builder() {}
261+
private Builder() {
262+
}
223263

224264
public Builder setHost(String host) {
225265
this.host = host;
@@ -236,6 +276,14 @@ public Builder setPort(int port) {
236276
return this;
237277
}
238278

279+
/**
280+
* Sets gRPC channel to use. Exclusive with host and port.
281+
*/
282+
public Builder setGRPCChannel(ManagedChannel gRPCChannel) {
283+
this.gRPCChannel = gRPCChannel;
284+
return this;
285+
}
286+
239287
/**
240288
* Sets the rpc timeout value for non query and non long poll calls. Default is 1000.
241289
*
@@ -278,6 +326,13 @@ public Builder setListArchivedWorkflowRpcTimeout(long timeoutMillis) {
278326
return this;
279327
}
280328

329+
/**
330+
* Returns the feature flags defined in ClientOptions
331+
*/
332+
public FeatureFlags getFeatureFlags() {
333+
return this.featureFlags;
334+
}
335+
281336
/**
282337
* Sets the feature flags to turn on/off some Cadence features By default, all features under
283338
* FeatureFlags are turned off.
@@ -289,11 +344,6 @@ public Builder setFeatureFlags(FeatureFlags featureFlags) {
289344
return this;
290345
}
291346

292-
/** Returns the feature flags defined in ClientOptions */
293-
public FeatureFlags getFeatureFlags() {
294-
return this.featureFlags;
295-
}
296-
297347
/**
298348
* Sets the client application name.
299349
*

0 commit comments

Comments
 (0)