-
Notifications
You must be signed in to change notification settings - Fork 526
ovsdb: exit clustered ovsdb server if multiple raft leaders found #6065
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -8,15 +8,19 @@ import ( | |||||
| "net" | ||||||
| "os" | ||||||
| "os/exec" | ||||||
| "slices" | ||||||
| "strconv" | ||||||
| "strings" | ||||||
| "syscall" | ||||||
| "time" | ||||||
|
|
||||||
| "github.com/ovn-kubernetes/libovsdb/ovsdb" | ||||||
| "github.com/ovn-kubernetes/libovsdb/ovsdb/serverdb" | ||||||
| "github.com/spf13/pflag" | ||||||
| discoveryv1 "k8s.io/api/discovery/v1" | ||||||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
| "k8s.io/apimachinery/pkg/labels" | ||||||
| "k8s.io/apimachinery/pkg/util/intstr" | ||||||
| "k8s.io/client-go/kubernetes" | ||||||
| "k8s.io/client-go/rest" | ||||||
| "k8s.io/client-go/tools/clientcmd" | ||||||
|
|
@@ -46,21 +50,26 @@ var labelSelector = labels.Set{discoveryv1.LabelServiceName: OvnNorthdServiceNam | |||||
|
|
||||||
| // Configuration is the controller conf | ||||||
| type Configuration struct { | ||||||
| KubeConfigFile string | ||||||
| KubeClient kubernetes.Interface | ||||||
| ProbeInterval int | ||||||
| EnableCompact bool | ||||||
| ISICDBServer bool | ||||||
| KubeConfigFile string | ||||||
| KubeClient kubernetes.Interface | ||||||
| ProbeInterval int | ||||||
| EnableCompact bool | ||||||
| ISICDBServer bool | ||||||
| localAddress string | ||||||
| remoteAddresses []string | ||||||
| } | ||||||
|
|
||||||
| // ParseFlags parses cmd args then init kubeclient and conf | ||||||
| // TODO: validate configuration | ||||||
| func ParseFlags() (*Configuration, error) { | ||||||
| podIP := os.Getenv("POD_IP") | ||||||
| var ( | ||||||
| argKubeConfigFile = pflag.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information. If not set use the inCluster token.") | ||||||
| argProbeInterval = pflag.Int("probeInterval", DefaultProbeInterval, "interval of probing leader in seconds") | ||||||
| argEnableCompact = pflag.Bool("enableCompact", true, "is enable compact") | ||||||
| argIsICDBServer = pflag.Bool("isICDBServer", false, "is ic db server") | ||||||
| localAddress = pflag.String("localAddress", podIP, "local ovsdb server address") | ||||||
| remoteAddresses = pflag.StringSlice("remoteAddresses", nil, "remote ovsdb server addresses") | ||||||
| ) | ||||||
|
|
||||||
| klogFlags := flag.NewFlagSet("klog", flag.ContinueOnError) | ||||||
|
|
@@ -88,11 +97,14 @@ func ParseFlags() (*Configuration, error) { | |||||
| } | ||||||
|
|
||||||
| config := &Configuration{ | ||||||
| KubeConfigFile: *argKubeConfigFile, | ||||||
| ProbeInterval: *argProbeInterval, | ||||||
| EnableCompact: *argEnableCompact, | ||||||
| ISICDBServer: *argIsICDBServer, | ||||||
| KubeConfigFile: *argKubeConfigFile, | ||||||
| ProbeInterval: *argProbeInterval, | ||||||
| EnableCompact: *argEnableCompact, | ||||||
| ISICDBServer: *argIsICDBServer, | ||||||
| localAddress: *localAddress, | ||||||
| remoteAddresses: slices.DeleteFunc(*remoteAddresses, func(s string) bool { return s == *localAddress }), | ||||||
| } | ||||||
|
|
||||||
| return config, nil | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -115,12 +127,10 @@ func KubeClientInit(cfg *Configuration) error { | |||||
| klog.Errorf("init kubernetes cfg failed %v", err) | ||||||
| return err | ||||||
| } | ||||||
| kubeClient, err := kubernetes.NewForConfig(kubeCfg) | ||||||
| if err != nil { | ||||||
| if cfg.KubeClient, err = kubernetes.NewForConfig(kubeCfg); err != nil { | ||||||
| klog.Errorf("init kubernetes client failed %v", err) | ||||||
| return err | ||||||
| } | ||||||
| cfg.KubeClient = kubeClient | ||||||
| return nil | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -154,36 +164,57 @@ func checkOvnIsAlive() bool { | |||||
| return true | ||||||
| } | ||||||
|
|
||||||
| func isDBLeader(dbName string, port int) bool { | ||||||
| addr := net.JoinHostPort(os.Getenv("POD_IP"), strconv.Itoa(port)) | ||||||
| query := fmt.Sprintf(`["_Server",{"table":"Database","where":[["name","==","%s"]],"columns":["leader"],"op":"select"}]`, dbName) | ||||||
|
|
||||||
| var cmd []string | ||||||
| if os.Getenv(EnvSSL) == "false" { | ||||||
| cmd = []string{"query", "tcp:" + addr, query} | ||||||
| } else { | ||||||
| cmd = []string{ | ||||||
| "-p", "/var/run/tls/key", | ||||||
| "-c", "/var/run/tls/cert", | ||||||
| "-C", "/var/run/tls/cacert", | ||||||
| "query", "ssl:" + addr, query, | ||||||
| } | ||||||
| // isDBLeader checks whether the ovn db at address is leader for the given database | ||||||
| func isDBLeader(address, database string) bool { | ||||||
| var dbAddr string | ||||||
| switch database { | ||||||
| case ovnnb.DatabaseName: | ||||||
| dbAddr = ovs.OvsdbServerAddress(address, intstr.FromInt32(6641)) | ||||||
| case ovnsb.DatabaseName: | ||||||
| dbAddr = ovs.OvsdbServerAddress(address, intstr.FromInt32(6642)) | ||||||
| case "OVN_IC_Northbound": | ||||||
| dbAddr = ovs.OvsdbServerAddress(address, intstr.FromInt32(6645)) | ||||||
| case "OVN_IC_Southbound": | ||||||
| dbAddr = ovs.OvsdbServerAddress(address, intstr.FromInt32(6646)) | ||||||
|
Comment on lines
+175
to
+178
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The database names For example, in const (
// ...
OvnICNB = "OVN_IC_Northbound"
OvnICSB = "OVN_IC_Southbound"
)Then you could use |
||||||
| default: | ||||||
| klog.Errorf("isDBLeader: unsupported database %s", database) | ||||||
| return false | ||||||
| } | ||||||
|
|
||||||
| output, err := exec.Command("ovsdb-client", cmd...).CombinedOutput() // #nosec G204 | ||||||
| result, err := ovs.Query(dbAddr, serverdb.DatabaseName, 1, ovsdb.Operation{ | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The timeout for For example, you could add a constant: const (
// ...
DBLeaderCheckTimeout = 3
)And use it in the function call.
Suggested change
|
||||||
| Op: ovsdb.OperationSelect, | ||||||
| Table: serverdb.DatabaseTable, | ||||||
| Where: []ovsdb.Condition{{ | ||||||
| Column: "name", | ||||||
| Function: ovsdb.ConditionEqual, | ||||||
| Value: database, | ||||||
| }}, | ||||||
| Columns: []string{"leader"}, | ||||||
| }) | ||||||
| if err != nil { | ||||||
| klog.Errorf("failed to execute cmd %q: err=%v, msg=%v", strings.Join(cmd, " "), err, string(output)) | ||||||
| klog.Errorf("failed to query leader info from ovsdb-server %s for database %s: %v", address, database, err) | ||||||
| return false | ||||||
| } | ||||||
| if len(result) != 1 { | ||||||
| klog.Errorf("unexpected number of results when querying leader info from ovsdb-server %s for database %s: %d", address, database, len(result)) | ||||||
| return false | ||||||
| } | ||||||
| if len(result[0].Rows) == 0 { | ||||||
| klog.Errorf("no rows returned when querying leader info from ovsdb-server %s for database %s", address, database) | ||||||
| return false | ||||||
| } | ||||||
| if len(result[0].Rows) != 1 { | ||||||
| klog.Errorf("unexpected number of rows when querying leader info from ovsdb-server %s for database %s: %d", address, database, len(result[0].Rows)) | ||||||
| return false | ||||||
| } | ||||||
|
|
||||||
| result := strings.TrimSpace(string(output)) | ||||||
| if len(result) == 0 { | ||||||
| klog.Errorf("cmd %q no output", strings.Join(cmd, " ")) | ||||||
| leader, ok := result[0].Rows[0]["leader"].(bool) | ||||||
| if !ok { | ||||||
| klog.Errorf("unexpected data format for leader info from ovsdb-server %s for database %s: %v", address, database, result[0].Rows[0]["leader"]) | ||||||
| return false | ||||||
| } | ||||||
|
|
||||||
| klog.V(5).Infof("cmd %q output: %s", strings.Join(cmd, " "), string(output)) | ||||||
| return strings.Contains(result, "true") | ||||||
| return leader | ||||||
| } | ||||||
|
|
||||||
| func checkNorthdActive() bool { | ||||||
|
|
@@ -305,13 +336,12 @@ func checkNorthdEpAlive(cfg *Configuration, namespace, service string) bool { | |||||
| } | ||||||
|
|
||||||
| func compactOvnDatabase(db string) { | ||||||
| command := []string{ | ||||||
| args := []string{ | ||||||
| "-t", | ||||||
| fmt.Sprintf("/var/run/ovn/ovn%s_db.ctl", db), | ||||||
| "ovsdb-server/compact", | ||||||
| } | ||||||
|
|
||||||
| output, err := exec.Command("ovn-appctl", command...).CombinedOutput() // #nosec G204 | ||||||
| output, err := exec.Command("ovn-appctl", args...).CombinedOutput() // #nosec G204 | ||||||
| if err != nil { | ||||||
| if !strings.Contains(string(output), "not storing a duplicate snapshot") { | ||||||
| klog.Errorf("failed to compact ovn%s database: %s", db, string(output)) | ||||||
|
|
@@ -338,11 +368,13 @@ func doOvnLeaderCheck(cfg *Configuration, podName, podNamespace string) { | |||||
| } | ||||||
|
|
||||||
| if !cfg.ISICDBServer { | ||||||
| sbLeader := isDBLeader(ovnsb.DatabaseName, 6642) | ||||||
| nbLeader := isDBLeader(cfg.localAddress, ovnnb.DatabaseName) | ||||||
| sbLeader := isDBLeader(cfg.localAddress, ovnsb.DatabaseName) | ||||||
| northdActive := checkNorthdActive() | ||||||
| patch := util.KVPatch{ | ||||||
| "ovn-nb-leader": strconv.FormatBool(isDBLeader(ovnnb.DatabaseName, 6641)), | ||||||
| "ovn-nb-leader": strconv.FormatBool(nbLeader), | ||||||
| "ovn-sb-leader": strconv.FormatBool(sbLeader), | ||||||
| "ovn-northd-leader": strconv.FormatBool(checkNorthdActive()), | ||||||
| "ovn-northd-leader": strconv.FormatBool(northdActive), | ||||||
| } | ||||||
| if err := util.PatchLabels(cfg.KubeClient.CoreV1().Pods(podNamespace), podName, patch); err != nil { | ||||||
| klog.Errorf("failed to patch labels for pod %s/%s: %v", podNamespace, podName, err) | ||||||
|
|
@@ -355,15 +387,25 @@ func doOvnLeaderCheck(cfg *Configuration, podName, podNamespace string) { | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| for addr := range slices.Values(cfg.remoteAddresses) { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The function
Suggested change
|
||||||
| if nbLeader && isDBLeader(addr, ovnnb.DatabaseName) { | ||||||
| klog.Fatalf("found another ovn-nb leader at %s, exiting process to restart", addr) | ||||||
| } | ||||||
| if sbLeader && isDBLeader(addr, ovnsb.DatabaseName) { | ||||||
| klog.Fatalf("found another ovn-sb leader at %s, exiting process to restart", addr) | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| if cfg.EnableCompact { | ||||||
| compactOvnDatabase("nb") | ||||||
| compactOvnDatabase("sb") | ||||||
| } | ||||||
| } else { | ||||||
| icNbLeader := isDBLeader("OVN_IC_Northbound", 6645) | ||||||
| icNbLeader := isDBLeader(cfg.localAddress, "OVN_IC_Northbound") | ||||||
| icSbLeader := isDBLeader(cfg.localAddress, "OVN_IC_Southbound") | ||||||
| patch := util.KVPatch{ | ||||||
| "ovn-ic-nb-leader": strconv.FormatBool(icNbLeader), | ||||||
| "ovn-ic-sb-leader": strconv.FormatBool(isDBLeader("OVN_IC_Southbound", 6646)), | ||||||
| "ovn-ic-sb-leader": strconv.FormatBool(icSbLeader), | ||||||
| } | ||||||
| if err := util.PatchLabels(cfg.KubeClient.CoreV1().Pods(podNamespace), podName, patch); err != nil { | ||||||
| klog.Errorf("failed to patch labels for pod %s/%s: %v", podNamespace, podName, err) | ||||||
|
|
@@ -376,6 +418,15 @@ func doOvnLeaderCheck(cfg *Configuration, podName, podNamespace string) { | |||||
| return | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| for addr := range slices.Values(cfg.remoteAddresses) { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
| if icNbLeader && isDBLeader(addr, "OVN_IC_Northbound") { | ||||||
| klog.Fatalf("found another ovn-ic-nb leader at %s, exiting process to restart", addr) | ||||||
| } | ||||||
| if icSbLeader && isDBLeader(addr, "OVN_IC_Southbound") { | ||||||
| klog.Fatalf("found another ovn-ic-sb leader at %s, exiting process to restart", addr) | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| package ovs | ||
|
|
||
| import ( | ||
| "encoding/json" | ||
| "fmt" | ||
| "net" | ||
| "os" | ||
| "os/exec" | ||
| "slices" | ||
| "strconv" | ||
| "strings" | ||
|
|
||
| "github.com/ovn-kubernetes/libovsdb/ovsdb" | ||
| "k8s.io/apimachinery/pkg/util/intstr" | ||
| ) | ||
|
|
||
| // OvsdbServerAddress constructs the ovsdb-server address based on the given host and port. | ||
| // It uses "ssl" scheme if the ENABLE_SSL environment variable is set to "true", otherwise "tcp". | ||
| // | ||
| // For example: | ||
| // | ||
| // OvsdbServerAddress("localhost:6641") returns "tcp:localhost:6641" or "ssl:localhost:6641" based on the ENABLE_SSL setting. | ||
| func OvsdbServerAddress(host string, port intstr.IntOrString) string { | ||
| scheme := "tcp" | ||
| if os.Getenv("ENABLE_SSL") == "true" { | ||
| scheme = "ssl" | ||
| } | ||
| return fmt.Sprintf("%s:%s", scheme, net.JoinHostPort(host, port.String())) | ||
| } | ||
|
|
||
| // Query executes an ovsdb-client query command against the given address and database with the provided operations | ||
| // and returns the operation results. | ||
| // For SSL connections, it assumes the certificates are located at /var/run/tls/{key,cert,cacert}. | ||
| // The timeout is specified in seconds. | ||
| // For more details, see `ovsdb-client --help`. | ||
| // | ||
| // For example: | ||
| // | ||
| // results, err := Query("tcp:[::1]:6641", "OVN_Northbound", 3, ovsdb.Operation{...}) | ||
| // results, err := Query("ssl:[::1]:6641", "OVN_Northbound", 3, ovsdb.Operation{...}) | ||
| func Query(address, database string, timeout int, operations ...ovsdb.Operation) ([]ovsdb.OperationResult, error) { | ||
| transArgs := ovsdb.NewTransactArgs(database, operations...) | ||
| query, err := json.Marshal(transArgs) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to marshal ovsdb transaction args %+v: %w", transArgs, err) | ||
| } | ||
|
|
||
| args := []string{"--timeout", strconv.Itoa(timeout), "query", address, string(query)} | ||
| if strings.HasPrefix(address, "ssl:") { | ||
| args = slices.Insert(args, 0, "-p", "/var/run/tls/key", "-c", "/var/run/tls/cert", "-C", "/var/run/tls/cacert") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| } | ||
|
|
||
| output, err := exec.Command("ovsdb-client", args...).CombinedOutput() // #nosec G204 | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to execute ovsdb-client with args %v: %w\noutput: %s", args, err, string(output)) | ||
| } | ||
|
|
||
| var results []ovsdb.OperationResult | ||
| if err = json.Unmarshal(output, &results); err != nil { | ||
| return nil, fmt.Errorf("failed to unmarshal ovsdb-client output %q: %w", string(output), err) | ||
| } | ||
|
|
||
| return results, nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| package ovs | ||
|
|
||
| import ( | ||
| "testing" | ||
|
|
||
| "k8s.io/apimachinery/pkg/util/intstr" | ||
| ) | ||
|
|
||
| func TestOvsdbServerAddress(t *testing.T) { | ||
| tests := []struct { | ||
| name string | ||
| host string | ||
| port intstr.IntOrString | ||
| envValue string | ||
| expected string | ||
| }{ | ||
| { | ||
| name: "tcp scheme", | ||
| host: "localhost", | ||
| port: intstr.FromInt32(6641), | ||
| envValue: "false", | ||
| expected: "tcp:localhost:6641", | ||
| }, | ||
| { | ||
| name: "ssl scheme", | ||
| host: "127.0.0.1", | ||
| port: intstr.FromInt(6642), | ||
| envValue: "true", | ||
| expected: "ssl:127.0.0.1:6642", | ||
| }, | ||
| { | ||
| name: "tcp scheme with ipv6 address", | ||
| host: "::1", | ||
| port: intstr.FromInt(6643), | ||
| envValue: "false", | ||
| expected: "tcp:[::1]:6643", | ||
| }, | ||
| { | ||
| name: "ssl scheme with ipv6 address", | ||
| host: "2001:0db8:85a3:0000:0000:8a2e:0370:7334", | ||
| port: intstr.FromInt(6644), | ||
| envValue: "true", | ||
| expected: "ssl:[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:6644", | ||
| }, | ||
| } | ||
|
|
||
| for _, tt := range tests { | ||
| t.Run(tt.name, func(t *testing.T) { | ||
| t.Setenv("ENABLE_SSL", tt.envValue) | ||
| result := OvsdbServerAddress(tt.host, tt.port) | ||
| if result != tt.expected { | ||
| t.Errorf("expected %s, got %s", tt.expected, result) | ||
| } | ||
| }) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new fields
localAddressandremoteAddressesare unexported, which is inconsistent with other fields in theConfigurationstruct. It's a good practice to export fields of configuration structs for consistency and reusability. Please consider making themLocalAddressandRemoteAddressesand updating their usage accordingly inParseFlags.