1717 memQueue struct {
1818 MaxCapacity int
1919 Jobs chan * Job
20+ closed chan struct {}
2021
2122 list * list.List
2223 run bool
@@ -29,15 +30,15 @@ type (
2930 workers []* Worker
3031 workersTypes []string
3132 running uint32
32- closed chan struct {}
3333 }
3434)
3535
3636// newMemQueue creates and a new in-memory queue.
3737func newMemQueue (workerType string ) * memQueue {
3838 return & memQueue {
39- list : list .New (),
40- Jobs : make (chan * Job ),
39+ list : list .New (),
40+ Jobs : make (chan * Job ),
41+ closed : make (chan struct {}),
4142 }
4243}
4344
@@ -57,17 +58,31 @@ func (q *memQueue) send() {
5758 for {
5859 q .jmu .Lock ()
5960 e := q .list .Front ()
60- if e == nil {
61+ if e == nil || ! q . run {
6162 q .run = false
6263 q .jmu .Unlock ()
6364 return
6465 }
6566 q .list .Remove (e )
6667 q .jmu .Unlock ()
67- q .Jobs <- e .Value .(* Job )
68+ select {
69+ case <- q .closed :
70+ return
71+ case q .Jobs <- e .Value .(* Job ):
72+ }
6873 }
6974}
7075
76+ func (q * memQueue ) close () {
77+ q .jmu .Lock ()
78+ defer q .jmu .Unlock ()
79+ if ! q .run {
80+ return
81+ }
82+ q .run = false
83+ go func () { q .closed <- struct {}{} }()
84+ }
85+
7186// Len returns the length of the queue
7287func (q * memQueue ) Len () int {
7388 q .jmu .RLock ()
@@ -82,7 +97,6 @@ func (q *memQueue) Len() int {
8297func NewMemBroker () Broker {
8398 return & memBroker {
8499 queues : make (map [string ]* memQueue ),
85- closed : make (chan struct {}),
86100 }
87101}
88102
@@ -126,7 +140,13 @@ func (b *memBroker) ShutdownWorkers(ctx context.Context) error {
126140 if len (b .workers ) == 0 {
127141 return nil
128142 }
143+
129144 fmt .Print (" shutting down in-memory broker..." )
145+
146+ for _ , q := range b .queues {
147+ q .close ()
148+ }
149+
130150 errs := make (chan error )
131151 for _ , w := range b .workers {
132152 go func (w * Worker ) { errs <- w .Shutdown (ctx ) }(w )
@@ -137,6 +157,7 @@ func (b *memBroker) ShutdownWorkers(ctx context.Context) error {
137157 errm = multierror .Append (errm , err )
138158 }
139159 }
160+
140161 if errm != nil {
141162 fmt .Println ("failed:" , errm )
142163 } else {
0 commit comments