Skip to content

Commit 6c332ac

Browse files
committed
{Voice,}Gateway: Fixed various race conditions
This commit fixes race conditions in both package voice, package voicegateway and package gateway. Originally, several race conditions exist when both the user's and the pacemaker's goroutines both want to do several things to the websocket connection. For example, the user's goroutine could be writing, and the pacemaker's goroutine could trigger a reconnection. This is racey. This issue is partially fixed by removing the pacer loop from package heart and combining the ticker into the event (pacemaker) loop itself. Technically, a race condition could still be triggered with care, but the API itself never guaranteed any of those. As events are handled using an internal loop into a channel, a race condition will not be triggered just by handling events and writing to the websocket.
1 parent 91ee92e commit 6c332ac

File tree

11 files changed

+375
-287
lines changed

11 files changed

+375
-287
lines changed

gateway/gateway.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -161,40 +161,38 @@ func (g *Gateway) AddIntent(i Intents) {
161161
}
162162

163163
// Close closes the underlying Websocket connection.
164-
func (g *Gateway) Close() error {
164+
func (g *Gateway) Close() (err error) {
165165
wsutil.WSDebug("Trying to close.")
166166

167167
// Check if the WS is already closed:
168-
if g.waitGroup == nil && g.PacerLoop.Stopped() {
168+
if g.PacerLoop.Stopped() {
169169
wsutil.WSDebug("Gateway is already closed.")
170-
171-
g.AfterClose(nil)
172-
return nil
170+
return err
173171
}
174172

173+
// Trigger the close callback on exit.
174+
defer func() { g.AfterClose(err) }()
175+
175176
// If the pacemaker is running:
176177
if !g.PacerLoop.Stopped() {
177178
wsutil.WSDebug("Stopping pacemaker...")
178179

179-
// Stop the pacemaker and the event handler
180+
// Stop the pacemaker and the event handler.
180181
g.PacerLoop.Stop()
181182

182183
wsutil.WSDebug("Stopped pacemaker.")
183184
}
184185

186+
wsutil.WSDebug("Closing the websocket...")
187+
err = g.WS.Close()
188+
185189
wsutil.WSDebug("Waiting for WaitGroup to be done.")
186190

187191
// This should work, since Pacemaker should signal its loop to stop, which
188192
// would also exit our event loop. Both would be 2.
189193
g.waitGroup.Wait()
190194

191-
// Mark g.waitGroup as empty:
192-
g.waitGroup = nil
193-
194195
wsutil.WSDebug("WaitGroup is done. Closing the websocket.")
195-
196-
err := g.WS.Close()
197-
g.AfterClose(err)
198196
return err
199197
}
200198

gateway/op.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,17 @@ func (g *Gateway) HandleOP(op *wsutil.OP) error {
4646
defer cancel()
4747

4848
// Server requesting a heartbeat.
49-
return g.PacerLoop.Pace(ctx)
49+
if err := g.PacerLoop.Pace(ctx); err != nil {
50+
return wsutil.ErrBrokenConnection(errors.Wrap(err, "failed to pace"))
51+
}
5052

5153
case ReconnectOP:
5254
// Server requests to reconnect, die and retry.
5355
wsutil.WSDebug("ReconnectOP received.")
5456

5557
// Exit with the ReconnectOP error to force the heartbeat event loop to
5658
// reconnect synchronously. Not really a fatal error.
57-
return ErrReconnectRequest
59+
return wsutil.ErrBrokenConnection(ErrReconnectRequest)
5860

5961
case InvalidSessionOP:
6062
// Discord expects us to sleep for no reason
@@ -66,7 +68,7 @@ func (g *Gateway) HandleOP(op *wsutil.OP) error {
6668
// Invalid session, try and Identify.
6769
if err := g.IdentifyCtx(ctx); err != nil {
6870
// Can't identify, reconnect.
69-
go g.Reconnect()
71+
return wsutil.ErrBrokenConnection(ErrReconnectRequest)
7072
}
7173

7274
return nil

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
1616
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
1717
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
1818
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
19+
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e h1:EHBhcS0mlXEAVwNyO2dLfjToGsyY4j24pTs2ScHnX7s=

internal/heart/heart.go

Lines changed: 82 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package heart
33

44
import (
55
"context"
6-
"sync"
76
"sync/atomic"
87
"time"
98

@@ -36,23 +35,30 @@ type Pacemaker struct {
3635
// Heartrate is the received duration between heartbeats.
3736
Heartrate time.Duration
3837

38+
ticker time.Ticker
39+
Ticks <-chan time.Time
40+
3941
// Time in nanoseconds, guarded by atomic read/writes.
4042
SentBeat AtomicTime
4143
EchoBeat AtomicTime
4244

4345
// Any callback that returns an error will stop the pacer.
44-
Pace func(context.Context) error
45-
46-
stop chan struct{}
47-
once sync.Once
48-
death chan error
46+
Pacer func(context.Context) error
4947
}
5048

51-
func NewPacemaker(heartrate time.Duration, pacer func(context.Context) error) *Pacemaker {
52-
return &Pacemaker{
49+
func NewPacemaker(heartrate time.Duration, pacer func(context.Context) error) Pacemaker {
50+
p := Pacemaker{
5351
Heartrate: heartrate,
54-
Pace: pacer,
52+
Pacer: pacer,
53+
ticker: *time.NewTicker(heartrate),
5554
}
55+
p.Ticks = p.ticker.C
56+
// Reset states to its old position.
57+
now := time.Now()
58+
p.EchoBeat.Set(now)
59+
p.SentBeat.Set(now)
60+
61+
return p
5662
}
5763

5864
func (p *Pacemaker) Echo() {
@@ -62,14 +68,6 @@ func (p *Pacemaker) Echo() {
6268

6369
// Dead, if true, will have Pace return an ErrDead.
6470
func (p *Pacemaker) Dead() bool {
65-
/* Deprecated
66-
if p.LastBeat[0].IsZero() || p.LastBeat[1].IsZero() {
67-
return false
68-
}
69-
70-
return p.LastBeat[0].Sub(p.LastBeat[1]) > p.Heartrate*2
71-
*/
72-
7371
var (
7472
echo = p.EchoBeat.Get()
7573
sent = p.SentBeat.Get()
@@ -84,75 +82,84 @@ func (p *Pacemaker) Dead() bool {
8482

8583
// Stop stops the pacemaker, or it does nothing if the pacemaker is not started.
8684
func (p *Pacemaker) Stop() {
87-
Debug("(*Pacemaker).stop is trying sync.Once.")
88-
89-
p.once.Do(func() {
90-
Debug("(*Pacemaker).stop closed the channel.")
91-
close(p.stop)
92-
})
85+
p.ticker.Stop()
9386
}
9487

9588
// pace sends a heartbeat with the appropriate timeout for the context.
96-
func (p *Pacemaker) pace() error {
89+
func (p *Pacemaker) Pace() error {
9790
ctx, cancel := context.WithTimeout(context.Background(), p.Heartrate)
9891
defer cancel()
9992

100-
return p.Pace(ctx)
93+
return p.PaceCtx(ctx)
10194
}
10295

103-
func (p *Pacemaker) start() error {
104-
// Reset states to its old position.
105-
p.EchoBeat.Set(time.Time{})
106-
p.SentBeat.Set(time.Time{})
107-
108-
// Create a new ticker.
109-
tick := time.NewTicker(p.Heartrate)
110-
defer tick.Stop()
111-
112-
// Echo at least once
113-
p.Echo()
114-
115-
for {
116-
if err := p.pace(); err != nil {
117-
return errors.Wrap(err, "failed to pace")
118-
}
119-
120-
// Paced, save:
121-
p.SentBeat.Set(time.Now())
122-
123-
if p.Dead() {
124-
return ErrDead
125-
}
126-
127-
select {
128-
case <-p.stop:
129-
return nil
130-
131-
case <-tick.C:
132-
}
96+
func (p *Pacemaker) PaceCtx(ctx context.Context) error {
97+
if err := p.Pacer(ctx); err != nil {
98+
return err
13399
}
134-
}
135100

136-
// StartAsync starts the pacemaker asynchronously. The WaitGroup is optional.
137-
func (p *Pacemaker) StartAsync(wg *sync.WaitGroup) (death chan error) {
138-
p.death = make(chan error)
139-
p.stop = make(chan struct{})
140-
p.once = sync.Once{}
101+
p.SentBeat.Set(time.Now())
141102

142-
if wg != nil {
143-
wg.Add(1)
103+
if p.Dead() {
104+
return ErrDead
144105
}
145106

146-
go func() {
147-
p.death <- p.start()
148-
// Debug.
149-
Debug("Pacemaker returned.")
150-
151-
// Mark the pacemaker loop as done.
152-
if wg != nil {
153-
wg.Done()
154-
}
155-
}()
156-
157-
return p.death
107+
return nil
158108
}
109+
110+
// func (p *Pacemaker) start() error {
111+
// // Reset states to its old position.
112+
// p.EchoBeat.Set(time.Time{})
113+
// p.SentBeat.Set(time.Time{})
114+
115+
// // Create a new ticker.
116+
// tick := time.NewTicker(p.Heartrate)
117+
// defer tick.Stop()
118+
119+
// // Echo at least once
120+
// p.Echo()
121+
122+
// for {
123+
// if err := p.pace(); err != nil {
124+
// return errors.Wrap(err, "failed to pace")
125+
// }
126+
127+
// // Paced, save:
128+
// p.SentBeat.Set(time.Now())
129+
130+
// if p.Dead() {
131+
// return ErrDead
132+
// }
133+
134+
// select {
135+
// case <-p.stop:
136+
// return nil
137+
138+
// case <-tick.C:
139+
// }
140+
// }
141+
// }
142+
143+
// // StartAsync starts the pacemaker asynchronously. The WaitGroup is optional.
144+
// func (p *Pacemaker) StartAsync(wg *sync.WaitGroup) (death chan error) {
145+
// p.death = make(chan error)
146+
// p.stop = make(chan struct{})
147+
// p.once = sync.Once{}
148+
149+
// if wg != nil {
150+
// wg.Add(1)
151+
// }
152+
153+
// go func() {
154+
// p.death <- p.start()
155+
// // Debug.
156+
// Debug("Pacemaker returned.")
157+
158+
// // Mark the pacemaker loop as done.
159+
// if wg != nil {
160+
// wg.Done()
161+
// }
162+
// }()
163+
164+
// return p.death
165+
// }

0 commit comments

Comments
 (0)