66 "fmt"
77 "io"
88 "net"
9+ "strconv"
910 "strings"
1011 "sync"
1112 "time"
@@ -14,21 +15,21 @@ import (
1415)
1516
1617const (
17- FLWCommand = "srvr "
18+ FLWCommand = "mntr "
1819
1920 ModeLeader = "leader"
2021 ModeFollower = "follower"
2122 ModeStandalone = "standalone"
2223)
2324
2425var (
25- ClusterModes = []string {ModeLeader , ModeFollower }
26- StandaloneModes = []string {ModeStandalone }
26+ ClusterModes = []string {ModeLeader , ModeFollower }
2727)
2828
29- // ServerState holds parsed fields of the "srvr" command response.
30- type ServerState struct {
31- Mode string
29+ // ServerStatus holds parsed fields of the "mntr" command response.
30+ type ServerStatus struct {
31+ ServerState string
32+ Followers int
3233}
3334
3435type Connections map [string ]net.Conn
@@ -59,54 +60,68 @@ func getConnections(ctx context.Context, log util.Logger, hostnamesByID map[stri
5960 return result , connErrs
6061}
6162
62- func queryPod (ctx context.Context , log util.Logger , conn net.Conn ) (ServerState , error ) {
63+ func queryPod (ctx context.Context , log util.Logger , conn net.Conn ) (ServerStatus , error ) {
6364 log .Debug (fmt .Sprintf ("querying keeper pod: %s" , conn .RemoteAddr ().String ()))
6465 if dl , ok := ctx .Deadline (); ok {
6566 if err := conn .SetDeadline (dl ); err != nil {
66- return ServerState {}, fmt .Errorf ("set deadline: %w" , err )
67+ return ServerStatus {}, fmt .Errorf ("set deadline: %w" , err )
6768 }
6869 }
6970
7071 n , err := io .WriteString (conn , FLWCommand )
7172 if err != nil {
72- return ServerState {}, fmt .Errorf ("write command: %w" , err )
73+ return ServerStatus {}, fmt .Errorf ("write command: %w" , err )
7374 }
7475 if n != len (FLWCommand ) {
75- return ServerState {}, fmt .Errorf ("can't write the whole string to socket expected: %d; actual: %d" , len (FLWCommand ), n )
76+ return ServerStatus {}, fmt .Errorf ("can't write the whole string to socket expected: %d; actual: %d" , len (FLWCommand ), n )
7677 }
7778
7879 reader := bufio .NewReader (conn )
7980 data , err := io .ReadAll (reader )
8081 if err != nil {
81- return ServerState {}, fmt .Errorf ("got error while reading from socket: %w" , err )
82+ return ServerStatus {}, fmt .Errorf ("got error while reading from socket: %w" , err )
8283 }
8384
8485 statMap := map [string ]string {}
8586 for i , stat := range strings .Split (string (data ), "\n " ) {
8687 if len (stat ) == 0 {
8788 continue
8889 }
89- parts := strings .Split (stat , ": " )
90+ parts := strings .Split (stat , "\t " )
9091 if len (parts ) != 2 {
91- return ServerState {}, fmt .Errorf ("failed to parse response line %d: %q" , i , stat )
92+ return ServerStatus {}, fmt .Errorf ("failed to parse response line %d: %q" , i , stat )
9293 }
9394
9495 statMap [parts [0 ]] = parts [1 ]
9596 }
9697
97- result := ServerState {Mode : statMap ["Mode" ]}
98- if result .Mode == "" {
99- return ServerState {}, fmt .Errorf ("response missing required field 'Mode': %q" , string (data ))
98+ result := ServerStatus {
99+ ServerState : statMap ["zk_server_state" ],
100+ }
101+ if result .ServerState == "" {
102+ return ServerStatus {}, fmt .Errorf ("response missing required field 'Mode': %q" , string (data ))
103+ }
104+
105+ if result .ServerState == ModeLeader {
106+ if followers , ok := statMap ["zk_followers" ]; ok {
107+ result .Followers , err = strconv .Atoi (followers )
108+ if err != nil {
109+ return ServerStatus {}, fmt .Errorf ("failed to parse field 'zk_followers': %w" , err )
110+ }
111+ } else {
112+ log .Warn ("'zk_followers' is missing in keeper response" )
113+ return ServerStatus {}, fmt .Errorf ("response missing required field 'Followers': %q" , string (data ))
114+ }
100115 }
101116
102117 return result , nil
103118}
104119
105- func queryAllPods (ctx context.Context , log util.Logger , connections Connections ) map [string ]ServerState {
120+ func queryAllPods (ctx context.Context , log util.Logger , connections Connections ) map [string ]ServerStatus {
106121 // Contains the hostName along with parsed response from it
107122 type NamedFourLetterCommandResponse struct {
108123 id string
109- state ServerState
124+ state ServerStatus
110125 }
111126
112127 // Buffered channels will block only if buffer is full
@@ -119,7 +134,7 @@ func queryAllPods(ctx context.Context, log util.Logger, connections Connections)
119134 wg .Add (1 )
120135 go func () {
121136 defer wg .Done ()
122- resp , err := queryPod (ctx , log , conn )
137+ resp , err := queryPod (ctx , log . With ( "replica_id" , id ) , conn )
123138 if err != nil {
124139 fails <- err
125140 } else {
@@ -128,7 +143,7 @@ func queryAllPods(ctx context.Context, log util.Logger, connections Connections)
128143 }()
129144 }
130145
131- result := map [string ]ServerState {}
146+ result := map [string ]ServerStatus {}
132147 errs := []error {}
133148
134149 // Wait for all goroutines to return error or result
@@ -152,7 +167,7 @@ func queryAllPods(ctx context.Context, log util.Logger, connections Connections)
152167 return result
153168}
154169
155- func getServersStates (ctx context.Context , log util.Logger , hostnamesByID map [string ]string ) map [string ]ServerState {
170+ func getServersStates (ctx context.Context , log util.Logger , hostnamesByID map [string ]string ) map [string ]ServerStatus {
156171 connections , errs := getConnections (ctx , log , hostnamesByID )
157172 for _ , err := range errs {
158173 log .Info ("error getting keeper connection" , "error" , err )
0 commit comments