Skip to content

Commit d4cbbd6

Browse files
committed
[CORE-10981] Move bandwidth code to pkg/bandwidth
Move bandwidth functionality to pkg/bandwidth, make it an importable module. Import it and use it in plugins/meta/bandwidth/main.go. Add a qdiscType arg to CreateEgressQdisc() so that both "ingress" and "clsact" qdiscs may be used ("ingress" is the default used by the bandwidth plugin). All of this has the objective of enabling Calico to apply the netlink qdisc configuration natively by importing this module.
1 parent dbad2e0 commit d4cbbd6

File tree

3 files changed

+207
-173
lines changed

3 files changed

+207
-173
lines changed

pkg/bandwidth/bandwidth.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
// Copyright 2025 CNI authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package bandwidth
16+
17+
import (
18+
"fmt"
19+
"math"
20+
21+
"github.com/containernetworking/cni/pkg/types"
22+
current "github.com/containernetworking/cni/pkg/types/100"
23+
"github.com/containernetworking/plugins/pkg/ip"
24+
"github.com/containernetworking/plugins/pkg/ns"
25+
"github.com/containernetworking/plugins/pkg/utils"
26+
"github.com/vishvananda/netlink"
27+
)
28+
29+
const (
30+
MaxIfbDeviceLength = 15
31+
IfbDevicePrefix = "bwp"
32+
)
33+
34+
// BandwidthEntry corresponds to a single entry in the bandwidth argument,
35+
// see CONVENTIONS.md
36+
type BandwidthEntry struct {
37+
IngressRate uint64 `json:"ingressRate"` // Bandwidth rate in bps for traffic through container. 0 for no limit. If ingressRate is set, ingressBurst must also be set
38+
IngressBurst uint64 `json:"ingressBurst"` // Bandwidth burst in bits for traffic through container. 0 for no limit. If ingressBurst is set, ingressRate must also be set
39+
40+
EgressRate uint64 `json:"egressRate"` // Bandwidth rate in bps for traffic through container. 0 for no limit. If egressRate is set, egressBurst must also be set
41+
EgressBurst uint64 `json:"egressBurst"` // Bandwidth burst in bits for traffic through container. 0 for no limit. If egressBurst is set, egressRate must also be set
42+
}
43+
44+
func (bw *BandwidthEntry) IsZero() bool {
45+
return bw.IngressBurst == 0 && bw.IngressRate == 0 && bw.EgressBurst == 0 && bw.EgressRate == 0
46+
}
47+
48+
type PluginConf struct {
49+
types.NetConf
50+
51+
RuntimeConfig struct {
52+
Bandwidth *BandwidthEntry `json:"bandwidth,omitempty"`
53+
} `json:"runtimeConfig,omitempty"`
54+
55+
*BandwidthEntry
56+
}
57+
58+
func SafeQdiscList(link netlink.Link) ([]netlink.Qdisc, error) {
59+
qdiscs, err := netlink.QdiscList(link)
60+
if err != nil {
61+
return nil, err
62+
}
63+
result := []netlink.Qdisc{}
64+
for _, qdisc := range qdiscs {
65+
// filter out pfifo_fast qdiscs because
66+
// older kernels don't return them
67+
_, pfifo := qdisc.(*netlink.PfifoFast)
68+
if !pfifo {
69+
result = append(result, qdisc)
70+
}
71+
}
72+
return result, nil
73+
}
74+
75+
func ValidateRateAndBurst(rate, burst uint64) error {
76+
switch {
77+
case burst == 0 && rate != 0:
78+
return fmt.Errorf("if rate is set, burst must also be set")
79+
case rate == 0 && burst != 0:
80+
return fmt.Errorf("if burst is set, rate must also be set")
81+
case burst/8 >= math.MaxUint32:
82+
return fmt.Errorf("burst cannot be more than 4GB")
83+
}
84+
85+
return nil
86+
}
87+
88+
func GetBandwidth(conf *PluginConf) *BandwidthEntry {
89+
if conf.BandwidthEntry == nil && conf.RuntimeConfig.Bandwidth != nil {
90+
return conf.RuntimeConfig.Bandwidth
91+
}
92+
return conf.BandwidthEntry
93+
}
94+
95+
func GetIfbDeviceName(networkName string, containerID string) string {
96+
return utils.MustFormatHashWithPrefix(MaxIfbDeviceLength, IfbDevicePrefix, networkName+containerID)
97+
}
98+
99+
func GetMTU(deviceName string) (int, error) {
100+
link, err := netlink.LinkByName(deviceName)
101+
if err != nil {
102+
return -1, err
103+
}
104+
105+
return link.Attrs().MTU, nil
106+
}
107+
108+
// get the veth peer of container interface in host namespace
109+
func GetHostInterface(interfaces []*current.Interface, containerIfName string, netns ns.NetNS) (*current.Interface, error) {
110+
if len(interfaces) == 0 {
111+
return nil, fmt.Errorf("no interfaces provided")
112+
}
113+
114+
// get veth peer index of container interface
115+
var peerIndex int
116+
var err error
117+
_ = netns.Do(func(_ ns.NetNS) error {
118+
_, peerIndex, err = ip.GetVethPeerIfindex(containerIfName)
119+
return nil
120+
})
121+
if peerIndex <= 0 {
122+
return nil, fmt.Errorf("container interface %s has no veth peer: %v", containerIfName, err)
123+
}
124+
125+
// find host interface by index
126+
link, err := netlink.LinkByIndex(peerIndex)
127+
if err != nil {
128+
return nil, fmt.Errorf("veth peer with index %d is not in host ns", peerIndex)
129+
}
130+
for _, iface := range interfaces {
131+
if iface.Sandbox == "" && iface.Name == link.Attrs().Name {
132+
return iface, nil
133+
}
134+
}
135+
136+
return nil, fmt.Errorf("no veth peer of container interface found in host ns")
137+
}
Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package main
15+
package bandwidth
1616

