Skip to content

Commit 7ad1a22

Browse files
authored
Merge pull request #53 from DrmagicE/bugfix
fix: memory leak with server.willMessage
2 parents b9f60f0 + 0b8e3af commit 7ad1a22

File tree

1 file changed

+30
-7
lines changed

1 file changed

+30
-7
lines changed

server/server.go

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,11 @@ func (srv *server) registerClient(connect *packets.Connect, connackPpt *packets.
364364
if err == nil {
365365
client.session = sess
366366
if sessionResume {
367+
// If a new Network Connection to this Session is made before the Will Delay Interval has passed,
368+
// the Server MUST NOT send the Will Message [MQTT-3.1.3-9].
369+
if w, ok := srv.willMessage[client.opts.ClientID]; ok {
370+
w.signal(false)
371+
}
367372
if srv.hooks.OnSessionResumed != nil {
368373
srv.hooks.OnSessionResumed(context.Background(), client)
369374
}
@@ -409,6 +414,10 @@ func (srv *server) registerClient(connect *packets.Connect, connackPpt *packets.
409414
err = fmt.Errorf("session terminated fail: %w", err)
410415
zaplog.Error("session terminated fail", zap.Error(err))
411416
}
417+
// Send will message because the previous session is ended.
418+
if w, ok := srv.willMessage[client.opts.ClientID]; ok {
419+
w.signal(true)
420+
}
412421
} else {
413422
qs = srv.queueStore[client.opts.ClientID]
414423
if qs != nil {
@@ -476,8 +485,17 @@ func (srv *server) registerClient(connect *packets.Connect, connackPpt *packets.
476485
}
477486

478487
type willMsg struct {
479-
msg *gmqtt.Message
480-
cancel chan struct{}
488+
msg *gmqtt.Message
489+
// If true, send the msg.
490+
// If false, discard the msg.
491+
send chan bool
492+
}
493+
494+
func (w *willMsg) signal(send bool) {
495+
select {
496+
case w.send<-send:
497+
default:
498+
}
481499
}
482500

483501
func (srv *server) unregisterClient(client *client) {
@@ -506,20 +524,25 @@ func (srv *server) unregisterClient(client *client) {
506524
msg := sess.Will.Copy()
507525
if willDelayInterval != 0 && storeSession {
508526
wm := &willMsg{
509-
msg: msg,
510-
cancel: make(chan struct{}),
527+
msg: msg,
528+
send: make(chan bool,1),
511529
}
512530
srv.willMessage[client.opts.ClientID] = wm
513531
t := time.NewTimer(time.Duration(willDelayInterval) * time.Second)
514532
go func(clientID string) {
533+
var send bool
515534
select {
516-
case <-wm.cancel:
535+
case send = <-wm.send:
517536
t.Stop()
518537
case <-t.C:
519-
srv.mu.Lock()
538+
send = true
539+
}
540+
srv.mu.Lock()
541+
defer srv.mu.Unlock()
542+
if send {
520543
srv.deliverMessageHandler(clientID, msg)
521-
srv.mu.Unlock()
522544
}
545+
delete(srv.willMessage,clientID)
523546
}(client.opts.ClientID)
524547
} else {
525548
srv.deliverMessageHandler(client.opts.ClientID, msg)

0 commit comments

Comments
 (0)