Skip to content

Commit 4c81d44

Browse files
authored
Merge pull request #6 from cozystack/fix/cilium-ipip-overlay
fix(encapsulation): route Cilium IPIP traffic through VxLAN overlay
2 parents b727cfc + 9c65a24 commit 4c81d44

File tree

11 files changed

+163
-86
lines changed

11 files changed

+163
-86
lines changed

pkg/encapsulation/cilium.go

Lines changed: 70 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -17,95 +17,106 @@ package encapsulation
1717
import (
1818
"fmt"
1919
"net"
20-
"sync"
21-
22-
"github.com/vishvananda/netlink"
2320

21+
"github.com/cozystack/kilo/pkg/iproute"
2422
"github.com/cozystack/kilo/pkg/iptables"
2523
)
2624

27-
const ciliumDeviceName = "cilium_host"
25+
const ciliumHostIface = "cilium_host"
2826

2927
type cilium struct {
3028
iface int
3129
strategy Strategy
32-
ch chan netlink.LinkUpdate
33-
done chan struct{}
34-
// mu guards updates to the iface field.
35-
mu sync.Mutex
3630
}
3731

38-
// NewCilium returns an encapsulator that uses Cilium.
32+
// NewCilium returns an encapsulator that uses IPIP tunnels
33+
// routed through Cilium's VxLAN overlay.
3934
func NewCilium(strategy Strategy) Encapsulator {
40-
return &cilium{
41-
ch: make(chan netlink.LinkUpdate),
42-
done: make(chan struct{}),
43-
strategy: strategy,
44-
}
35+
return &cilium{strategy: strategy}
4536
}
4637

