-
Notifications
You must be signed in to change notification settings - Fork 47
Expand file tree
/
Copy pathdb.go
More file actions
574 lines (506 loc) · 19.9 KB
/
db.go
File metadata and controls
574 lines (506 loc) · 19.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
/*
Copyright The Kubernetes Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package inventory
import (
"context"
"fmt"
"maps"
"net"
"sort"
"strings"
"sync"
"time"
"sigs.k8s.io/dranet/pkg/apis"
"sigs.k8s.io/dranet/pkg/cloudprovider"
"sigs.k8s.io/dranet/pkg/names"
"github.com/Mellanox/rdmamap"
"github.com/jaypipes/ghw"
"github.com/vishvananda/netlink"
"github.com/vishvananda/netns"
"golang.org/x/time/rate"
resourceapi "k8s.io/api/resource/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/dynamic-resource-allocation/deviceattribute"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
"sigs.k8s.io/dranet/internal/nlwrap"
)
const (
// defaultMinPollInterval is the default minimum interval between two
// consecutive polls of the inventory.
defaultMinPollInterval = 2 * time.Second
// defaultMaxPollInterval is the default maximum interval between two
// consecutive polls of the inventory.
defaultMaxPollInterval = 1 * time.Minute
// defaultPollBurst is the default number of polls that can be run in a
// burst.
defaultPollBurst = 5
)
var (
// ignoredInterfaceNames is a set of network interface names that are typically
// created by CNI plugins or are otherwise not relevant for DRA resource exposure.
ignoredInterfaceNames = sets.New("cilium_net", "cilium_host", "docker0")
)
type DB struct {
instance cloudprovider.CloudInstance
// TODO: it is not common but may happen in edge cases that the default
// gateway changes revisit once we have more evidence this can be a
// potential problem or break some use cases.
gwInterfaces sets.Set[string]
mu sync.RWMutex
// podNetNsStore gives the network namespace for a pod, indexed by the pods
// "namespaced/name".
podNetNsStore map[string]string
// deviceStore is an in-memory cache of the available devices on the node.
// It is keyed by the normalized PCI address of the device. The value is a
// resourceapi.Device object that contains the device's attributes.
// The deviceStore is periodically updated by the Run method.
deviceStore map[string]resourceapi.Device
// deviceConfigStore caches cloud-provider network configuration per device.
// This helps us avoid repeatedly querying the provider APIs. Keyed by device name.
deviceConfigStore map[string]*apis.NetworkConfig
rateLimiter *rate.Limiter
maxPollInterval time.Duration
notifications chan []resourceapi.Device
hasDevices bool
// moveIBInterfaces controls whether IPoIB network interfaces are
// associated with their PCI devices. When true (default), IPoIB interfaces
// are treated like regular network interfaces and moved into pod namespaces.
// When false, IPoIB interfaces are skipped and the underlying device is
// exposed as an IB-only RDMA device.
moveIBInterfaces bool
}
type Option func(*DB)
func WithRateLimiter(limiter *rate.Limiter) Option {
return func(db *DB) {
db.rateLimiter = limiter
}
}
func WithMaxPollInterval(d time.Duration) Option {
return func(db *DB) {
db.maxPollInterval = d
}
}
func WithMoveIBInterfaces(move bool) Option {
return func(db *DB) {
db.moveIBInterfaces = move
}
}
func New(opts ...Option) *DB {
db := &DB{
podNetNsStore: map[string]string{},
deviceStore: map[string]resourceapi.Device{},
deviceConfigStore: map[string]*apis.NetworkConfig{},
rateLimiter: rate.NewLimiter(rate.Every(defaultMinPollInterval), defaultPollBurst),
notifications: make(chan []resourceapi.Device),
maxPollInterval: defaultMaxPollInterval,
moveIBInterfaces: true,
}
for _, o := range opts {
o(db)
}
return db
}
func (db *DB) AddPodNetNs(pod string, netNsPath string) {
db.mu.Lock()
defer db.mu.Unlock()
ns, err := netns.GetFromPath(netNsPath)
if err != nil {
klog.Errorf("Failed to get pod %s network namespace %s handle: %v", pod, netNsPath, err)
return
}
defer ns.Close()
db.podNetNsStore[pod] = netNsPath
}
func (db *DB) RemovePodNetNs(pod string) {
db.mu.Lock()
defer db.mu.Unlock()
delete(db.podNetNsStore, pod)
}
// GetPodNamespace allows to get the Pod network namespace
func (db *DB) GetPodNetNs(pod string) string {
db.mu.RLock()
defer db.mu.RUnlock()
return db.podNetNsStore[pod]
}
func (db *DB) Run(ctx context.Context) error {
registerMetrics()
begin := time.Now()
defer close(db.notifications)
// Resources are published periodically or if there is a netlink notification
// indicating a new interfaces was added or changed
nlChannel := make(chan netlink.LinkUpdate)
doneCh := make(chan struct{})
defer close(doneCh)
if err := netlink.LinkSubscribe(nlChannel, doneCh); err != nil {
klog.Error(err, "error subscribing to netlink interfaces, only syncing periodically", "interval", db.maxPollInterval.String())
}
// Obtain data that will not change after the startup
db.instance = getInstanceProperties(ctx)
db.gwInterfaces = getDefaultGwInterfaces()
klog.V(2).Infof("Default gateway interfaces: %v", db.gwInterfaces.UnsortedList())
inventoryStartupDurationSeconds.Set(time.Since(begin).Seconds())
for {
err := db.rateLimiter.Wait(ctx)
if err != nil {
klog.Error(err, "unexpected rate limited error trying to get system interfaces")
}
filteredDevices := db.scan()
if len(filteredDevices) > 0 || db.hasDevices {
db.hasDevices = len(filteredDevices) > 0
db.notifications <- filteredDevices
}
select {
// trigger a reconcile
case <-nlChannel:
// drain the channel so we only sync once
for len(nlChannel) > 0 {
<-nlChannel
}
case <-time.After(db.maxPollInterval):
case <-ctx.Done():
return ctx.Err()
}
}
}
// scan discovers the available devices on the node.
// It discovers PCI, network, and RDMA devices, adds cloud attributes,
// filters out default interfaces, and updates the device store.
func (db *DB) scan() []resourceapi.Device {
devices := db.discoverPCIDevices()
devices = db.discoverNetworkInterfaces(devices)
devices = db.discoverRDMADevices(devices)
devices = db.addCloudAttributes(devices)
// Remove default interface.
filteredDevices := []resourceapi.Device{}
for _, device := range devices {
ifName := device.Attributes[apis.AttrInterfaceName].StringValue
if ifName != nil && db.gwInterfaces.Has(string(*ifName)) {
klog.V(4).Infof("Ignoring interface %s from discovery since it is an uplink interface", *ifName)
continue
}
filteredDevices = append(filteredDevices, device)
}
sort.Slice(filteredDevices, func(i, j int) bool {
return filteredDevices[i].Name < filteredDevices[j].Name
})
klog.V(4).Infof("Found %d devices", len(filteredDevices))
db.updateDeviceStore(filteredDevices)
return filteredDevices
}
func (db *DB) GetResources(ctx context.Context) <-chan []resourceapi.Device {
return db.notifications
}
func (db *DB) discoverPCIDevices() []resourceapi.Device {
devices := []resourceapi.Device{}
pci, err := ghw.PCI(
ghw.WithDisableTools(),
)
if err != nil {
klog.Errorf("Could not get PCI devices: %v", err)
return devices
}
for _, pciDev := range pci.Devices {
if !isNetworkDevice(pciDev) {
continue
}
device := resourceapi.Device{
Name: names.NormalizePCIAddress(pciDev.Address),
Attributes: make(map[resourceapi.QualifiedName]resourceapi.DeviceAttribute),
Capacity: make(map[resourceapi.QualifiedName]resourceapi.DeviceCapacity),
}
device.Attributes[apis.AttrPCIAddress] = resourceapi.DeviceAttribute{StringValue: &pciDev.Address}
if pciDev.Vendor != nil {
device.Attributes[apis.AttrPCIVendor] = resourceapi.DeviceAttribute{StringValue: &pciDev.Vendor.Name}
}
if pciDev.Product != nil {
device.Attributes[apis.AttrPCIDevice] = resourceapi.DeviceAttribute{StringValue: &pciDev.Product.Name}
}
if pciDev.Subsystem != nil {
device.Attributes[apis.AttrPCISubsystem] = resourceapi.DeviceAttribute{StringValue: &pciDev.Subsystem.ID}
}
if pciDev.Node != nil {
device.Attributes[apis.AttrNUMANode] = resourceapi.DeviceAttribute{IntValue: ptr.To(int64(pciDev.Node.ID))}
}
pcieRootAttr, err := deviceattribute.GetPCIeRootAttributeByPCIBusID(pciDev.Address)
if err != nil {
klog.Infof("Could not get pci root attribute: %v", err)
} else {
device.Attributes[pcieRootAttr.Name] = pcieRootAttr.Value
}
devices = append(devices, device)
}
return devices
}
// discoveryNetworkInterfaces updates the devices based on information retried
// from network interfaces. For each network interface, the two possible
// outcomes are:
// - If the network interface is associated with some parent PCI device, the
// existing PCI device is modified with additional attributes related to the
// network interface.
// - For Network interfaces which are not associated with a PCI Device (like
// virtual interfaces), they are added as their own device.
func (db *DB) discoverNetworkInterfaces(pciDevices []resourceapi.Device) []resourceapi.Device {
links, err := nlwrap.LinkList()
if err != nil {
klog.Errorf("Could not list network interfaces: %v", err)
return pciDevices
}
pciDeviceMap := make(map[string]*resourceapi.Device)
for i := range pciDevices {
pciDeviceMap[pciDevices[i].Name] = &pciDevices[i]
}
otherDevices := []resourceapi.Device{}
for _, link := range links {
ifName := link.Attrs().Name
if ignoredInterfaceNames.Has(ifName) {
klog.V(4).Infof("Network Interface %s is in the list of ignored interfaces, excluding it from discovery", ifName)
continue
}
// skip loopback interfaces
if link.Attrs().Flags&net.FlagLoopback != 0 {
klog.V(4).Infof("Network Interface %s is a loopback interface, excluding it from discovery", ifName)
continue
}
// When moveIBInterfaces is false, skip IPoIB interfaces.
// The underlying PCI device will be discovered as an IB-only RDMA
// device (no netdev) via discoverRDMADevices. Associating the IPoIB
// netdev with the PCI device would mask the IB-only nature of the
// device and prevent correct RDMA char device injection into pods.
// When moveIBInterfaces is true (default), IPoIB interfaces
// are associated with their PCI device so they can be moved into pod namespace.
if link.Type() == "ipoib" && !db.moveIBInterfaces {
klog.V(4).Infof("Network Interface %s is IPoIB, skipping netdev association (will be discovered as IB-only RDMA device)", ifName)
continue
}
pciAddr, err := pciAddressForNetInterface(ifName)
if err == nil {
// It's a PCI device.
normalizedAddress := names.NormalizePCIAddress(pciAddr.String())
var exists bool
device, exists := pciDeviceMap[normalizedAddress]
if !exists {
// We don't expect this to happen.
klog.Errorf("Network interface %s has PCI address %q, but it was not found in initial PCI scan.", ifName, pciAddr)
continue
}
addLinkAttributes(device, link)
} else {
// Not a PCI device.
if !isVirtual(ifName, sysnetPath) {
// If we failed to identify the PCI address of the network
// interface and the network interface is also not a virtual
// device, use a best-effort strategy where the network
// interface is assumed to be virtual.
klog.Warningf("PCI address not found for non-virtual interface %s, proceeding as if it were virtual. Error: %v", ifName, err)
}
newDevice := &resourceapi.Device{
Name: names.NormalizeInterfaceName(ifName),
Attributes: make(map[resourceapi.QualifiedName]resourceapi.DeviceAttribute),
}
addLinkAttributes(newDevice, link)
otherDevices = append(otherDevices, *newDevice)
}
}
return append(pciDevices, otherDevices...)
}
func addLinkAttributes(device *resourceapi.Device, link netlink.Link) {
ifName := link.Attrs().Name
device.Attributes[apis.AttrInterfaceName] = resourceapi.DeviceAttribute{StringValue: &ifName}
device.Attributes[apis.AttrMac] = resourceapi.DeviceAttribute{StringValue: ptr.To(link.Attrs().HardwareAddr.String())}
device.Attributes[apis.AttrMTU] = resourceapi.DeviceAttribute{IntValue: ptr.To(int64(link.Attrs().MTU))}
device.Attributes[apis.AttrEncapsulation] = resourceapi.DeviceAttribute{StringValue: ptr.To(link.Attrs().EncapType)}
device.Attributes[apis.AttrAlias] = resourceapi.DeviceAttribute{StringValue: ptr.To(link.Attrs().Alias)}
device.Attributes[apis.AttrState] = resourceapi.DeviceAttribute{StringValue: ptr.To(link.Attrs().OperState.String())}
device.Attributes[apis.AttrType] = resourceapi.DeviceAttribute{StringValue: ptr.To(link.Type())}
v4 := sets.Set[string]{}
v6 := sets.Set[string]{}
if ips, err := nlwrap.AddrList(link, netlink.FAMILY_ALL); err == nil && len(ips) > 0 {
for _, address := range ips {
if !address.IP.IsGlobalUnicast() {
continue
}
if address.IP.To4() == nil && address.IP.To16() != nil {
v6.Insert(address.IPNet.String())
} else if address.IP.To4() != nil {
v4.Insert(address.IPNet.String())
}
}
if v4.Len() > 0 {
device.Attributes[apis.AttrIPv4] = resourceapi.DeviceAttribute{StringValue: ptr.To(strings.Join(v4.UnsortedList(), ","))}
}
if v6.Len() > 0 {
device.Attributes[apis.AttrIPv6] = resourceapi.DeviceAttribute{StringValue: ptr.To(strings.Join(v6.UnsortedList(), ","))}
}
}
isEbpf := false
filterNames, ok := getTcFilters(link)
if ok {
isEbpf = true
device.Attributes[apis.AttrTCFilterNames] = resourceapi.DeviceAttribute{StringValue: ptr.To(strings.Join(filterNames, ","))}
}
programNames, ok := getTcxFilters(link)
if ok {
isEbpf = true
device.Attributes[apis.AttrTCXProgramNames] = resourceapi.DeviceAttribute{StringValue: ptr.To(strings.Join(programNames, ","))}
}
device.Attributes[apis.AttrEBPF] = resourceapi.DeviceAttribute{BoolValue: &isEbpf}
isSRIOV := sriovTotalVFs(ifName) > 0
device.Attributes[apis.AttrSRIOV] = resourceapi.DeviceAttribute{BoolValue: &isSRIOV}
if isSRIOV {
vfs := int64(sriovNumVFs(ifName))
device.Attributes[apis.AttrSRIOVVfs] = resourceapi.DeviceAttribute{IntValue: &vfs}
}
isSriovVirtualFunction := isSriovVf(ifName, sysnetPath)
if isSriovVirtualFunction {
device.Attributes[apis.AttrIsSriovVf] = resourceapi.DeviceAttribute{BoolValue: &isSriovVirtualFunction}
}
if isVirtual(ifName, sysnetPath) {
device.Attributes[apis.AttrVirtual] = resourceapi.DeviceAttribute{BoolValue: ptr.To(true)}
} else {
device.Attributes[apis.AttrVirtual] = resourceapi.DeviceAttribute{BoolValue: ptr.To(false)}
}
}
func (db *DB) discoverRDMADevices(devices []resourceapi.Device) []resourceapi.Device {
for i := range devices {
isRDMA := false
if ifName := devices[i].Attributes[apis.AttrInterfaceName].StringValue; ifName != nil && *ifName != "" {
// Try rdmamap library first
isRDMA = rdmamap.IsRDmaDeviceForNetdevice(*ifName)
// Fallback to sysfs check if rdmamap fails. This is particularly
// needed for InfiniBand interfaces where rdmamap has a bug comparing
// against node GUID instead of port GUID:
// https://github.com/Mellanox/rdmamap/issues/15
if !isRDMA {
isRDMA = isRdmaDeviceInSysfs(*ifName)
}
} else if pciAddr := devices[i].Attributes[apis.AttrPCIAddress].StringValue; pciAddr != nil && *pciAddr != "" {
rdmaDevices := rdmamap.GetRdmaDevicesForPcidev(*pciAddr)
isRDMA = len(rdmaDevices) != 0
if isRDMA {
// IB-only device: has RDMA capability but no netdev interface.
rdmaDevName := rdmaDevices[0]
devices[i].Attributes[apis.AttrRDMADevice] = resourceapi.DeviceAttribute{StringValue: &rdmaDevName}
}
}
devices[i].Attributes[apis.AttrRDMA] = resourceapi.DeviceAttribute{BoolValue: &isRDMA}
}
return devices
}
func (db *DB) addCloudAttributes(devices []resourceapi.Device) []resourceapi.Device {
for i := range devices {
device := &devices[i]
maps.Copy(device.Attributes, getProviderAttributes(device, db.instance))
}
return devices
}
func (db *DB) updateDeviceStore(devices []resourceapi.Device) {
deviceStore := map[string]resourceapi.Device{}
deviceConfigStore := map[string]*apis.NetworkConfig{}
for _, device := range devices {
deviceStore[device.Name] = device
// Cache the configuration if the provider returns one.
if db.instance != nil {
id := cloudprovider.DeviceIdentifiers{
Name: device.Name,
}
if macAttr, ok := device.Attributes[apis.AttrMac]; ok && macAttr.StringValue != nil {
id.MAC = *macAttr.StringValue
}
if pciAttr, ok := device.Attributes[apis.AttrPCIAddress]; ok && pciAttr.StringValue != nil {
id.PCIAddress = *pciAttr.StringValue
}
if conf := db.instance.GetDeviceConfig(id); conf != nil {
deviceConfigStore[device.Name] = conf
}
}
}
db.mu.Lock()
defer db.mu.Unlock()
db.deviceStore = deviceStore
db.deviceConfigStore = deviceConfigStore
}
func (db *DB) GetDevice(deviceName string) (resourceapi.Device, bool) {
db.mu.RLock()
defer db.mu.RUnlock()
device, exists := db.deviceStore[deviceName]
return device, exists
}
// GetDeviceConfig returns the network configuration associated with the device, if any.
func (db *DB) GetDeviceConfig(deviceName string) (*apis.NetworkConfig, bool) {
db.mu.RLock()
defer db.mu.RUnlock()
conf, exists := db.deviceConfigStore[deviceName]
return conf, exists
}
// GetNetInterfaceName returns the network interface name for a given device. It
// first attempts to retrieve the name from the local device store. If the
// device is not found, it triggers a rescan of the system's devices and retries
// the lookup. This can happen when a device was recently released by a previous
// pod and a scan had not happened yet. This ensures that the function can find
// newly added devices that were not present in the store at the time of the
// initial call.
func (db *DB) GetNetInterfaceName(deviceName string) (string, error) {
name, err := db.getNetInterfaceNameWithoutRescan(deviceName)
if err != nil {
klog.V(3).Infof("Device %q not found in local store, rescanning.", deviceName)
db.scan()
name, err = db.getNetInterfaceNameWithoutRescan(deviceName)
}
return name, err
}
// getNetInterfaceNameWithoutRescan returns the network interface name for a
// given device from the local device store without triggering a rescan if the
// device is not found.
func (db *DB) getNetInterfaceNameWithoutRescan(deviceName string) (string, error) {
device, exists := db.GetDevice(deviceName)
if !exists {
return "", fmt.Errorf("device %s not found in store", deviceName)
}
if device.Attributes[apis.AttrInterfaceName].StringValue == nil {
return "", fmt.Errorf("device %s has no interface name in local store", deviceName)
}
return *device.Attributes[apis.AttrInterfaceName].StringValue, nil
}
// IsIBOnlyDevice returns true if the device has RDMA capability but no netdev
// interface (i.e. an InfiniBand-only device). Derived from existing attributes:
// a device with a non-empty rdmaDevice and no ifName is IB-only.
func (db *DB) IsIBOnlyDevice(deviceName string) bool {
device, exists := db.GetDevice(deviceName)
if !exists {
return false
}
rdmaAttr := device.Attributes[apis.AttrRDMADevice]
ifAttr := device.Attributes[apis.AttrInterfaceName]
return rdmaAttr.StringValue != nil && *rdmaAttr.StringValue != "" &&
(ifAttr.StringValue == nil || *ifAttr.StringValue == "")
}
// GetRDMADeviceName returns the RDMA link name (e.g. "mlx5_0") for an IB-only
// device. It returns an error if the device is not found or has no RDMA device
// name recorded.
func (db *DB) GetRDMADeviceName(deviceName string) (string, error) {
device, exists := db.GetDevice(deviceName)
if !exists {
return "", fmt.Errorf("device %s not found in store", deviceName)
}
attr, ok := device.Attributes[apis.AttrRDMADevice]
if !ok || attr.StringValue == nil {
return "", fmt.Errorf("device %s has no RDMA device name in local store", deviceName)
}
return *attr.StringValue, nil
}
// isNetworkDevice checks the class is 0x2, defined for all types of network controllers
// https://pcisig.com/sites/default/files/files/PCI_Code-ID_r_1_11__v24_Jan_2019.pdf
func isNetworkDevice(dev *ghw.PCIDevice) bool {
return dev.Class.ID == "02"
}