File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -17,16 +17,22 @@ const (
1717// this handles bursts of expiring keys efficiently.
1818// The cleanup interval is derived from ServerConfig.Hz and is re-read on
1919// each tick, so CONFIG SET hz takes effect without a restart.
20- func StartActiveExpiry () {
20+ // The goroutine exits when stop is closed.
21+ func StartActiveExpiry (stop <- chan struct {}) {
2122 go func () {
2223 for {
2324 // A fresh ticker is created on every iteration so that changes to hz
2425 // via CONFIG SET take effect on the next cycle without a restart.
2526 // Reusing a single ticker would lock in the interval set at startup.
2627 interval := ServerConfig .CleanupInterval ()
2728 ticker := time .NewTicker (interval )
28- <- ticker .C
29- ticker .Stop ()
29+ select {
30+ case <- stop :
31+ ticker .Stop ()
32+ return
33+ case <- ticker .C :
34+ ticker .Stop ()
35+ }
3036
3137 if ! ServerConfig .ActiveExpireEnabled {
3238 continue
Original file line number Diff line number Diff line change 77 "fmt"
88 "net"
99 "os"
10+ "os/signal"
11+ "sync"
12+ "syscall"
1013)
1114
1215func main () {
@@ -22,14 +25,36 @@ func main() {
2225 fmt .Printf ("Failed to bind to port %d\n " , * port )
2326 os .Exit (1 )
2427 }
25- defer l .Close ()
26- db .StartActiveExpiry ()
28+
29+ stop := make (chan struct {})
30+ db .StartActiveExpiry (stop )
31+
32+ quit := make (chan os.Signal , 1 )
33+ signal .Notify (quit , syscall .SIGINT , syscall .SIGTERM )
34+ go func () {
35+ <- quit
36+ fmt .Println ("Shutting down..." )
37+ close (stop )
38+ l .Close ()
39+ }()
40+
41+ var wg sync.WaitGroup
2742 for {
2843 conn , err := l .Accept ()
2944 if err != nil {
30- fmt .Println ("Error accepting connection: " , err .Error ())
31- continue
45+ // l.Close() during shutdown causes Accept to return an error.
46+ // Check if we're shutting down to avoid a spurious log line.
47+ select {
48+ case <- stop :
49+ default :
50+ fmt .Println ("Error accepting connection: " , err .Error ())
51+ }
52+ break
3253 }
33- go resp .HandleConn (conn )
54+ wg .Add (1 )
55+ go resp .HandleConn (conn , & wg )
3456 }
57+
58+ wg .Wait ()
59+ fmt .Println ("Shutdown complete." )
3560}
Original file line number Diff line number Diff line change @@ -4,12 +4,18 @@ import (
44 "bufio"
55 "io"
66 "net"
7+ "sync"
78)
89
910// HandleConn reads RESP commands from the connection and writes replies.
1011// It uses a bufio.Reader to read one complete RESP message at a time,
1112// so commands split across multiple TCP packets are handled correctly.
12- func HandleConn (conn net.Conn ) {
13+ // wg must be held by the caller before invoking HandleConn; it is released
14+ // when the connection is fully done so the server can wait for all
15+ // in-flight connections during graceful shutdown.
16+ func HandleConn (conn net.Conn , wg * sync.WaitGroup ) {
17+ defer wg .Done ()
18+ defer conn .Close ()
1319 reader := bufio .NewReader (conn )
1420 for {
1521 msg , err := readMessage (reader )
You can’t perform that action at this time.
0 commit comments