@@ -40,6 +40,7 @@ import (
4040 "go.temporal.io/sdk/temporal"
4141 "go.temporal.io/sdk/workflow"
4242 "go.temporal.io/server/common/dynamicconfig"
43+ "go.temporal.io/server/common/log"
4344 "go.temporal.io/server/common/util"
4445 "go.temporal.io/server/tests/testcore"
4546)
@@ -72,10 +73,11 @@ type internalRulesTestWorkflow struct {
7273 activityFailedCn chan struct {}
7374
7475 testSuite * testcore.FunctionalTestBase
76+ logger log.Logger
7577 ctx context.Context
7678}
7779
78- func newInternalRulesTestWorkflow (ctx context.Context , testSuite * testcore.FunctionalTestBase ) * internalRulesTestWorkflow {
80+ func newInternalRulesTestWorkflow (ctx context.Context , testSuite * testcore.FunctionalTestBase , logger log. Logger ) * internalRulesTestWorkflow {
7981 wf := & internalRulesTestWorkflow {
8082 initialRetryInterval : 1 * time .Second ,
8183 scheduleToCloseTimeout : 30 * time .Minute ,
@@ -84,6 +86,7 @@ func newInternalRulesTestWorkflow(ctx context.Context, testSuite *testcore.Funct
8486 activityFailedCn : make (chan struct {}),
8587 testSuite : testSuite ,
8688 ctx : ctx ,
89+ logger : logger ,
8790 }
8891 wf .activityRetryPolicy = & temporal.RetryPolicy {
8992 InitialInterval : wf .initialRetryInterval ,
@@ -305,7 +308,7 @@ func (s *ActivityApiRulesClientTestSuite) TestActivityRulesApi_RetryActivity() {
305308 ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
306309 defer cancel ()
307310
308- testWorkflow := newInternalRulesTestWorkflow (ctx , & s .FunctionalTestBase )
311+ testWorkflow := newInternalRulesTestWorkflow (ctx , & s .FunctionalTestBase , s . Logger )
309312
310313 s .Worker ().RegisterWorkflow (testWorkflow .WorkflowFuncForRetryActivity )
311314 s .Worker ().RegisterActivity (testWorkflow .ActivityFuncForRetryActivity )
@@ -339,8 +342,9 @@ func (s *ActivityApiRulesClientTestSuite) TestActivityRulesApi_RetryActivity() {
339342 }
340343 }, 5 * time .Second , 200 * time .Millisecond )
341344
342- // Let activity fail
343- err = util .InterruptibleSleep (ctx , 1 * time .Second )
345+ // Let namespace config propagate.
346+ // There is no good way to check if the namespace config has propagated to the history service
347+ err = util .InterruptibleSleep (ctx , 4 * time .Second )
344348 s .NoError (err )
345349
346350 testWorkflow .activityFailedCn <- struct {}{}
@@ -378,6 +382,11 @@ func (s *ActivityApiRulesClientTestSuite) TestActivityRulesApi_RetryActivity() {
378382 assert .Len (s .T (), nsResp .Rules , 0 )
379383 }, 5 * time .Second , 200 * time .Millisecond )
380384
385+ // Let namespace config propagate.
386+ // There is no good way to check if the namespace config has propagated to the history service
387+ err = util .InterruptibleSleep (ctx , 4 * time .Second )
388+ s .NoError (err )
389+
381390 // unpause the activity
382391 _ , err = s .FrontendClient ().UnpauseActivity (ctx , & workflowservice.UnpauseActivityRequest {
383392 Namespace : s .Namespace ().String (),
@@ -397,7 +406,7 @@ func (s *ActivityApiRulesClientTestSuite) TestActivityRulesApi_RetryActivity() {
397406 assert .True (t , description .PendingActivities [0 ].GetActivityType ().GetName () == activityType )
398407 assert .False (t , description .PendingActivities [0 ].GetPaused ())
399408 }
400- assert .Equal (t , int32 (1 ), testWorkflow .startedActivityCount .Load ())
409+ assert .LessOrEqual (t , int32 (1 ), testWorkflow .startedActivityCount .Load ())
401410 }, 5 * time .Second , 200 * time .Millisecond )
402411
403412 // let activity complete
@@ -409,6 +418,140 @@ func (s *ActivityApiRulesClientTestSuite) TestActivityRulesApi_RetryActivity() {
409418 s .NoError (err )
410419}
411420
421+ func (s * ActivityApiRulesClientTestSuite ) TestActivityRulesApi_RetryTask () {
422+ ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
423+ defer cancel ()
424+
425+ // overall test execution plan:
426+ // 1. start workflow
427+ // 2. wait for activity to start and fail exactly once
428+ // 3. create rule to pause activity
429+ // 4. wait for activity to be paused by rule
430+ // 5. let activity succeed
431+ // 6. Remove the rule so it didn't interfere with the activity
432+ // 7. Make sure there is no rules
433+ // 6. Unpause the activity. this will also trigger the activity
434+ // 7. Wait for activity to be unpaused
435+ // 8. Let activity complete
436+ // 9. Wait for workflow to finish
437+
438+ testRetryTaskWorkflow := newInternalRulesTestWorkflow (ctx , & s .FunctionalTestBase , s .Logger )
439+
440+ // set much longer retry interval to make sure that activity is retried at least once
441+ s .initialRetryInterval = 4 * time .Second
442+ s .activityRetryPolicy .InitialInterval = s .initialRetryInterval
443+
444+ s .Worker ().RegisterWorkflow (testRetryTaskWorkflow .WorkflowFuncForRetryTask )
445+ s .Worker ().RegisterActivity (testRetryTaskWorkflow .ActivityFuncForRetryTask )
446+
447+ // 1. Start workflow
448+ workflowRun := s .createWorkflow (ctx , testRetryTaskWorkflow .WorkflowFuncForRetryTask )
449+
450+ // 2. Wait for activity to start and fail exactly once
451+ s .EventuallyWithT (func (t * assert.CollectT ) {
452+ description , err := s .SdkClient ().DescribeWorkflowExecution (ctx , workflowRun .GetID (), workflowRun .GetRunID ())
453+ assert .NoError (t , err )
454+ if description .GetPendingActivities () != nil {
455+ assert .Len (t , description .PendingActivities , 1 )
456+ }
457+ assert .Equal (t , int32 (1 ), testRetryTaskWorkflow .startedActivityCount .Load ())
458+ }, 2 * time .Second , 200 * time .Millisecond )
459+
460+ // 3. Create rule to pause activity
461+ ruleID := "pause-activity"
462+ activityType := "ActivityFuncForRetryTask"
463+ createRuleRequest := s .createPauseRuleRequest (activityType , ruleID )
464+ createRuleResponse , err := s .FrontendClient ().CreateWorkflowRule (ctx , createRuleRequest )
465+ s .NoError (err )
466+ s .NotNil (createRuleResponse )
467+
468+ // 4. verify that frontend has updated namespaces
469+ s .EventuallyWithT (func (t * assert.CollectT ) {
470+ nsResp , err := s .FrontendClient ().ListWorkflowRules (ctx , & workflowservice.ListWorkflowRulesRequest {
471+ Namespace : s .Namespace ().String (),
472+ })
473+ assert .NoError (s .T (), err )
474+ assert .NotNil (s .T (), nsResp )
475+ assert .NotNil (s .T (), nsResp .Rules )
476+ if nsResp .GetRules () != nil {
477+ assert .Len (s .T (), nsResp .Rules , 1 )
478+ assert .Equal (s .T (), ruleID , nsResp .Rules [0 ].Spec .Id )
479+ }
480+ }, 5 * time .Second , 200 * time .Millisecond )
481+
482+ // Let namespace config propagate.
483+ // There is no good way to check if the namespace config has propagated to the history service
484+ err = util .InterruptibleSleep (ctx , 2 * time .Second )
485+ s .NoError (err )
486+
487+ // 5. wait for activity to be paused by rule. This should happen in the activity retry task
488+ s .EventuallyWithT (func (t * assert.CollectT ) {
489+ description , err := s .SdkClient ().DescribeWorkflowExecution (ctx , workflowRun .GetID (), workflowRun .GetRunID ())
490+ assert .NoError (t , err )
491+ if description .GetPendingActivities () != nil {
492+ assert .Len (t , description .PendingActivities , 1 )
493+ assert .True (t , description .PendingActivities [0 ].GetActivityType ().GetName () == activityType )
494+ assert .True (t , description .PendingActivities [0 ].GetPaused ())
495+ }
496+ assert .Equal (t , int32 (1 ), testRetryTaskWorkflow .startedActivityCount .Load ())
497+ }, 5 * time .Second , 200 * time .Millisecond )
498+
499+ // let activity succeed
500+ testRetryTaskWorkflow .letActivitySucceed .Store (true )
501+
502+ // remove the rule so it didn't interfere with the activity
503+ deleteRuleResponse , err := s .FrontendClient ().DeleteWorkflowRule (ctx , & workflowservice.DeleteWorkflowRuleRequest {
504+ Namespace : s .Namespace ().String (),
505+ RuleId : ruleID ,
506+ })
507+ s .NoError (err )
508+ s .NotNil (deleteRuleResponse )
509+
510+ // make sure there is no rules
511+ s .EventuallyWithT (func (t * assert.CollectT ) {
512+ nsResp , err := s .FrontendClient ().ListWorkflowRules (ctx , & workflowservice.ListWorkflowRulesRequest {
513+ Namespace : s .Namespace ().String (),
514+ })
515+ assert .NoError (s .T (), err )
516+ assert .NotNil (s .T (), nsResp )
517+ assert .Len (s .T (), nsResp .Rules , 0 )
518+ }, 5 * time .Second , 200 * time .Millisecond )
519+
520+ // Let namespace config propagate.
521+ // There is no good way to check if the namespace config has propagated to the history service
522+ err = util .InterruptibleSleep (ctx , 2 * time .Second )
523+ s .NoError (err )
524+
525+ // unpause the activity. this will also trigger the activity
526+ _ , err = s .FrontendClient ().UnpauseActivity (ctx , & workflowservice.UnpauseActivityRequest {
527+ Namespace : s .Namespace ().String (),
528+ Execution : & commonpb.WorkflowExecution {
529+ WorkflowId : workflowRun .GetID (),
530+ },
531+ Activity : & workflowservice.UnpauseActivityRequest_Type {Type : activityType },
532+ })
533+ s .NoError (err )
534+
535+ // wait for activity to be unpaused
536+ s .EventuallyWithT (func (t * assert.CollectT ) {
537+ description , err := s .SdkClient ().DescribeWorkflowExecution (ctx , workflowRun .GetID (), workflowRun .GetRunID ())
538+ assert .NoError (t , err )
539+ if description .GetPendingActivities () != nil {
540+ assert .Len (t , description .PendingActivities , 1 )
541+ assert .True (t , description .PendingActivities [0 ].GetActivityType ().GetName () == activityType )
542+ assert .False (t , description .PendingActivities [0 ].GetPaused ())
543+ }
544+ assert .LessOrEqual (t , int32 (1 ), testRetryTaskWorkflow .startedActivityCount .Load ())
545+ }, 5 * time .Second , 200 * time .Millisecond )
546+
547+ // let activity complete
548+ testRetryTaskWorkflow .activityCompleteCn <- struct {}{}
549+ // wait for workflow to finish
550+ var out string
551+ err = workflowRun .Get (ctx , & out )
552+ s .NoError (err )
553+ }
554+
412555func (s * ActivityApiRulesClientTestSuite ) createPauseRuleRequest (
413556 activityType string , ruleID string ,
414557) * workflowservice.CreateWorkflowRuleRequest {
0 commit comments