Skip to content

Commit 884d60d

Browse files
committed
Fix potential goroutine leak.
Race conditions in cleanup goroutines could cause them to fail. As they are created every time a queue becomes non-empty, a single queue being worked can cause multiple goroutines to spawn for cleanup, and they could all get quite unlucky and never be able to perform their cleanup (particularly if a task gets left in that queue for a while after triggering multiple cleanup routines). Taken across a large number of queues, this can be an issue and leak memory unnecessarily. The implementation now recognizes that the presence of the "dependents" semaphore makes synchronous cleanup a safe procedure, simplifying logic and interactions between goroutines.
1 parent 0f27f5b commit 884d60d

3 files changed

Lines changed: 47 additions & 47 deletions

File tree

backend/eqmem/eqmem.go

Lines changed: 31 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -830,59 +830,34 @@ func (m *EQMem) locksForQueues(qs []string) []*qLock {
830830
return locks
831831
}
832832

833-
// Get a single lock for a queue, creating it if it doesn't exist. If newly
834-
// created, a cleanup goroutine is also launched in the background to handle
835-
// empty queues. Dependents are incremented here.
836-
func (m *EQMem) lockForQueueUnsafe(q string) *qLock {
837-
ql := m.locksSuperUnsafe[q]
838-
ts := m.queues[q]
839-
840-
if (ts == nil) != (ql == nil) {
833+
// Get a single lock for a queue, creating it if it doesn't exist. Dependents
834+
// are incremented here.
835+
func (m *EQMem) lockForQueueUnsafe(q string) (ql *qLock) {
836+
// Always increment dependents, whether we exit early from finding a lock,
837+
// or late from creating a new queue.
838+
defer func() {
839+
ql.dependents++
840+
}()
841+
842+
ql = m.locksSuperUnsafe[q]
843+
844+
if ts := m.queues[q]; (ts == nil) != (ql == nil) {
841845
log.Fatalf("Queue tasks and lock structures out of step for queue %q: ts=%v, ql=%v", q, ts, ql)
842846
}
843847

844848
if ql != nil {
845-
ql.dependents++
846849
return ql
847850
}
848851

849-
if ts == nil || ql == nil {
850-
ts = newTaskQueue(q)
851-
m.queues[q] = ts
852-
853-
ql = &qLock{
854-
queue: q,
855-
heap: newClaimHeap(),
856-
tasks: ts,
857-
dependents: 1, // someone asked for it, mark it up.
858-
}
859-
m.locksSuperUnsafe[q] = ql
860-
861-
// Since we're creating a new queue lock, we create its goroutine to
862-
// clean it up when its dependents have gone to zero. We use only the global lock
863-
// here to avoid nested locks (global and local). This means that the
864-
// decrement must also be done while holding *only* the global lock.
865-
go func() {
866-
for {
867-
time.Sleep(5 * time.Second)
868-
done := func() bool {
869-
defer un(lock(m))
870-
// Empty queue and nobody leaning on the lock, delete everything.
871-
// Then indicate that we're finished so the goroutine can
872-
// exit for this queue.
873-
if m.queues[q].Len() == 0 && m.locksSuperUnsafe[q].dependents == 0 {
874-
delete(m.queues, q)
875-
delete(m.locksSuperUnsafe, q)
876-
return true
877-
}
878-
return false
879-
}()
880-
if done {
881-
return
882-
}
883-
}
884-
}()
852+
ts := newTaskQueue(q)
853+
m.queues[q] = ts
854+
855+
ql = &qLock{
856+
queue: q,
857+
heap: newClaimHeap(),
858+
tasks: ts,
885859
}
860+
m.locksSuperUnsafe[q] = ql
886861

887862
return ql
888863
}
@@ -908,10 +883,19 @@ func (m *EQMem) lockQueues(qs []string) ([]*qLock, func()) {
908883
qls[i].Unlock()
909884
}
910885
// Now that we're unlocked, take the global lock again and reduce
911-
// dependents by 1, in reverse order.
886+
// dependents by 1, in reverse order, then try to clean up if
887+
// dependents go to zero anywhere with empty queues. If it fails, it
888+
// simply exits; something else needed the lock to stay alive betwen
889+
// lock acquisitions, so cleanup will occur later.
912890
defer un(lock(m))
913891
for i := len(qls) - 1; i >= 0; i-- {
914-
qls[i].dependents--
892+
ql := qls[i]
893+
ql.dependents--
894+
895+
if ts := m.queues[ql.queue]; ql.dependents == 0 && ts.Len() == 0 {
896+
delete(m.queues, ql.queue)
897+
delete(m.locksSuperUnsafe, ql.queue)
898+
}
915899
}
916900
}
917901
}

backend/eqmem/taskqueue.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ func (s *taskQueue) Update(id uuid.UUID, f func(*entroq.Task) *entroq.Task) erro
6767

6868
// Len returns the current size of this task store.
6969
func (s *taskQueue) Len() int {
70+
if s == nil {
71+
return 0
72+
}
7073
defer un(lock(s))
7174
return s.size
7275
}

qsvc/qtest/qtest.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,19 @@ func SimpleChange(ctx context.Context, t *testing.T, client *entroq.EntroQ, qPre
109109
func SimpleWorker(ctx context.Context, t *testing.T, client *entroq.EntroQ, qPrefix string) {
110110
queue := path.Join(qPrefix, "simple_worker")
111111

112+
attempts := 30
113+
if testing.Short() {
114+
attempts = 5
115+
}
116+
for i := 0; i < attempts; i++ {
117+
q := fmt.Sprintf("%s_%d", queue, i)
118+
simpleWorkerOnce(ctx, t, client, q)
119+
}
120+
}
121+
122+
func simpleWorkerOnce(ctx context.Context, t *testing.T, client *entroq.EntroQ, queue string) {
123+
t.Helper()
124+
112125
const numTasks = 10
113126

114127
showQueue := func() {

0 commit comments

Comments
 (0)