Skip to content

Commit d55d302

Browse files
authored
Various fixes for TS worker's kitchen sink/throughput stress workflow (#235)
## What was changed Various fixes for TS worker's kitchen sink/throughput stress workflow - set correct activity timeout values (no 0 values) - add search attributes to child workflows so visibility check passes - fix pass through payload converter to simply pass through payloads (was breaking on its `fromPayload` path because no encoding was set, and it did not keep what encoding the payload was prior to encoding. But we don't need to encode at all 😃) ## Why? Allows usage of TS worker when running `throughput_stress`. To be used for future smoke/stress testing. 2. How was this tested: Able to run: ``` go run ./cmd run-scenario-with-worker \ --scenario throughput_stress \ --language typescript \ --iterations 10 \ --option internal-iterations=10 ``` which failed before 3. Any docs updates needed? No
1 parent bdbb7f6 commit d55d302

5 files changed

Lines changed: 38 additions & 55 deletions

File tree

scenarios/throughput_stress.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@ func (t *tpsExecutor) createChildWorkflowAction(run *loadgen.Run, childID int) *
507507
WorkflowId: fmt.Sprintf("%s/child-%d", run.DefaultStartWorkflowOptions().ID, childID),
508508
SearchAttributes: map[string]*common.Payload{
509509
ThroughputStressScenarioIdSearchAttribute: &common.Payload{
510-
Metadata: map[string][]byte{"encoding": []byte("json/plain")},
510+
Metadata: map[string][]byte{"encoding": []byte("json/plain"), "type": []byte("Keyword")},
511511
Data: []byte(fmt.Sprintf("%q", t.config.ScenarioRunID)), // quoted to be valid JSON string
512512
},
513513
},

workers/python/kitchen_sink.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,14 @@
66

77
import temporalio.workflow
88
from temporalio import exceptions, workflow
9-
from temporalio.api.common.v1 import Payload
9+
from temporalio.api.common.v1 import Payload, SearchAttributes
1010
from temporalio.common import (
1111
Priority,
1212
RawValue,
1313
RetryPolicy,
1414
SearchAttributeKey,
1515
SearchAttributeUpdate,
1616
)
17-
from temporalio.converter import DefaultPayloadConverter
1817
from temporalio.workflow import ActivityHandle, ChildWorkflowHandle
1918

