From 6f0fa0171be3798190e3208daed1cede52254f63 Mon Sep 17 00:00:00 2001
From: Raphael Simon
Date: Sat, 25 May 2024 18:26:53 -0700
Subject: [PATCH] Ensure node.Close and node.RemoveWorker properly stop local
jobs
Before they get requeued.
---
pool/README.md | 7 +++----
pool/node.go | 4 ++--
pool/node_test.go | 27 ++++++++++++++++++---------
pool/scheduler.go | 2 +-
pool/worker.go | 17 +++++++++--------
scripts/test | 12 +++++++++---
6 files changed, 42 insertions(+), 27 deletions(-)
diff --git a/pool/README.md b/pool/README.md
index 885f1be..3716bfb 100644
--- a/pool/README.md
+++ b/pool/README.md
@@ -20,7 +20,6 @@ Pulse uses the [Jump Consistent Hash](https://arxiv.org/abs/1406.2294) algorithm
to assign jobs to workers which provides a good balance between load balancing
and worker assignment stability.
-
```mermaid
%%{init: {'themeVariables': { 'edgeLabelBackground': '#7A7A7A'}}}%%
flowchart LR
@@ -216,12 +215,12 @@ flowchart TD
end
pr --1. DispatchJob--> no
no --2. Add Job--> js
- js -.3. Job.-> ps
+ js --3. Job--> ps
ps --4. Add Job--> ws
- ws -.5. Job.-> r
+ ws --5. Job--> r
r --6. Start Job--> u
r --7. Add Ack--> rs
- rs -.7. Ack.-> nr
+ rs --7. Ack--> nr
nr --8. Ack Add Job Event--> js
classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC;
diff --git a/pool/node.go b/pool/node.go
index fad9bbd..2004990 100644
--- a/pool/node.go
+++ b/pool/node.go
@@ -526,7 +526,7 @@ func (node *Node) ackWorkerEvent(ctx context.Context, ev *streaming.Event) {
}
// returnDispatchStatus returns the start job result to the caller.
-func (node *Node) returnDispatchStatus(ctx context.Context, ev *streaming.Event) {
+func (node *Node) returnDispatchStatus(_ context.Context, ev *streaming.Event) {
node.lock.Lock()
defer node.lock.Unlock()
@@ -738,7 +738,7 @@ func (node *Node) deleteWorker(ctx context.Context, id string) error {
// workerStream retrieves the stream for a worker. It caches the result in the
// workerStreams map. Caller is responsible for locking.
-func (node *Node) workerStream(ctx context.Context, id string) (*streaming.Stream, error) {
+func (node *Node) workerStream(_ context.Context, id string) (*streaming.Stream, error) {
stream, ok := node.workerStreams[id]
if !ok {
s, err := streaming.NewStream(workerStreamName(id), node.rdb, soptions.WithStreamLogger(node.logger))
diff --git a/pool/node_test.go b/pool/node_test.go
index ccc2605..24a8877 100644
--- a/pool/node_test.go
+++ b/pool/node_test.go
@@ -76,9 +76,13 @@ func TestRemoveWorkerThenShutdown(t *testing.T) {
rdb = redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
node = newTestNode(t, ctx, rdb, testName)
worker = newTestWorker(t, ctx, node)
+ handler = worker.handler.(*mockHandler)
)
defer cleanup(t, rdb, true, testName)
+ assert.NoError(t, node.DispatchJob(ctx, testName, []byte("payload")))
+ assert.Eventually(t, func() bool { return len(handler.jobs) == 1 }, max, delay)
assert.NoError(t, node.RemoveWorker(ctx, worker))
+ assert.Eventually(t, func() bool { return len(handler.jobs) == 0 }, max, delay)
assert.NoError(t, node.Shutdown(ctx))
}
@@ -88,9 +92,14 @@ func TestClose(t *testing.T) {
testName = strings.Replace(t.Name(), "/", "_", -1)
rdb = redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
node = newTestNode(t, ctx, rdb, testName)
+ worker = newTestWorker(t, ctx, node)
+ handler = worker.handler.(*mockHandler)
)
defer cleanup(t, rdb, false, testName)
+ assert.NoError(t, node.DispatchJob(ctx, testName, []byte("payload")))
+ assert.Eventually(t, func() bool { return len(handler.jobs) == 1 }, max, delay)
assert.NoError(t, node.Close(ctx))
+ assert.Eventually(t, func() bool { return len(handler.jobs) == 0 }, max, delay)
}
func newTestNode(t *testing.T, ctx context.Context, rdb *redis.Client, name string) *Node {
@@ -106,11 +115,11 @@ func newTestNode(t *testing.T, ctx context.Context, rdb *redis.Client, name stri
func newTestWorker(t *testing.T, ctx context.Context, node *Node) *Worker {
t.Helper()
- wm := &workerMock{jobs: make(map[string]*Job)}
- wm.startFunc = func(job *Job) error { wm.jobs[job.Key] = job; return nil }
- wm.stopFunc = func(key string) error { delete(wm.jobs, key); return nil }
- wm.notifyFunc = func(payload []byte) error { return nil }
- worker, err := node.AddWorker(ctx, wm)
+ handler := &mockHandler{jobs: make(map[string]*Job)}
+ handler.startFunc = func(job *Job) error { handler.jobs[job.Key] = job; return nil }
+ handler.stopFunc = func(key string) error { delete(handler.jobs, key); return nil }
+ handler.notifyFunc = func(payload []byte) error { return nil }
+ worker, err := node.AddWorker(ctx, handler)
require.NoError(t, err)
return worker
}
@@ -150,16 +159,16 @@ func cleanup(t *testing.T, rdb *redis.Client, checkClean bool, testName string)
assert.NoError(t, rdb.FlushDB(ctx).Err())
}
-type workerMock struct {
+type mockHandler struct {
startFunc func(job *Job) error
stopFunc func(key string) error
notifyFunc func(payload []byte) error
jobs map[string]*Job
}
-func (w *workerMock) Start(job *Job) error { return w.startFunc(job) }
-func (w *workerMock) Stop(key string) error { return w.stopFunc(key) }
-func (w *workerMock) Notify(p []byte) error { return w.notifyFunc(p) }
+func (w *mockHandler) Start(job *Job) error { return w.startFunc(job) }
+func (w *mockHandler) Stop(key string) error { return w.stopFunc(key) }
+func (w *mockHandler) Notify(p []byte) error { return w.notifyFunc(p) }
// buffer is a goroutine safe bytes.Buffer
type buffer struct {
diff --git a/pool/scheduler.go b/pool/scheduler.go
index 8617fa5..6ff192c 100644
--- a/pool/scheduler.go
+++ b/pool/scheduler.go
@@ -170,7 +170,7 @@ func (sched *scheduler) stopJobs(ctx context.Context, plan *JobPlan) error {
}
// handleStop handles the scheduler stop signal.
-func (sched *scheduler) handleStop(ctx context.Context) {
+func (sched *scheduler) handleStop(_ context.Context) {
ch := sched.jobMap.Subscribe()
for ev := range ch {
if ev == rmap.EventReset {
diff --git a/pool/worker.go b/pool/worker.go
index 1c956e1..cab52ea 100644
--- a/pool/worker.go
+++ b/pool/worker.go
@@ -163,7 +163,9 @@ func (w *Worker) handleEvents(c <-chan *streaming.Event) {
case evStartJob:
err = w.startJob(ctx, unmarshalJob(payload))
case evStopJob:
+ w.lock.Lock()
err = w.stopJob(ctx, unmarshalJobKey(payload))
+ w.lock.Unlock()
case evNotify:
key, payload := unmarshalNotification(payload)
err = w.notify(ctx, key, payload)
@@ -229,7 +231,7 @@ func (w *Worker) stopAndWait(ctx context.Context) {
}
// startJob starts a job.
-func (w *Worker) startJob(ctx context.Context, job *Job) error {
+func (w *Worker) startJob(_ context.Context, job *Job) error {
w.lock.Lock()
defer w.lock.Unlock()
if w.stopped {
@@ -246,12 +248,8 @@ func (w *Worker) startJob(ctx context.Context, job *Job) error {
}
// stopJob stops a job.
-func (w *Worker) stopJob(ctx context.Context, key string) error {
- w.lock.Lock()
- defer w.lock.Unlock()
- if w.stopped {
- return nil
- }
+// worker.lock must be held when calling this method.
+func (w *Worker) stopJob(_ context.Context, key string) error {
if _, ok := w.jobs[key]; !ok {
return fmt.Errorf("job %s not found", key)
}
@@ -264,7 +262,7 @@ func (w *Worker) stopJob(ctx context.Context, key string) error {
}
// notify notifies the worker with the given payload.
-func (w *Worker) notify(ctx context.Context, key string, payload []byte) error {
+func (w *Worker) notify(_ context.Context, key string, payload []byte) error {
w.lock.Lock()
defer w.lock.Unlock()
if w.stopped {
@@ -335,6 +333,9 @@ func (w *Worker) requeueJobs(ctx context.Context) {
w.lock.Lock()
defer w.lock.Unlock()
for _, job := range w.jobs {
+ if err := w.stopJob(ctx, job.Key); err != nil {
+ w.logger.Error(fmt.Errorf("failed to stop job %q: %w", job.Key, err))
+ }
if _, err := w.Node.poolStream.Add(ctx, evStartJob, marshalJob(job)); err != nil {
w.logger.Error(fmt.Errorf("failed to requeue job %q: %w", job.Key, err))
}
diff --git a/scripts/test b/scripts/test
index 6bf64fa..2b271c9 100755
--- a/scripts/test
+++ b/scripts/test
@@ -9,9 +9,15 @@ pushd ${GIT_ROOT}
staticcheck ./...
+# If --force is passed, add --count=1 to the go test command
+if [[ "$1" == "--force" ]]; then
+ shift
+ OPTIONS="--count=1"
+fi
+
# Run tests one package at a time to avoid Redis race conditions
-go test -race goa.design/pulse/rmap/...
-go test -race goa.design/pulse/streaming/...
-go test -race goa.design/pulse/pool/...
+go test -race goa.design/pulse/rmap/... $OPTIONS
+go test -race goa.design/pulse/streaming/... $OPTIONS
+go test -race goa.design/pulse/pool/... $OPTIONS
popd