Skip to content

Commit 58e8cd1

Browse files
committed
ovsdb: exit clustered ovsdb server if multiple raft leaders found
Signed-off-by: zhangzujian <zhangzujian.7@gmail.com>
1 parent 3f88103 commit 58e8cd1

File tree

6 files changed

+233
-81
lines changed

6 files changed

+233
-81
lines changed

dist/images/start-db.sh

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -534,4 +534,7 @@ ovn-appctl -t /var/run/ovn/ovnnb_db.ctl ovsdb-server/memory-trim-on-compaction o
534534
ovn-appctl -t /var/run/ovn/ovnsb_db.ctl ovsdb-server/memory-trim-on-compaction on
535535

536536
chmod 600 /etc/ovn/*
537-
/kube-ovn/kube-ovn-leader-checker --probeInterval=${OVN_LEADER_PROBE_INTERVAL} --enableCompact=${ENABLE_COMPACT}
537+
/kube-ovn/kube-ovn-leader-checker \
538+
--probeInterval=${OVN_LEADER_PROBE_INTERVAL} \
539+
--enableCompact=${ENABLE_COMPACT} \
540+
--remoteAddresses="${NODE_IPS}"

dist/images/start-ic-db.sh

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,10 @@ fi
215215

216216
if [[ $ENABLE_OVN_LEADER_CHECK == "true" ]]; then
217217
chmod 600 /etc/ovn/*
218-
/kube-ovn/kube-ovn-leader-checker --probeInterval=${OVN_LEADER_PROBE_INTERVAL} --isICDBServer=true
218+
/kube-ovn/kube-ovn-leader-checker \
219+
--probeInterval=${OVN_LEADER_PROBE_INTERVAL} \
220+
--remoteAddresses="${NODE_IPS}" \
221+
--isICDBServer=true
219222
else
220223
# Compatible with controller deployment methods before kube-ovn 1.11.16
221224
TS_NAME=${TS_NAME:-ts}

pkg/ovn_leader_checker/ovn.go

Lines changed: 93 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,19 @@ import (
88
"net"
99
"os"
1010
"os/exec"
11+
"slices"
1112
"strconv"
1213
"strings"
1314
"syscall"
1415
"time"
1516

17+
"github.com/ovn-kubernetes/libovsdb/ovsdb"
18+
"github.com/ovn-kubernetes/libovsdb/ovsdb/serverdb"
1619
"github.com/spf13/pflag"
1720
discoveryv1 "k8s.io/api/discovery/v1"
1821
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1922
"k8s.io/apimachinery/pkg/labels"
23+
"k8s.io/apimachinery/pkg/util/intstr"
2024
"k8s.io/client-go/kubernetes"
2125
"k8s.io/client-go/rest"
2226
"k8s.io/client-go/tools/clientcmd"
@@ -46,21 +50,26 @@ var labelSelector = labels.Set{discoveryv1.LabelServiceName: OvnNorthdServiceNam
4650

4751
// Configuration is the controller conf
4852
type Configuration struct {
49-
KubeConfigFile string
50-
KubeClient kubernetes.Interface
51-
ProbeInterval int
52-
EnableCompact bool
53-
ISICDBServer bool
53+
KubeConfigFile string
54+
KubeClient kubernetes.Interface
55+
ProbeInterval int
56+
EnableCompact bool
57+
ISICDBServer bool
58+
localAddress string
59+
remoteAddresses []string
5460
}
5561

5662
// ParseFlags parses cmd args then init kubeclient and conf
5763
// TODO: validate configuration
5864
func ParseFlags() (*Configuration, error) {
65+
podIP := os.Getenv("POD_IP")
5966
var (
6067
argKubeConfigFile = pflag.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information. If not set use the inCluster token.")
6168
argProbeInterval = pflag.Int("probeInterval", DefaultProbeInterval, "interval of probing leader in seconds")
6269
argEnableCompact = pflag.Bool("enableCompact", true, "is enable compact")
6370
argIsICDBServer = pflag.Bool("isICDBServer", false, "is ic db server")
71+
localAddress = pflag.String("localAddress", podIP, "local ovsdb server address")
72+
remoteAddresses = pflag.StringSlice("remoteAddresses", nil, "remote ovsdb server addresses")
6473
)
6574

6675
klogFlags := flag.NewFlagSet("klog", flag.ContinueOnError)
@@ -88,11 +97,14 @@ func ParseFlags() (*Configuration, error) {
8897
}
8998

9099
config := &Configuration{
91-
KubeConfigFile: *argKubeConfigFile,
92-
ProbeInterval: *argProbeInterval,
93-
EnableCompact: *argEnableCompact,
94-
ISICDBServer: *argIsICDBServer,
100+
KubeConfigFile: *argKubeConfigFile,
101+
ProbeInterval: *argProbeInterval,
102+
EnableCompact: *argEnableCompact,
103+
ISICDBServer: *argIsICDBServer,
104+
localAddress: *localAddress,
105+
remoteAddresses: slices.DeleteFunc(*remoteAddresses, func(s string) bool { return s == *localAddress }),
95106
}
107+
96108
return config, nil
97109
}
98110

@@ -115,12 +127,10 @@ func KubeClientInit(cfg *Configuration) error {
115127
klog.Errorf("init kubernetes cfg failed %v", err)
116128
return err
117129
}
118-
kubeClient, err := kubernetes.NewForConfig(kubeCfg)
119-
if err != nil {
130+
if cfg.KubeClient, err = kubernetes.NewForConfig(kubeCfg); err != nil {
120131
klog.Errorf("init kubernetes client failed %v", err)
121132
return err
122133
}
123-
cfg.KubeClient = kubeClient
124134
return nil
125135
}
126136

@@ -154,36 +164,58 @@ func checkOvnIsAlive() bool {
154164
return true
155165
}
156166

157-
func isDBLeader(dbName string, port int) bool {
158-
addr := net.JoinHostPort(os.Getenv("POD_IP"), strconv.Itoa(port))
159-
query := fmt.Sprintf(`["_Server",{"table":"Database","where":[["name","==","%s"]],"columns":["leader"],"op":"select"}]`, dbName)
160-
161-
var cmd []string
162-
if os.Getenv(EnvSSL) == "false" {
163-
cmd = []string{"query", "tcp:" + addr, query}
164-
} else {
165-
cmd = []string{
166-
"-p", "/var/run/tls/key",
167-
"-c", "/var/run/tls/cert",
168-
"-C", "/var/run/tls/cacert",
169-
"query", "ssl:" + addr, query,
170-
}
167+
// isDBLeader checks whether the ovn db is leader
168+
// return values: string: local ovsdb-server address, bool: is leader or not
169+
func isDBLeader(address, database string) bool {
170+
var dbAddr string
171+
switch database {
172+
case ovnnb.DatabaseName:
173+
dbAddr = ovs.OvsdbServerAddress(address, intstr.FromInt32(6641))
174+
case ovnsb.DatabaseName:
175+
dbAddr = ovs.OvsdbServerAddress(address, intstr.FromInt32(6642))
176+
case "OVN_IC_Northbound":
177+
dbAddr = ovs.OvsdbServerAddress(address, intstr.FromInt32(6645))
178+
case "OVN_IC_Southbound":
179+
dbAddr = ovs.OvsdbServerAddress(address, intstr.FromInt32(6646))
180+
default:
181+
klog.Errorf("isDBLeader: unsupported database %s", database)
182+
return false
171183
}
172184

173-
output, err := exec.Command("ovsdb-client", cmd...).CombinedOutput() // #nosec G204
185+
result, err := ovs.Query(dbAddr, serverdb.DatabaseName, 1, ovsdb.Operation{
186+
Op: ovsdb.OperationSelect,
187+
Table: serverdb.DatabaseTable,
188+
Where: []ovsdb.Condition{{
189+
Column: "name",
190+
Function: ovsdb.ConditionEqual,
191+
Value: database,
192+
}},
193+
Columns: []string{"leader"},
194+
})
174195
if err != nil {
175-
klog.Errorf("failed to execute cmd %q: err=%v, msg=%v", strings.Join(cmd, " "), err, string(output))
196+
klog.Errorf("failed to query leader info from ovsdb-server %s for database %s: %v", address, database, err)
197+
return false
198+
}
199+
if len(result) != 1 {
200+
klog.Errorf("unexpected number of results when querying leader info from ovsdb-server %s for database %s: %d", address, database, len(result))
201+
return false
202+
}
203+
if len(result[0].Rows) == 0 {
204+
klog.Errorf("no rows returned when querying leader info from ovsdb-server %s for database %s", address, database)
205+
return false
206+
}
207+
if len(result[0].Rows) != 1 {
208+
klog.Errorf("unexpected number of rows when querying leader info from ovsdb-server %s for database %s: %d", address, database, len(result[0].Rows))
176209
return false
177210
}
178211

179-
result := strings.TrimSpace(string(output))
180-
if len(result) == 0 {
181-
klog.Errorf("cmd %q no output", strings.Join(cmd, " "))
212+
leader, ok := result[0].Rows[0]["leader"].(bool)
213+
if !ok {
214+
klog.Errorf("unexpected data format for leader info from ovsdb-server %s for database %s: %v", address, database, result[0].Rows[0]["leader"])
182215
return false
183216
}
184217

185-
klog.V(5).Infof("cmd %q output: %s", strings.Join(cmd, " "), string(output))
186-
return strings.Contains(result, "true")
218+
return leader
187219
}
188220

189221
func checkNorthdActive() bool {
@@ -305,13 +337,12 @@ func checkNorthdEpAlive(cfg *Configuration, namespace, service string) bool {
305337
}
306338

307339
func compactOvnDatabase(db string) {
308-
command := []string{
340+
args := []string{
309341
"-t",
310342
fmt.Sprintf("/var/run/ovn/ovn%s_db.ctl", db),
311343
"ovsdb-server/compact",
312344
}
313-
314-
output, err := exec.Command("ovn-appctl", command...).CombinedOutput() // #nosec G204
345+
output, err := exec.Command("ovn-appctl", args...).CombinedOutput() // #nosec G204
315346
if err != nil {
316347
if !strings.Contains(string(output), "not storing a duplicate snapshot") {
317348
klog.Errorf("failed to compact ovn%s database: %s", db, string(output))
@@ -338,11 +369,13 @@ func doOvnLeaderCheck(cfg *Configuration, podName, podNamespace string) {
338369
}
339370

340371
if !cfg.ISICDBServer {
341-
sbLeader := isDBLeader(ovnsb.DatabaseName, 6642)
372+
nbLeader := isDBLeader(cfg.localAddress, ovnnb.DatabaseName)
373+
sbLeader := isDBLeader(cfg.localAddress, ovnsb.DatabaseName)
374+
northdActive := checkNorthdActive()
342375
patch := util.KVPatch{
343-
"ovn-nb-leader": strconv.FormatBool(isDBLeader(ovnnb.DatabaseName, 6641)),
376+
"ovn-nb-leader": strconv.FormatBool(nbLeader),
344377
"ovn-sb-leader": strconv.FormatBool(sbLeader),
345-
"ovn-northd-leader": strconv.FormatBool(checkNorthdActive()),
378+
"ovn-northd-leader": strconv.FormatBool(northdActive),
346379
}
347380
if err := util.PatchLabels(cfg.KubeClient.CoreV1().Pods(podNamespace), podName, patch); err != nil {
348381
klog.Errorf("failed to patch labels for pod %s/%s: %v", podNamespace, podName, err)
@@ -355,15 +388,25 @@ func doOvnLeaderCheck(cfg *Configuration, podName, podNamespace string) {
355388
}
356389
}
357390

391+
for addr := range slices.Values(cfg.remoteAddresses) {
392+
if nbLeader && isDBLeader(addr, ovnnb.DatabaseName) {
393+
klog.Fatalf("found another ovn-nb leader at %s, exiting process to restart", addr)
394+
}
395+
if sbLeader && isDBLeader(addr, ovnsb.DatabaseName) {
396+
klog.Fatalf("found another ovn-sb leader at %s, exiting process to restart", addr)
397+
}
398+
}
399+
358400
if cfg.EnableCompact {
359401
compactOvnDatabase("nb")
360402
compactOvnDatabase("sb")
361403
}
362404
} else {
363-
icNbLeader := isDBLeader("OVN_IC_Northbound", 6645)
405+
icNbLeader := isDBLeader(cfg.localAddress, "OVN_IC_Northbound")
406+
icSbLeader := isDBLeader(cfg.localAddress, "OVN_IC_Southbound")
364407
patch := util.KVPatch{
365408
"ovn-ic-nb-leader": strconv.FormatBool(icNbLeader),
366-
"ovn-ic-sb-leader": strconv.FormatBool(isDBLeader("OVN_IC_Southbound", 6646)),
409+
"ovn-ic-sb-leader": strconv.FormatBool(icSbLeader),
367410
}
368411
if err := util.PatchLabels(cfg.KubeClient.CoreV1().Pods(podNamespace), podName, patch); err != nil {
369412
klog.Errorf("failed to patch labels for pod %s/%s: %v", podNamespace, podName, err)
@@ -376,6 +419,15 @@ func doOvnLeaderCheck(cfg *Configuration, podName, podNamespace string) {
376419
return
377420
}
378421
}
422+
423+
for addr := range slices.Values(cfg.remoteAddresses) {
424+
if icNbLeader && isDBLeader(addr, "OVN_IC_Northbound") {
425+
klog.Fatalf("found another ovn-ic-nb leader at %s, exiting process to restart", addr)
426+
}
427+
if icSbLeader && isDBLeader(addr, "OVN_IC_Southbound") {
428+
klog.Fatalf("found another ovn-ic-sb leader at %s, exiting process to restart", addr)
429+
}
430+
}
379431
}
380432
}
381433

pkg/ovs/ovsdb-client.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package ovs
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"net"
7+
"os"
8+
"os/exec"
9+
"slices"
10+
"strconv"
11+
"strings"
12+
13+
"github.com/ovn-kubernetes/libovsdb/ovsdb"
14+
"k8s.io/apimachinery/pkg/util/intstr"
15+
)
16+
17+
// OvsdbServerAddress constructs the ovsdb-server address based on the given host and port.
18+
// It uses "ssl" scheme if the ENABLE_SSL environment variable is set to "true", otherwise "tcp".
19+
//
20+
// For example:
21+
//
22+
// OvsdbServerAddress("localhost:6641") returns "tcp:localhost:6641" or "ssl:localhost:6641" based on the ENABLE_SSL setting.
23+
func OvsdbServerAddress(host string, port intstr.IntOrString) string {
24+
scheme := "tcp"
25+
if os.Getenv("ENABLE_SSL") == "true" {
26+
scheme = "ssl"
27+
}
28+
return fmt.Sprintf("%s:%s", scheme, net.JoinHostPort(host, port.String()))
29+
}
30+
31+
// Query executes an ovsdb-client query command against the given address and database with the provided operations
32+
// and returns the operation results.
33+
// For SSL connections, it assumes the certificates are located at /var/run/tls/{key,cert,cacert}.
34+
// The timeout is specified in seconds.
35+
// For more details, see `ovsdb-client --help`.
36+
//
37+
// For example:
38+
//
39+
// results, err := Query("tcp:[::1]:6641", "OVN_Northbound", 3, ovsdb.Operation{...})
40+
// results, err := Query("ssl:[::1]:6641", "OVN_Northbound", 3, ovsdb.Operation{...})
41+
func Query(address, database string, timeout int, operations ...ovsdb.Operation) ([]ovsdb.OperationResult, error) {
42+
transArgs := ovsdb.NewTransactArgs(database, operations...)
43+
query, err := json.Marshal(transArgs)
44+
if err != nil {
45+
return nil, fmt.Errorf("failed to marshal ovsdb transaction args %+v: %w", transArgs, err)
46+
}
47+
48+
args := []string{"--timeout", strconv.Itoa(timeout), "query", address, string(query)}
49+
if strings.HasPrefix(address, "ssl:") {
50+
args = slices.Insert(args, 0, "-p", "/var/run/tls/key", "-c", "/var/run/tls/cert", "-C", "/var/run/tls/cacert")
51+
}
52+
53+
output, err := exec.Command("ovsdb-client", args...).CombinedOutput() // #nosec G204
54+
if err != nil {
55+
return nil, fmt.Errorf("failed to execute ovsdb-client with args %v: %w\noutput: %s", args, err, string(output))
56+
}
57+
58+
var results []ovsdb.OperationResult
59+
if err = json.Unmarshal(output, &results); err != nil {
60+
return nil, fmt.Errorf("failed to unmarshal ovsdb-client output %q: %w", string(output), err)
61+
}
62+
63+
return results, nil
64+
}

pkg/ovs/ovsdb-client_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package ovs
2+
3+
import (
4+
"os"
5+
"testing"
6+
7+
"k8s.io/apimachinery/pkg/util/intstr"
8+
)
9+
10+
func TestOvsdbServerAddress(t *testing.T) {
11+
tests := []struct {
12+
name string
13+
host string
14+
port intstr.IntOrString
15+
envValue string
16+
expected string
17+
}{
18+
{
19+
name: "tcp scheme",
20+
host: "localhost",
21+
port: intstr.FromInt32(6641),
22+
envValue: "false",
23+
expected: "tcp:localhost:6641",
24+
},
25+
{
26+
name: "ssl scheme",
27+
host: "127.0.0.1",
28+
port: intstr.FromInt(6642),
29+
envValue: "true",
30+
expected: "ssl:127.0.0.1:6642",
31+
},
32+
{
33+
name: "tcp scheme with ipv6 address",
34+
host: "::1",
35+
port: intstr.FromInt(6643),
36+
envValue: "false",
37+
expected: "tcp:[::1]:6643",
38+
},
39+
{
40+
name: "ssl scheme with ipv6 address",
41+
host: "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
42+
port: intstr.FromInt(6644),
43+
envValue: "true",
44+
expected: "ssl:[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:6644",
45+
},
46+
}
47+
48+
for _, tt := range tests {
49+
t.Run(tt.name, func(t *testing.T) {
50+
os.Setenv("ENABLE_SSL", tt.envValue)
51+
result := OvsdbServerAddress(tt.host, tt.port)
52+
if result != tt.expected {
53+
t.Errorf("expected %s, got %s", tt.expected, result)
54+
}
55+
})
56+
}
57+
}

0 commit comments

Comments
 (0)