diff --git a/internal/server/ironhawk/iothread.go b/internal/server/ironhawk/iothread.go index f130821e2..1dd38e7d0 100644 --- a/internal/server/ironhawk/iothread.go +++ b/internal/server/ironhawk/iothread.go @@ -106,9 +106,8 @@ func (t *IOThread) Start(ctx context.Context, shardManager *shardmanager.ShardMa // Also, CLientID is duplicated in command and io-thread. // Also, we shouldn't allow execution/registration incase of invalid commands // like for B.WATCH cmd since it'll err out we shall return and not create subscription - if err == nil { - t.ClientID = _c.ClientID - } + // No error handling after this as we have continued loop above if error found + t.ClientID = _c.ClientID if _c.Meta.IsWatchable { _cWatch := _c @@ -116,7 +115,7 @@ func (t *IOThread) Start(ctx context.Context, shardManager *shardmanager.ShardMa res.Rs.Fingerprint64 = _cWatch.Fingerprint() } - if c.Cmd == "HANDSHAKE" && err == nil { + if c.Cmd == "HANDSHAKE" { t.ClientID = _c.C.Args[0] t.Mode = _c.C.Args[1] } @@ -141,9 +140,7 @@ func (t *IOThread) Start(ctx context.Context, shardManager *shardmanager.ShardMa // TODO: Streamline this because we need ordering of updates // that are being sent to watchers. - if err == nil { - watchManager.NotifyWatchers(_c, shardManager, t) - } + watchManager.NotifyWatchers(_c, shardManager, t) } } diff --git a/internal/server/ironhawk/main.go b/internal/server/ironhawk/main.go index a2df3ef6f..2419fe819 100644 --- a/internal/server/ironhawk/main.go +++ b/internal/server/ironhawk/main.go @@ -170,6 +170,13 @@ func (s *Server) startIOThread(ctx context.Context, wg *sync.WaitGroup, thread * slog.Any("error", err)) } } + if err := thread.Stop(); err != nil { + slog.Debug("failed to stop io-thread", + slog.String("client_id", thread.ClientID), + slog.String("mode", thread.Mode), + slog.Any("error", err), + ) + } } func (s *Server) Shutdown() { diff --git a/internal/server/ironhawk/watch_manager.go b/internal/server/ironhawk/watch_manager.go index 9a89840a8..d83099181 100644 --- a/internal/server/ironhawk/watch_manager.go +++ b/internal/server/ironhawk/watch_manager.go @@ -125,6 +125,9 @@ func (w *WatchManager) CleanupThreadWatchSubscriptions(t *IOThread) { delete(w.fpClientMap[fp], t.ClientID) if len(w.fpClientMap[fp]) == 0 { delete(w.fpClientMap, fp) + + // If we have deleted the fingerprint, delete the command from the map + delete(w.fpCmdMap, fp) } } }