Skip to content

Commit cb5d6ec

Browse files
authored
update idls to use refresh tasks API (#668)
* add refresh tasks API to client
1 parent b72242c commit cb5d6ec

File tree

14 files changed

+146
-44
lines changed

14 files changed

+146
-44
lines changed

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ env:
77

88
before_install:
99
- pushd /tmp
10-
- wget https://www-us.apache.org/dist/thrift/0.9.3/thrift-0.9.3.tar.gz
10+
- wget https://archive.apache.org/dist/thrift/0.9.3/thrift-0.9.3.tar.gz
1111
- tar xfz thrift-0.9.3.tar.gz
1212
- cd thrift-0.9.3
1313
- ./configure --enable-libs=no --enable-tests=no --enable-tutorial=no --with-cpp=no --with-c_glib=no --with-java=yes --with-ruby=no --with-erlang=no --with-go=no --with-nodejs=no --with-python=no

build.gradle

+5
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,11 @@ uploadArchives {
266266
name 'Maxim Fateev'
267267
268268
}
269+
developer {
270+
id 'mkol'
271+
name 'Max K'
272+
273+
}
269274
developer {
270275
id 'meiliang'
271276
name 'Liang Mei'

docker/buildkite/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ RUN apk add git libstdc++
1313

1414
# Compile source
1515
RUN set -ex ;\
16-
wget https://downloads.apache.org/thrift/${APACHE_THRIFT_VERSION}/thrift-${APACHE_THRIFT_VERSION}.tar.gz && \
16+
wget https://archive.apache.org/dist/thrift/${APACHE_THRIFT_VERSION}/thrift-${APACHE_THRIFT_VERSION}.tar.gz && \
1717
tar -xvf thrift-${APACHE_THRIFT_VERSION}.tar.gz && \
1818
rm thrift-${APACHE_THRIFT_VERSION}.tar.gz && \
1919
cd thrift-${APACHE_THRIFT_VERSION}/ && \

src/main/idls

Submodule idls updated from d9811d5 to 3318277

src/main/java/com/uber/cadence/client/WorkflowClient.java

+10
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.uber.cadence.client;
1919

20+
import com.uber.cadence.RefreshWorkflowTasksRequest;
2021
import com.uber.cadence.WorkflowExecution;
2122
import com.uber.cadence.activity.Activity;
2223
import com.uber.cadence.internal.sync.WorkflowClientInternal;
@@ -34,6 +35,7 @@
3435
import com.uber.cadence.workflow.WorkflowMethod;
3536
import java.util.Optional;
3637
import java.util.concurrent.CompletableFuture;
38+
import org.apache.thrift.TException;
3739

3840
/**
3941
* Client to the Cadence service used to start and query workflows by external processes. Also it
@@ -237,6 +239,14 @@ WorkflowStub newUntypedWorkflowStub(
237239
*/
238240
WorkflowExecution signalWithStart(BatchRequest signalWithStartBatch);
239241

242+
/**
243+
* Refreshes all the tasks of a given workflow.
244+
*
245+
* @param refreshWorkflowTasksRequest that contains WorkflowID and RunID of the started workflow.
246+
*/
247+
void refreshWorkflowTasks(RefreshWorkflowTasksRequest refreshWorkflowTasksRequest)
248+
throws TException;
249+
240250
/**
241251
* Executes zero argument workflow with void return type
242252
*

src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java

+11
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,13 @@ public ListTaskListPartitionsResponse ListTaskListPartitions(
554554
return impl.ListTaskListPartitions(request);
555555
}
556556

557+
@Override
558+
public void RefreshWorkflowTasks(RefreshWorkflowTasksRequest request)
559+
throws BadRequestError, DomainNotActiveError, ServiceBusyError, EntityNotExistsError,
560+
TException {
561+
impl.RefreshWorkflowTasks(request);
562+
}
563+
557564
@Override
558565
public void RegisterDomain(
559566
RegisterDomainRequest registerRequest, AsyncMethodCallback resultHandler)
@@ -843,6 +850,10 @@ public void ListTaskListPartitions(
843850
ListTaskListPartitionsRequest request, AsyncMethodCallback resultHandler)
844851
throws TException {}
845852

853+
@Override
854+
public void RefreshWorkflowTasks(
855+
RefreshWorkflowTasksRequest request, AsyncMethodCallback resultHandler) throws TException {}
856+
846857
@Override
847858
public void RegisterDomain(RegisterDomainRequest registerRequest)
848859
throws BadRequestError, InternalServiceError, DomainAlreadyExistsError, TException {

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

+13
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.uber.cadence.DescribeWorkflowExecutionRequest;
3131
import com.uber.cadence.DescribeWorkflowExecutionResponse;
3232
import com.uber.cadence.DomainAlreadyExistsError;
33+
import com.uber.cadence.DomainNotActiveError;
3334
import com.uber.cadence.EntityNotExistsError;
3435
import com.uber.cadence.GetSearchAttributesResponse;
3536
import com.uber.cadence.GetTaskListsByDomainRequest;
@@ -61,6 +62,7 @@
6162
import com.uber.cadence.RecordActivityTaskHeartbeatByIDRequest;
6263
import com.uber.cadence.RecordActivityTaskHeartbeatRequest;
6364
import com.uber.cadence.RecordActivityTaskHeartbeatResponse;
65+
import com.uber.cadence.RefreshWorkflowTasksRequest;
6466
import com.uber.cadence.RegisterDomainRequest;
6567
import com.uber.cadence.RequestCancelWorkflowExecutionRequest;
6668
import com.uber.cadence.ResetStickyTaskListRequest;
@@ -424,6 +426,13 @@ public ListTaskListPartitionsResponse ListTaskListPartitions(
424426
return impl.ListTaskListPartitions(request);
425427
}
426428

429+
@Override
430+
public void RefreshWorkflowTasks(RefreshWorkflowTasksRequest request)
431+
throws BadRequestError, DomainNotActiveError, ServiceBusyError, EntityNotExistsError,
432+
TException {
433+
impl.RefreshWorkflowTasks(request);
434+
}
435+
427436
@Override
428437
public void RegisterDomain(
429438
RegisterDomainRequest registerRequest, AsyncMethodCallback resultHandler)
@@ -718,6 +727,10 @@ public void ListTaskListPartitions(
718727
ListTaskListPartitionsRequest request, AsyncMethodCallback resultHandler)
719728
throws TException {}
720729

730+
@Override
731+
public void RefreshWorkflowTasks(
732+
RefreshWorkflowTasksRequest request, AsyncMethodCallback resultHandler) throws TException {}
733+
721734
@Override
722735
public void RegisterDomain(RegisterDomainRequest registerRequest)
723736
throws BadRequestError, InternalServiceError, DomainAlreadyExistsError, TException {

src/main/java/com/uber/cadence/internal/sync/WorkflowClientInternal.java

+8
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.google.common.base.Strings;
2121
import com.google.common.reflect.TypeToken;
22+
import com.uber.cadence.RefreshWorkflowTasksRequest;
2223
import com.uber.cadence.WorkflowExecution;
2324
import com.uber.cadence.client.ActivityCompletionClient;
2425
import com.uber.cadence.client.BatchRequest;
@@ -42,6 +43,7 @@
4243
import java.util.Objects;
4344
import java.util.Optional;
4445
import java.util.concurrent.CompletableFuture;
46+
import org.apache.thrift.TException;
4547

4648
public final class WorkflowClientInternal implements WorkflowClient {
4749

@@ -205,6 +207,12 @@ public WorkflowExecution signalWithStart(BatchRequest signalWithStartBatch) {
205207
return ((SignalWithStartBatchRequest) signalWithStartBatch).invoke();
206208
}
207209

210+
@Override
211+
public void refreshWorkflowTasks(RefreshWorkflowTasksRequest refreshWorkflowTasksRequest)
212+
throws TException {
213+
workflowService.RefreshWorkflowTasks(refreshWorkflowTasksRequest);
214+
}
215+
208216
public static WorkflowExecution start(Functions.Proc workflow) {
209217
WorkflowInvocationHandler.initAsyncInvocation(InvocationType.START);
210218
try {

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java

+12
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import com.uber.cadence.RecordActivityTaskHeartbeatByIDRequest;
6262
import com.uber.cadence.RecordActivityTaskHeartbeatRequest;
6363
import com.uber.cadence.RecordActivityTaskHeartbeatResponse;
64+
import com.uber.cadence.RefreshWorkflowTasksRequest;
6465
import com.uber.cadence.RegisterDomainRequest;
6566
import com.uber.cadence.RequestCancelWorkflowExecutionRequest;
6667
import com.uber.cadence.ResetStickyTaskListRequest;
@@ -773,6 +774,13 @@ public ListTaskListPartitionsResponse ListTaskListPartitions(
773774
throw new UnsupportedOperationException("not implemented");
774775
}
775776

777+
@Override
778+
public void RefreshWorkflowTasks(RefreshWorkflowTasksRequest request)
779+
throws BadRequestError, DomainNotActiveError, ServiceBusyError, EntityNotExistsError,
780+
TException {
781+
throw new UnsupportedOperationException("not implemented");
782+
}
783+
776784
@Override
777785
public void RegisterDomain(
778786
RegisterDomainRequest registerRequest, AsyncMethodCallback resultHandler) throws TException {
@@ -1086,6 +1094,10 @@ public void GetClusterInfo(AsyncMethodCallback resultHandler) throws TException
10861094
public void ListTaskListPartitions(
10871095
ListTaskListPartitionsRequest request, AsyncMethodCallback resultHandler) throws TException {}
10881096

1097+
@Override
1098+
public void RefreshWorkflowTasks(
1099+
RefreshWorkflowTasksRequest request, AsyncMethodCallback resultHandler) throws TException {}
1100+
10891101
private <R> R requireNotNull(String fieldName, R value) throws BadRequestError {
10901102
if (value == null) {
10911103
throw new BadRequestError("Missing requried field \"" + fieldName + "\".");

src/main/java/com/uber/cadence/internal/worker/PollerOptions.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,8 @@ public Builder setPollThreadNamePrefix(String pollThreadNamePrefix) {
137137
}
138138

139139
/**
140-
* The poller will check task executor's remaining capacity before polling tasks.
141-
* This is to prevent task to get started but not being able to execute in time.
140+
* The poller will check task executor's remaining capacity before polling tasks. This is to
141+
* prevent task to get started but not being able to execute in time.
142142
*/
143143
public Builder setPollOnlyIfExecutorHasCapacity(boolean pollOnlyIfExecutorHasCapacity) {
144144
this.pollOnlyIfExecutorHasCapacity = pollOnlyIfExecutorHasCapacity;

src/main/java/com/uber/cadence/internal/worker/TaskExecutor.java

+1
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,6 @@
1919

2020
interface TaskExecutor<T> {
2121
void process(T task);
22+
2223
boolean hasCapacity();
2324
}

src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java

+61-18
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package com.uber.cadence.serviceclient;
1919

20+
import static com.uber.cadence.internal.metrics.MetricsTagValue.REQUEST_TYPE_LONG_POLL;
21+
import static com.uber.cadence.internal.metrics.MetricsTagValue.REQUEST_TYPE_NORMAL;
22+
2023
import com.google.common.collect.ImmutableMap;
2124
import com.google.gson.Gson;
2225
import com.google.gson.GsonBuilder;
@@ -65,6 +68,7 @@
6568
import com.uber.cadence.RecordActivityTaskHeartbeatByIDRequest;
6669
import com.uber.cadence.RecordActivityTaskHeartbeatRequest;
6770
import com.uber.cadence.RecordActivityTaskHeartbeatResponse;
71+
import com.uber.cadence.RefreshWorkflowTasksRequest;
6872
import com.uber.cadence.RegisterDomainRequest;
6973
import com.uber.cadence.RequestCancelWorkflowExecutionRequest;
7074
import com.uber.cadence.ResetStickyTaskListRequest;
@@ -110,12 +114,6 @@
110114
import com.uber.tchannel.messages.ThriftRequest;
111115
import com.uber.tchannel.messages.ThriftResponse;
112116
import com.uber.tchannel.messages.generated.Meta;
113-
import org.apache.thrift.TException;
114-
import org.apache.thrift.async.AsyncMethodCallback;
115-
import org.apache.thrift.transport.TTransportException;
116-
import org.slf4j.Logger;
117-
import org.slf4j.LoggerFactory;
118-
119117
import java.net.InetAddress;
120118
import java.net.InetSocketAddress;
121119
import java.net.UnknownHostException;
@@ -126,9 +124,11 @@
126124
import java.util.UUID;
127125
import java.util.concurrent.CompletableFuture;
128126
import java.util.concurrent.ExecutionException;
129-
130-
import static com.uber.cadence.internal.metrics.MetricsTagValue.REQUEST_TYPE_LONG_POLL;
131-
import static com.uber.cadence.internal.metrics.MetricsTagValue.REQUEST_TYPE_NORMAL;
127+
import org.apache.thrift.TException;
128+
import org.apache.thrift.async.AsyncMethodCallback;
129+
import org.apache.thrift.transport.TTransportException;
130+
import org.slf4j.Logger;
131+
import org.slf4j.LoggerFactory;
132132

133133
public class WorkflowServiceTChannel implements IWorkflowService {
134134
private static final Logger log = LoggerFactory.getLogger(WorkflowServiceTChannel.class);
@@ -363,7 +363,8 @@ private <T> T measureRemoteCall(String scopeName, RemoteCall<T> call) throws TEx
363363
return measureRemoteCallWithTags(scopeName, call, null);
364364
}
365365

366-
private <T> T measureRemoteCallWithTags(String scopeName, RemoteCall<T> call, Map<String, String> tags) throws TException {
366+
private <T> T measureRemoteCallWithTags(
367+
String scopeName, RemoteCall<T> call, Map<String, String> tags) throws TException {
367368
Scope scope = options.getMetricsScope().subScope(scopeName);
368369
if (tags != null) {
369370
scope = scope.tagged(tags);
@@ -677,21 +678,27 @@ private StartWorkflowExecutionResponse startWorkflowExecution(
677678
@Override
678679
public GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistoryWithTimeout(
679680
GetWorkflowExecutionHistoryRequest request, Long timeoutInMillis) throws TException {
680-
Map<String, String> tags = ImmutableMap.of(MetricsTag.REQUEST_TYPE, request.isWaitForNewEvent() ? REQUEST_TYPE_LONG_POLL : REQUEST_TYPE_NORMAL);
681+
Map<String, String> tags =
682+
ImmutableMap.of(
683+
MetricsTag.REQUEST_TYPE,
684+
request.isWaitForNewEvent() ? REQUEST_TYPE_LONG_POLL : REQUEST_TYPE_NORMAL);
681685
return measureRemoteCallWithTags(
682-
ServiceMethod.GET_WORKFLOW_EXECUTION_HISTORY,
683-
() -> getWorkflowExecutionHistory(request, timeoutInMillis),
684-
tags);
686+
ServiceMethod.GET_WORKFLOW_EXECUTION_HISTORY,
687+
() -> getWorkflowExecutionHistory(request, timeoutInMillis),
688+
tags);
685689
}
686690

687691
@Override
688692
public GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistory(
689693
GetWorkflowExecutionHistoryRequest request) throws TException {
690-
Map<String, String> tags = ImmutableMap.of(MetricsTag.REQUEST_TYPE, request.isWaitForNewEvent() ? REQUEST_TYPE_LONG_POLL : REQUEST_TYPE_NORMAL);
694+
Map<String, String> tags =
695+
ImmutableMap.of(
696+
MetricsTag.REQUEST_TYPE,
697+
request.isWaitForNewEvent() ? REQUEST_TYPE_LONG_POLL : REQUEST_TYPE_NORMAL);
691698
return measureRemoteCallWithTags(
692-
ServiceMethod.GET_WORKFLOW_EXECUTION_HISTORY,
693-
() -> getWorkflowExecutionHistory(request, null),
694-
tags);
699+
ServiceMethod.GET_WORKFLOW_EXECUTION_HISTORY,
700+
() -> getWorkflowExecutionHistory(request, null),
701+
tags);
695702
}
696703

697704
private GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
@@ -2156,6 +2163,38 @@ public ListTaskListPartitionsResponse ListTaskListPartitions(
21562163
ServiceMethod.LIST_TASK_LIST_PARTITIONS, () -> listTaskListPartitions(request));
21572164
}
21582165

2166+
@Override
2167+
public void RefreshWorkflowTasks(RefreshWorkflowTasksRequest refreshWorkflowTasks)
2168+
throws BadRequestError, DomainNotActiveError, ServiceBusyError, EntityNotExistsError,
2169+
TException {
2170+
ThriftResponse<WorkflowService.RefreshWorkflowTasks_result> response = null;
2171+
try {
2172+
ThriftRequest<WorkflowService.RefreshWorkflowTasks_args> request =
2173+
buildThriftRequest(
2174+
"RefreshWorkflowTasks",
2175+
new WorkflowService.RefreshWorkflowTasks_args(refreshWorkflowTasks));
2176+
response = doRemoteCall(request);
2177+
WorkflowService.RefreshWorkflowTasks_result result =
2178+
response.getBody(WorkflowService.RefreshWorkflowTasks_result.class);
2179+
if (result.isSetBadRequestError()) {
2180+
throw result.getBadRequestError();
2181+
}
2182+
if (result.isSetDomainNotActiveError()) {
2183+
throw result.getDomainNotActiveError();
2184+
}
2185+
if (result.isSetServiceBusyError()) {
2186+
throw result.getServiceBusyError();
2187+
}
2188+
if (result.isSetEntityNotExistError()) {
2189+
throw result.getEntityNotExistError();
2190+
}
2191+
} finally {
2192+
if (response != null) {
2193+
response.release();
2194+
}
2195+
}
2196+
}
2197+
21592198
private ListTaskListPartitionsResponse listTaskListPartitions(
21602199
ListTaskListPartitionsRequest listRequest) throws TException {
21612200
ThriftResponse<WorkflowService.ListTaskListPartitions_result> response = null;
@@ -2643,6 +2682,10 @@ public void GetClusterInfo(AsyncMethodCallback resultHandler) throws TException
26432682
public void ListTaskListPartitions(
26442683
ListTaskListPartitionsRequest request, AsyncMethodCallback resultHandler) throws TException {}
26452684

2685+
@Override
2686+
public void RefreshWorkflowTasks(
2687+
RefreshWorkflowTasksRequest request, AsyncMethodCallback resultHandler) throws TException {}
2688+
26462689
@Override
26472690
public void RegisterDomain(
26482691
RegisterDomainRequest registerRequest, AsyncMethodCallback resultHandler) throws TException {

0 commit comments

Comments
 (0)