1717
import (
1818
"fmt"
@@ -24,7 +24,7 @@ import (
2424
"github.com/vishvananda/netlink"
2525
)
2626

27-
const latencyInMillis = 25
27+
const LatencyInMillis = 25
2828

2929
func CreateIfb(ifbDeviceName string, mtu int) error {
3030
err := netlink.LinkAdd(&netlink.Ifb{
@@ -58,7 +58,7 @@ func CreateIngressQdisc(rateInBits, burstInBits uint64, hostDeviceName string) e
5858
return createTBF(rateInBits, burstInBits, hostDevice.Attrs().Index)
5959
}
6060

61-
func CreateEgressQdisc(rateInBits, burstInBits uint64, hostDeviceName string, ifbDeviceName string) error {
61+
func CreateEgressQdisc(qdiscType string, rateInBits, burstInBits uint64, hostDeviceName string, ifbDeviceName string) error {
6262
ifbDevice, err := netlink.LinkByName(ifbDeviceName)
6363
if err != nil {
6464
return fmt.Errorf("get ifb device: %s", err)
@@ -68,41 +68,52 @@ func CreateEgressQdisc(rateInBits, burstInBits uint64, hostDeviceName string, if
6868
return fmt.Errorf("get host device: %s", err)
6969
}
7070

71-
// add qdisc ingress on host device
72-
ingress := &netlink.Ingress{
73-
QdiscAttrs: netlink.QdiscAttrs{
74-
LinkIndex: hostDevice.Attrs().Index,
75-
Handle: netlink.MakeHandle(0xffff, 0), // ffff:
76-
Parent: netlink.HANDLE_INGRESS,
77-
},
71+
var qdisc netlink.Qdisc
72+
var filterHandle uint32
73+
switch qdiscType {
74+
case "ingress":
75+
// add ingress qdisc on host device
76+
qdisc = &netlink.Ingress{
77+
QdiscAttrs: netlink.QdiscAttrs{
78+
LinkIndex: hostDevice.Attrs().Index,
79+
Handle: netlink.MakeHandle(0xffff, 0), // ffff:
80+
Parent: netlink.HANDLE_INGRESS,
81+
},
82+
}
83+
filterHandle = qdisc.Attrs().Handle
84+
case "clsact":
85+
// add clsact qdisc on host device
86+
qdisc = &netlink.Clsact{
87+
QdiscAttrs: netlink.QdiscAttrs{
88+
LinkIndex: hostDevice.Attrs().Index,
89+
Handle: netlink.MakeHandle(0xffff, 0), // ffff:
90+
Parent: netlink.HANDLE_CLSACT,
91+
},
92+
}
93+
filterHandle = netlink.HANDLE_MIN_EGRESS
94+
default:
95+
return fmt.Errorf("unknown qdisc type: %s", qdiscType)
7896
}
7997

80-
err = netlink.QdiscAdd(ingress)
98+
err = netlink.QdiscAdd(qdisc)
8199
if err != nil {
82-
return fmt.Errorf("create ingress qdisc: %s", err)
100+
return fmt.Errorf("create %s qdisc: %s", qdisc.Type(), err)
83101
}
84102

85-
// add filter on host device to mirror traffic to ifb device
86103
filter := &netlink.U32{
87104
FilterAttrs: netlink.FilterAttrs{
88105
LinkIndex: hostDevice.Attrs().Index,
89-
Parent: ingress.QdiscAttrs.Handle,
106+
Parent: filterHandle,
90107
Priority: 1,
91108
Protocol: syscall.ETH_P_ALL,
92109
},
93110
ClassId: netlink.MakeHandle(1, 1),
94111
RedirIndex: ifbDevice.Attrs().Index,
95-
Actions: []netlink.Action{
96-
&netlink.MirredAction{
97-
ActionAttrs: netlink.ActionAttrs{},
98-
MirredAction: netlink.TCA_EGRESS_REDIR,
99-
Ifindex: ifbDevice.Attrs().Index,
100-
},
101-
},
112+
Actions: []netlink.Action{netlink.NewMirredAction(ifbDevice.Attrs().Index)},
102113
}
103114
err = netlink.FilterAdd(filter)
104115
if err != nil {
105-
return fmt.Errorf("add filter: %s", err)
116+
return fmt.Errorf("add egress filter: %s", err)
106117
}
107118

108119
// throttle traffic on ifb device
@@ -126,9 +137,9 @@ func createTBF(rateInBits, burstInBits uint64, linkIndex int) error {
126137
}
127138
rateInBytes := rateInBits / 8
128139
burstInBytes := burstInBits / 8
129-
bufferInBytes := buffer(uint64(rateInBytes), uint32(burstInBytes))
130-
latency := latencyInUsec(latencyInMillis)
131-
limitInBytes := limit(uint64(rateInBytes), latency, uint32(burstInBytes))
140+
bufferInBytes := Buffer(rateInBytes, uint32(burstInBytes))
141+
latency := LatencyInUsec(LatencyInMillis)
142+
limitInBytes := Limit(rateInBytes, latency, uint32(burstInBytes))
132143

133144
qdisc := &netlink.Tbf{
134145
QdiscAttrs: netlink.QdiscAttrs{
@@ -155,14 +166,14 @@ func time2Tick(time uint32) uint32 {
155166
return uint32(float64(time) * float64(netlink.TickInUsec()))
156167
}
157168

158-
func buffer(rate uint64, burst uint32) uint32 {
169+
func Buffer(rate uint64, burst uint32) uint32 {
159170
return time2Tick(uint32(float64(burst) * float64(netlink.TIME_UNITS_PER_SEC) / float64(rate)))
160171
}
161172

162-
func limit(rate uint64, latency float64, buffer uint32) uint32 {
173+
func Limit(rate uint64, latency float64, buffer uint32) uint32 {
163174
return uint32(float64(rate)*latency/float64(netlink.TIME_UNITS_PER_SEC)) + buffer
164175
}
165176

166-
func latencyInUsec(latencyInMillis float64) float64 {
167-
return float64(netlink.TIME_UNITS_PER_SEC) * (latencyInMillis / 1000.0)
177+
func LatencyInUsec(LatencyInMillis float64) float64 {
178+
return float64(netlink.TIME_UNITS_PER_SEC) * (LatencyInMillis / 1000.0)
168179
}

0 commit comments

Comments
 (0)