Skip to content

Commit 0afa7ee

Browse files
authored
Support Search Attributes on Start workflow
2 parents 1b2a2ce + fbdc476 commit 0afa7ee

File tree

9 files changed

+213
-25
lines changed

9 files changed

+213
-25
lines changed

src/main/java/com/uber/cadence/client/WorkflowOptions.java

+37-6
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public static WorkflowOptions merge(
6464
.setRetryOptions(RetryOptions.merge(methodRetry, o.getRetryOptions()))
6565
.setCronSchedule(OptionsUtils.merge(cronAnnotation, o.getCronSchedule(), String.class))
6666
.setMemo(o.getMemo())
67+
.setSearchAttributes(o.getSearchAttributes())
6768
.validateBuildWithDefaults();
6869
}
6970

@@ -87,6 +88,8 @@ public static final class Builder {
8788

8889
private Map<String, Object> memo;
8990

91+
private Map<String, Object> searchAttributes;
92+
9093
public Builder() {}
9194

9295
public Builder(WorkflowOptions o) {
@@ -102,6 +105,7 @@ public Builder(WorkflowOptions o) {
102105
this.retryOptions = o.retryOptions;
103106
this.cronSchedule = o.cronSchedule;
104107
this.memo = o.memo;
108+
this.searchAttributes = o.searchAttributes;
105109
}
106110

107111
/**
@@ -186,12 +190,24 @@ public Builder setCronSchedule(String cronSchedule) {
186190
return this;
187191
}
188192

189-
/** Specifies additional non-indexed information in result of list workflow. */
193+
/**
194+
* Specifies additional non-indexed information in result of list workflow. The type of value
195+
* can be any object that are serializable by {@link com.uber.cadence.converter.DataConverter}
196+
*/
190197
public Builder setMemo(Map<String, Object> memo) {
191198
this.memo = memo;
192199
return this;
193200
}
194201

202+
/**
203+
* Specifies additional indexed information in result of list workflow. The type of value should
204+
* be basic type such as: String, Integer, Boolean, Double,LocalDateTime
205+
*/
206+
public Builder setSearchAttributes(Map<String, Object> searchAttributes) {
207+
this.searchAttributes = searchAttributes;
208+
return this;
209+
}
210+
195211
public WorkflowOptions build() {
196212
return new WorkflowOptions(
197213
workflowId,
@@ -202,7 +218,8 @@ public WorkflowOptions build() {
202218
childPolicy,
203219
retryOptions,
204220
cronSchedule,
205-
memo);
221+
memo,
222+
searchAttributes);
206223
}
207224

208225
/**
@@ -248,7 +265,8 @@ public WorkflowOptions validateBuildWithDefaults() {
248265
childPolicy,
249266
retryOptions,
250267
cronSchedule,
251-
memo);
268+
memo,
269+
searchAttributes);
252270
}
253271
}
254272

@@ -270,6 +288,8 @@ public WorkflowOptions validateBuildWithDefaults() {
270288

271289
private Map<String, Object> memo;
272290

291+
private Map<String, Object> searchAttributes;
292+
273293
private WorkflowOptions(
274294
String workflowId,
275295
WorkflowIdReusePolicy workflowIdReusePolicy,
@@ -279,7 +299,8 @@ private WorkflowOptions(
279299
ChildPolicy childPolicy,
280300
RetryOptions retryOptions,
281301
String cronSchedule,
282-
Map<String, Object> memo) {
302+
Map<String, Object> memo,
303+
Map<String, Object> searchAttributes) {
283304
this.workflowId = workflowId;
284305
this.workflowIdReusePolicy = workflowIdReusePolicy;
285306
this.executionStartToCloseTimeout = executionStartToCloseTimeout;
@@ -289,6 +310,7 @@ private WorkflowOptions(
289310
this.retryOptions = retryOptions;
290311
this.cronSchedule = cronSchedule;
291312
this.memo = memo;
313+
this.searchAttributes = searchAttributes;
292314
}
293315

294316
public String getWorkflowId() {
@@ -327,6 +349,10 @@ public Map<String, Object> getMemo() {
327349
return memo;
328350
}
329351

352+
public Map<String, Object> getSearchAttributes() {
353+
return searchAttributes;
354+
}
355+
330356
@Override
331357
public boolean equals(Object o) {
332358
if (this == o) return true;
@@ -340,7 +366,8 @@ public boolean equals(Object o) {
340366
&& childPolicy == that.childPolicy
341367
&& Objects.equals(retryOptions, that.retryOptions)
342368
&& Objects.equals(cronSchedule, that.cronSchedule)
343-
&& Objects.equals(memo, that.memo);
369+
&& Objects.equals(memo, that.memo)
370+
&& Objects.equals(searchAttributes, that.searchAttributes);
344371
}
345372

346373
@Override
@@ -354,7 +381,8 @@ public int hashCode() {
354381
childPolicy,
355382
retryOptions,
356383
cronSchedule,
357-
memo);
384+
memo,
385+
searchAttributes);
358386
}
359387

360388
@Override
@@ -382,6 +410,9 @@ public String toString() {
382410
+ ", memo='"
383411
+ memo
384412
+ '\''
413+
+ ", searchAttributes='"
414+
+ searchAttributes
415+
+ '\''
385416
+ '}';
386417
}
387418
}

src/main/java/com/uber/cadence/internal/common/StartWorkflowExecutionParameters.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public final class StartWorkflowExecutionParameters {
5353

5454
private Map<String, byte[]> memo;
5555

56+
private Map<String, byte[]> searchAttributes;
57+
5658
/**
5759
* Returns the value of the WorkflowId property for this object.
5860
*
@@ -303,6 +305,14 @@ public void setMemo(Map<String, byte[]> memo) {
303305
this.memo = memo;
304306
}
305307

308+
public Map<String, byte[]> getSearchAttributes() {
309+
return searchAttributes;
310+
}
311+
312+
public void setSearchAttributes(Map<String, byte[]> searchAttributes) {
313+
this.searchAttributes = searchAttributes;
314+
}
315+
306316
public StartWorkflowExecutionParameters withRetryParameters(RetryParameters retryParameters) {
307317
this.retryParameters = retryParameters;
308318
return this;
@@ -378,6 +388,9 @@ public String toString() {
378388
+ ", memo='"
379389
+ memo
380390
+ '\''
391+
+ ", searchAttributes='"
392+
+ searchAttributes
393+
+ '\''
381394
+ '}';
382395
}
383396

@@ -396,7 +409,8 @@ public boolean equals(Object o) {
396409
&& workflowIdReusePolicy == that.workflowIdReusePolicy
397410
&& Objects.equals(retryParameters, that.retryParameters)
398411
&& Objects.equals(cronSchedule, that.cronSchedule)
399-
&& Objects.equals(memo, that.memo);
412+
&& Objects.equals(memo, that.memo)
413+
&& Objects.equals(searchAttributes, that.searchAttributes);
400414
}
401415

402416
@Override
@@ -412,7 +426,8 @@ public int hashCode() {
412426
workflowIdReusePolicy,
413427
retryParameters,
414428
cronSchedule,
415-
memo);
429+
memo,
430+
searchAttributes);
416431
result = 31 * result + Arrays.hashCode(input);
417432
return result;
418433
}

src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java

+16
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.uber.cadence.QueryWorkflowResponse;
2424
import com.uber.cadence.RequestCancelWorkflowExecutionRequest;
2525
import com.uber.cadence.RetryPolicy;
26+
import com.uber.cadence.SearchAttributes;
2627
import com.uber.cadence.SignalWithStartWorkflowExecutionRequest;
2728
import com.uber.cadence.SignalWorkflowExecutionRequest;
2829
import com.uber.cadence.StartWorkflowExecutionRequest;
@@ -117,6 +118,7 @@ private WorkflowExecution startWorkflowInternal(StartWorkflowExecutionParameters
117118
request.setCronSchedule(startParameters.getCronSchedule());
118119
}
119120
request.setMemo(toMemoThrift(startParameters.getMemo()));
121+
request.setSearchAttributes(toSearchAttributesThrift(startParameters.getSearchAttributes()));
120122

121123
// if(startParameters.getChildPolicy() != null) {
122124
// request.setChildPolicy(startParameters.getChildPolicy());
@@ -154,6 +156,20 @@ private Memo toMemoThrift(Map<String, byte[]> memo) {
154156
return memoThrift;
155157
}
156158

159+
private SearchAttributes toSearchAttributesThrift(Map<String, byte[]> searchAttributes) {
160+
if (searchAttributes == null || searchAttributes.isEmpty()) {
161+
return null;
162+
}
163+
164+
Map<String, ByteBuffer> fields = new HashMap<>();
165+
for (Map.Entry<String, byte[]> item : searchAttributes.entrySet()) {
166+
fields.put(item.getKey(), ByteBuffer.wrap(item.getValue()));
167+
}
168+
SearchAttributes searchAttrThrift = new SearchAttributes();
169+
searchAttrThrift.setIndexedFields(fields);
170+
return searchAttrThrift;
171+
}
172+
157173
private RetryPolicy toRetryPolicy(RetryParameters retryParameters) {
158174
return new RetryPolicy()
159175
.setBackoffCoefficient(retryParameters.getBackoffCoefficient())

src/main/java/com/uber/cadence/internal/sync/WorkflowClientInternal.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public static WorkflowClient newInstance(String domain) {
6868
* default port (7933).
6969
*
7070
* @param domain domain that worker uses to poll.
71-
* @param options Options (like {@link com.uber.cadence.converter.DataConverter}er override) for
71+
* @param options Options (like {@link com.uber.cadence.converter.DataConverter} override) for
7272
* configuring client.
7373
*/
7474
public static WorkflowClient newInstance(String domain, WorkflowClientOptions options) {
@@ -95,7 +95,7 @@ public static WorkflowClient newInstance(String host, int port, String domain) {
9595
* @param host of the Cadence Service endpoint
9696
* @param port of the Cadence Service endpoint
9797
* @param domain domain that worker uses to poll.
98-
* @param options Options (like {@link com.uber.cadence.converter.DataConverter}er override) for
98+
* @param options Options (like {@link com.uber.cadence.converter.DataConverter} override) for
9999
* configuring client.
100100
*/
101101
public static WorkflowClient newInstance(
@@ -118,7 +118,7 @@ public static WorkflowClient newInstance(IWorkflowService service, String domain
118118
*
119119
* @param service client to the Cadence Service endpoint.
120120
* @param domain domain that worker uses to poll.
121-
* @param options Options (like {@link com.uber.cadence.converter.DataConverter}er override) for
121+
* @param options Options (like {@link com.uber.cadence.converter.DataConverter} override) for
122122
* configuring client.
123123
*/
124124
public static WorkflowClient newInstance(

src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java

+18-8
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.uber.cadence.client.WorkflowStub;
3434
import com.uber.cadence.converter.DataConverter;
3535
import com.uber.cadence.converter.DataConverterException;
36+
import com.uber.cadence.converter.JsonDataConverter;
3637
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
3738
import com.uber.cadence.internal.common.SignalWithStartWorkflowExecutionParameters;
3839
import com.uber.cadence.internal.common.StartWorkflowExecutionParameters;
@@ -140,23 +141,32 @@ private StartWorkflowExecutionParameters getStartWorkflowExecutionParameters(
140141
p.setInput(dataConverter.toData(args));
141142
p.setWorkflowType(new WorkflowType().setName(workflowType.get()));
142143
p.setMemo(convertMemoFromObjectToBytes(o.getMemo()));
144+
p.setSearchAttributes(convertSearchAttributesFromObjectToBytes(o.getSearchAttributes()));
143145
return p;
144146
}
145147

146-
private Map<String, byte[]> convertMemoFromObjectToBytes(Map<String, Object> memoFromOption) {
147-
if (memoFromOption == null) {
148+
private Map<String, byte[]> convertMapFromObjectToBytes(
149+
Map<String, Object> map, DataConverter dataConverter) {
150+
if (map == null) {
148151
return null;
149152
}
150-
Map<String, byte[]> memo = new HashMap<>();
151-
for (Map.Entry<String, Object> item : memoFromOption.entrySet()) {
153+
Map<String, byte[]> result = new HashMap<>();
154+
for (Map.Entry<String, Object> item : map.entrySet()) {
152155
try {
153-
memo.put(item.getKey(), dataConverter.toData(item.getValue()));
156+
result.put(item.getKey(), dataConverter.toData(item.getValue()));
154157
} catch (DataConverterException e) {
155-
throw new DataConverterException(
156-
"Cannot serialize memo for key " + item.getKey(), e.getCause());
158+
throw new DataConverterException("Cannot serialize key " + item.getKey(), e.getCause());
157159
}
158160
}
159-
return memo;
161+
return result;
162+
}
163+
164+
private Map<String, byte[]> convertMemoFromObjectToBytes(Map<String, Object> map) {
165+
return convertMapFromObjectToBytes(map, dataConverter);
166+
}
167+
168+
private Map<String, byte[]> convertSearchAttributesFromObjectToBytes(Map<String, Object> map) {
169+
return convertMapFromObjectToBytes(map, JsonDataConverter.getInstance());
160170
}
161171

162172
@Override

src/main/java/com/uber/cadence/internal/testservice/StateMachines.java

+1
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,7 @@ private static void startWorkflow(
539539
}
540540
a.setLastCompletionResult(data.lastCompletionResult);
541541
a.setMemo(request.getMemo());
542+
a.setSearchAttributes((request.getSearchAttributes()));
542543
HistoryEvent event =
543544
new HistoryEvent()
544545
.setEventType(EventType.WorkflowExecutionStarted)

0 commit comments

Comments
 (0)