Skip to content

Commit 030d859

Browse files
committed
[CORE-10981] Move bandwidth code to pkg/bandwidth
Move bandwidth functionality to pkg/bandwidth, make it an importable module. Import it and use it in plugins/meta/bandwidth/main.go. Add a qdiscType arg to CreateEgressQdisc() so that both "ingress" and "clsact" qdiscs may be used ("ingress" is the default used by the bandwidth plugin). All of this has the objective of enabling Calico to apply the netlink qdisc configuration natively by importing this module.
1 parent 44c7ddc commit 030d859

File tree

4 files changed

+220
-182
lines changed

4 files changed

+220
-182
lines changed

pkg/bandwidth/bandwidth.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// Copyright 2025 CNI authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package bandwidth
16+
17+
import (
18+
"fmt"
19+
"math"
20+
21+
"github.com/vishvananda/netlink"
22+
23+
"github.com/containernetworking/cni/pkg/types"
24+
current "github.com/containernetworking/cni/pkg/types/100"
25+
"github.com/containernetworking/plugins/pkg/ip"
26+
"github.com/containernetworking/plugins/pkg/ns"
27+
"github.com/containernetworking/plugins/pkg/utils"
28+
)
29+
30+
const (
31+
MaxIfbDeviceLength = 15
32+
IfbDevicePrefix = "bwp"
33+
)
34+
35+
// BandwidthEntry corresponds to a single entry in the bandwidth argument,
36+
// see CONVENTIONS.md
37+
type BandwidthEntry struct {
38+
IngressRate uint64 `json:"ingressRate"` // Bandwidth rate in bps for traffic through container. 0 for no limit. If ingressRate is set, ingressBurst must also be set
39+
IngressBurst uint64 `json:"ingressBurst"` // Bandwidth burst in bits for traffic through container. 0 for no limit. If ingressBurst is set, ingressRate must also be set
40+
41+
EgressRate uint64 `json:"egressRate"` // Bandwidth rate in bps for traffic through container. 0 for no limit. If egressRate is set, egressBurst must also be set
42+
EgressBurst uint64 `json:"egressBurst"` // Bandwidth burst in bits for traffic through container. 0 for no limit. If egressBurst is set, egressRate must also be set
43+
}
44+
45+
func (bw *BandwidthEntry) IsZero() bool {
46+
return bw.IngressBurst == 0 && bw.IngressRate == 0 && bw.EgressBurst == 0 && bw.EgressRate == 0
47+
}
48+
49+
type PluginConf struct {
50+
types.NetConf
51+
52+
RuntimeConfig struct {
53+
Bandwidth *BandwidthEntry `json:"bandwidth,omitempty"`
54+
} `json:"runtimeConfig,omitempty"`
55+
56+
*BandwidthEntry
57+
}
58+
59+
func SafeQdiscList(link netlink.Link) ([]netlink.Qdisc, error) {
60+
qdiscs, err := netlink.QdiscList(link)
61+
if err != nil {
62+
return nil, err
63+
}
64+
result := []netlink.Qdisc{}
65+
for _, qdisc := range qdiscs {
66+
// filter out pfifo_fast qdiscs because
67+
// older kernels don't return them
68+
_, pfifo := qdisc.(*netlink.PfifoFast)
69+
if !pfifo {
70+
result = append(result, qdisc)
71+
}
72+
}
73+
return result, nil
74+
}
75+
76+
func ValidateRateAndBurst(rate, burst uint64) error {
77+
switch {
78+
case burst == 0 && rate != 0:
79+
return fmt.Errorf("if rate is set, burst must also be set")
80+
case rate == 0 && burst != 0:
81+
return fmt.Errorf("if burst is set, rate must also be set")
82+
case burst/8 >= math.MaxUint32:
83+
return fmt.Errorf("burst cannot be more than 4GB")
84+
}
85+
86+
return nil
87+
}
88+
89+
func GetBandwidth(conf *PluginConf) *BandwidthEntry {
90+
if conf.BandwidthEntry == nil && conf.RuntimeConfig.Bandwidth != nil {
91+
return conf.RuntimeConfig.Bandwidth
92+
}
93+
return conf.BandwidthEntry
94+
}
95+
96+
func GetIfbDeviceName(networkName string, containerID string) string {
97+
return utils.MustFormatHashWithPrefix(MaxIfbDeviceLength, IfbDevicePrefix, networkName+containerID)
98+
}
99+
100+
func GetMTU(deviceName string) (int, error) {
101+
link, err := netlink.LinkByName(deviceName)
102+
if err != nil {
103+
return -1, err
104+
}
105+
106+
return link.Attrs().MTU, nil
107+
}
108+
109+
// get the veth peer of container interface in host namespace
110+
func GetHostInterface(interfaces []*current.Interface, containerIfName string, netns ns.NetNS) (*current.Interface, error) {
111+
if len(interfaces) == 0 {
112+
return nil, fmt.Errorf("no interfaces provided")
113+
}
114+
115+
// get veth peer index of container interface
116+
var peerIndex int
117+
var err error
118+
_ = netns.Do(func(_ ns.NetNS) error {
119+
_, peerIndex, err = ip.GetVethPeerIfindex(containerIfName)
120+
return nil
121+
})
122+
if peerIndex <= 0 {
123+
return nil, fmt.Errorf("container interface %s has no veth peer: %v", containerIfName, err)
124+
}
125+
126+
// find host interface by index
127+
link, err := netlink.LinkByIndex(peerIndex)
128+
if err != nil {
129+
return nil, fmt.Errorf("veth peer with index %d is not in host ns", peerIndex)
130+
}
131+
for _, iface := range interfaces {
132+
if iface.Sandbox == "" && iface.Name == link.Attrs().Name {
133+
return iface, nil
134+
}
135+
}
136+
137+
return nil, fmt.Errorf("no veth peer of container interface found in host ns")
138+
}
Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package main
15+
package bandwidth
1616