2019
from protos.kitchen_sink_pb2 import (
@@ -130,15 +129,14 @@ async def handle_action(self, action: Action) -> Optional[Payload]:
130129
child_action = action.exec_child_workflow
131130
child = child_action.workflow_type or "kitchenSink"
132131
args = [RawValue(i) for i in child_action.input]
133-
132+
proto_sa = SearchAttributes(indexed_fields=child_action.search_attributes)
133+
typed_attrs = temporalio.converter.decode_typed_search_attributes(proto_sa)
134134
await handle_awaitable_choice(
135135
workflow.start_child_workflow(
136136
child,
137137
id=child_action.workflow_id,
138138
args=args,
139-
search_attributes=decode_search_attrs(
140-
child_action.search_attributes, DefaultPayloadConverter()
141-
),
139+
search_attributes=typed_attrs,
142140
),
143141
child_action.awaitable_choice,
144142
after_started_fn=wait_task_complete,
@@ -324,10 +322,3 @@ def convert_act_cancel_type(
324322
return temporalio.workflow.ActivityCancellationType.ABANDON
325323
else:
326324
raise NotImplementedError("Unknown cancellation type " + str(ctype))
327-
328-
329-
def decode_search_attrs(msg_map, converter):
330-
return {
331-
k: v if isinstance(v := converter.from_payload(p), list) else [v]
332-
for k, p in msg_map.items()
333-
}

workers/typescript/src/payload-converter.ts

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,39 +9,28 @@ import {
99
JsonPayloadConverter,
1010
PayloadConverterWithEncoding,
1111
UndefinedPayloadConverter,
12-
ValueError,
1312
} from '@temporalio/common';
14-
import { decode, encode } from '@temporalio/common/lib/encoding';
1513
import Payload = temporal.api.common.v1.Payload;
1614

15+
// TODO(thomas): can remove this file entirely (and usage of custom payload converter for worker)
16+
// once RawValue.fromPayload(p) is released.
1717
export class PassThroughPayload implements PayloadConverterWithEncoding {
1818
public toPayload(value: any): Payload | undefined {
19-
if (!value || value.metadata === undefined || value.data === undefined) {
19+
if (
20+
!value ||
21+
value.metadata == null ||
22+
value.data == null ||
23+
value?.metadata?.encoding == null
24+
) {
2025
return undefined;
2126
}
22-
let asPayload;
23-
try {
24-
asPayload = Payload.fromObject(value as any);
25-
} catch (e) {
26-
throw new ValueError('PassThroughPayload can only convert Payloads');
27-
}
28-
const asBytes = Payload.encode(asPayload).finish();
29-
return Payload.create({
30-
metadata: {
31-
encoding: encode(this.encodingType),
32-
},
33-
data: asBytes,
34-
});
27+
// If it looks like a Payload, return it as-is
28+
return value as Payload;
3529
}
3630

37-
public fromPayload<T>(content: Payload): T {
38-
if (decode(content.metadata?.encoding) === '__passthrough') {
39-
const innerPayload = Payload.decode(new Uint8Array(content.data));
40-
return payloadConverter.fromPayload<T>(innerPayload);
41-
}
42-
throw new ValueError(
43-
'PassThroughPayload can only decode passthrough Payloads, got ' + JSON.stringify(content)
44-
);
31+
public fromPayload<T>(_: Payload): T {
32+
// This should never be called since we don't modify the encoding
33+
throw new Error('PassThroughPayload.fromPayload should not be called');
4534
}
4635

4736
public get encodingType(): string {

workers/typescript/src/proto_help.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@ import { google } from './protos/root';
33
import IDuration = google.protobuf.IDuration;
44
import Long from 'long';
55

6+
export function durationConvertMaybeUndefined(d: IDuration | null | undefined): number | undefined {
7+
if (!d) {
8+
return undefined;
9+
}
10+
return durationConvert(d);
11+
}
612
export function durationConvert(d: IDuration | null | undefined): number {
713
if (!d) {
814
return 0;

workers/typescript/src/workflows/kitchen_sink.ts

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import {
44
ApplicationFailure,
55
CancellationScope,
66
ChildWorkflowHandle,
7-
ChildWorkflowOptions,
87
condition,
98
continueAsNew,
109
defineQuery,
@@ -28,7 +27,8 @@ import {
2827
LocalActivityOptions,
2928
SearchAttributes,
3029
} from '@temporalio/common';
31-
import { durationConvert, numify } from '../proto_help';
30+
import { decodeTypedSearchAttributes } from '@temporalio/common/lib/converter/payload-search-attributes';
31+
import { durationConvert, durationConvertMaybeUndefined, numify } from '../proto_help';
3232
import WorkflowInput = temporal.omes.kitchen_sink.WorkflowInput;
3333
import WorkflowState = temporal.omes.kitchen_sink.WorkflowState;
3434
import Payload = temporal.api.common.v1.Payload;
@@ -139,19 +139,17 @@ export async function kitchenSink(input: WorkflowInput | undefined): Promise<IPa
139139
action.execActivity.awaitableChoice
140140
);
141141
} else if (action.execChildWorkflow) {
142-
const opts: ChildWorkflowOptions = {};
143-
if (action.execChildWorkflow.workflowId) {
144-
opts.workflowId = action.execChildWorkflow.workflowId;
145-
}
146142
const execChild = action.execChildWorkflow;
147-
const childStarter = () => {
148-
return startChild(execChild.workflowType ?? 'kitchenSink', {
149-
args: execChild.input ?? [],
150-
...opts,
151-
});
152-
};
153143
await handleAwaitableChoice(
154-
childStarter,
144+
() => {
145+
return startChild(execChild.workflowType || 'kitchenSink', {
146+
args: execChild.input ?? [],
147+
workflowId: execChild.workflowId ?? undefined,
148+
typedSearchAttributes: decodeTypedSearchAttributes(
149+
action?.execChildWorkflow?.searchAttributes
150+
),
151+
});
152+
},
155153
action.execChildWorkflow.awaitableChoice,
156154
async (task) => {
157155
await task;
@@ -277,11 +275,10 @@ function launchActivity(execActivity: IExecuteActivityAction): Promise<unknown>
277275
actType = 'client';
278276
args.push(execActivity.client);
279277
}
280-
281278
const actArgs: ActivityOptions | LocalActivityOptions = {
282-
scheduleToCloseTimeout: durationConvert(execActivity.scheduleToCloseTimeout),
283-
startToCloseTimeout: durationConvert(execActivity.startToCloseTimeout),
284-
scheduleToStartTimeout: durationConvert(execActivity.scheduleToStartTimeout),
279+
scheduleToCloseTimeout: durationConvertMaybeUndefined(execActivity.scheduleToCloseTimeout),
280+
startToCloseTimeout: durationConvertMaybeUndefined(execActivity.startToCloseTimeout),
281+
scheduleToStartTimeout: durationConvertMaybeUndefined(execActivity.scheduleToStartTimeout),
285282
retry: decompileRetryPolicy(execActivity.retryPolicy),
286283
priority: decodePriority(execActivity.priority),
287284
};

0 commit comments

Comments
 (0)