Skip to content

Commit 2e4a227

Browse files
committed
[CORE-10981] Move bandwidth code to pkg/bandwidth
Move bandwidth functionality to pkg/bandwidth, make it an importable module. Import it and use it in plugins/meta/bandwidth/main.go. Add a qdiscType arg to CreateEgressQdisc() so that both "ingress" and "clsact" qdiscs may be used ("ingress" is the default used by the bandwidth plugin). All of this has the objective of enabling Calico to apply the netlink qdisc configuration natively by importing this module.
1 parent 56432a2 commit 2e4a227

File tree

4 files changed

+217
-175
lines changed

4 files changed

+217
-175
lines changed

pkg/bandwidth/bandwidth.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// Copyright 2025 CNI authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package bandwidth
16+
17+
import (
18+
"fmt"
19+
"math"
20+
21+
"github.com/vishvananda/netlink"
22+
23+
"github.com/containernetworking/cni/pkg/types"
24+
current "github.com/containernetworking/cni/pkg/types/100"
25+
"github.com/containernetworking/plugins/pkg/ip"
26+
"github.com/containernetworking/plugins/pkg/ns"
27+
"github.com/containernetworking/plugins/pkg/utils"
28+
)
29+
30+
const (
31+
MaxIfbDeviceLength = 15
32+
IfbDevicePrefix = "bwp"
33+
)
34+
35+
// BandwidthEntry corresponds to a single entry in the bandwidth argument,
36+
// see CONVENTIONS.md
37+
type BandwidthEntry struct {
38+
IngressRate uint64 `json:"ingressRate"` // Bandwidth rate in bps for traffic through container. 0 for no limit. If ingressRate is set, ingressBurst must also be set
39+
IngressBurst uint64 `json:"ingressBurst"` // Bandwidth burst in bits for traffic through container. 0 for no limit. If ingressBurst is set, ingressRate must also be set
40+
41+
EgressRate uint64 `json:"egressRate"` // Bandwidth rate in bps for traffic through container. 0 for no limit. If egressRate is set, egressBurst must also be set
42+
EgressBurst uint64 `json:"egressBurst"` // Bandwidth burst in bits for traffic through container. 0 for no limit. If egressBurst is set, egressRate must also be set
43+
}
44+
45+
func (bw *BandwidthEntry) IsZero() bool {
46+
return bw.IngressBurst == 0 && bw.IngressRate == 0 && bw.EgressBurst == 0 && bw.EgressRate == 0
47+
}
48+
49+
type PluginConf struct {
50+
types.NetConf
51+
52+
RuntimeConfig struct {
53+
Bandwidth *BandwidthEntry `json:"bandwidth,omitempty"`
54+
} `json:"runtimeConfig,omitempty"`
55+
56+
*BandwidthEntry
57+
}
58+
59+
func SafeQdiscList(link netlink.Link) ([]netlink.Qdisc, error) {
60+
qdiscs, err := netlink.QdiscList(link)
61+
if err != nil {
62+
return nil, err
63+
}
64+
result := []netlink.Qdisc{}
65+
for _, qdisc := range qdiscs {
66+
// filter out pfifo_fast qdiscs because
67+
// older kernels don't return them
68+
_, pfifo := qdisc.(*netlink.PfifoFast)
69+
if !pfifo {
70+
result = append(result, qdisc)
71+
}
72+
}
73+
return result, nil
74+
}
75+
76+
func ValidateRateAndBurst(rate, burst uint64) error {
77+
switch {
78+
case burst == 0 && rate != 0:
79+
return fmt.Errorf("if rate is set, burst must also be set")
80+
case rate == 0 && burst != 0:
81+
return fmt.Errorf("if burst is set, rate must also be set")
82+
case burst/8 >= math.MaxUint32:
83+
return fmt.Errorf("burst cannot be more than 4GB")
84+
}
85+
86+
return nil
87+
}
88+
89+
func GetBandwidth(conf *PluginConf) *BandwidthEntry {
90+
if conf.BandwidthEntry == nil && conf.RuntimeConfig.Bandwidth != nil {
91+
return conf.RuntimeConfig.Bandwidth
92+
}
93+
return conf.BandwidthEntry
94+
}
95+
96+
func GetIfbDeviceName(networkName string, containerID string) string {
97+
return utils.MustFormatHashWithPrefix(MaxIfbDeviceLength, IfbDevicePrefix, networkName+containerID)
98+
}
99+
100+
func GetMTU(deviceName string) (int, error) {
101+
link, err := netlink.LinkByName(deviceName)
102+
if err != nil {
103+
return -1, err
104+
}
105+
106+
return link.Attrs().MTU, nil
107+
}
108+
109+
// get the veth peer of container interface in host namespace
110+
func GetHostInterface(interfaces []*current.Interface, containerIfName string, netns ns.NetNS) (*current.Interface, error) {
111+
if len(interfaces) == 0 {
112+
return nil, fmt.Errorf("no interfaces provided")
113+
}
114+
115+
// get veth peer index of container interface
116+
var peerIndex int
117+
var err error
118+
_ = netns.Do(func(_ ns.NetNS) error {
119+
_, peerIndex, err = ip.GetVethPeerIfindex(containerIfName)
120+
return nil
121+
})
122+
if peerIndex <= 0 {
123+
return nil, fmt.Errorf("container interface %s has no veth peer: %v", containerIfName, err)
124+
}
125+
126+
// find host interface by index
127+
link, err := netlink.LinkByIndex(peerIndex)
128+
if err != nil {
129+
return nil, fmt.Errorf("veth peer with index %d is not in host ns", peerIndex)
130+
}
131+
for _, iface := range interfaces {
132+
if iface.Sandbox == "" && iface.Name == link.Attrs().Name {
133+
return iface, nil
134+
}
135+
}
136+
137+
return nil, fmt.Errorf("no veth peer of container interface found in host ns")
138+
}
Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package main
15+
package bandwidth
1616

