Skip to content

Commit cf4537c

Browse files
feat(inventory): Add configurable discovery rate limiting
This commit introduces configurable rate limiting for the inventory discovery process. Previously, a fixed 5-second rate limit with a burst of 1 could delay processing of netlink updates, leading to failures during high pod churn scenarios. Command-line flags have been added to control the inventory discovery rate limit and burst size. The default values have been adjusted to be more responsive to rapid pod lifecycle events, ensuring that device state is updated promptly.
1 parent 51b8a6c commit cf4537c

File tree

4 files changed

+67
-15
lines changed

4 files changed

+67
-15
lines changed

cmd/dranet/app.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,15 @@ import (
2727
"runtime/debug"
2828
"sync/atomic"
2929
"syscall"
30+
"time"
3031

3132
"github.com/google/cel-go/cel"
3233
"github.com/google/cel-go/ext"
3334
"github.com/google/dranet/pkg/driver"
35+
"github.com/google/dranet/pkg/inventory"
3436
"github.com/google/dranet/pkg/pcidb"
3537
"github.com/prometheus/client_golang/prometheus/promhttp"
38+
"golang.org/x/time/rate"
3639

3740
resourcev1 "k8s.io/api/resource/v1"
3841
"k8s.io/client-go/kubernetes"
@@ -51,6 +54,9 @@ var (
5154
kubeconfig string
5255
bindAddress string
5356
celExpression string
57+
minPollInterval time.Duration
58+
maxPollInterval time.Duration
59+
pollBurst int
5460

5561
ready atomic.Bool
5662
)
@@ -60,6 +66,9 @@ func init() {
6066
flag.StringVar(&bindAddress, "bind-address", ":9177", "The IP address and port for the metrics and healthz server to serve on")
6167
flag.StringVar(&hostnameOverride, "hostname-override", "", "If non-empty, will be used as the name of the Node that kube-network-policies is running on. If unset, the node name is assumed to be the same as the node's hostname.")
6268
flag.StringVar(&celExpression, "filter", `!("dra.net/type" in attributes) || attributes["dra.net/type"].StringValue != "veth"`, "CEL expression to filter network interface attributes (v1.DeviceAttribute).")
69+
flag.DurationVar(&minPollInterval, "inventory-min-poll-interval", 5*time.Second, "The minimum interval between two consecutive polls of the inventory.")
70+
flag.DurationVar(&maxPollInterval, "inventory-max-poll-interval", 1*time.Minute, "The maximum interval between two consecutive polls of the inventory.")
71+
flag.IntVar(&pollBurst, "inventory-poll-burst", 5, "The number of polls that can be run in a burst.")
6372

6473
flag.Usage = func() {
6574
fmt.Fprint(os.Stderr, "Usage: dranet [options]\n\n")
@@ -150,6 +159,11 @@ func main() {
150159
}
151160
opts = append(opts, driver.WithFilter(prg))
152161
}
162+
db := inventory.New(
163+
inventory.WithRateLimiter(rate.NewLimiter(rate.Every(minPollInterval), pollBurst)),
164+
inventory.WithMaxPollInterval(maxPollInterval),
165+
)
166+
opts = append(opts, driver.WithInventory(db))
153167
dranet, err := driver.Start(ctx, driverName, clientset, nodeName, opts...)
154168
if err != nil {
155169
klog.Fatalf("driver failed to start: %v", err)

pkg/driver/driver.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,13 @@ func WithFilter(filter cel.Program) Option {
7373
}
7474
}
7575

76+
// WithInventory sets the inventory database for the driver.
77+
func WithInventory(db inventoryDB) Option {
78+
return func(o *NetworkDriver) {
79+
o.netdb = db
80+
}
81+
}
82+
7683
type NetworkDriver struct {
7784
driverName string
7885
nodeName string
@@ -174,7 +181,9 @@ func Start(ctx context.Context, driverName string, kubeClient kubernetes.Interfa
174181
}()
175182

176183
// register the host network interfaces
177-
plugin.netdb = inventory.New()
184+
if plugin.netdb == nil {
185+
plugin.netdb = inventory.New()
186+
}
178187
go func() {
179188
for i := 0; i < maxAttempts; i++ {
180189
err = plugin.netdb.Run(ctx)

pkg/inventory/db.go

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,15 @@ import (
4242
)
4343

4444
const (
45-
// database poll period
46-
minInterval = 5 * time.Second
47-
maxInterval = 1 * time.Minute
45+
// defaultMinPollInterval is the default minimum interval between two
46+
// consecutive polls of the inventory.
47+
defaultMinPollInterval = 2 * time.Second
48+
// defaultMaxPollInterval is the default maximum interval between two
49+
// consecutive polls of the inventory.
50+
defaultMaxPollInterval = 1 * time.Minute
51+
// defaultPollBurst is the default number of polls that can be run in a
52+
// burst.
53+
defaultPollBurst = 5
4854
)
4955

5056
var (
@@ -66,18 +72,38 @@ type DB struct {
6672
// The deviceStore is periodically updated by the Run method.
6773
deviceStore map[string]resourceapi.Device
6874

69-
rateLimiter *rate.Limiter
70-
notifications chan []resourceapi.Device
71-
hasDevices bool
75+
rateLimiter *rate.Limiter
76+
maxPollInterval time.Duration
77+
notifications chan []resourceapi.Device
78+
hasDevices bool
7279
}
7380

74-
func New() *DB {
75-
return &DB{
76-
podNetNsStore: map[string]string{},
77-
deviceStore: map[string]resourceapi.Device{},
78-
rateLimiter: rate.NewLimiter(rate.Every(minInterval), 1),
79-
notifications: make(chan []resourceapi.Device),
81+
type Option func(*DB)
82+
83+
func WithRateLimiter(limiter *rate.Limiter) Option {
84+
return func(db *DB) {
85+
db.rateLimiter = limiter
86+
}
87+
}
88+
89+
func WithMaxPollInterval(d time.Duration) Option {
90+
return func(db *DB) {
91+
db.maxPollInterval = d
92+
}
93+
}
94+
95+
func New(opts ...Option) *DB {
96+
db := &DB{
97+
podNetNsStore: map[string]string{},
98+
deviceStore: map[string]resourceapi.Device{},
99+
rateLimiter: rate.NewLimiter(rate.Every(defaultMinPollInterval), defaultPollBurst),
100+
notifications: make(chan []resourceapi.Device),
101+
maxPollInterval: defaultMaxPollInterval,
102+
}
103+
for _, o := range opts {
104+
o(db)
80105
}
106+
return db
81107
}
82108

83109
func (db *DB) AddPodNetNs(pod string, netNsPath string) {
@@ -114,7 +140,7 @@ func (db *DB) Run(ctx context.Context) error {
114140
doneCh := make(chan struct{})
115141
defer close(doneCh)
116142
if err := netlink.LinkSubscribe(nlChannel, doneCh); err != nil {
117-
klog.Error(err, "error subscribing to netlink interfaces, only syncing periodically", "interval", maxInterval.String())
143+
klog.Error(err, "error subscribing to netlink interfaces, only syncing periodically", "interval", db.maxPollInterval.String())
118144
}
119145

120146
// Obtain data that will not change after the startup
@@ -159,7 +185,7 @@ func (db *DB) Run(ctx context.Context) error {
159185
for len(nlChannel) > 0 {
160186
<-nlChannel
161187
}
162-
case <-time.After(maxInterval):
188+
case <-time.After(db.maxPollInterval):
163189
case <-ctx.Done():
164190
return ctx.Err()
165191
}

tests/e2e.bats

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@ setup_tcx_filter() {
110110
docker exec "$CLUSTER_NAME"-worker bash -c "ip link add mlx5_6 type dummy"
111111
docker exec "$CLUSTER_NAME"-worker bash -c "ip link set up dev mlx5_6"
112112

113+
echo "DEBUG: This is debug info before creation of deviceclass and resourceclaim"
114+
dump_debug_info_on_failure
115+
113116
kubectl apply -f "$BATS_TEST_DIRNAME"/../tests/manifests/deviceclass.yaml
114117
kubectl apply -f "$BATS_TEST_DIRNAME"/../tests/manifests/resourceclaim.yaml
115118
kubectl wait --timeout=30s --for=condition=ready pods -l app=pod

0 commit comments

Comments
 (0)