Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ const (
`event_store_version: ?, ` +
`branch_token: ?, ` +
`cron_schedule: ?, ` +
`cron_overlap_policy: ?, ` +
`expiration_seconds: ?, ` +
`search_attributes: ?, ` +
`memo: ?, ` +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ func parseWorkflowExecutionInfo(result map[string]interface{}) *persistence.Inte
activeClusterSelectionPolicy = v.([]byte)
case "active_cluster_selection_policy_encoding":
activeClusterSelectionPolicyEncoding = constants.EncodingType(v.(string))
case "cron_overlap_policy":
info.CronOverlapPolicy = types.CronOverlapPolicy(int32(v.(int)))
}
}
info.CompletionEvent = persistence.NewDataBlob(completionEventData, completionEventEncoding)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1398,6 +1398,7 @@ func updateWorkflowExecution(
persistence.EventStoreVersion,
execution.BranchToken,
execution.CronSchedule,
int32(execution.CronOverlapPolicy),
int32(execution.ExpirationInterval.Seconds()),
execution.SearchAttributes,
execution.Memo,
Expand Down Expand Up @@ -1498,6 +1499,7 @@ func createWorkflowExecution(
persistence.EventStoreVersion,
execution.BranchToken,
execution.CronSchedule,
int32(execution.CronOverlapPolicy),
int32(execution.ExpirationInterval.Seconds()),
execution.SearchAttributes,
execution.Memo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2582,6 +2582,7 @@ func TestUpdateWorkflowExecution(t *testing.T) {
DecisionStartedID: 3,
CompletionEvent: &persistence.DataBlob{},
AutoResetPoints: &persistence.DataBlob{},
CronOverlapPolicy: 0,
},
PreviousNextEventIDCondition: common.Int64Ptr(10),
VersionHistories: &persistence.DataBlob{},
Expand All @@ -2600,7 +2601,7 @@ func TestUpdateWorkflowExecution(t *testing.T) {
`cancel_requested: false, cancel_request_id: , sticky_task_list: , sticky_schedule_to_start_timeout: 0,client_library_version: , ` +
`client_feature_version: , client_impl: , auto_reset_points: [], auto_reset_points_encoding: , attempt: 0, has_retry_policy: false, ` +
`init_interval: 0, backoff_coefficient: 0, max_interval: 0, expiration_time: 0001-01-01T00:00:00Z, max_attempts: 0, ` +
`non_retriable_errors: [], event_store_version: 2, branch_token: [], cron_schedule: , expiration_seconds: 0, search_attributes: map[], ` +
`non_retriable_errors: [], event_store_version: 2, branch_token: [], cron_schedule: , cron_overlap_policy: 0, expiration_seconds: 0, search_attributes: map[], ` +
`memo: map[], partition_config: map[], active_cluster_selection_policy: [], active_cluster_selection_policy_encoding: ` +
`}, next_event_id = 0 , version_histories = [] , version_histories_encoding = , checksum = {version: 0, flavor: 0, value: [] }, workflow_last_write_version = 0 , workflow_state = 0 , last_updated_time = 2025-01-06T15:00:00Z ` +
`WHERE ` +
Expand Down Expand Up @@ -2666,6 +2667,7 @@ func TestCreateWorkflowExecution(t *testing.T) {
Encoding: constants.EncodingTypeThriftRW,
Data: []byte("thrift-encoded-active-cluster-selection-policy-data"),
},
CronOverlapPolicy: types.CronOverlapPolicyBufferOne,
},
PreviousNextEventIDCondition: common.Int64Ptr(10),
VersionHistories: &persistence.DataBlob{},
Expand All @@ -2684,7 +2686,7 @@ func TestCreateWorkflowExecution(t *testing.T) {
`cancel_requested: false, cancel_request_id: , sticky_task_list: , sticky_schedule_to_start_timeout: 0,client_library_version: , client_feature_version: , ` +
`client_impl: , auto_reset_points: [], auto_reset_points_encoding: , attempt: 0, has_retry_policy: false, init_interval: 0, ` +
`backoff_coefficient: 0, max_interval: 0, expiration_time: 0001-01-01T00:00:00Z, max_attempts: 0, non_retriable_errors: [], ` +
`event_store_version: 2, branch_token: [], cron_schedule: , expiration_seconds: 0, search_attributes: map[], memo: map[], partition_config: map[], ` +
`event_store_version: 2, branch_token: [], cron_schedule: , cron_overlap_policy: 1, expiration_seconds: 0, search_attributes: map[], memo: map[], partition_config: map[], ` +
`active_cluster_selection_policy: [116 104 114 105 102 116 45 101 110 99 111 100 101 100 45 97 99 116 105 118 101 45 99 108 117 115 116 101 114 45 115 101 108 101 99 116 105 111 110 45 112 111 108 105 99 121 45 100 97 116 97], active_cluster_selection_policy_encoding: thriftrw` +
`}, 0, 946684800000, -10, [], , {version: 0, flavor: 0, value: [] }, 0, 0, 2025-01-06T15:00:00Z) IF NOT EXISTS `,
},
Expand Down
1 change: 1 addition & 0 deletions common/types/testdata/service_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ var (
Control: Control,
RetryPolicy: &RetryPolicy,
CronSchedule: CronSchedule,
CronOverlapPolicy: &CronOverlapPolicy,
Memo: &Memo,
SearchAttributes: &SearchAttributes,
Header: &Header,
Expand Down
7 changes: 7 additions & 0 deletions tools/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ const (
FlagSearchAttribute = "search_attr"
FlagNumReadPartitions = "num_read_partitions"
FlagNumWritePartitions = "num_write_partitions"
FlagCronOverlapPolicy = "cron_overlap_policy"

FlagClustersUsage = "Clusters (example: --clusters clusterA,clusterB or --cl clusterA --cl clusterB)"
)
Expand Down Expand Up @@ -340,6 +341,12 @@ func getFlagsForStart() []cli.Flag {
"\t│ │ │ │ │ \n" +
"\t* * * * *",
},
&cli.IntFlag{
Name: FlagCronOverlapPolicy,
Aliases: []string{"cop"},
Usage: "Optional cron overlap policy for the workflow when a cron run overlaps with next scheduled run. " +
"Available options: 0: Skip running if cron run overlaps, 1: Start new run immediately if previous run overlaps and completes",
},
&cli.IntFlag{
Name: FlagWorkflowIDReusePolicy,
Aliases: []string{"wrp"},
Expand Down
5 changes: 5 additions & 0 deletions tools/cli/workflow_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,10 @@ func constructStartWorkflowRequest(c *cli.Context) (*types.StartWorkflowExecutio
startRequest.FirstRunAtTimeStamp = common.Int64Ptr(t.UnixNano())
}

if c.IsSet(FlagCronOverlapPolicy) {
startRequest.CronOverlapPolicy = types.CronOverlapPolicy(c.Int(FlagCronOverlapPolicy)).Ptr()
}

headerFields, err := processHeader(c)
if err != nil {
return nil, fmt.Errorf("error when process header: %w", err)
Expand Down Expand Up @@ -822,6 +826,7 @@ func constructSignalWithStartWorkflowRequest(c *cli.Context) (*types.SignalWithS
WorkflowIDReusePolicy: startRequest.WorkflowIDReusePolicy,
RetryPolicy: startRequest.RetryPolicy,
CronSchedule: startRequest.CronSchedule,
CronOverlapPolicy: startRequest.CronOverlapPolicy,
Memo: startRequest.Memo,
SearchAttributes: startRequest.SearchAttributes,
Header: startRequest.Header,
Expand Down
4 changes: 3 additions & 1 deletion tools/cli/workflow_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func TestConstructStartWorkflowRequest(t *testing.T) {
set.Int(DelayStartSeconds, 5, DelayStartSeconds)
set.Int(JitterStartSeconds, 2, JitterStartSeconds)
set.String("first_run_at_time", "2024-07-24T12:00:00Z", "first-run-at-time")
set.Int("cron_overlap_policy", 0, "cron_overlap_policy")

c := cli.NewContext(nil, set, nil)
// inject context with span
Expand All @@ -92,7 +93,7 @@ func TestConstructStartWorkflowRequest(t *testing.T) {
assert.NoError(t, c.Set(DelayStartSeconds, "5"))
assert.NoError(t, c.Set(JitterStartSeconds, "2"))
assert.NoError(t, c.Set("first_run_at_time", "2024-07-24T12:00:00Z"))

assert.NoError(t, c.Set("cron_overlap_policy", "0"))
request, err := constructStartWorkflowRequest(c)
assert.NoError(t, err)
assert.NotNil(t, request)
Expand All @@ -111,6 +112,7 @@ func TestConstructStartWorkflowRequest(t *testing.T) {
firstRunAt, err := time.Parse(time.RFC3339, "2024-07-24T12:00:00Z")
assert.NoError(t, err)
assert.Equal(t, firstRunAt.UnixNano(), *request.FirstRunAtTimeStamp)
assert.Equal(t, types.CronOverlapPolicySkipped, *request.CronOverlapPolicy)
}

func Test_PrintAutoResetPoints(t *testing.T) {
Expand Down