Skip to content

Commit dea20c4

Browse files
committed
Add open flow sync refer to ovn-k8s (#6117)
* add openflow sync refer to ovn-k8s Signed-off-by: clyi <clyi@alauda.io>
1 parent 5448893 commit dea20c4

File tree

5 files changed

+265
-122
lines changed

5 files changed

+265
-122
lines changed

pkg/daemon/controller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -706,6 +706,9 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
706706
}
707707
}
708708

709+
// Start OpenFlow sync loop
710+
go c.runFlowSync(stopCh)
711+
709712
<-stopCh
710713
klog.Info("Shutting down workers")
711714
}

pkg/daemon/controller_linux.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"reflect"
1111
"slices"
1212
"strings"
13+
"sync"
1314
"syscall"
1415

1516
ovsutil "github.com/digitalocean/go-openvswitch/ovs"
@@ -49,6 +50,10 @@ type ControllerRuntime struct {
4950

5051
nmSyncer *networkManagerSyncer
5152
ovsClient *ovsutil.Client
53+
54+
flowCache map[string]map[string][]string // key: bridgeName -> flowKey -> flow rules
55+
flowCacheMutex sync.RWMutex
56+
flowChan chan struct{} // channel to trigger immediate flow sync
5257
}
5358

5459
type LbServiceRules struct {
@@ -103,6 +108,10 @@ func (c *Controller) initRuntime() error {
103108
c.k8sipsets = k8sipset.New(c.k8sExec)
104109
c.ovsClient = ovsutil.New()
105110

111+
// Initialize OpenFlow flow cache (ovn-kubernetes style)
112+
c.flowCache = make(map[string]map[string][]string)
113+
c.flowChan = make(chan struct{}, 1)
114+
106115
if c.protocol == kubeovnv1.ProtocolIPv4 || c.protocol == kubeovnv1.ProtocolDual {
107116
ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv4)
108117
if err != nil {
@@ -493,18 +502,17 @@ func (c *Controller) reconcileServices(event *serviceEvent) error {
493502
if len(lbServiceRulesToAdd) > 0 {
494503
for _, rule := range lbServiceRulesToAdd {
495504
klog.Infof("Adding LB service rule: %+v", rule)
496-
if err := ovs.AddOrUpdateUnderlaySubnetSvcLocalOpenFlow(c.ovsClient, rule.BridgeName, rule.IP, rule.Protocol, rule.DstMac, rule.UnderlayNic, rule.Port); err != nil {
497-
klog.Errorf("failed to add or update underlay subnet svc local openflow: %v", err)
505+
if err := c.AddOrUpdateUnderlaySubnetSvcLocalFlowCache(rule.IP, rule.Port, rule.Protocol, rule.DstMac, rule.UnderlayNic, rule.BridgeName); err != nil {
506+
klog.Errorf("failed to update underlay subnet svc local openflow cache: %v", err)
507+
return err
498508
}
499509
}
500510
}
501511

502512
if len(lbServiceRulesToDel) > 0 {
503513
for _, rule := range lbServiceRulesToDel {
504514
klog.Infof("Delete LB service rule: %+v", rule)
505-
if err := ovs.DeleteUnderlaySubnetSvcLocalOpenFlow(c.ovsClient, rule.BridgeName, rule.IP, rule.Protocol, rule.UnderlayNic, rule.Port); err != nil {
506-
klog.Errorf("failed to delete underlay subnet svc local openflow: %v", err)
507-
}
515+
c.deleteUnderlaySubnetSvcLocalFlowCache(rule.BridgeName, rule.IP, rule.Port, rule.Protocol)
508516
}
509517
}
510518

pkg/daemon/flow_rules_linux.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package daemon
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"strings"
7+
8+
"k8s.io/klog/v2"
9+
10+
kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
11+
"github.com/kubeovn/kube-ovn/pkg/util"
12+
)
13+
14+
const flowKindUnderlayService = "usvc"
15+
16+
func (c *Controller) AddOrUpdateUnderlaySubnetSvcLocalFlowCache(serviceIP string, port uint16, protocol, dstMac, underlayNic, bridgeName string) error {
17+
inPort, err := c.getPortID(bridgeName, underlayNic)
18+
if err != nil {
19+
return err
20+
}
21+
22+
outPort, err := c.getPortID(bridgeName, "patch-localnet.")
23+
if err != nil {
24+
return err
25+
}
26+
27+
isIPv6 := util.CheckProtocol(serviceIP) == kubeovnv1.ProtocolIPv6
28+
protoStr := ""
29+
switch strings.ToUpper(protocol) {
30+
case "TCP":
31+
protoStr = "tcp"
32+
if isIPv6 {
33+
protoStr = "tcp6"
34+
}
35+
case "UDP":
36+
protoStr = "udp"
37+
if isIPv6 {
38+
protoStr = "udp6"
39+
}
40+
default:
41+
return fmt.Errorf("unsupported protocol %s", protocol)
42+
}
43+
44+
cookie := fmt.Sprintf("0x%x", util.UnderlaySvcLocalOpenFlowCookieV4)
45+
nwDst := "nw_dst"
46+
if isIPv6 {
47+
cookie = fmt.Sprintf("0x%x", util.UnderlaySvcLocalOpenFlowCookieV6)
48+
nwDst = "ipv6_dst"
49+
}
50+
51+
flow := fmt.Sprintf("cookie=%s,priority=%d,in_port=%d,%s,%s=%s,tp_dst=%d "+
52+
"actions=mod_dl_dst:%s,output:%d",
53+
cookie, util.UnderlaySvcLocalOpenFlowPriority, inPort, protoStr, nwDst, serviceIP, port, dstMac, outPort)
54+
55+
key := buildFlowKey(flowKindUnderlayService, serviceIP, port, protocol, "")
56+
c.setFlowCache(c.flowCache, bridgeName, key, []string{flow})
57+
58+
klog.V(5).Infof("updated underlay flow cache for service %s", key)
59+
c.requestFlowSync()
60+
return nil
61+
}
62+
63+
func (c *Controller) deleteUnderlaySubnetSvcLocalFlowCache(bridgeName, serviceIP string, port uint16, protocol string) {
64+
key := buildFlowKey(flowKindUnderlayService, serviceIP, port, protocol, "")
65+
66+
c.deleteFlowCache(c.flowCache, bridgeName, key)
67+
68+
klog.V(5).Infof("deleted underlay flow cache for service %s", key)
69+
c.requestFlowSync()
70+
}
71+
72+
func buildFlowKey(kind, ip string, port uint16, protocol, extra string) string {
73+
if extra == "" {
74+
return fmt.Sprintf("%s-%s-%s-%d", kind, ip, protocol, port)
75+
}
76+
return fmt.Sprintf("%s-%s-%s-%d-%s", kind, ip, protocol, port, extra)
77+
}
78+
79+
func (c *Controller) getPortID(bridgeName, portName string) (int, error) {
80+
if c.ovsClient == nil {
81+
return 0, errors.New("ovs client not initialized")
82+
}
83+
84+
portInfo, err := c.ovsClient.OpenFlow.DumpPort(bridgeName, portName)
85+
if err != nil {
86+
return 0, fmt.Errorf("failed to dump port %s on bridge %s: %w", portName, bridgeName, err)
87+
}
88+
return portInfo.PortID, nil
89+
}

pkg/daemon/flow_sync_linux.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package daemon
2+
3+
import (
4+
"time"
5+
6+
openvswitch "github.com/digitalocean/go-openvswitch/ovs"
7+
"k8s.io/apimachinery/pkg/util/sets"
8+
"k8s.io/klog/v2"
9+
10+
"github.com/kubeovn/kube-ovn/pkg/ovs"
11+
"github.com/kubeovn/kube-ovn/pkg/util"
12+
)
13+
14+
const flowSyncPeriod = 15 * time.Second
15+
16+
var managedFlowCookieSet = sets.New[uint64](
17+
util.UnderlaySvcLocalOpenFlowCookieV4,
18+
util.UnderlaySvcLocalOpenFlowCookieV6,
19+
)
20+
21+
func (c *Controller) requestFlowSync() {
22+
if c.flowChan == nil {
23+
util.LogFatalAndExit(nil, "flowChan is not initialized")
24+
}
25+
26+
select {
27+
case c.flowChan <- struct{}{}:
28+
klog.V(5).Info("OpenFlow sync requested")
29+
default:
30+
klog.V(5).Info("OpenFlow sync already requested")
31+
}
32+
}
33+
34+
func (c *Controller) syncFlows() {
35+
if c.ovsClient == nil {
36+
util.LogFatalAndExit(nil, "ovsClient is not initialized")
37+
}
38+
39+
flowCacheByBridge := c.storeFlowCache()
40+
41+
bridges, err := ovs.Bridges()
42+
if err != nil {
43+
klog.Errorf("failed to list bridges: %v", err)
44+
return
45+
}
46+
47+
for _, bridgeName := range bridges {
48+
existing, err := ovs.DumpFlows(c.ovsClient, bridgeName)
49+
if err != nil {
50+
klog.Errorf("failed to dump flows for bridge %s: %v", bridgeName, err)
51+
continue
52+
}
53+
54+
preserved := filterUnmanagedFlows(existing)
55+
cachedFlows := flowCacheByBridge[bridgeName]
56+
finalFlows := append(preserved, cachedFlows...)
57+
58+
if err := ovs.ReplaceFlows(bridgeName, finalFlows); err != nil {
59+
klog.Errorf("failed to replace flows for bridge %s: %v", bridgeName, err)
60+
continue
61+
}
62+
if len(cachedFlows) == 0 {
63+
klog.V(5).Infof("no cached flows for bridge %s", bridgeName)
64+
continue
65+
}
66+
klog.V(3).Infof("synced %d cached flows on bridge %s", len(cachedFlows), bridgeName)
67+
}
68+
}
69+
70+
func (c *Controller) storeFlowCache() map[string][]string {
71+
snapshot := make(map[string][]string)
72+
73+
c.flowCacheMutex.RLock()
74+
defer c.flowCacheMutex.RUnlock()
75+
76+
for bridgeName, entries := range c.flowCache {
77+
for _, flows := range entries {
78+
snapshot[bridgeName] = append(snapshot[bridgeName], flows...)
79+
}
80+
}
81+
82+
return snapshot
83+
}
84+
85+
func filterUnmanagedFlows(flows []string) []string {
86+
filtered := make([]string, 0, len(flows))
87+
for _, flow := range flows {
88+
if isManagedFlow(flow) {
89+
continue
90+
}
91+
filtered = append(filtered, flow)
92+
}
93+
return filtered
94+
}
95+
96+
func isManagedFlow(flow string) bool {
97+
var f openvswitch.Flow
98+
if err := f.UnmarshalText([]byte(flow)); err != nil {
99+
return false
100+
}
101+
return managedFlowCookieSet.Has(f.Cookie)
102+
}
103+
104+
func (c *Controller) runFlowSync(stopCh <-chan struct{}) {
105+
klog.Info("Starting OpenFlow sync loop")
106+
107+
ticker := time.NewTicker(flowSyncPeriod)
108+
defer ticker.Stop()
109+
for {
110+
select {
111+
case <-ticker.C:
112+
c.syncFlows()
113+
case <-c.flowChan:
114+
klog.V(5).Info("Immediate OpenFlow sync triggered")
115+
c.syncFlows()
116+
ticker.Reset(flowSyncPeriod)
117+
case <-stopCh:
118+
klog.Info("Stopping OpenFlow sync loop")
119+
return
120+
}
121+
}
122+
}
123+
124+
func (c *Controller) setFlowCache(cache map[string]map[string][]string, bridgeName, key string, flows []string) {
125+
c.flowCacheMutex.Lock()
126+
defer c.flowCacheMutex.Unlock()
127+
128+
if cache[bridgeName] == nil {
129+
cache[bridgeName] = make(map[string][]string)
130+
}
131+
cache[bridgeName][key] = flows
132+
}
133+
134+
func (c *Controller) deleteFlowCache(cache map[string]map[string][]string, bridgeName, key string) {
135+
c.flowCacheMutex.Lock()
136+
defer c.flowCacheMutex.Unlock()
137+
delete(cache[bridgeName], key)
138+
}

0 commit comments

Comments
 (0)