diff --git a/dist/images/start-db.sh b/dist/images/start-db.sh index 2399fb71c0b..86534220183 100755 --- a/dist/images/start-db.sh +++ b/dist/images/start-db.sh @@ -534,4 +534,7 @@ ovn-appctl -t /var/run/ovn/ovnnb_db.ctl ovsdb-server/memory-trim-on-compaction o ovn-appctl -t /var/run/ovn/ovnsb_db.ctl ovsdb-server/memory-trim-on-compaction on chmod 600 /etc/ovn/* -/kube-ovn/kube-ovn-leader-checker --probeInterval=${OVN_LEADER_PROBE_INTERVAL} --enableCompact=${ENABLE_COMPACT} +/kube-ovn/kube-ovn-leader-checker \ + --probeInterval=${OVN_LEADER_PROBE_INTERVAL} \ + --enableCompact=${ENABLE_COMPACT} \ + --remoteAddresses="${NODE_IPS}" diff --git a/dist/images/start-ic-db.sh b/dist/images/start-ic-db.sh index 3c8d555debb..0345af1c9c9 100755 --- a/dist/images/start-ic-db.sh +++ b/dist/images/start-ic-db.sh @@ -215,7 +215,10 @@ fi if [[ $ENABLE_OVN_LEADER_CHECK == "true" ]]; then chmod 600 /etc/ovn/* - /kube-ovn/kube-ovn-leader-checker --probeInterval=${OVN_LEADER_PROBE_INTERVAL} --isICDBServer=true + /kube-ovn/kube-ovn-leader-checker \ + --probeInterval=${OVN_LEADER_PROBE_INTERVAL} \ + --remoteAddresses="${NODE_IPS}" \ + --isICDBServer=true else # Compatible with controller deployment methods before kube-ovn 1.11.16 TS_NAME=${TS_NAME:-ts} diff --git a/pkg/ovn_leader_checker/ovn.go b/pkg/ovn_leader_checker/ovn.go index 70d6c5e5289..c2d2d4cd95c 100755 --- a/pkg/ovn_leader_checker/ovn.go +++ b/pkg/ovn_leader_checker/ovn.go @@ -8,15 +8,19 @@ import ( "net" "os" "os/exec" + "slices" "strconv" "strings" "syscall" "time" + "github.com/ovn-kubernetes/libovsdb/ovsdb" + "github.com/ovn-kubernetes/libovsdb/ovsdb/serverdb" "github.com/spf13/pflag" discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -46,21 +50,26 @@ var labelSelector = labels.Set{discoveryv1.LabelServiceName: OvnNorthdServiceNam // Configuration is the controller conf type Configuration struct { - KubeConfigFile string - KubeClient kubernetes.Interface - ProbeInterval int - EnableCompact bool - ISICDBServer bool + KubeConfigFile string + KubeClient kubernetes.Interface + ProbeInterval int + EnableCompact bool + ISICDBServer bool + localAddress string + remoteAddresses []string } // ParseFlags parses cmd args then init kubeclient and conf // TODO: validate configuration func ParseFlags() (*Configuration, error) { + podIP := os.Getenv("POD_IP") var ( argKubeConfigFile = pflag.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information. If not set use the inCluster token.") argProbeInterval = pflag.Int("probeInterval", DefaultProbeInterval, "interval of probing leader in seconds") argEnableCompact = pflag.Bool("enableCompact", true, "is enable compact") argIsICDBServer = pflag.Bool("isICDBServer", false, "is ic db server") + localAddress = pflag.String("localAddress", podIP, "local ovsdb server address") + remoteAddresses = pflag.StringSlice("remoteAddresses", nil, "remote ovsdb server addresses") ) klogFlags := flag.NewFlagSet("klog", flag.ContinueOnError) @@ -88,11 +97,14 @@ func ParseFlags() (*Configuration, error) { } config := &Configuration{ - KubeConfigFile: *argKubeConfigFile, - ProbeInterval: *argProbeInterval, - EnableCompact: *argEnableCompact, - ISICDBServer: *argIsICDBServer, + KubeConfigFile: *argKubeConfigFile, + ProbeInterval: *argProbeInterval, + EnableCompact: *argEnableCompact, + ISICDBServer: *argIsICDBServer, + localAddress: *localAddress, + remoteAddresses: slices.DeleteFunc(*remoteAddresses, func(s string) bool { return s == *localAddress }), } + return config, nil } @@ -115,12 +127,10 @@ func KubeClientInit(cfg *Configuration) error { klog.Errorf("init kubernetes cfg failed %v", err) return err } - kubeClient, err := kubernetes.NewForConfig(kubeCfg) - if err != nil { + if cfg.KubeClient, err = kubernetes.NewForConfig(kubeCfg); err != nil { klog.Errorf("init kubernetes client failed %v", err) return err } - cfg.KubeClient = kubeClient return nil } @@ -154,36 +164,57 @@ func checkOvnIsAlive() bool { return true } -func isDBLeader(dbName string, port int) bool { - addr := net.JoinHostPort(os.Getenv("POD_IP"), strconv.Itoa(port)) - query := fmt.Sprintf(`["_Server",{"table":"Database","where":[["name","==","%s"]],"columns":["leader"],"op":"select"}]`, dbName) - - var cmd []string - if os.Getenv(EnvSSL) == "false" { - cmd = []string{"query", "tcp:" + addr, query} - } else { - cmd = []string{ - "-p", "/var/run/tls/key", - "-c", "/var/run/tls/cert", - "-C", "/var/run/tls/cacert", - "query", "ssl:" + addr, query, - } +// isDBLeader checks whether the ovn db at address is leader for the given database +func isDBLeader(address, database string) bool { + var dbAddr string + switch database { + case ovnnb.DatabaseName: + dbAddr = ovs.OvsdbServerAddress(address, intstr.FromInt32(6641)) + case ovnsb.DatabaseName: + dbAddr = ovs.OvsdbServerAddress(address, intstr.FromInt32(6642)) + case "OVN_IC_Northbound": + dbAddr = ovs.OvsdbServerAddress(address, intstr.FromInt32(6645)) + case "OVN_IC_Southbound": + dbAddr = ovs.OvsdbServerAddress(address, intstr.FromInt32(6646)) + default: + klog.Errorf("isDBLeader: unsupported database %s", database) + return false } - output, err := exec.Command("ovsdb-client", cmd...).CombinedOutput() // #nosec G204 + result, err := ovs.Query(dbAddr, serverdb.DatabaseName, 1, ovsdb.Operation{ + Op: ovsdb.OperationSelect, + Table: serverdb.DatabaseTable, + Where: []ovsdb.Condition{{ + Column: "name", + Function: ovsdb.ConditionEqual, + Value: database, + }}, + Columns: []string{"leader"}, + }) if err != nil { - klog.Errorf("failed to execute cmd %q: err=%v, msg=%v", strings.Join(cmd, " "), err, string(output)) + klog.Errorf("failed to query leader info from ovsdb-server %s for database %s: %v", address, database, err) + return false + } + if len(result) != 1 { + klog.Errorf("unexpected number of results when querying leader info from ovsdb-server %s for database %s: %d", address, database, len(result)) + return false + } + if len(result[0].Rows) == 0 { + klog.Errorf("no rows returned when querying leader info from ovsdb-server %s for database %s", address, database) + return false + } + if len(result[0].Rows) != 1 { + klog.Errorf("unexpected number of rows when querying leader info from ovsdb-server %s for database %s: %d", address, database, len(result[0].Rows)) return false } - result := strings.TrimSpace(string(output)) - if len(result) == 0 { - klog.Errorf("cmd %q no output", strings.Join(cmd, " ")) + leader, ok := result[0].Rows[0]["leader"].(bool) + if !ok { + klog.Errorf("unexpected data format for leader info from ovsdb-server %s for database %s: %v", address, database, result[0].Rows[0]["leader"]) return false } - klog.V(5).Infof("cmd %q output: %s", strings.Join(cmd, " "), string(output)) - return strings.Contains(result, "true") + return leader } func checkNorthdActive() bool { @@ -305,13 +336,12 @@ func checkNorthdEpAlive(cfg *Configuration, namespace, service string) bool { } func compactOvnDatabase(db string) { - command := []string{ + args := []string{ "-t", fmt.Sprintf("/var/run/ovn/ovn%s_db.ctl", db), "ovsdb-server/compact", } - - output, err := exec.Command("ovn-appctl", command...).CombinedOutput() // #nosec G204 + output, err := exec.Command("ovn-appctl", args...).CombinedOutput() // #nosec G204 if err != nil { if !strings.Contains(string(output), "not storing a duplicate snapshot") { klog.Errorf("failed to compact ovn%s database: %s", db, string(output)) @@ -338,11 +368,13 @@ func doOvnLeaderCheck(cfg *Configuration, podName, podNamespace string) { } if !cfg.ISICDBServer { - sbLeader := isDBLeader(ovnsb.DatabaseName, 6642) + nbLeader := isDBLeader(cfg.localAddress, ovnnb.DatabaseName) + sbLeader := isDBLeader(cfg.localAddress, ovnsb.DatabaseName) + northdActive := checkNorthdActive() patch := util.KVPatch{ - "ovn-nb-leader": strconv.FormatBool(isDBLeader(ovnnb.DatabaseName, 6641)), + "ovn-nb-leader": strconv.FormatBool(nbLeader), "ovn-sb-leader": strconv.FormatBool(sbLeader), - "ovn-northd-leader": strconv.FormatBool(checkNorthdActive()), + "ovn-northd-leader": strconv.FormatBool(northdActive), } if err := util.PatchLabels(cfg.KubeClient.CoreV1().Pods(podNamespace), podName, patch); err != nil { klog.Errorf("failed to patch labels for pod %s/%s: %v", podNamespace, podName, err) @@ -355,15 +387,25 @@ func doOvnLeaderCheck(cfg *Configuration, podName, podNamespace string) { } } + for addr := range slices.Values(cfg.remoteAddresses) { + if nbLeader && isDBLeader(addr, ovnnb.DatabaseName) { + klog.Fatalf("found another ovn-nb leader at %s, exiting process to restart", addr) + } + if sbLeader && isDBLeader(addr, ovnsb.DatabaseName) { + klog.Fatalf("found another ovn-sb leader at %s, exiting process to restart", addr) + } + } + if cfg.EnableCompact { compactOvnDatabase("nb") compactOvnDatabase("sb") } } else { - icNbLeader := isDBLeader("OVN_IC_Northbound", 6645) + icNbLeader := isDBLeader(cfg.localAddress, "OVN_IC_Northbound") + icSbLeader := isDBLeader(cfg.localAddress, "OVN_IC_Southbound") patch := util.KVPatch{ "ovn-ic-nb-leader": strconv.FormatBool(icNbLeader), - "ovn-ic-sb-leader": strconv.FormatBool(isDBLeader("OVN_IC_Southbound", 6646)), + "ovn-ic-sb-leader": strconv.FormatBool(icSbLeader), } if err := util.PatchLabels(cfg.KubeClient.CoreV1().Pods(podNamespace), podName, patch); err != nil { klog.Errorf("failed to patch labels for pod %s/%s: %v", podNamespace, podName, err) @@ -376,6 +418,15 @@ func doOvnLeaderCheck(cfg *Configuration, podName, podNamespace string) { return } } + + for addr := range slices.Values(cfg.remoteAddresses) { + if icNbLeader && isDBLeader(addr, "OVN_IC_Northbound") { + klog.Fatalf("found another ovn-ic-nb leader at %s, exiting process to restart", addr) + } + if icSbLeader && isDBLeader(addr, "OVN_IC_Southbound") { + klog.Fatalf("found another ovn-ic-sb leader at %s, exiting process to restart", addr) + } + } } } diff --git a/pkg/ovs/ovsdb-client.go b/pkg/ovs/ovsdb-client.go new file mode 100644 index 00000000000..0d5ce8725b5 --- /dev/null +++ b/pkg/ovs/ovsdb-client.go @@ -0,0 +1,64 @@ +package ovs + +import ( + "encoding/json" + "fmt" + "net" + "os" + "os/exec" + "slices" + "strconv" + "strings" + + "github.com/ovn-kubernetes/libovsdb/ovsdb" + "k8s.io/apimachinery/pkg/util/intstr" +) + +// OvsdbServerAddress constructs the ovsdb-server address based on the given host and port. +// It uses "ssl" scheme if the ENABLE_SSL environment variable is set to "true", otherwise "tcp". +// +// For example: +// +// OvsdbServerAddress("localhost:6641") returns "tcp:localhost:6641" or "ssl:localhost:6641" based on the ENABLE_SSL setting. +func OvsdbServerAddress(host string, port intstr.IntOrString) string { + scheme := "tcp" + if os.Getenv("ENABLE_SSL") == "true" { + scheme = "ssl" + } + return fmt.Sprintf("%s:%s", scheme, net.JoinHostPort(host, port.String())) +} + +// Query executes an ovsdb-client query command against the given address and database with the provided operations +// and returns the operation results. +// For SSL connections, it assumes the certificates are located at /var/run/tls/{key,cert,cacert}. +// The timeout is specified in seconds. +// For more details, see `ovsdb-client --help`. +// +// For example: +// +// results, err := Query("tcp:[::1]:6641", "OVN_Northbound", 3, ovsdb.Operation{...}) +// results, err := Query("ssl:[::1]:6641", "OVN_Northbound", 3, ovsdb.Operation{...}) +func Query(address, database string, timeout int, operations ...ovsdb.Operation) ([]ovsdb.OperationResult, error) { + transArgs := ovsdb.NewTransactArgs(database, operations...) + query, err := json.Marshal(transArgs) + if err != nil { + return nil, fmt.Errorf("failed to marshal ovsdb transaction args %+v: %w", transArgs, err) + } + + args := []string{"--timeout", strconv.Itoa(timeout), "query", address, string(query)} + if strings.HasPrefix(address, "ssl:") { + args = slices.Insert(args, 0, "-p", "/var/run/tls/key", "-c", "/var/run/tls/cert", "-C", "/var/run/tls/cacert") + } + + output, err := exec.Command("ovsdb-client", args...).CombinedOutput() // #nosec G204 + if err != nil { + return nil, fmt.Errorf("failed to execute ovsdb-client with args %v: %w\noutput: %s", args, err, string(output)) + } + + var results []ovsdb.OperationResult + if err = json.Unmarshal(output, &results); err != nil { + return nil, fmt.Errorf("failed to unmarshal ovsdb-client output %q: %w", string(output), err) + } + + return results, nil +} diff --git a/pkg/ovs/ovsdb-client_test.go b/pkg/ovs/ovsdb-client_test.go new file mode 100644 index 00000000000..b55474cb2eb --- /dev/null +++ b/pkg/ovs/ovsdb-client_test.go @@ -0,0 +1,56 @@ +package ovs + +import ( + "testing" + + "k8s.io/apimachinery/pkg/util/intstr" +) + +func TestOvsdbServerAddress(t *testing.T) { + tests := []struct { + name string + host string + port intstr.IntOrString + envValue string + expected string + }{ + { + name: "tcp scheme", + host: "localhost", + port: intstr.FromInt32(6641), + envValue: "false", + expected: "tcp:localhost:6641", + }, + { + name: "ssl scheme", + host: "127.0.0.1", + port: intstr.FromInt(6642), + envValue: "true", + expected: "ssl:127.0.0.1:6642", + }, + { + name: "tcp scheme with ipv6 address", + host: "::1", + port: intstr.FromInt(6643), + envValue: "false", + expected: "tcp:[::1]:6643", + }, + { + name: "ssl scheme with ipv6 address", + host: "2001:0db8:85a3:0000:0000:8a2e:0370:7334", + port: intstr.FromInt(6644), + envValue: "true", + expected: "ssl:[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:6644", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Setenv("ENABLE_SSL", tt.envValue) + result := OvsdbServerAddress(tt.host, tt.port) + if result != tt.expected { + t.Errorf("expected %s, got %s", tt.expected, result) + } + }) + } +} diff --git a/pkg/pinger/ovn.go b/pkg/pinger/ovn.go index 54e2cfd5e68..d46b4f074bb 100644 --- a/pkg/pinger/ovn.go +++ b/pkg/pinger/ovn.go @@ -1,7 +1,6 @@ package pinger import ( - "encoding/json" "fmt" "os" "os/exec" @@ -9,6 +8,7 @@ import ( "strings" "github.com/ovn-kubernetes/libovsdb/ovsdb" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/klog/v2" "k8s.io/utils/set" @@ -16,6 +16,14 @@ import ( "github.com/kubeovn/kube-ovn/pkg/ovsdb/ovnsb" ) +var sbServiceAddress string + +func init() { + sbHost := os.Getenv("OVN_SB_SERVICE_HOST") + sbPort := os.Getenv("OVN_SB_SERVICE_PORT") + sbServiceAddress = ovs.OvsdbServerAddress(sbHost, intstr.FromString(sbPort)) +} + func checkOvs(config *Configuration, setMetrics bool) error { for component, err := range getOvsStatus() { if err != nil { @@ -111,43 +119,8 @@ func checkOvsBindings() (set.Set[string], error) { return result, nil } -func ovnSBQuery(database string, operations ...ovsdb.Operation) ([]ovsdb.OperationResult, error) { - transArgs := ovsdb.NewTransactArgs(database, operations...) - query, err := json.Marshal(transArgs) - if err != nil { - return nil, fmt.Errorf("failed to marshal ovsdb transaction args %+v: %w", transArgs, err) - } - - sbHost := os.Getenv("OVN_SB_SERVICE_HOST") - sbPort := os.Getenv("OVN_SB_SERVICE_PORT") - - args := []string{ - "--timeout=10", "query", fmt.Sprintf("tcp:[%s]:%s", sbHost, sbPort), string(query), - } - if os.Getenv("ENABLE_SSL") == "true" { - args = []string{ - "-p", "/var/run/tls/key", - "-c", "/var/run/tls/cert", - "-C", "/var/run/tls/cacert", - "--timeout=10", "query", fmt.Sprintf("ssl:[%s]:%s", sbHost, sbPort), string(query), - } - } - - output, err := exec.Command("ovsdb-client", args...).CombinedOutput() // #nosec G204 - if err != nil { - return nil, fmt.Errorf("failed to execute ovsdb-client with args %v: %w\noutput: %s", args, err, string(output)) - } - - var result []ovsdb.OperationResult - if err = json.Unmarshal(output, &result); err != nil { - return nil, fmt.Errorf("failed to unmarshal ovsdb-client output %q: %w", string(output), err) - } - - return result, nil -} - func getChassis(hostname string) (string, error) { - result, err := ovnSBQuery(ovnsb.DatabaseName, ovsdb.Operation{ + result, err := ovs.Query(sbServiceAddress, ovnsb.DatabaseName, 10, ovsdb.Operation{ Op: ovsdb.OperationSelect, Table: ovnsb.ChassisTable, Where: []ovsdb.Condition{{ @@ -179,7 +152,7 @@ func getChassis(hostname string) (string, error) { } func getLogicalPort(chassisUUID string) (set.Set[string], error) { - result, err := ovnSBQuery(ovnsb.DatabaseName, ovsdb.Operation{ + result, err := ovs.Query(sbServiceAddress, ovnsb.DatabaseName, 10, ovsdb.Operation{ Op: ovsdb.OperationSelect, Table: ovnsb.PortBindingTable, Where: []ovsdb.Condition{{