Skip to content

Commit 8eeb76e

Browse files
authored
Merge pull request #1481 from nono/fix-panic-on-shutdown
Fix a panic on shutdown with the mem broker
2 parents aa503c8 + ddc64ab commit 8eeb76e

File tree

1 file changed

+27
-6
lines changed

1 file changed

+27
-6
lines changed

pkg/jobs/mem_broker.go

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type (
1717
memQueue struct {
1818
MaxCapacity int
1919
Jobs chan *Job
20+
closed chan struct{}
2021

2122
list *list.List
2223
run bool
@@ -29,15 +30,15 @@ type (
2930
workers []*Worker
3031
workersTypes []string
3132
running uint32
32-
closed chan struct{}
3333
}
3434
)
3535

3636
// newMemQueue creates and a new in-memory queue.
3737
func newMemQueue(workerType string) *memQueue {
3838
return &memQueue{
39-
list: list.New(),
40-
Jobs: make(chan *Job),
39+
list: list.New(),
40+
Jobs: make(chan *Job),
41+
closed: make(chan struct{}),
4142
}
4243
}
4344

@@ -57,17 +58,31 @@ func (q *memQueue) send() {
5758
for {
5859
q.jmu.Lock()
5960
e := q.list.Front()
60-
if e == nil {
61+
if e == nil || !q.run {
6162
q.run = false
6263
q.jmu.Unlock()
6364
return
6465
}
6566
q.list.Remove(e)
6667
q.jmu.Unlock()
67-
q.Jobs <- e.Value.(*Job)
68+
select {
69+
case <-q.closed:
70+
return
71+
case q.Jobs <- e.Value.(*Job):
72+
}
6873
}
6974
}
7075

76+
func (q *memQueue) close() {
77+
q.jmu.Lock()
78+
defer q.jmu.Unlock()
79+
if !q.run {
80+
return
81+
}
82+
q.run = false
83+
go func() { q.closed <- struct{}{} }()
84+
}
85+
7186
// Len returns the length of the queue
7287
func (q *memQueue) Len() int {
7388
q.jmu.RLock()
@@ -82,7 +97,6 @@ func (q *memQueue) Len() int {
8297
func NewMemBroker() Broker {
8398
return &memBroker{
8499
queues: make(map[string]*memQueue),
85-
closed: make(chan struct{}),
86100
}
87101
}
88102

@@ -126,7 +140,13 @@ func (b *memBroker) ShutdownWorkers(ctx context.Context) error {
126140
if len(b.workers) == 0 {
127141
return nil
128142
}
143+
129144
fmt.Print(" shutting down in-memory broker...")
145+
146+
for _, q := range b.queues {
147+
q.close()
148+
}
149+
130150
errs := make(chan error)
131151
for _, w := range b.workers {
132152
go func(w *Worker) { errs <- w.Shutdown(ctx) }(w)
@@ -137,6 +157,7 @@ func (b *memBroker) ShutdownWorkers(ctx context.Context) error {
137157
errm = multierror.Append(errm, err)
138158
}
139159
}
160+
140161
if errm != nil {
141162
fmt.Println("failed:", errm)
142163
} else {

0 commit comments

Comments
 (0)