Skip to content

Commit 27a2cb8

Browse files
authored
Fix test flakes (temporalio#2253)
* fix race * shutdown worker before client for tests * drain polled tasks on shutdown * Add temporary fix to fix CI * Fix more flakes
1 parent 0374f0a commit 27a2cb8

5 files changed

Lines changed: 47 additions & 22 deletions

File tree

internal/internal_task_pollers.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,17 @@ func (bp *basePoller) doPoll(pollFunc func(ctx context.Context) (taskForWorker,
315315
case <-doneC:
316316
return result, err
317317
case <-bp.stopC:
318-
<-doneC
318+
// TEMP FIX: Give the server a reasonable window to complete the poll after
319+
// ShutdownWorker. Fall back to cancelling the poll if it takes too
320+
// long, e.g. when the gRPC connection was closed before Stop().
321+
timer := time.NewTimer(5 * time.Second)
322+
defer timer.Stop()
323+
select {
324+
case <-doneC:
325+
case <-timer.C:
326+
cancel()
327+
<-doneC
328+
}
319329
return result, err
320330
}
321331
}
@@ -503,6 +513,9 @@ func (wtp *workflowTaskProcessor) processWorkflowTask(task *workflowTask) (retEr
503513
return nil, nil
504514
}
505515
task := wtp.toWorkflowTask(heartbeatResponse.WorkflowTask)
516+
if err := visitProtoPayloads(ctx, wtp.inboundPayloadVisitor, task.task); err != nil {
517+
return nil, err
518+
}
506519
task.doneCh = doneCh
507520
task.laResultCh = laResultCh
508521
task.laRetryCh = laRetryCh
@@ -532,6 +545,9 @@ func (wtp *workflowTaskProcessor) processWorkflowTask(task *workflowTask) (retEr
532545

533546
// we are getting new workflow task, so reset the workflowTask and continue process the new one
534547
task = wtp.toWorkflowTask(response.WorkflowTask)
548+
if err := visitProtoPayloads(ctx, wtp.inboundPayloadVisitor, task.task); err != nil {
549+
return err
550+
}
535551
}
536552
}
537553

internal/internal_worker.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1465,12 +1465,13 @@ func (aw *AggregatedWorker) Stop() {
14651465
case <-aw.stopC:
14661466
return
14671467
default:
1468-
close(aw.stopC)
14691468
}
14701469

1471-
// Prevent pollers from re-polling after ShutdownWorker cancels
1472-
// in-flight polls. The poll goroutine can return and re-poll
1473-
// before shutdownWorker() returns to this goroutine.
1470+
// Prevent pollers from re-polling before closing stopC. There is a race
1471+
// between stopC being closed and the ShutdownWorker RPC: a poll can
1472+
// complete naturally (e.g. long-poll timeout) right after stopC fires
1473+
// but before ShutdownWorker is sent, causing the poller to loop and
1474+
// re-poll.
14741475
if !util.IsInterfaceNil(aw.activityWorker) {
14751476
aw.activityWorker.worker.noRepoll.Store(true)
14761477
}
@@ -1481,6 +1482,8 @@ func (aw *AggregatedWorker) Stop() {
14811482
aw.nexusWorker.worker.noRepoll.Store(true)
14821483
}
14831484

1485+
close(aw.stopC)
1486+
14841487
aw.shutdownWorker()
14851488

14861489
// Issue stop through plugins

test/integration_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,13 +269,14 @@ func (ts *IntegrationTestSuite) SetupTest() {
269269
}
270270

271271
func (ts *IntegrationTestSuite) TearDownTest() {
272-
if ts.client != nil {
273-
ts.client.Close()
274-
}
272+
// Stop worker before closing client so ShutdownWorker RPC reaches the server
275273
if !ts.workerStopped {
276274
ts.worker.Stop()
277275
ts.workerStopped = true
278276
}
277+
if ts.client != nil {
278+
ts.client.Close()
279+
}
279280
}
280281

