Skip to content

Commit 8c03416

Browse files
authored
Fix regression in Go SDK where Batch Timer interval for heartbeats is overcalculated (#225)
As part of switching from variables representing time in seconds to time.Duration, we forgot to correctly update the line of code that calculates the interval for sending the next heartbeat. We were multiplying by an extra factor of one billion, which either caused an overflow (so that we were constantly sending heartbeats) or resulted in a large number (where we would never send a heartbeat after the first one).
1 parent 542efab commit 8c03416

5 files changed

+68
-6
lines changed

internal/internal_task_handlers.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1625,7 +1625,7 @@ func (i *temporalInvoker) Heartbeat(details *commonpb.Payloads, skipBatching boo
16251625
}
16261626

16271627
// We set a deadline at 80% of the timeout.
1628-
duration := time.Duration(0.8*float32(deadlineToTrigger)) * time.Second
1628+
duration := time.Duration(0.8 * float64(deadlineToTrigger))
16291629
i.hbBatchEndTimer = time.NewTimer(duration)
16301630

16311631
go func() {

internal/internal_task_handlers_test.go

+30-5
Original file line numberDiff line numberDiff line change
@@ -1306,19 +1306,44 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_WorkflowTaskHeartbeatFail
13061306
func (t *TaskHandlersTestSuite) TestHeartBeat_NoError() {
13071307
mockCtrl := gomock.NewController(t.T())
13081308
mockService := workflowservicemock.NewMockWorkflowServiceClient(mockCtrl)
1309-
1309+
invocationChannel := make(chan int, 2)
13101310
heartbeatResponse := workflowservice.RecordActivityTaskHeartbeatResponse{CancelRequested: false}
1311-
mockService.EXPECT().RecordActivityTaskHeartbeat(gomock.Any(), gomock.Any(), gomock.Any()).Return(&heartbeatResponse, nil)
1311+
mockService.EXPECT().
1312+
RecordActivityTaskHeartbeat(gomock.Any(), gomock.Any(), gomock.Any()).
1313+
Do(func(_ interface{}, _ interface{}, _ ...interface{}) { invocationChannel <- 1 }).
1314+
Return(&heartbeatResponse, nil).
1315+
Times(2)
13121316

13131317
temporalInvoker := &temporalInvoker{
1314-
identity: "Test_Temporal_Invoker",
1315-
service: mockService,
1316-
taskToken: nil,
1318+
identity: "Test_Temporal_Invoker",
1319+
service: mockService,
1320+
taskToken: nil,
1321+
heartBeatTimeout: time.Second,
13171322
}
13181323

13191324
heartbeatErr := temporalInvoker.Heartbeat(nil, false)
1325+
t.NoError(heartbeatErr)
1326+
1327+
select {
1328+
case <-invocationChannel:
1329+
case <-time.After(3 * time.Second):
1330+
t.Fail("did not get expected 1st call to record heartbeat")
1331+
}
13201332

1333+
heartbeatErr = temporalInvoker.Heartbeat(nil, false)
13211334
t.NoError(heartbeatErr)
1335+
1336+
select {
1337+
case <-invocationChannel:
1338+
t.Fail("got unexpected call to record heartbeat. 2nd call should come via batch timer")
1339+
default:
1340+
}
1341+
1342+
select {
1343+
case <-invocationChannel:
1344+
case <-time.After(3 * time.Second):
1345+
t.Fail("did not get expected 2nd call to record heartbeat via batch timer")
1346+
}
13221347
}
13231348

13241349
func (t *TaskHandlersTestSuite) TestHeartBeat_NilResponseWithError() {

test/activity_test.go

+11
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,17 @@ func (a *Activities) HeartbeatAndSleep(ctx context.Context, seq int, delay time.
8585
return seq, nil
8686
}
8787

88+
func (a *Activities) LongRunningHeartbeat(ctx context.Context, delay time.Duration, recordHeartbeatDelay time.Duration) error {
89+
a.append("longRunningHeartbeat")
90+
endTime := time.Now().Add(delay)
91+
for time.Now().Before(endTime) {
92+
activity.RecordHeartbeat(ctx)
93+
time.Sleep(recordHeartbeatDelay)
94+
}
95+
96+
return nil
97+
}
98+
8899
func (a *Activities) fail(_ context.Context) error {
89100
a.append("fail")
90101
return errFailOnPurpose

test/integration_test.go

+7
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,13 @@ func (ts *IntegrationTestSuite) TestActivityRetryOnHBTimeout() {
193193
ts.EqualValues(expected, ts.activities.invoked())
194194
}
195195

196+
func (ts *IntegrationTestSuite) TestLongRunningActivityWithHB() {
197+
var expected []string
198+
err := ts.executeWorkflow("test-long-running-activity-with-hb", ts.workflows.LongRunningActivityWithHB, &expected)
199+
ts.NoError(err)
200+
ts.EqualValues(expected, ts.activities.invoked())
201+
}
202+
196203
func (ts *IntegrationTestSuite) TestContinueAsNew() {
197204
var result int
198205
err := ts.executeWorkflow("test-continueasnew", ts.workflows.ContinueAsNew, &result, 4, ts.taskQueueName)

test/workflow_test.go

+19
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,24 @@ func (w *Workflows) ActivityRetryOnTimeout(ctx workflow.Context, timeoutType enu
137137
return []string{"sleep", "sleep", "sleep"}, nil
138138
}
139139

140+
func (w *Workflows) LongRunningActivityWithHB(ctx workflow.Context) ([]string, error) {
141+
opts := w.defaultActivityOptionsWithRetry()
142+
opts.HeartbeatTimeout = 3 * time.Second
143+
opts.ScheduleToCloseTimeout = time.Second * 12
144+
opts.StartToCloseTimeout = time.Second * 12
145+
opts.RetryPolicy = &internal.RetryPolicy{
146+
MaximumAttempts: 1,
147+
}
148+
ctx = workflow.WithActivityOptions(ctx, opts)
149+
150+
err := workflow.ExecuteActivity(ctx, "LongRunningHeartbeat", 8*time.Second, 300*time.Millisecond).Get(ctx, nil)
151+
if err != nil {
152+
return nil, fmt.Errorf("expected activity to succeed but it failed: %v", err)
153+
}
154+
155+
return []string{"longRunningHeartbeat"}, nil
156+
}
157+
140158
func (w *Workflows) ActivityRetryOnHBTimeout(ctx workflow.Context) ([]string, error) {
141159
opts := w.defaultActivityOptionsWithRetry()
142160
opts.HeartbeatTimeout = time.Second
@@ -718,6 +736,7 @@ func (w *Workflows) register(worker worker.Worker) {
718736
worker.RegisterWorkflow(w.InspectActivityInfo)
719737
worker.RegisterWorkflow(w.InspectLocalActivityInfo)
720738
worker.RegisterWorkflow(w.LargeQueryResultWorkflow)
739+
worker.RegisterWorkflow(w.LongRunningActivityWithHB)
721740
worker.RegisterWorkflow(w.RetryTimeoutStableErrorWorkflow)
722741
worker.RegisterWorkflow(w.SimplestWorkflow)
723742
worker.RegisterWorkflow(w.WorkflowWithLocalActivityCtxPropagation)

0 commit comments

Comments
 (0)