Skip to content

Commit bdbb7f6

Browse files
authored
Small fixes for throughput stress with python worker (#233)
## What was changed Two small fixes to allow `throughput_stress` to run on the Python worker: - Provide a default `BackoffCoefficient` for the `DefaultLocalActivity` action. The Go SDK provides default values for local activity retry policy fields (if missing), this is not the case in other SDKs. As such, we provide a default backoff coefficient when we create the action - Pass search attributes through to child workflows on the python worker. Child workflows on the python worker were not including the throughput stress search attribute, causing the visibility count check to fail (and consequently, the scenario). ## Why? We want the python worker to be able to run the throughput stress scenario successfully 2. How was this tested: Previously: ``` go run ./cmd run-scenario-with-worker \ --scenario throughput_stress \ --language python \ --iterations 10 \ --option internal-iterations=10 ``` failed. With these fixes it succeeds.
1 parent a87934a commit bdbb7f6

4 files changed

Lines changed: 21 additions & 5 deletions

File tree

cmd/dev/util.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@ var (
1919
"dotnet", "go", "java", "node", "protoc", "python", "rust",
2020
}
2121
toolDependencies = map[string][]string{
22-
"python": {"python", "uv", "poe"},
22+
"python": {"python3", "uv", "poe"},
2323
"typescript": {"node"},
2424
"protoc": {"protoc-gen-go"},
2525
}
2626
toolVersionCommands = map[string][]string{
2727
"go": {"go", "version"},
2828
"java": {"java", "-version"},
29-
"python": {"python", "--version"},
29+
"python3": {"python3", "--version"},
3030
"node": {"node", "--version"},
3131
"dotnet": {"dotnet", "--version"},
3232
"cargo": {"cargo", "--version"},

loadgen/kitchensink/helpers.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,9 @@ func DefaultLocalActivity(activity *ExecuteActivityAction) *Action {
155155
IsLocal: &emptypb.Empty{},
156156
}
157157
activity.RetryPolicy = &common.RetryPolicy{
158-
InitialInterval: durationpb.New(10 * time.Millisecond),
159-
MaximumAttempts: 10,
158+
InitialInterval: durationpb.New(10 * time.Millisecond),
159+
MaximumAttempts: 10,
160+
BackoffCoefficient: 2.0,
160161
}
161162
return &Action{
162163
Variant: &Action_ExecActivity{

scenarios/throughput_stress.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
287287

288288
var sb strings.Builder
289289
sb.WriteString("[Scenario completion summary] ")
290+
sb.WriteString(fmt.Sprintf("Run ID: %s, ", info.RunID))
290291
sb.WriteString(fmt.Sprintf("Total iterations completed: %d, ", completedIterations))
291292
sb.WriteString(fmt.Sprintf("Total child workflows: %d (%d per iteration), ", completedChildWorkflows, t.config.InternalIterations))
292293
sb.WriteString(fmt.Sprintf("Total continue-as-new workflows: %d (%d per iteration), ", continueAsNewWorkflows, continueAsNewPerIter))

workers/python/kitchen_sink.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
SearchAttributeKey,
1515
SearchAttributeUpdate,
1616
)
17+
from temporalio.converter import DefaultPayloadConverter
1718
from temporalio.workflow import ActivityHandle, ChildWorkflowHandle
1819

1920
from protos.kitchen_sink_pb2 import (
@@ -129,9 +130,15 @@ async def handle_action(self, action: Action) -> Optional[Payload]:
129130
child_action = action.exec_child_workflow
130131
child = child_action.workflow_type or "kitchenSink"
131132
args = [RawValue(i) for i in child_action.input]
133+
132134
await handle_awaitable_choice(
133135
workflow.start_child_workflow(
134-
child, id=child_action.workflow_id, args=args
136+
child,
137+
id=child_action.workflow_id,
138+
args=args,
139+
search_attributes=decode_search_attrs(
140+
child_action.search_attributes, DefaultPayloadConverter()
141+
),
135142
),
136143
child_action.awaitable_choice,
137144
after_started_fn=wait_task_complete,
@@ -317,3 +324,10 @@ def convert_act_cancel_type(
317324
return temporalio.workflow.ActivityCancellationType.ABANDON
318325
else:
319326
raise NotImplementedError("Unknown cancellation type " + str(ctype))
327+
328+
329+
def decode_search_attrs(msg_map, converter):
330+
return {
331+
k: v if isinstance(v := converter.from_payload(p), list) else [v]
332+
for k, p in msg_map.items()
333+
}

0 commit comments

Comments
 (0)