Skip to content

Commit bb647ea

Browse files
authored
Update MGM/QDB topology and version loading (#21)
* Update MGM/QDB topology and version loading * Cache MGM and QDB versions per session
1 parent cbc99e8 commit bb647ea

14 files changed

Lines changed: 775 additions & 70 deletions

eos/client_test.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -864,6 +864,16 @@ func TestParseEOSServerVersion(t *testing.T) {
864864
input: "EOS_INSTANCE=eosdev\n",
865865
want: "",
866866
},
867+
{
868+
name: "dash dash version output",
869+
input: "EOS 5.4.2 (2026)\n\nDeveloped by the CERN IT Storage Group\n",
870+
want: "5.4.2",
871+
},
872+
{
873+
name: "dash dash version older output",
874+
input: "EOS 5.4.0 (2020)\n\nDeveloped by the CERN IT storage group\n",
875+
want: "5.4.0",
876+
},
867877
}
868878
for _, tt := range tests {
869879
t.Run(tt.name, func(t *testing.T) {
@@ -1228,6 +1238,116 @@ func TestParseNamespaceAttrsNoEquals(t *testing.T) {
12281238
}
12291239
}
12301240

1241+
func TestParseMonitoringKeyValues(t *testing.T) {
1242+
input := []byte(`
1243+
uid=all gid=all is_master=true master_id=eospilot-ns-02.cern.ch:1094
1244+
uid=all gid=all ns.mgm.local=eospilot-ns-02.cern.ch:1094
1245+
uid=all gid=all ns.mgm.role=leader
1246+
uid=all gid=all ns.mgm.leader=eospilot-ns-02.cern.ch:1094
1247+
uid=all gid=all ns.mgm.followers=eospilot-ns-ip700.cern.ch:1094,eospilot-ns-01.cern.ch:1094
1248+
uid=all gid=all ns.qdb.leader=eospilot-ns-02.cern.ch:7777
1249+
uid=all gid=all ns.qdb.followers=eospilot-ns-01.cern.ch:7777,eospilot-ns-ip700.cern.ch:7777
1250+
`)
1251+
values := parseMonitoringKeyValues(input)
1252+
1253+
if got := values["ns.mgm.leader"]; got != "eospilot-ns-02.cern.ch:1094" {
1254+
t.Fatalf("expected ns.mgm.leader, got %q", got)
1255+
}
1256+
if got := values["ns.qdb.followers"]; got != "eospilot-ns-01.cern.ch:7777,eospilot-ns-ip700.cern.ch:7777" {
1257+
t.Fatalf("expected ns.qdb.followers, got %q", got)
1258+
}
1259+
if got := values["master_id"]; got != "eospilot-ns-02.cern.ch:1094" {
1260+
t.Fatalf("expected master_id, got %q", got)
1261+
}
1262+
}
1263+
1264+
func TestParseMGMsFromNSStatMonitoring(t *testing.T) {
1265+
input := []byte(`
1266+
uid=all gid=all ns.mgm.local=eospilot-ns-02.cern.ch:1094
1267+
uid=all gid=all ns.mgm.role=leader
1268+
uid=all gid=all ns.mgm.leader=eospilot-ns-02.cern.ch:1094
1269+
uid=all gid=all ns.mgm.followers=eospilot-ns-ip700.cern.ch:1094,eospilot-ns-01.cern.ch:1094
1270+
uid=all gid=all ns.qdb.leader=eospilot-ns-02.cern.ch:7777
1271+
uid=all gid=all ns.qdb.followers=eospilot-ns-01.cern.ch:7777,eospilot-ns-ip700.cern.ch:7777
1272+
`)
1273+
1274+
mgms, ok := parseMGMsFromNSStatMonitoring(input)
1275+
if !ok {
1276+
t.Fatal("expected structured ns stat monitoring output to parse")
1277+
}
1278+
if len(mgms) != 3 {
1279+
t.Fatalf("expected 3 combined records, got %d", len(mgms))
1280+
}
1281+
1282+
if mgms[0].Host != "eospilot-ns-02.cern.ch" || mgms[0].Port != 1094 || mgms[0].Role != "leader" || mgms[0].Status != "online" {
1283+
t.Fatalf("unexpected MGM leader record: %+v", mgms[0])
1284+
}
1285+
if mgms[0].QDBHost != "eospilot-ns-02.cern.ch" || mgms[0].QDBPort != 7777 || mgms[0].QDBRole != "leader" || mgms[0].QDBStatus != "online" {
1286+
t.Fatalf("unexpected QDB leader record: %+v", mgms[0])
1287+
}
1288+
if mgms[1].Host != "eospilot-ns-ip700.cern.ch" || mgms[1].Role != "follower" {
1289+
t.Fatalf("unexpected first follower record: %+v", mgms[1])
1290+
}
1291+
if mgms[1].QDBHost != "eospilot-ns-01.cern.ch" || mgms[1].QDBRole != "follower" {
1292+
t.Fatalf("unexpected first QDB follower record: %+v", mgms[1])
1293+
}
1294+
if mgms[2].Host != "eospilot-ns-01.cern.ch" || mgms[2].QDBHost != "eospilot-ns-ip700.cern.ch" {
1295+
t.Fatalf("unexpected combined record ordering: %+v", mgms[2])
1296+
}
1297+
}
1298+
1299+
func TestNodeStatsFromMonitoringValues(t *testing.T) {
1300+
values := parseMonitoringKeyValues([]byte(`
1301+
uid=all gid=all ns.total.files=23502173
1302+
uid=all gid=all ns.total.directories=383968
1303+
uid=all gid=all ns.current.fid=2590610131
1304+
uid=all gid=all ns.current.cid=3465125
1305+
uid=all gid=all ns.memory.virtual=27031158784
1306+
uid=all gid=all ns.memory.resident=13764378624
1307+
uid=all gid=all ns.memory.share=89653248
1308+
uid=all gid=all ns.memory.growth=23214104576
1309+
uid=all gid=all ns.stat.threads=666
1310+
uid=all gid=all ns.fds.all=866
1311+
uid=all gid=all ns.uptime=1523
1312+
`))
1313+
1314+
stats := nodeStatsFromMonitoringValues(values)
1315+
1316+
if stats.FileCount != 23502173 {
1317+
t.Fatalf("expected file count, got %d", stats.FileCount)
1318+
}
1319+
if stats.DirCount != 383968 {
1320+
t.Fatalf("expected dir count, got %d", stats.DirCount)
1321+
}
1322+
if stats.CurrentFID != 2590610131 || stats.CurrentCID != 3465125 {
1323+
t.Fatalf("unexpected current IDs: fid=%d cid=%d", stats.CurrentFID, stats.CurrentCID)
1324+
}
1325+
if stats.MemVirtual != 27031158784 || stats.MemResident != 13764378624 {
1326+
t.Fatalf("unexpected memory stats: virtual=%d resident=%d", stats.MemVirtual, stats.MemResident)
1327+
}
1328+
if stats.MemShared != 89653248 || stats.MemGrowth != 23214104576 {
1329+
t.Fatalf("unexpected shared/growth memory stats: shared=%d growth=%d", stats.MemShared, stats.MemGrowth)
1330+
}
1331+
if stats.ThreadCount != 666 || stats.FileDescs != 866 {
1332+
t.Fatalf("unexpected threads/fds: threads=%d fds=%d", stats.ThreadCount, stats.FileDescs)
1333+
}
1334+
if stats.Uptime.Seconds() != 1523 {
1335+
t.Fatalf("expected uptime 1523s, got %s", stats.Uptime)
1336+
}
1337+
}
1338+
1339+
func TestMGMPortFromMonitoringValues(t *testing.T) {
1340+
values := parseMonitoringKeyValues([]byte(`
1341+
uid=all gid=all master_id=eospilot-ns-02.cern.ch:1094
1342+
`))
1343+
if got := mgmPortFromMonitoringValues(values); got != "1094" {
1344+
t.Fatalf("expected 1094, got %q", got)
1345+
}
1346+
if got := mgmPortFromMonitoringValues(map[string]string{}); got != "1094" {
1347+
t.Fatalf("expected fallback port, got %q", got)
1348+
}
1349+
}
1350+
12311351
// --- entryFromCLI ---
12321352

12331353
func TestEntryFromCLIRootPath(t *testing.T) {

eos/fetch_mgm.go

Lines changed: 111 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package eos
22

33
import (
44
"context"
5-
"encoding/json"
65
"fmt"
76
"sort"
87
"strings"
@@ -11,6 +10,19 @@ import (
1110
func (c *Client) MGMs(ctx context.Context) ([]MgmRecord, error) {
1211
_ = ctx
1312

13+
if output, err := c.runCommand("eos", "-b", "ns", "stat", "-m"); err == nil {
14+
values := parseMonitoringKeyValues(output)
15+
if mgms, ok := parseMGMsFromMonitoringValues(values); ok {
16+
return mgms, nil
17+
}
18+
return c.mgmsFromRaftInfo(mgmPortFromMonitoringValues(values))
19+
}
20+
21+
return c.mgmsFromRaftInfo("")
22+
}
23+
24+
func (c *Client) mgmsFromRaftInfo(mgmPort string) ([]MgmRecord, error) {
25+
1426
// Run redis-cli raft-info directly via runCommand.
1527
// The SSH target (if set) is always the MGM or an MGM leader node,
1628
// so we do not need a separate SSH hop.
@@ -33,10 +45,11 @@ func (c *Client) MGMs(ctx context.Context) ([]MgmRecord, error) {
3345
return nil, fmt.Errorf("no MGM cluster info from raft-info")
3446
}
3547

36-
// Fetch the MGM service port from `eos ns stat` via master_id
37-
// (e.g. "eospilot-ns-02.cern.ch:1094"). The raft nodes use the QDB port
38-
// (7777); the actual MGM port must be read from the namespace.
39-
mgmPort := mgmPortFromNsStat(c)
48+
// The raft nodes use the QDB port (7777); the actual MGM port is read from
49+
// the namespace monitoring payload when available.
50+
if mgmPort == "" {
51+
mgmPort = "1094"
52+
}
4053

4154
// Determine leader hostname (strip raft port :7777)
4255
leaderHost := hostOnly(info.Leader)
@@ -92,7 +105,9 @@ func (c *Client) MGMs(ctx context.Context) ([]MgmRecord, error) {
92105
QDBHost: qh,
93106
QDBPort: qp,
94107
Role: role,
108+
QDBRole: role,
95109
Status: status,
110+
QDBStatus: status,
96111
EOSVersion: version,
97112
QDBVersion: version,
98113
})
@@ -112,27 +127,103 @@ func (c *Client) MGMs(ctx context.Context) ([]MgmRecord, error) {
112127
return mgms, nil
113128
}
114129

115-
// mgmPortFromNsStat fetches the MGM service port by reading master_id from
116-
// `eos ns stat`. master_id is of the form "hostname:port" (e.g.
117-
// "eospilot-ns-02.cern.ch:1094"). Falls back to "1094" on any error.
118-
func mgmPortFromNsStat(c *Client) string {
119-
const fallback = "1094"
130+
func parseMGMsFromNSStatMonitoring(output []byte) ([]MgmRecord, bool) {
131+
return parseMGMsFromMonitoringValues(parseMonitoringKeyValues(output))
132+
}
120133

121-
out, err := c.runCommand("eos", "-j", "-b", "ns", "stat")
122-
if err != nil {
123-
return fallback
134+
func parseMGMsFromMonitoringValues(values map[string]string) ([]MgmRecord, bool) {
135+
mgmLeader := strings.TrimSpace(values["ns.mgm.leader"])
136+
qdbLeader := strings.TrimSpace(values["ns.qdb.leader"])
137+
if mgmLeader == "" || qdbLeader == "" {
138+
return nil, false
139+
}
140+
141+
mgmNodes := append([]string{mgmLeader}, splitMonitoringList(values["ns.mgm.followers"])...)
142+
qdbNodes := append([]string{qdbLeader}, splitMonitoringList(values["ns.qdb.followers"])...)
143+
mgmNodes = uniqueEndpoints(mgmNodes)
144+
qdbNodes = uniqueEndpoints(qdbNodes)
145+
146+
count := len(mgmNodes)
147+
if len(qdbNodes) > count {
148+
count = len(qdbNodes)
149+
}
150+
mgms := make([]MgmRecord, 0, count)
151+
for i := 0; i < count; i++ {
152+
var record MgmRecord
153+
if i < len(mgmNodes) {
154+
record.Host, record.Port = splitHostPort(mgmNodes[i])
155+
record.Role = "follower"
156+
if mgmNodes[i] == mgmLeader {
157+
record.Role = "leader"
158+
}
159+
record.Status = "online"
160+
}
161+
if i < len(qdbNodes) {
162+
record.QDBHost, record.QDBPort = splitHostPort(qdbNodes[i])
163+
record.QDBRole = "follower"
164+
if qdbNodes[i] == qdbLeader {
165+
record.QDBRole = "leader"
166+
}
167+
record.QDBStatus = "online"
168+
}
169+
mgms = append(mgms, record)
170+
}
171+
return mgms, true
172+
}
173+
174+
func parseMonitoringKeyValues(output []byte) map[string]string {
175+
values := make(map[string]string)
176+
for _, raw := range strings.Split(string(output), "\n") {
177+
for _, field := range strings.Fields(strings.TrimSpace(raw)) {
178+
key, value, ok := strings.Cut(field, "=")
179+
if !ok || key == "" {
180+
continue
181+
}
182+
values[key] = value
183+
}
124184
}
185+
return values
186+
}
125187

126-
var payload struct {
127-
Result []struct {
128-
Master string `json:"master_id"`
129-
} `json:"result"`
188+
func splitMonitoringList(raw string) []string {
189+
raw = strings.TrimSpace(raw)
190+
if raw == "" || raw == "none" {
191+
return nil
130192
}
131-
if err := json.Unmarshal(stripEOSPreamble(out), &payload); err != nil || len(payload.Result) == 0 {
132-
return fallback
193+
parts := strings.Split(raw, ",")
194+
out := make([]string, 0, len(parts))
195+
for _, part := range parts {
196+
part = strings.TrimSpace(part)
197+
if part != "" && part != "none" {
198+
out = append(out, part)
199+
}
133200
}
201+
return out
202+
}
203+
204+
func uniqueEndpoints(nodes []string) []string {
205+
seen := make(map[string]struct{}, len(nodes))
206+
out := make([]string, 0, len(nodes))
207+
for _, node := range nodes {
208+
node = strings.TrimSpace(node)
209+
if node == "" {
210+
continue
211+
}
212+
if _, ok := seen[node]; ok {
213+
continue
214+
}
215+
seen[node] = struct{}{}
216+
out = append(out, node)
217+
}
218+
return out
219+
}
220+
221+
// mgmPortFromMonitoringValues extracts the MGM service port from the
222+
// master_id key in `eos ns stat -m`. Falls back to the default MGM port.
223+
func mgmPortFromMonitoringValues(values map[string]string) string {
224+
const fallback = "1094"
134225

135-
masterID := payload.Result[0].Master // e.g. "eospilot-ns-02.cern.ch:1094"
226+
masterID := strings.TrimSpace(values["master_id"])
136227
if idx := strings.LastIndex(masterID, ":"); idx != -1 {
137228
if port := masterID[idx+1:]; port != "" {
138229
return port

0 commit comments

Comments
 (0)