Skip to content

Commit 25cb765

Browse files
timl3136claude
andcommitted
feat: Add cron_overlap_policy support to StartWorkflowOptions
Add support for configuring cron overlap behavior when starting workflows: - Add cron_overlap_policy field to StartWorkflowOptions - Accept both protobuf enum values and string literals ("SKIPPED", "BUFFER_ONE") - Add helper function _resolve_cron_overlap_policy for string-to-enum conversion - Add comprehensive tests for enum, string, and error cases Co-Authored-By: Claude (claude-opus-4-5) <noreply@anthropic.com> Signed-off-by: Tim Li <ltim@uber.com>
1 parent 6cff798 commit 25cb765

File tree

2 files changed

+143
-2
lines changed

2 files changed

+143
-2
lines changed

cadence/client.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import socket
33
import uuid
44
from datetime import timedelta
5-
from typing import TypedDict, Unpack, Any, cast, Union
5+
from typing import TypedDict, Unpack, Any, cast, Union, Literal
66

77
from grpc import ChannelCredentials, Compression
88
from google.protobuf.duration_pb2 import Duration
@@ -22,11 +22,36 @@
2222
SignalWithStartWorkflowExecutionResponse,
2323
)
2424
from cadence.api.v1.common_pb2 import WorkflowType, WorkflowExecution
25+
from cadence.api.v1 import workflow_pb2
2526
from cadence.api.v1.tasklist_pb2 import TaskList
2627
from cadence.data_converter import DataConverter, DefaultDataConverter
2728
from cadence.metrics import MetricsEmitter, NoOpMetricsEmitter
2829
from cadence.workflow import WorkflowDefinition
2930

31+
# Type alias for cron overlap policy - accepts protobuf enum or string
32+
CronOverlapPolicyType = (
33+
workflow_pb2.CronOverlapPolicy | Literal["SKIPPED", "BUFFER_ONE"]
34+
)
35+
36+
# Mapping from string to protobuf enum value
37+
_CRON_OVERLAP_POLICY_MAP: dict[str, int] = {
38+
"SKIPPED": workflow_pb2.CRON_OVERLAP_POLICY_SKIPPED,
39+
"BUFFER_ONE": workflow_pb2.CRON_OVERLAP_POLICY_BUFFER_ONE,
40+
}
41+
42+
43+
def _resolve_cron_overlap_policy(policy: CronOverlapPolicyType) -> int:
44+
"""Convert string or enum to protobuf enum value."""
45+
if isinstance(policy, str):
46+
policy_upper = policy.upper()
47+
if policy_upper not in _CRON_OVERLAP_POLICY_MAP:
48+
raise ValueError(
49+
f"Invalid cron_overlap_policy: '{policy}'. "
50+
"Expected 'SKIPPED' or 'BUFFER_ONE'"
51+
)
52+
return _CRON_OVERLAP_POLICY_MAP[policy_upper]
53+
return int(policy)
54+
3055

3156
class StartWorkflowOptions(TypedDict, total=False):
3257
"""Options for starting a workflow execution."""
@@ -36,6 +61,7 @@ class StartWorkflowOptions(TypedDict, total=False):
3661
workflow_id: str
3762
task_start_to_close_timeout: timedelta
3863
cron_schedule: str
64+
cron_overlap_policy: CronOverlapPolicyType
3965

4066

4167
def _validate_and_apply_defaults(options: StartWorkflowOptions) -> StartWorkflowOptions:
@@ -185,6 +211,10 @@ def _build_start_workflow_request(
185211
request.input.CopyFrom(input_payload)
186212
if options.get("cron_schedule"):
187213
request.cron_schedule = options["cron_schedule"]
214+
if options.get("cron_overlap_policy") is not None:
215+
request.cron_overlap_policy = _resolve_cron_overlap_policy(
216+
options["cron_overlap_policy"]
217+
)
188218

189219
return request
190220

tests/cadence/test_client_workflow.py

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,13 @@
88
StartWorkflowExecutionRequest,
99
StartWorkflowExecutionResponse,
1010
)
11-
from cadence.client import Client, StartWorkflowOptions, _validate_and_apply_defaults
11+
from cadence.client import (
12+
Client,
13+
StartWorkflowOptions,
14+
_validate_and_apply_defaults,
15+
_resolve_cron_overlap_policy,
16+
)
17+
from cadence.api.v1 import workflow_pb2
1218
from cadence.data_converter import DefaultDataConverter
1319
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions
1420

@@ -241,6 +247,111 @@ async def test_build_request_with_cron_schedule(self, mock_client):
241247

242248
assert request.cron_schedule == "0 * * * *"
243249

