Skip to content

Commit ca1dfb1

Browse files
committed
add openflow sync refer to ovn-kubernetes
Signed-off-by: clyi <clyi@alauda.io>
1 parent dabfbb9 commit ca1dfb1

File tree

6 files changed

+323
-122
lines changed

6 files changed

+323
-122
lines changed

pkg/daemon/controller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -706,6 +706,9 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
706706
}
707707
}
708708

709+
// Start OpenFlow sync loop
710+
go c.runFlowSync(stopCh)
711+
709712
<-stopCh
710713
klog.Info("Shutting down workers")
711714
}

pkg/daemon/controller_linux.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"reflect"
1111
"slices"
1212
"strings"
13+
"sync"
1314
"syscall"
1415

1516
ovsutil "github.com/digitalocean/go-openvswitch/ovs"
@@ -49,6 +50,10 @@ type ControllerRuntime struct {
4950

5051
nmSyncer *networkManagerSyncer
5152
ovsClient *ovsutil.Client
53+
54+
flowCache map[string]map[string][]string // key: bridgeName -> flowKey -> flow rules
55+
flowCacheMutex sync.RWMutex
56+
flowChan chan struct{} // channel to trigger immediate flow sync
5257
}
5358

5459
type LbServiceRules struct {
@@ -103,6 +108,10 @@ func (c *Controller) initRuntime() error {
103108
c.k8sipsets = k8sipset.New(c.k8sExec)
104109
c.ovsClient = ovsutil.New()
105110

111+
// Initialize OpenFlow flow cache (ovn-kubernetes style)
112+
c.flowCache = make(map[string]map[string][]string)
113+
c.flowChan = make(chan struct{}, 1)
114+
106115
if c.protocol == kubeovnv1.ProtocolIPv4 || c.protocol == kubeovnv1.ProtocolDual {
107116
ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv4)
108117
if err != nil {
@@ -493,18 +502,16 @@ func (c *Controller) reconcileServices(event *serviceEvent) error {
493502
if len(lbServiceRulesToAdd) > 0 {
494503
for _, rule := range lbServiceRulesToAdd {
495504
klog.Infof("Adding LB service rule: %+v", rule)
496-
if err := ovs.AddOrUpdateUnderlaySubnetSvcLocalOpenFlow(c.ovsClient, rule.BridgeName, rule.IP, rule.Protocol, rule.DstMac, rule.UnderlayNic, rule.Port); err != nil {
497-
klog.Errorf("failed to add or update underlay subnet svc local openflow: %v", err)
505+
if err := c.AddOrUpdateUnderlaySubnetSvcLocalFlowCache(rule.IP, rule.Port, rule.Protocol, rule.DstMac, rule.UnderlayNic, rule.BridgeName); err != nil {
506+
klog.Errorf("failed to update underlay subnet svc local openflow cache: %v", err)
498507
}
499508
}
500509
}
501510

502511
if len(lbServiceRulesToDel) > 0 {
503512
for _, rule := range lbServiceRulesToDel {
504513
klog.Infof("Delete LB service rule: %+v", rule)
505-
if err := ovs.DeleteUnderlaySubnetSvcLocalOpenFlow(c.ovsClient, rule.BridgeName, rule.IP, rule.Protocol, rule.UnderlayNic, rule.Port); err != nil {
506-
klog.Errorf("failed to delete underlay subnet svc local openflow: %v", err)
507-
}
514+
c.deleteUnderlaySubnetSvcLocalFlowCache(rule.BridgeName, rule.IP, rule.Port, rule.Protocol)
508515
}
509516
}
510517

pkg/daemon/controller_windows.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ func (c *Controller) initRuntime() error {
2525
return nil
2626
}
2727

28+
func (c *Controller) syncFlows() {}
29+
30+
func (c *Controller) runFlowSync(stopCh <-chan struct{}) {
31+
<-stopCh
32+
}
33+
2834
func (c *Controller) reconcileServices(_ *serviceEvent) error {
2935
return nil
3036
}

pkg/daemon/flow_rules.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package daemon
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
7+
"k8s.io/klog/v2"
8+
9+
kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
10+
"github.com/kubeovn/kube-ovn/pkg/util"
11+
)
12+
13+
const flowKindUnderlayService = "usvc"
14+
15+
func (c *Controller) AddOrUpdateUnderlaySubnetSvcLocalFlowCache(serviceIP string, port uint16, protocol, dstMac, underlayNic, bridgeName string) error {
16+
inPort, err := c.getPortID(bridgeName, underlayNic)
17+
if err != nil {
18+
return err
19+
}
20+
21+
outPort, err := c.getPortID(bridgeName, "patch-localnet.")
22+
if err != nil {
23+
return err
24+
}
25+
26+
isIPv6 := util.CheckProtocol(serviceIP) == kubeovnv1.ProtocolIPv6
27+
protoStr := ""
28+
switch strings.ToUpper(protocol) {
29+
case "TCP":
30+
protoStr = "tcp"
31+
if isIPv6 {
32+
protoStr = "tcp6"
33+
}
34+
case "UDP":
35+
protoStr = "udp"
36+
if isIPv6 {
37+
protoStr = "udp6"
38+
}
39+
default:
40+
return fmt.Errorf("unsupported protocol %s", protocol)
41+
}
42+
43+
cookie := fmt.Sprintf("0x%x", util.UnderlaySvcLocalOpenFlowCookieV4)
44+
nwDst := "nw_dst"
45+
if isIPv6 {
46+
cookie = fmt.Sprintf("0x%x", util.UnderlaySvcLocalOpenFlowCookieV6)
47+
nwDst = "ipv6_dst"
48+
}
49+
50+
flow := fmt.Sprintf("cookie=%s,priority=%d,in_port=%d,%s,%s=%s,tp_dst=%d "+
51+
"actions=mod_dl_dst:%s,output:%d",
52+
cookie, util.UnderlaySvcLocalOpenFlowPriority, inPort, protoStr, nwDst, serviceIP, port, dstMac, outPort)
53+
54+
key := buildFlowKey(flowKindUnderlayService, serviceIP, port, protocol, "")
55+
c.setFlowCache(c.flowCache, bridgeName, key, []string{flow})
56+
57+
klog.V(5).Infof("updated underlay flow cache for service %s", key)
58+
c.requestFlowSync()
59+
return nil
60+
}
61+
62+
func (c *Controller) deleteUnderlaySubnetSvcLocalFlowCache(bridgeName, serviceIP string, port uint16, protocol string) {
63+
key := buildFlowKey(flowKindUnderlayService, serviceIP, port, protocol, "")
64+
65+
c.deleteFlowCache(c.flowCache, bridgeName, key)
66+
67+
klog.V(5).Infof("deleted underlay flow cache for service %s", key)
68+
c.requestFlowSync()
69+
}
70+
71+
func buildFlowKey(kind, ip string, port uint16, protocol, extra string) string {
72+
if extra == "" {
73+
return fmt.Sprintf("%s-%s-%s-%d", kind, ip, protocol, port)
74+
}
75+
return fmt.Sprintf("%s-%s-%s-%d-%s", kind, ip, protocol, port, extra)
76+
}

pkg/daemon/flow_sync_linux.go

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
package daemon
2+
3+
import (
4+
"fmt"
5+
"strconv"
6+
"strings"
7+
"time"
8+
9+
"k8s.io/klog/v2"
10+
11+
"github.com/kubeovn/kube-ovn/pkg/ovs"
12+
"github.com/kubeovn/kube-ovn/pkg/util"
13+
)
14+
15+
const flowSyncPeriod = 15 * time.Second
16+
17+
var managedFlowCookieSet = map[uint64]struct{}{
18+
util.UnderlaySvcLocalOpenFlowCookieV4: {},
19+
util.UnderlaySvcLocalOpenFlowCookieV6: {},
20+
}
21+
22+
func (c *Controller) requestFlowSync() {
23+
if c.flowChan == nil {
24+
return
25+
}
26+
27+
select {
28+
case c.flowChan <- struct{}{}:
29+
klog.V(5).Info("OpenFlow sync requested")
30+
default:
31+
klog.V(5).Info("OpenFlow sync already requested")
32+
}
33+
}
34+
35+
func (c *Controller) syncFlows() {
36+
if c.ovsClient == nil {
37+
return
38+
}
39+
40+
flowsByBridge := c.storeFlowCache()
41+
42+
bridges, err := ovs.Bridges()
43+
if err != nil {
44+
klog.Errorf("failed to list bridges: %v", err)
45+
return
46+
}
47+
48+
for _, bridgeName := range bridges {
49+
existing, err := ovs.DumpFlows(c.ovsClient, bridgeName)
50+
if err != nil {
51+
klog.Errorf("failed to dump flows for bridge %s: %v", bridgeName, err)
52+
continue
53+
}
54+
55+
preserved := filterUnmanagedFlows(existing)
56+
managed := flowsByBridge[bridgeName]
57+
merged := append(preserved, managed...)
58+
59+
if err := ovs.ReplaceFlows(bridgeName, merged); err != nil {
60+
klog.Errorf("failed to replace flows for bridge %s: %v", bridgeName, err)
61+
continue
62+
}
63+
if len(managed) == 0 {
64+
klog.V(5).Infof("no managed flows for bridge %s", bridgeName)
65+
continue
66+
}
67+
klog.V(3).Infof("synced %d managed flows on bridge %s", len(managed), bridgeName)
68+
}
69+
}
70+
71+
func (c *Controller) storeFlowCache() map[string][]string {
72+
snapshot := make(map[string][]string)
73+
74+
c.flowCacheMutex.RLock()
75+
defer c.flowCacheMutex.RUnlock()
76+
77+
appendFlowCache(snapshot, c.flowCache)
78+
79+
return snapshot
80+
}
81+
82+
func appendFlowCache(dst map[string][]string, src map[string]map[string][]string) {
83+
for bridgeName, entries := range src {
84+
if len(entries) == 0 {
85+
if _, ok := dst[bridgeName]; !ok {
86+
dst[bridgeName] = nil
87+
}
88+
continue
89+
}
90+
for _, flows := range entries {
91+
if len(flows) == 0 {
92+
if _, ok := dst[bridgeName]; !ok {
93+
dst[bridgeName] = nil
94+
}
95+
continue
96+
}
97+
dst[bridgeName] = append(dst[bridgeName], flows...)
98+
}
99+
}
100+
}
101+
102+
func filterUnmanagedFlows(flows []string) []string {
103+
filtered := make([]string, 0, len(flows))
104+
for _, flow := range flows {
105+
if isManagedFlow(flow) {
106+
continue
107+
}
108+
filtered = append(filtered, flow)
109+
}
110+
return filtered
111+
}
112+
113+
func isManagedFlow(flow string) bool {
114+
cookie, ok := extractFlowCookie(flow)
115+
if !ok {
116+
return false
117+
}
118+
_, exists := managedFlowCookieSet[cookie]
119+
return exists
120+
}
121+
122+
func extractFlowCookie(flow string) (uint64, bool) {
123+
idx := strings.Index(flow, "cookie=")
124+
if idx == -1 {
125+
return 0, false
126+
}
127+
cookieField := flow[idx+len("cookie="):]
128+
if comma := strings.Index(cookieField, ","); comma != -1 {
129+
cookieField = cookieField[:comma]
130+
}
131+
if slash := strings.Index(cookieField, "/"); slash != -1 {
132+
cookieField = cookieField[:slash]
133+
}
134+
cookieField = strings.TrimSpace(cookieField)
135+
if cookieField == "" {
136+
return 0, false
137+
}
138+
139+
cookie, err := parseHexUint64(cookieField)
140+
if err != nil {
141+
return 0, false
142+
}
143+
return cookie, true
144+
}
145+
146+
func parseHexUint64(value string) (uint64, error) {
147+
if !strings.HasPrefix(value, "0x") {
148+
value = "0x" + value
149+
}
150+
return strconv.ParseUint(value, 0, 64)
151+
}
152+
153+
func (c *Controller) runFlowSync(stopCh <-chan struct{}) {
154+
klog.Info("Starting OpenFlow sync loop")
155+
156+
ticker := time.NewTicker(flowSyncPeriod)
157+
defer ticker.Stop()
158+
for {
159+
select {
160+
case <-ticker.C:
161+
c.syncFlows()
162+
case <-c.flowChan:
163+
klog.V(5).Info("Immediate OpenFlow sync triggered")
164+
c.syncFlows()
165+
ticker.Reset(flowSyncPeriod)
166+
case <-stopCh:
167+
klog.Info("Stopping OpenFlow sync loop")
168+
return
169+
}
170+
}
171+
}
172+
173+
func (c *Controller) getPortID(bridgeName, portName string) (int, error) {
174+
if c.ovsClient == nil {
175+
return 0, fmt.Errorf("ovs client not initialized")
176+
}
177+
178+
portInfo, err := c.ovsClient.OpenFlow.DumpPort(bridgeName, portName)
179+
if err != nil {
180+
return 0, fmt.Errorf("failed to dump port %s on bridge %s: %w", portName, bridgeName, err)
181+
}
182+
return portInfo.PortID, nil
183+
}
184+
185+
func (c *Controller) setFlowCache(cache map[string]map[string][]string, bridgeName, key string, flows []string) {
186+
c.flowCacheMutex.Lock()
187+
defer c.flowCacheMutex.Unlock()
188+
189+
if cache[bridgeName] == nil {
190+
cache[bridgeName] = make(map[string][]string)
191+
}
192+
cache[bridgeName][key] = flows
193+
}
194+
195+
func (c *Controller) deleteFlowCache(cache map[string]map[string][]string, bridgeName, key string) {
196+
c.flowCacheMutex.Lock()
197+
defer c.flowCacheMutex.Unlock()
198+
199+
entries := cache[bridgeName]
200+
if entries == nil {
201+
return
202+
}
203+
delete(entries, key)
204+
}

0 commit comments

Comments
 (0)