Skip to content

Commit 836d235

Browse files
committed
rpcsrv: wait WS readers/writers to finish on server shutdown
Close #4248. Ref. #4248 (comment). Signed-off-by: Anna Shaleva <shaleva.ann@nspcc.ru>
1 parent a827b71 commit 836d235

1 file changed

Lines changed: 12 additions & 2 deletions

File tree

pkg/services/rpcsrv/server.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,9 @@ type (
164164
notaryRequestSubs int
165165
mempoolEventSubs int
166166

167+
wsReaders sync.WaitGroup
168+
wsWriters sync.WaitGroup
169+
167170
blockCh chan *block.Block
168171
blockHeaderCh chan *block.Header
169172
executionCh chan *state.AppExecResult
@@ -493,6 +496,11 @@ func (s *Server) Shutdown() {
493496

494497
// Wait for handleSubEvents to finish.
495498
<-s.subEventsToExitCh
499+
500+
// Wait for WS reader/writer routines to finish.
501+
s.wsReaders.Wait()
502+
s.wsWriters.Wait()
503+
496504
_ = s.log.Sync()
497505
}
498506

@@ -537,7 +545,7 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ
537545
s.subsLock.Unlock()
538546
updateWSConnectionsCnt(numOfSubs)
539547
s.log.Info("websocket client connected", zap.String("remoteAddr", remote), zap.Int("numOfSubs", numOfSubs))
540-
go s.handleWsWrites(ws, resChan, subChan)
548+
s.wsWriters.Go(func() { s.handleWsWrites(ws, resChan, subChan) })
541549
s.handleWsReads(ws, resChan, subscr, remote)
542550
return
543551
}
@@ -579,7 +587,7 @@ func (s *Server) RegisterLocal(ctx context.Context, events chan<- neorpc.Notific
579587
s.subsLock.Unlock()
580588
updateWSConnectionsCnt(numOfSubs)
581589
s.log.Info("local websocket client connected", zap.Int("numOfSubs", numOfSubs))
582-
go s.handleLocalNotifications(ctx, events, subChan, subscr)
590+
s.wsWriters.Go(func() { s.handleLocalNotifications(ctx, events, subChan, subscr) })
583591
return func(req *neorpc.Request) (*neorpc.Response, error) {
584592
return s.handleInternal(req, subscr)
585593
}
@@ -750,6 +758,8 @@ drainloop:
750758
}
751759

752760
func (s *Server) handleWsReads(ws *websocket.Conn, resChan chan<- abstractResult, subscr *subscriber, remoteAddr string) {
761+
s.wsReaders.Add(1)
762+
defer s.wsReaders.Done()
753763
ws.SetReadLimit(s.wsReadLimit)
754764
err := ws.SetReadDeadline(time.Now().Add(wsPongLimit))
755765
ws.SetPongHandler(func(string) error { return ws.SetReadDeadline(time.Now().Add(wsPongLimit)) })

0 commit comments

Comments
 (0)