250+
@pytest.mark.asyncio
251+
async def test_build_request_with_cron_overlap_policy_enum(self, mock_client):
252+
"""Test building request with cron_overlap_policy as enum."""
253+
client = Client(domain="test-domain", target="localhost:7933")
254+
255+
options = StartWorkflowOptions(
256+
task_list="test-task-list",
257+
execution_start_to_close_timeout=timedelta(minutes=10),
258+
task_start_to_close_timeout=timedelta(seconds=30),
259+
cron_schedule="0 * * * *",
260+
cron_overlap_policy=workflow_pb2.CRON_OVERLAP_POLICY_SKIPPED,
261+
)
262+
263+
request = client._build_start_workflow_request("TestWorkflow", (), options)
264+
265+
assert request.cron_overlap_policy == workflow_pb2.CRON_OVERLAP_POLICY_SKIPPED
266+
267+
@pytest.mark.asyncio
268+
async def test_build_request_with_cron_overlap_policy_string_skipped(
269+
self, mock_client
270+
):
271+
"""Test building request with cron_overlap_policy as string 'SKIPPED'."""
272+
client = Client(domain="test-domain", target="localhost:7933")
273+
274+
options = StartWorkflowOptions(
275+
task_list="test-task-list",
276+
execution_start_to_close_timeout=timedelta(minutes=10),
277+
task_start_to_close_timeout=timedelta(seconds=30),
278+
cron_schedule="0 * * * *",
279+
cron_overlap_policy="SKIPPED",
280+
)
281+
282+
request = client._build_start_workflow_request("TestWorkflow", (), options)
283+
284+
assert request.cron_overlap_policy == workflow_pb2.CRON_OVERLAP_POLICY_SKIPPED
285+
286+
@pytest.mark.asyncio
287+
async def test_build_request_with_cron_overlap_policy_string_buffer_one(
288+
self, mock_client
289+
):
290+
"""Test building request with cron_overlap_policy as string 'BUFFER_ONE'."""
291+
client = Client(domain="test-domain", target="localhost:7933")
292+
293+
options = StartWorkflowOptions(
294+
task_list="test-task-list",
295+
execution_start_to_close_timeout=timedelta(minutes=10),
296+
task_start_to_close_timeout=timedelta(seconds=30),
297+
cron_schedule="0 * * * *",
298+
cron_overlap_policy="BUFFER_ONE",
299+
)
300+
301+
request = client._build_start_workflow_request("TestWorkflow", (), options)
302+
303+
assert (
304+
request.cron_overlap_policy == workflow_pb2.CRON_OVERLAP_POLICY_BUFFER_ONE
305+
)
306+
307+
@pytest.mark.asyncio
308+
async def test_build_request_with_cron_overlap_policy_string_lowercase(
309+
self, mock_client
310+
):
311+
"""Test building request with cron_overlap_policy as lowercase string."""
312+
client = Client(domain="test-domain", target="localhost:7933")
313+
314+
options = StartWorkflowOptions(
315+
task_list="test-task-list",
316+
execution_start_to_close_timeout=timedelta(minutes=10),
317+
task_start_to_close_timeout=timedelta(seconds=30),
318+
cron_schedule="0 * * * *",
319+
cron_overlap_policy="skipped",
320+
)
321+
322+
request = client._build_start_workflow_request("TestWorkflow", (), options)
323+
324+
assert request.cron_overlap_policy == workflow_pb2.CRON_OVERLAP_POLICY_SKIPPED
325+
326+
327+
class TestResolveCronOverlapPolicy:
328+
"""Test _resolve_cron_overlap_policy helper function."""
329+
330+
def test_resolve_enum_value(self):
331+
"""Test resolving protobuf enum value."""
332+
result = _resolve_cron_overlap_policy(workflow_pb2.CRON_OVERLAP_POLICY_SKIPPED)
333+
assert result == workflow_pb2.CRON_OVERLAP_POLICY_SKIPPED
334+
335+
def test_resolve_string_skipped(self):
336+
"""Test resolving string 'SKIPPED'."""
337+
result = _resolve_cron_overlap_policy("SKIPPED")
338+
assert result == workflow_pb2.CRON_OVERLAP_POLICY_SKIPPED
339+
340+
def test_resolve_string_buffer_one(self):
341+
"""Test resolving string 'BUFFER_ONE'."""
342+
result = _resolve_cron_overlap_policy("BUFFER_ONE")
343+
assert result == workflow_pb2.CRON_OVERLAP_POLICY_BUFFER_ONE
344+
345+
def test_resolve_string_lowercase(self):
346+
"""Test resolving lowercase string."""
347+
result = _resolve_cron_overlap_policy("buffer_one")
348+
assert result == workflow_pb2.CRON_OVERLAP_POLICY_BUFFER_ONE
349+
350+
def test_resolve_invalid_string_raises_error(self):
351+
"""Test that invalid string raises ValueError."""
352+
with pytest.raises(ValueError, match="Invalid cron_overlap_policy"):
353+
_resolve_cron_overlap_policy("INVALID_POLICY")
354+
244355

245356
class TestClientStartWorkflow:
246357
"""Test Client.start_workflow method."""

0 commit comments

Comments
 (0)