47-
// CleanUp close done channel
48-
func (f *cilium) CleanUp() error {
49-
close(f.done)
50-
return nil
38+
// CleanUp will remove any created IPIP devices.
39+
func (c *cilium) CleanUp() error {
40+
if err := iproute.DeleteAddresses(c.iface); err != nil {
41+
return err
42+
}
43+
return iproute.RemoveInterface(c.iface)
5144
}
5245

5346
// Gw returns the correct gateway IP associated with the given node.
54-
func (f *cilium) Gw(_, _ net.IP, subnet *net.IPNet) net.IP {
47+
// It returns the Cilium internal IP so that the IPIP outer packets are routed
48+
// through Cilium's VxLAN overlay rather than the host network.
49+
func (c *cilium) Gw(_, _, ciliumIP net.IP, subnet *net.IPNet) net.IP {
50+
if ciliumIP != nil {
51+
return ciliumIP
52+
}
5553
return subnet.IP
5654
}
5755

58-
// Index returns the index of the Cilium interface.
59-
func (f *cilium) Index() int {
60-
f.mu.Lock()
61-
defer f.mu.Unlock()
62-
return f.iface
63-
}
64-
65-
// Init finds the Cilium interface index.
66-
func (f *cilium) Init(_ int) error {
67-
if err := netlink.LinkSubscribe(f.ch, f.done); err != nil {
68-
return fmt.Errorf("failed to subscribe to updates to %s: %v", ciliumDeviceName, err)
69-
}
70-
go func() {
71-
var lu netlink.LinkUpdate
72-
for {
73-
select {
74-
case lu = <-f.ch:
75-
if lu.Attrs().Name == ciliumDeviceName {
76-
f.mu.Lock()
77-
f.iface = lu.Attrs().Index
78-
f.mu.Unlock()
79-
}
80-
case <-f.done:
81-
return
82-
}
83-
}
84-
}()
85-
i, err := netlink.LinkByName(ciliumDeviceName)
86-
if _, ok := err.(netlink.LinkNotFoundError); ok {
56+
// LocalIP returns the IP address of the cilium_host interface.
57+
// This IP is advertised to other nodes so they can route IPIP outer
58+
// packets through Cilium's overlay.
59+
func (c *cilium) LocalIP() net.IP {
60+
iface, err := net.InterfaceByName(ciliumHostIface)
61+
if err != nil {
8762
return nil
8863
}
64+
addrs, err := iface.Addrs()
8965
if err != nil {
90-
return fmt.Errorf("failed to query for Cilium interface: %v", err)
66+
return nil
67+
}
68+
for _, a := range addrs {
69+
if ipNet, ok := a.(*net.IPNet); ok && ipNet.IP.To4() != nil {
70+
return ipNet.IP
71+
}
9172
}
92-
f.mu.Lock()
93-
f.iface = i.Attrs().Index
94-
f.mu.Unlock()
9573
return nil
9674
}
9775

98-
// Rules is a no-op.
99-
func (f *cilium) Rules(_ []*net.IPNet) iptables.RuleSet {
100-
return iptables.RuleSet{}
76+
// Index returns the index of the IPIP tunnel interface.
77+
func (c *cilium) Index() int {
78+
return c.iface
10179
}
10280

103-
// Set is a no-op.
104-
func (f *cilium) Set(_ *net.IPNet) error {
81+
// Init initializes the IPIP tunnel interface.
82+
func (c *cilium) Init(base int) error {
83+
iface, err := iproute.NewIPIP(base)
84+
if err != nil {
85+
return fmt.Errorf("failed to create tunnel interface: %v", err)
86+
}
87+
if err := iproute.Set(iface, true); err != nil {
88+
return fmt.Errorf("failed to set tunnel interface up: %v", err)
89+
}
90+
c.iface = iface
10591
return nil
10692
}
10793

94+
// Rules returns a set of iptables rules that are necessary
95+
// when traffic between nodes must be encapsulated.
96+
func (c *cilium) Rules(nodes []*net.IPNet) iptables.RuleSet {
97+
rules := iptables.RuleSet{}
98+
proto := ipipProtocolName()
99+
rules.AddToAppend(iptables.NewIPv4Chain("filter", "KILO-IPIP"))
100+
rules.AddToAppend(iptables.NewIPv6Chain("filter", "KILO-IPIP"))
101+
rules.AddToAppend(iptables.NewIPv4Rule("filter", "INPUT", "-p", proto, "-m", "comment", "--comment", "Kilo: jump to IPIP chain", "-j", "KILO-IPIP"))
102+
rules.AddToAppend(iptables.NewIPv6Rule("filter", "INPUT", "-p", proto, "-m", "comment", "--comment", "Kilo: jump to IPIP chain", "-j", "KILO-IPIP"))
103+
for _, n := range nodes {
104+
// Accept encapsulated traffic from peers.
105+
rules.AddToPrepend(iptables.NewRule(iptables.GetProtocol(n.IP), "filter", "KILO-IPIP", "-s", n.String(), "-m", "comment", "--comment", "Kilo: allow IPIP traffic", "-j", "ACCEPT"))
106+
}
107+
// Drop all other IPIP traffic.
108+
rules.AddToAppend(iptables.NewIPv4Rule("filter", "INPUT", "-p", proto, "-m", "comment", "--comment", "Kilo: reject other IPIP traffic", "-j", "DROP"))
109+
rules.AddToAppend(iptables.NewIPv6Rule("filter", "INPUT", "-p", proto, "-m", "comment", "--comment", "Kilo: reject other IPIP traffic", "-j", "DROP"))
110+
111+
return rules
112+
}
113+
114+
// Set sets the IP address of the IPIP tunnel interface.
115+
func (c *cilium) Set(cidr *net.IPNet) error {
116+
return iproute.SetAddress(c.iface, cidr)
117+
}
118+
108119
// Strategy returns the configured strategy for encapsulation.
109-
func (f *cilium) Strategy() Strategy {
110-
return f.strategy
120+
func (c *cilium) Strategy() Strategy {
121+
return c.strategy
111122
}

pkg/encapsulation/encapsulation.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,12 @@ const (
4646
// * clean up any changes applied to the backend.
4747
type Encapsulator interface {
4848
CleanUp() error
49-
Gw(net.IP, net.IP, *net.IPNet) net.IP
49+
Gw(net.IP, net.IP, net.IP, *net.IPNet) net.IP
5050
Index() int
5151
Init(int) error
52+
// LocalIP returns the local overlay IP that should be advertised
53+
// to other nodes. For Cilium, this is the IP of the cilium_host interface.
54+
LocalIP() net.IP
5255
Rules([]*net.IPNet) iptables.RuleSet
5356
Set(*net.IPNet) error
5457
Strategy() Strategy

pkg/encapsulation/flannel.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,15 @@ func (f *flannel) CleanUp() error {
5050
}
5151

5252
// Gw returns the correct gateway IP associated with the given node.
53-
func (f *flannel) Gw(_, _ net.IP, subnet *net.IPNet) net.IP {
53+
func (f *flannel) Gw(_, _, _ net.IP, subnet *net.IPNet) net.IP {
5454
return subnet.IP
5555
}
5656

57+
// LocalIP is a no-op for Flannel.
58+
func (f *flannel) LocalIP() net.IP {
59+
return nil
60+
}
61+
5762
// Index returns the index of the Flannel interface.
5863
func (f *flannel) Index() int {
5964
f.mu.Lock()

pkg/encapsulation/ipip.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,15 @@ func (i *ipip) CleanUp() error {
4141
}
4242

4343
// Gw returns the correct gateway IP associated with the given node.
44-
func (i *ipip) Gw(_, internal net.IP, _ *net.IPNet) net.IP {
44+
func (i *ipip) Gw(_, internal, _ net.IP, _ *net.IPNet) net.IP {
4545
return internal
4646
}
4747

48+
// LocalIP is a no-op for IPIP.
49+
func (i *ipip) LocalIP() net.IP {
50+
return nil
51+
}
52+
4853
// Index returns the index of the IPIP interface.
4954
func (i *ipip) Index() int {
5055
return i.iface

pkg/encapsulation/noop.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,12 @@ func (n Noop) CleanUp() error {
2929
}
3030

3131
// Gw will also do nothing.
32-
func (n Noop) Gw(_ net.IP, _ net.IP, _ *net.IPNet) net.IP {
32+
func (n Noop) Gw(_, _, _ net.IP, _ *net.IPNet) net.IP {
33+
return nil
34+
}
35+
36+
// LocalIP will also do nothing.
37+
func (n Noop) LocalIP() net.IP {
3338
return nil
3439
}
3540

pkg/k8s/backend.go

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,20 +49,21 @@ import (
4949

5050
const (
5151
// Backend is the name of this mesh backend.
52-
Backend = "kubernetes"
53-
endpointAnnotationKey = "kilo.squat.ai/endpoint"
54-
forceEndpointAnnotationKey = "kilo.squat.ai/force-endpoint"
55-
forceInternalIPAnnotationKey = "kilo.squat.ai/force-internal-ip"
56-
internalIPAnnotationKey = "kilo.squat.ai/internal-ip"
57-
keyAnnotationKey = "kilo.squat.ai/key"
58-
lastSeenAnnotationKey = "kilo.squat.ai/last-seen"
59-
leaderAnnotationKey = "kilo.squat.ai/leader"
60-
locationAnnotationKey = "kilo.squat.ai/location"
61-
persistentKeepaliveKey = "kilo.squat.ai/persistent-keepalive"
62-
wireGuardIPAnnotationKey = "kilo.squat.ai/wireguard-ip"
63-
discoveredEndpointsKey = "kilo.squat.ai/discovered-endpoints"
64-
allowedLocationIPsKey = "kilo.squat.ai/allowed-location-ips"
65-
granularityKey = "kilo.squat.ai/granularity"
52+
Backend = "kubernetes"
53+
endpointAnnotationKey = "kilo.squat.ai/endpoint"
54+
forceEndpointAnnotationKey = "kilo.squat.ai/force-endpoint"
55+
forceInternalIPAnnotationKey = "kilo.squat.ai/force-internal-ip"
56+
internalIPAnnotationKey = "kilo.squat.ai/internal-ip"
57+
keyAnnotationKey = "kilo.squat.ai/key"
58+
lastSeenAnnotationKey = "kilo.squat.ai/last-seen"
59+
leaderAnnotationKey = "kilo.squat.ai/leader"
60+
locationAnnotationKey = "kilo.squat.ai/location"
61+
persistentKeepaliveKey = "kilo.squat.ai/persistent-keepalive"
62+
wireGuardIPAnnotationKey = "kilo.squat.ai/wireguard-ip"
63+
discoveredEndpointsKey = "kilo.squat.ai/discovered-endpoints"
64+
allowedLocationIPsKey = "kilo.squat.ai/allowed-location-ips"
65+
granularityKey = "kilo.squat.ai/granularity"
66+
ciliumInternalIPAnnotationKey = "kilo.squat.ai/cilium-internal-ip"
6667
// RegionLabelKey is the key for the well-known Kubernetes topology region label.
6768
RegionLabelKey = "topology.kubernetes.io/region"
6869
jsonPatchSlash = "~1"
@@ -241,6 +242,11 @@ func (nb *nodeBackend) Set(ctx context.Context, name string, node *mesh.Node) er
241242
n.ObjectMeta.Annotations[discoveredEndpointsKey] = string(discoveredEndpoints)
242243
}
243244
n.ObjectMeta.Annotations[granularityKey] = string(node.Granularity)
245+
if node.CiliumInternalIP != nil {
246+
n.ObjectMeta.Annotations[ciliumInternalIPAnnotationKey] = node.CiliumInternalIP.String()
247+
} else {
248+
n.ObjectMeta.Annotations[ciliumInternalIPAnnotationKey] = ""
249+
}
244250
oldData, err := json.Marshal(old)
245251
if err != nil {
246252
return err
@@ -342,6 +348,12 @@ func translateNode(node *v1.Node, topologyLabel string) *mesh.Node {
342348
// TODO log some error or warning.
343349
key, _ := wgtypes.ParseKey(node.ObjectMeta.Annotations[keyAnnotationKey])
344350

351+
// Parse the Cilium internal IP if present.
352+
var ciliumInternalIP net.IP
353+
if cipStr, ok := node.ObjectMeta.Annotations[ciliumInternalIPAnnotationKey]; ok && cipStr != "" {
354+
ciliumInternalIP = net.ParseIP(cipStr)
355+
}
356+
345357
return &mesh.Node{
346358
// Endpoint and InternalIP should only ever fail to parse if the
347359
// remote node's agent has not yet set its IP address;
@@ -352,6 +364,7 @@ func translateNode(node *v1.Node, topologyLabel string) *mesh.Node {
352364
Endpoint: endpoint,
353365
NoInternalIP: noInternalIP,
354366
InternalIP: internalIP,
367+
CiliumInternalIP: ciliumInternalIP,
355368
Key: key,
356369
LastSeen: lastSeen,
357370
Leader: leader,

pkg/mesh/backend.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,11 @@ const (
5757

5858
// Node represents a node in the network.
5959
type Node struct {
60-
Endpoint *wireguard.Endpoint
61-
Key wgtypes.Key
62-
NoInternalIP bool
63-
InternalIP *net.IPNet
60+
Endpoint *wireguard.Endpoint
61+
Key wgtypes.Key
62+
NoInternalIP bool
63+
InternalIP *net.IPNet
64+
CiliumInternalIP net.IP
6465
// LastSeen is a Unix time for the last time
6566
// the node confirmed it was live.
6667
LastSeen int64

pkg/mesh/mesh.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,7 @@ func (m *Mesh) handleLocal(ctx context.Context, n *Node) {
412412
Key: m.pub,
413413
NoInternalIP: n.NoInternalIP,
414414
InternalIP: n.InternalIP,
415+
CiliumInternalIP: m.enc.LocalIP(),
415416
LastSeen: time.Now().Unix(),
416417
Leader: n.Leader,
417418
Location: n.Location,
@@ -699,6 +700,7 @@ func nodesAreEqual(a, b *Node) bool {
699700
return a.Key.String() == b.Key.String() &&
700701
ipNetsEqual(a.WireGuardIP, b.WireGuardIP) &&
701702
ipNetsEqual(a.InternalIP, b.InternalIP) &&
703+
a.CiliumInternalIP.Equal(b.CiliumInternalIP) &&
702704
a.Leader == b.Leader &&
703705
a.Location == b.Location &&
704706
a.Name == b.Name &&

pkg/mesh/routes.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (t *Topology) Routes(kiloIfaceName string, kiloIface, privIface, tunlIface
4040
var gw net.IP
4141
for _, segment := range t.segments {
4242
if segment.location == t.location {
43-
gw = enc.Gw(t.updateEndpoint(segment.endpoint, segment.key, &segment.persistentKeepalive).IP(), segment.privateIPs[segment.leader], segment.cidrs[segment.leader])
43+
gw = enc.Gw(t.updateEndpoint(segment.endpoint, segment.key, &segment.persistentKeepalive).IP(), segment.privateIPs[segment.leader], segment.ciliumInternalIPs[segment.leader], segment.cidrs[segment.leader])
4444
break
4545
}
4646
}
@@ -61,10 +61,11 @@ func (t *Topology) Routes(kiloIfaceName string, kiloIface, privIface, tunlIface
6161
if segment.privateIPs[i].Equal(t.privateIP.IP) {
6262
continue
6363
}
64+
nodeGw := enc.Gw(nil, segment.privateIPs[i], segment.ciliumInternalIPs[i], segment.cidrs[i])
6465
routes = append(routes, encapsulateRoute(&netlink.Route{
6566
Dst: segment.cidrs[i],
6667
Flags: int(netlink.FLAG_ONLINK),
67-
Gw: segment.privateIPs[i],
68+
Gw: nodeGw,
6869
LinkIndex: privIface,
6970
Protocol: unix.RTPROT_STATIC,
7071
}, enc.Strategy(), t.privateIP, tunlIface))
@@ -74,7 +75,7 @@ func (t *Topology) Routes(kiloIfaceName string, kiloIface, privIface, tunlIface
7475
routes = append(routes, &netlink.Route{
7576
Dst: oneAddressCIDR(segment.privateIPs[i]),
7677
Flags: int(netlink.FLAG_ONLINK),
77-
Gw: segment.privateIPs[i],
78+
Gw: nodeGw,
7879
LinkIndex: tunlIface,
7980
Src: t.privateIP.IP,
8081
Protocol: unix.RTPROT_STATIC,
@@ -155,10 +156,11 @@ func (t *Topology) Routes(kiloIfaceName string, kiloIface, privIface, tunlIface
155156
if segment.privateIPs[i].Equal(t.privateIP.IP) {
156157
continue
157158
}
159+
nodeGw := enc.Gw(nil, segment.privateIPs[i], segment.ciliumInternalIPs[i], segment.cidrs[i])
158160
routes = append(routes, encapsulateRoute(&netlink.Route{
159161
Dst: segment.cidrs[i],
160162
Flags: int(netlink.FLAG_ONLINK),
161-
Gw: segment.privateIPs[i],
163+
Gw: nodeGw,
162164
LinkIndex: privIface,
163165
Protocol: unix.RTPROT_STATIC,
164166
}, enc.Strategy(), t.privateIP, tunlIface))
@@ -168,7 +170,7 @@ func (t *Topology) Routes(kiloIfaceName string, kiloIface, privIface, tunlIface
168170
routes = append(routes, &netlink.Route{
169171
Dst: oneAddressCIDR(segment.privateIPs[i]),
170172
Flags: int(netlink.FLAG_ONLINK),
171-
Gw: segment.privateIPs[i],
173+
Gw: nodeGw,
172174
LinkIndex: tunlIface,
173175
Src: t.privateIP.IP,
174176
Protocol: unix.RTPROT_STATIC,

0 commit comments

Comments
 (0)