1717
import (
1818
"fmt"
@@ -24,7 +24,7 @@ import (
2424
"github.com/containernetworking/plugins/pkg/ip"
2525
)
2626

27-
const latencyInMillis = 25
27+
const LatencyInMillis = 25
2828

2929
func CreateIfb(ifbDeviceName string, mtu int) error {
3030
// do not set TxQLen > 0 nor TxQLen == -1 until issues have been fixed with numrxqueues / numtxqueues across interfaces
@@ -60,7 +60,7 @@ func CreateIngressQdisc(rateInBits, burstInBits uint64, hostDeviceName string) e
6060
return createTBF(rateInBits, burstInBits, hostDevice.Attrs().Index)
6161
}
6262

63-
func CreateEgressQdisc(rateInBits, burstInBits uint64, hostDeviceName string, ifbDeviceName string) error {
63+
func CreateEgressQdisc(qdiscType string, rateInBits, burstInBits uint64, hostDeviceName string, ifbDeviceName string) error {
6464
ifbDevice, err := netlink.LinkByName(ifbDeviceName)
6565
if err != nil {
6666
return fmt.Errorf("get ifb device: %s", err)
@@ -70,41 +70,52 @@ func CreateEgressQdisc(rateInBits, burstInBits uint64, hostDeviceName string, if
7070
return fmt.Errorf("get host device: %s", err)
7171
}
7272

73-
// add qdisc ingress on host device
74-
ingress := &netlink.Ingress{
75-
QdiscAttrs: netlink.QdiscAttrs{
76-
LinkIndex: hostDevice.Attrs().Index,
77-
Handle: netlink.MakeHandle(0xffff, 0), // ffff:
78-
Parent: netlink.HANDLE_INGRESS,
79-
},
73+
var qdisc netlink.Qdisc
74+
var filterHandle uint32
75+
switch qdiscType {
76+
case "ingress":
77+
// add ingress qdisc on host device
78+
qdisc = &netlink.Ingress{
79+
QdiscAttrs: netlink.QdiscAttrs{
80+
LinkIndex: hostDevice.Attrs().Index,
81+
Handle: netlink.MakeHandle(0xffff, 0), // ffff:
82+
Parent: netlink.HANDLE_INGRESS,
83+
},
84+
}
85+
filterHandle = qdisc.Attrs().Handle
86+
case "clsact":
87+
// add clsact qdisc on host device
88+
qdisc = &netlink.Clsact{
89+
QdiscAttrs: netlink.QdiscAttrs{
90+
LinkIndex: hostDevice.Attrs().Index,
91+
Handle: netlink.MakeHandle(0xffff, 0), // ffff:
92+
Parent: netlink.HANDLE_CLSACT,
93+
},
94+
}
95+
filterHandle = netlink.HANDLE_MIN_EGRESS
96+
default:
97+
return fmt.Errorf("unknown qdisc type: %s", qdiscType)
8098
}
8199

82-
err = netlink.QdiscAdd(ingress)
100+
err = netlink.QdiscAdd(qdisc)
83101
if err != nil {
84-
return fmt.Errorf("create ingress qdisc: %s", err)
102+
return fmt.Errorf("create %s qdisc: %s", qdisc.Type(), err)
85103
}
86104

87-
// add filter on host device to mirror traffic to ifb device
88105
filter := &netlink.U32{
89106
FilterAttrs: netlink.FilterAttrs{
90107
LinkIndex: hostDevice.Attrs().Index,
91-
Parent: ingress.QdiscAttrs.Handle,
108+
Parent: filterHandle,
92109
Priority: 1,
93110
Protocol: syscall.ETH_P_ALL,
94111
},
95112
ClassId: netlink.MakeHandle(1, 1),
96113
RedirIndex: ifbDevice.Attrs().Index,
97-
Actions: []netlink.Action{
98-
&netlink.MirredAction{
99-
ActionAttrs: netlink.ActionAttrs{},
100-
MirredAction: netlink.TCA_EGRESS_REDIR,
101-
Ifindex: ifbDevice.Attrs().Index,
102-
},
103-
},
114+
Actions: []netlink.Action{netlink.NewMirredAction(ifbDevice.Attrs().Index)},
104115
}
105116
err = netlink.FilterAdd(filter)
106117
if err != nil {
107-
return fmt.Errorf("add filter: %s", err)
118+
return fmt.Errorf("add egress filter: %s", err)
108119
}
109120

110121
// throttle traffic on ifb device
@@ -128,9 +139,9 @@ func createTBF(rateInBits, burstInBits uint64, linkIndex int) error {
128139
}
129140
rateInBytes := rateInBits / 8
130141
burstInBytes := burstInBits / 8
131-
bufferInBytes := buffer(rateInBytes, uint32(burstInBytes))
132-
latency := latencyInUsec(latencyInMillis)
133-
limitInBytes := limit(rateInBytes, latency, uint32(burstInBytes))
142+
bufferInBytes := Buffer(rateInBytes, uint32(burstInBytes))
143+
latency := LatencyInUsec(LatencyInMillis)
144+
limitInBytes := Limit(rateInBytes, latency, uint32(burstInBytes))
134145

135146
qdisc := &netlink.Tbf{
136147
QdiscAttrs: netlink.QdiscAttrs{
@@ -153,14 +164,14 @@ func time2Tick(time uint32) uint32 {
153164
return uint32(float64(time) * netlink.TickInUsec())
154165
}
155166

156-
func buffer(rate uint64, burst uint32) uint32 {
167+
func Buffer(rate uint64, burst uint32) uint32 {
157168
return time2Tick(uint32(float64(burst) * float64(netlink.TIME_UNITS_PER_SEC) / float64(rate)))
158169
}
159170

160-
func limit(rate uint64, latency float64, buffer uint32) uint32 {
171+
func Limit(rate uint64, latency float64, buffer uint32) uint32 {
161172
return uint32(float64(rate)*latency/float64(netlink.TIME_UNITS_PER_SEC)) + buffer
162173
}
163174

164-
func latencyInUsec(latencyInMillis float64) float64 {
175+
func LatencyInUsec(latencyInMillis float64) float64 {
165176
return float64(netlink.TIME_UNITS_PER_SEC) * (latencyInMillis / 1000.0)
166177
}

plugins/meta/bandwidth/bandwidth_linux_test.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ import (
3131
"github.com/containernetworking/cni/pkg/skel"
3232
"github.com/containernetworking/cni/pkg/types"
3333
types100 "github.com/containernetworking/cni/pkg/types/100"
34+
bw "github.com/containernetworking/plugins/pkg/bandwidth"
3435
"github.com/containernetworking/plugins/pkg/ns"
3536
"github.com/containernetworking/plugins/pkg/testutils"
3637
)
3738

38-
func buildOneConfig(name, cniVersion string, orig *PluginConf, prevResult types.Result) ([]byte, error) {
39+
func buildOneConfig(name, cniVersion string, orig *bw.PluginConf, prevResult types.Result) ([]byte, error) {
3940
var err error
4041

4142
inject := map[string]interface{}{
@@ -71,7 +72,7 @@ func buildOneConfig(name, cniVersion string, orig *PluginConf, prevResult types.
7172
return nil, err
7273
}
7374

74-
conf := &PluginConf{}
75+
conf := &bw.PluginConf{}
7576
if err := json.Unmarshal(newBytes, &conf); err != nil {
7677
return nil, fmt.Errorf("error parsing configuration: %s", err)
7778
}
@@ -939,11 +940,11 @@ var _ = Describe("bandwidth test", func() {
939940
containerWithTbfResult, err := types100.GetResult(containerWithTbfRes)
940941
Expect(err).NotTo(HaveOccurred())
941942

942-
tbfPluginConf := &PluginConf{}
943+
tbfPluginConf := &bw.PluginConf{}
943944
err = json.Unmarshal([]byte(ptpConf), &tbfPluginConf)
944945
Expect(err).NotTo(HaveOccurred())
945946

946-
tbfPluginConf.RuntimeConfig.Bandwidth = &BandwidthEntry{
947+
tbfPluginConf.RuntimeConfig.Bandwidth = &bw.BandwidthEntry{
947948
IngressBurst: burstInBits,
948949
IngressRate: rateInBits,
949950
EgressBurst: burstInBits,
@@ -965,11 +966,11 @@ var _ = Describe("bandwidth test", func() {
965966

966967
if testutils.SpecVersionHasCHECK(ver) {
967968
// Do CNI Check
968-
checkConf := &PluginConf{}
969+
checkConf := &bw.PluginConf{}
969970
err = json.Unmarshal([]byte(ptpConf), &checkConf)
970971
Expect(err).NotTo(HaveOccurred())
971972

972-
checkConf.RuntimeConfig.Bandwidth = &BandwidthEntry{
973+
checkConf.RuntimeConfig.Bandwidth = &bw.BandwidthEntry{
973974
IngressBurst: burstInBits,
974975
IngressRate: rateInBits,
975976
EgressBurst: burstInBits,
@@ -1045,15 +1046,15 @@ var _ = Describe("bandwidth test", func() {
10451046

10461047
Describe("Validating input", func() {
10471048
It("Should allow only 4GB burst rate", func() {
1048-
err := validateRateAndBurst(5000, 4*1024*1024*1024*8-16) // 2 bytes less than the max should pass
1049+
err := bw.ValidateRateAndBurst(5000, 4*1024*1024*1024*8-16) // 2 bytes less than the max should pass
10491050
Expect(err).NotTo(HaveOccurred())
1050-
err = validateRateAndBurst(5000, 4*1024*1024*1024*8) // we're 1 bit above MaxUint32
1051+
err = bw.ValidateRateAndBurst(5000, 4*1024*1024*1024*8) // we're 1 bit above MaxUint32
10511052
Expect(err).To(HaveOccurred())
1052-
err = validateRateAndBurst(0, 1)
1053+
err = bw.ValidateRateAndBurst(0, 1)
10531054
Expect(err).To(HaveOccurred())
1054-
err = validateRateAndBurst(1, 0)
1055+
err = bw.ValidateRateAndBurst(1, 0)
10551056
Expect(err).To(HaveOccurred())
1056-
err = validateRateAndBurst(0, 0)
1057+
err = bw.ValidateRateAndBurst(0, 0)
10571058
Expect(err).NotTo(HaveOccurred())
10581059
})
10591060
})

0 commit comments

Comments
 (0)