1717
import (
1818
"fmt"
@@ -24,7 +24,7 @@ import (
2424
"github.com/vishvananda/netlink"
2525
)
2626

27-
const latencyInMillis = 25
27+
const LatencyInMillis = 25
2828

2929
func CreateIfb(ifbDeviceName string, mtu int) error {
3030
err := netlink.LinkAdd(&netlink.Ifb{
@@ -58,7 +58,7 @@ func CreateIngressQdisc(rateInBits, burstInBits uint64, hostDeviceName string) e
5858
return createTBF(rateInBits, burstInBits, hostDevice.Attrs().Index)
5959
}
6060

61-
func CreateEgressQdisc(rateInBits, burstInBits uint64, hostDeviceName string, ifbDeviceName string) error {
61+
func CreateEgressQdisc(qdiscType string, rateInBits, burstInBits uint64, hostDeviceName string, ifbDeviceName string) error {
6262
ifbDevice, err := netlink.LinkByName(ifbDeviceName)
6363
if err != nil {
6464
return fmt.Errorf("get ifb device: %s", err)
@@ -68,41 +68,52 @@ func CreateEgressQdisc(rateInBits, burstInBits uint64, hostDeviceName string, if
6868
return fmt.Errorf("get host device: %s", err)
6969
}
7070

71-
// add qdisc ingress on host device
72-
ingress := &netlink.Ingress{
73-
QdiscAttrs: netlink.QdiscAttrs{
74-
LinkIndex: hostDevice.Attrs().Index,
75-
Handle: netlink.MakeHandle(0xffff, 0), // ffff:
76-
Parent: netlink.HANDLE_INGRESS,
77-
},
71+
var qdisc netlink.Qdisc
72+
var filterHandle uint32
73+
switch qdiscType {
74+
case "ingress":
75+
// add ingress qdisc on host device
76+
qdisc = &netlink.Ingress{
77+
QdiscAttrs: netlink.QdiscAttrs{
78+
LinkIndex: hostDevice.Attrs().Index,
79+
Handle: netlink.MakeHandle(0xffff, 0), // ffff:
80+
Parent: netlink.HANDLE_INGRESS,
81+
},
82+
}
83+
filterHandle = qdisc.Attrs().Handle
84+
case "clsact":
85+
// add clsact qdisc on host device
86+
qdisc = &netlink.Clsact{
87+
QdiscAttrs: netlink.QdiscAttrs{
88+
LinkIndex: hostDevice.Attrs().Index,
89+
Handle: netlink.MakeHandle(0xffff, 0), // ffff:
90+
Parent: netlink.HANDLE_CLSACT,
91+
},
92+
}
93+
filterHandle = netlink.HANDLE_MIN_EGRESS
94+
default:
95+
return fmt.Errorf("unknown qdisc type: %s", qdiscType)
7896
}
7997

80-
err = netlink.QdiscAdd(ingress)
98+
err = netlink.QdiscAdd(qdisc)
8199
if err != nil {
82-
return fmt.Errorf("create ingress qdisc: %s", err)
100+
return fmt.Errorf("create %s qdisc: %s", qdisc.Type(), err)
83101
}
84102

85-
// add filter on host device to mirror traffic to ifb device
86103
filter := &netlink.U32{
87104
FilterAttrs: netlink.FilterAttrs{
88105
LinkIndex: hostDevice.Attrs().Index,
89-
Parent: ingress.QdiscAttrs.Handle,
106+
Parent: filterHandle,
90107
Priority: 1,
91108
Protocol: syscall.ETH_P_ALL,
92109
},
93110
ClassId: netlink.MakeHandle(1, 1),
94111
RedirIndex: ifbDevice.Attrs().Index,
95-
Actions: []netlink.Action{
96-
&netlink.MirredAction{
97-
ActionAttrs: netlink.ActionAttrs{},
98-
MirredAction: netlink.TCA_EGRESS_REDIR,
99-
Ifindex: ifbDevice.Attrs().Index,
100-
},
101-
},
112+
Actions: []netlink.Action{netlink.NewMirredAction(ifbDevice.Attrs().Index)},
102113
}
103114
err = netlink.FilterAdd(filter)
104115
if err != nil {
105-
return fmt.Errorf("add filter: %s", err)
116+
return fmt.Errorf("add egress filter: %s", err)
106117
}
107118

108119
// throttle traffic on ifb device
@@ -126,9 +137,9 @@ func createTBF(rateInBits, burstInBits uint64, linkIndex int) error {
126137
}
127138
rateInBytes := rateInBits / 8
128139
burstInBytes := burstInBits / 8
129-
bufferInBytes := buffer(uint64(rateInBytes), uint32(burstInBytes))
130-
latency := latencyInUsec(latencyInMillis)
131-
limitInBytes := limit(uint64(rateInBytes), latency, uint32(burstInBytes))
140+
bufferInBytes := Buffer(rateInBytes, uint32(burstInBytes))
141+
latency := LatencyInUsec(LatencyInMillis)
142+
limitInBytes := Limit(rateInBytes, latency, uint32(burstInBytes))
132143

133144
qdisc := &netlink.Tbf{
134145
QdiscAttrs: netlink.QdiscAttrs{
@@ -155,14 +166,14 @@ func time2Tick(time uint32) uint32 {
155166
return uint32(float64(time) * float64(netlink.TickInUsec()))
156167
}
157168

158-
func buffer(rate uint64, burst uint32) uint32 {
169+
func Buffer(rate uint64, burst uint32) uint32 {
159170
return time2Tick(uint32(float64(burst) * float64(netlink.TIME_UNITS_PER_SEC) / float64(rate)))
160171
}
161172

162-
func limit(rate uint64, latency float64, buffer uint32) uint32 {
173+
func Limit(rate uint64, latency float64, buffer uint32) uint32 {
163174
return uint32(float64(rate)*latency/float64(netlink.TIME_UNITS_PER_SEC)) + buffer
164175
}
165176

166-
func latencyInUsec(latencyInMillis float64) float64 {
177+
func LatencyInUsec(latencyInMillis float64) float64 {
167178
return float64(netlink.TIME_UNITS_PER_SEC) * (latencyInMillis / 1000.0)
168179
}

plugins/meta/bandwidth/bandwidth_linux_test.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ import (
2828
"github.com/containernetworking/cni/pkg/invoke"
2929
"github.com/containernetworking/cni/pkg/skel"
3030
"github.com/containernetworking/cni/pkg/types"
31-
"github.com/containernetworking/cni/pkg/types/100"
31+
types100 "github.com/containernetworking/cni/pkg/types/100"
32+
bw "github.com/containernetworking/plugins/pkg/bandwidth"
3233
"github.com/containernetworking/plugins/pkg/ns"
3334
"github.com/containernetworking/plugins/pkg/testutils"
3435

@@ -37,7 +38,7 @@ import (
3738
"github.com/onsi/gomega/gexec"
3839
)
3940

40-
func buildOneConfig(name, cniVersion string, orig *PluginConf, prevResult types.Result) (*PluginConf, []byte, error) {
41+
func buildOneConfig(name, cniVersion string, orig *bw.PluginConf, prevResult types.Result) (*bw.PluginConf, []byte, error) {
4142
var err error
4243

4344
inject := map[string]interface{}{
@@ -73,7 +74,7 @@ func buildOneConfig(name, cniVersion string, orig *PluginConf, prevResult types.
7374
return nil, nil, err
7475
}
7576

76-
conf := &PluginConf{}
77+
conf := &bw.PluginConf{}
7778
if err := json.Unmarshal(newBytes, &conf); err != nil {
7879
return nil, nil, fmt.Errorf("error parsing configuration: %s", err)
7980
}
@@ -948,11 +949,11 @@ var _ = Describe("bandwidth test", func() {
948949
containerWithTbfResult, err := types100.GetResult(containerWithTbfRes)
949950
Expect(err).NotTo(HaveOccurred())
950951

951-
tbfPluginConf := &PluginConf{}
952+
tbfPluginConf := &bw.PluginConf{}
952953
err = json.Unmarshal([]byte(ptpConf), &tbfPluginConf)
953954
Expect(err).NotTo(HaveOccurred())
954955

955-
tbfPluginConf.RuntimeConfig.Bandwidth = &BandwidthEntry{
956+
tbfPluginConf.RuntimeConfig.Bandwidth = &bw.BandwidthEntry{
956957
IngressBurst: burstInBits,
957958
IngressRate: rateInBits,
958959
EgressBurst: burstInBits,
@@ -974,11 +975,11 @@ var _ = Describe("bandwidth test", func() {
974975

975976
if testutils.SpecVersionHasCHECK(ver) {
976977
// Do CNI Check
977-
checkConf := &PluginConf{}
978+
checkConf := &bw.PluginConf{}
978979
err = json.Unmarshal([]byte(ptpConf), &checkConf)
979980
Expect(err).NotTo(HaveOccurred())
980981

981-
checkConf.RuntimeConfig.Bandwidth = &BandwidthEntry{
982+
checkConf.RuntimeConfig.Bandwidth = &bw.BandwidthEntry{
982983
IngressBurst: burstInBits,
983984
IngressRate: rateInBits,
984985
EgressBurst: burstInBits,
@@ -1056,15 +1057,15 @@ var _ = Describe("bandwidth test", func() {
10561057

10571058
Describe("Validating input", func() {
10581059
It("Should allow only 4GB burst rate", func() {
1059-
err := validateRateAndBurst(5000, 4*1024*1024*1024*8-16) // 2 bytes less than the max should pass
1060+
err := bw.ValidateRateAndBurst(5000, 4*1024*1024*1024*8-16) // 2 bytes less than the max should pass
10601061
Expect(err).NotTo(HaveOccurred())
1061-
err = validateRateAndBurst(5000, 4*1024*1024*1024*8) // we're 1 bit above MaxUint32
1062+
err = bw.ValidateRateAndBurst(5000, 4*1024*1024*1024*8) // we're 1 bit above MaxUint32
10621063
Expect(err).To(HaveOccurred())
1063-
err = validateRateAndBurst(0, 1)
1064+
err = bw.ValidateRateAndBurst(0, 1)
10641065
Expect(err).To(HaveOccurred())
1065-
err = validateRateAndBurst(1, 0)
1066+
err = bw.ValidateRateAndBurst(1, 0)
10661067
Expect(err).To(HaveOccurred())
1067-
err = validateRateAndBurst(0, 0)
1068+
err = bw.ValidateRateAndBurst(0, 0)
10681069
Expect(err).NotTo(HaveOccurred())
10691070
})
10701071
})

0 commit comments

Comments
 (0)