281282
func (ts *IntegrationTestSuite) TestBasic() {

test/worker_heartbeat_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func (ts *WorkerHeartbeatTestSuite) SetupSuite() {
5151
Namespace: ts.config.Namespace,
5252
Logger: ilog.NewDefaultLogger(),
5353
WorkerHeartbeatInterval: 1 * time.Second,
54-
ConnectionOptions: client.ConnectionOptions{TLS: ts.config.TLS},
54+
ConnectionOptions: client.ConnectionOptions{TLS: ts.config.TLS, GetSystemInfoTimeout: ctxTimeout},
5555
Identity: "WorkerHeartbeatTest",
5656
})
5757
ts.NoError(err)
@@ -312,7 +312,7 @@ func (ts *WorkerHeartbeatTestSuite) TestWorkerHeartbeatDisabled() {
312312
Namespace: ts.config.Namespace,
313313
Logger: ilog.NewDefaultLogger(),
314314
WorkerHeartbeatInterval: -1,
315-
ConnectionOptions: client.ConnectionOptions{TLS: ts.config.TLS},
315+
ConnectionOptions: client.ConnectionOptions{TLS: ts.config.TLS, GetSystemInfoTimeout: ctxTimeout},
316316
})
317317
ts.NoError(err)
318318
defer clientNoHeartbeat.Close()
@@ -880,7 +880,7 @@ func (ts *WorkerHeartbeatTestSuite) TestWorkerHeartbeatPlugins() {
880880
Namespace: ts.config.Namespace,
881881
Logger: ilog.NewDefaultLogger(),
882882
WorkerHeartbeatInterval: 1 * time.Second,
883-
ConnectionOptions: client.ConnectionOptions{TLS: ts.config.TLS},
883+
ConnectionOptions: client.ConnectionOptions{TLS: ts.config.TLS, GetSystemInfoTimeout: ctxTimeout},
884884
Identity: "PluginTest",
885885
Plugins: []client.Plugin{clientPlugin},
886886
})
@@ -943,7 +943,8 @@ func (ts *WorkerHeartbeatTestSuite) TestWorkerPollCompleteOnShutdown() {
943943
Logger: ilog.NewDefaultLogger(),
944944
WorkerHeartbeatInterval: 1 * time.Second,
945945
ConnectionOptions: client.ConnectionOptions{
946-
TLS: ts.config.TLS,
946+
TLS: ts.config.TLS,
947+
GetSystemInfoTimeout: ctxTimeout,
947948
DialOptions: []grpc.DialOption{
948949
grpc.WithUnaryInterceptor(func(
949950
ctx context.Context,
@@ -1036,7 +1037,7 @@ func (ts *WorkerHeartbeatTestSuite) TestWorkerHeartbeatStorageDrivers() {
10361037
DriverSelector: &roundRobinSelector{drivers: drivers},
10371038
},
10381039
WorkerHeartbeatInterval: 1 * time.Second,
1039-
ConnectionOptions: client.ConnectionOptions{TLS: ts.config.TLS},
1040+
ConnectionOptions: client.ConnectionOptions{TLS: ts.config.TLS, GetSystemInfoTimeout: ctxTimeout},
10401041
Identity: "StorageDriverTest",
10411042
})
10421043
ts.NoError(err)

test/worker_tuner_test.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package test_test
33
import (
44
"context"
55
"testing"
6+
"time"
67

78
"github.com/stretchr/testify/require"
89
"github.com/stretchr/testify/suite"
@@ -48,7 +49,7 @@ func (ts *WorkerTunerTestSuite) TestFixedSizeWorkerTuner() {
4849
})
4950
ts.NoError(err)
5051

51-
ts.runTheWorkflow(worker.Options{Tuner: tuner}, ctx)
52+
ts.runTheWorkflow(worker.Options{Tuner: tuner}, ctx, 0)
5253
}
5354

5455
func (ts *WorkerTunerTestSuite) TestCompositeWorkerTuner() {
@@ -75,7 +76,7 @@ func (ts *WorkerTunerTestSuite) TestCompositeWorkerTuner() {
7576
WorkflowSlotSupplier: wfSS, ActivitySlotSupplier: actSS, LocalActivitySlotSupplier: laCss})
7677
ts.NoError(err)
7778

78-
ts.runTheWorkflow(worker.Options{Tuner: tuner}, ctx)
79+
ts.runTheWorkflow(worker.Options{Tuner: tuner}, ctx, 0)
7980
}
8081

8182
func (ts *WorkerTunerTestSuite) TestPollerBehaviorAutoscalingScaler() {
@@ -89,7 +90,7 @@ func (ts *WorkerTunerTestSuite) TestPollerBehaviorAutoscalingScaler() {
8990
ActivityTaskPollerBehavior: worker.NewPollerBehaviorAutoscaling(
9091
worker.PollerBehaviorAutoscalingOptions{},
9192
),
92-
}, ctx)
93+
}, ctx, 0)
9394
}
9495

9596
func (ts *WorkerTunerTestSuite) TestPollerBehaviorSimpleMaximumScaler() {
@@ -103,11 +104,11 @@ func (ts *WorkerTunerTestSuite) TestPollerBehaviorSimpleMaximumScaler() {
103104
ActivityTaskPollerBehavior: worker.NewPollerBehaviorSimpleMaximum(
104105
worker.PollerBehaviorSimpleMaximumOptions{},
105106
),
106-
}, ctx)
107+
}, ctx, 0)
107108
}
108109

109110
func (ts *WorkerTunerTestSuite) TestResourceBasedSmallSlots() {
110-
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
111+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
111112
defer cancel()
112113

113114
wfSS, err := worker.NewFixedSizeSlotSupplier(10)
@@ -132,19 +133,22 @@ func (ts *WorkerTunerTestSuite) TestResourceBasedSmallSlots() {
132133

133134
// The bug this is verifying was triggered by a race, so run this a bunch to verify it's not hit
134135
for i := 0; i < 10; i++ {
135-
ts.runTheWorkflow(worker.Options{Tuner: tuner}, ctx)
136+
ts.runTheWorkflow(worker.Options{Tuner: tuner}, ctx, 2*time.Minute)
136137
}
137138
}
138139

139-
func (ts *WorkerTunerTestSuite) runTheWorkflow(workerOptions worker.Options, ctx context.Context) {
140+
func (ts *WorkerTunerTestSuite) runTheWorkflow(workerOptions worker.Options, ctx context.Context, wfTimeout time.Duration) {
140141
myWorker := worker.New(ts.client, ts.taskQueueName, workerOptions)
141142
ts.workflows.register(myWorker)
142143
ts.activities.register(myWorker)
143144
ts.NoError(myWorker.Start())
144145
defer myWorker.Stop()
145146

146-
handle, err := ts.client.ExecuteWorkflow(ctx,
147-
ts.startWorkflowOptions(ts.T().Name()),
147+
opts := ts.startWorkflowOptions(ts.T().Name())
148+
if wfTimeout > 0 {
149+
opts.WorkflowExecutionTimeout = wfTimeout
150+
}
151+
handle, err := ts.client.ExecuteWorkflow(ctx, opts,
148152
ts.workflows.RunsLocalAndNonlocalActsWithRetries, 5, 2)
149153
ts.NoError(err)
150154
ts.NoError(handle.Get(ctx, nil))

0 commit comments

Comments
 (0)