Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 39 additions & 6 deletions pkg/kubeletclient/kubeletclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"

"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/checkpoint"
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/logging"
Expand All @@ -41,6 +43,11 @@ const (
defaultPodResourcesMaxSize = 1024 * 1024 * 16 // 16 Mb
defaultPodResourcesPath = "/var/lib/kubelet/pod-resources"
unixProtocol = "unix"
// Retry configuration for rate limiting
maxRetries = 5
initialRetryDelay = 100 * time.Millisecond
maxRetryDelay = 2 * time.Second
retryBackoffFactor = 2
)

// LocalEndpoint returns the full path to a unix socket at the given endpoint
Expand Down Expand Up @@ -111,17 +118,43 @@ type kubeletClient struct {
}

func (rc *kubeletClient) getPodResources(client podresourcesapi.PodResourcesListerClient) error {
var resp *podresourcesapi.ListPodResourcesResponse
var err error
retryDelay := initialRetryDelay

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for attempt := 0; attempt <= maxRetries; attempt++ {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

resp, err := client.List(ctx, &podresourcesapi.ListPodResourcesRequest{})
if err != nil {
resp, err = client.List(ctx, &podresourcesapi.ListPodResourcesRequest{})
cancel()

if err == nil {
rc.resources = resp.PodResources
return nil
}

// Check if this is a rate limit error
if st, ok := status.FromError(err); ok && st.Code() == codes.ResourceExhausted {
if attempt < maxRetries {
logging.Debugf("getPodResources: rate limit hit (attempt %d/%d), retrying after %v: %v",
attempt+1, maxRetries+1, retryDelay, err)
time.Sleep(retryDelay)

// Exponential backoff with cap
retryDelay *= retryBackoffFactor
if retryDelay > maxRetryDelay {
retryDelay = maxRetryDelay
}
continue
}
logging.Errorf("getPodResources: rate limit exceeded after %d attempts", maxRetries+1)
}

// For non-rate-limit errors or final retry attempt, return the error
return logging.Errorf("getPodResources: failed to list pod resources, %v.Get(_) = _, %v", client, err)
Comment on lines +137 to 154

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The error handling for retries can be simplified and made more robust. Currently, when the final retry attempt fails due to rate limiting, a specific error message is logged (line 150) but its value is discarded. Instead, a more generic error from line 154 is returned, which can be misleading during debugging. It would be better to return the more specific error message that indicates the rate limit was exceeded after all attempts.

Additionally, the format string "%v.Get(_) = _, %v" on line 154 is unconventional and could be simplified to just include the error itself for better readability.

The suggested change addresses these points. Note that applying this suggestion will require updating the assertion in the test should fail after max retries with continuous rate limiting to check for the new, more specific error message.

                if attempt < maxRetries {
                    logging.Debugf("getPodResources: rate limit hit (attempt %d/%d), retrying after %v: %v",
                        attempt+1, maxRetries+1, retryDelay, err)
                    time.Sleep(retryDelay)

                    // Exponential backoff with cap
                    retryDelay *= retryBackoffFactor
                    if retryDelay > maxRetryDelay {
                        retryDelay = maxRetryDelay
                    }
                    continue
                }
                return logging.Errorf("getPodResources: rate limit exceeded after %d attempts: %v", maxRetries+1, err)
            }

            // For non-rate-limit errors, return the error
            return logging.Errorf("getPodResources: failed to list pod resources: %v", err)

}

rc.resources = resp.PodResources
return nil
return logging.Errorf("getPodResources: failed to list pod resources, %v.Get(_) = _, %v", client, err)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This return statement appears to be unreachable. The for loop above is structured to always exit via a return statement within the loop body if an error occurs, or if the call is successful. The loop will never complete its iterations and fall through to this line. This statement can be safely removed.

}

// GetPodResourceMap returns an instance of a map of Pod ResourceInfo given a (Pod name, namespace) tuple
Expand Down
139 changes: 139 additions & 0 deletions pkg/kubeletclient/kubeletclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import (
"net/url"
"os"
"path/filepath"
"sync/atomic"
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"golang.org/x/sys/unix"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sTypes "k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -340,4 +343,140 @@ var _ = Describe("Kubelet resource endpoint data read operations", func() {
Expect(resourceMap).To(Equal(emptyRMap))
})
})

Context("Rate limit handling with retries", func() {
var (
rateLimitSocketDir string
rateLimitSocketName string
rateLimitSocket *url.URL
rateLimitServer *rateLimitResourceServer
)

BeforeEach(func() {
tempSocketDir, err := os.MkdirTemp("", "kubelet-rate-limit-test")
Expect(err).NotTo(HaveOccurred())
testingPodResourcesPath := filepath.Join(tempSocketDir, defaultPodResourcesPath)

err = os.MkdirAll(testingPodResourcesPath, os.ModeDir)
Expect(err).NotTo(HaveOccurred())

rateLimitSocketDir = testingPodResourcesPath
rateLimitSocketName = filepath.Join(rateLimitSocketDir, "kubelet-ratelimit.sock")
rateLimitSocket = localEndpoint(filepath.Join(rateLimitSocketDir, "kubelet-ratelimit"))

rateLimitServer = &rateLimitResourceServer{
server: grpc.NewServer(),
failCount: 3,
currentCount: 0,
}
podresourcesapi.RegisterPodResourcesListerServer(rateLimitServer.server, rateLimitServer)
lis, err := CreateListener(rateLimitSocketName)
Expect(err).NotTo(HaveOccurred())
go rateLimitServer.server.Serve(lis)
})

AfterEach(func() {
if rateLimitServer != nil {
rateLimitServer.server.Stop()
}
os.RemoveAll(rateLimitSocketDir)
})

It("should retry and succeed after rate limit errors", func() {
client, err := getKubeletClient(rateLimitSocket)
Expect(err).NotTo(HaveOccurred())
Expect(client).NotTo(BeNil())

// Verify that retries occurred
finalCount := atomic.LoadInt32(&rateLimitServer.currentCount)
Expect(finalCount).To(BeNumerically(">", rateLimitServer.failCount))
})

It("should fail after max retries with continuous rate limiting", func() {
// Create a server that always fails
alwaysFailServer := &rateLimitResourceServer{
server: grpc.NewServer(),
failCount: 100, // Always fail
currentCount: 0,
}

tempSocketDir, err := os.MkdirTemp("", "kubelet-always-fail-test")
Expect(err).NotTo(HaveOccurred())
defer os.RemoveAll(tempSocketDir)

testingPodResourcesPath := filepath.Join(tempSocketDir, defaultPodResourcesPath)
err = os.MkdirAll(testingPodResourcesPath, os.ModeDir)
Expect(err).NotTo(HaveOccurred())

alwaysFailSocketName := filepath.Join(testingPodResourcesPath, "kubelet-always-fail.sock")
alwaysFailSocket := localEndpoint(filepath.Join(testingPodResourcesPath, "kubelet-always-fail"))

podresourcesapi.RegisterPodResourcesListerServer(alwaysFailServer.server, alwaysFailServer)
lis, err := CreateListener(alwaysFailSocketName)
Expect(err).NotTo(HaveOccurred())
go alwaysFailServer.server.Serve(lis)
defer alwaysFailServer.server.Stop()

_, err = getKubeletClient(alwaysFailSocket)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("failed to list pod resources"))

// Verify that max retries were attempted
finalCount := atomic.LoadInt32(&alwaysFailServer.currentCount)
Expect(finalCount).To(Equal(int32(maxRetries + 1)))
})
Comment on lines +395 to +427

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This test case duplicates a significant amount of server setup logic that is already present in the BeforeEach block for the preceding test context. This repetition makes the tests harder to read and maintain.

Consider refactoring the common server setup into a helper function. This function could accept parameters like failCount and return the necessary components for the test (e.g., the server URL, the server instance, and a cleanup function), which would reduce code duplication and improve the overall structure of the test file.

})
})

// rateLimitResourceServer simulates a kubelet server that returns rate limit errors
// for the first N calls, then succeeds
type rateLimitResourceServer struct {
server *grpc.Server
failCount int32
currentCount int32
}

func (m *rateLimitResourceServer) GetAllocatableResources(_ context.Context, _ *podresourcesapi.AllocatableResourcesRequest) (*podresourcesapi.AllocatableResourcesResponse, error) {
return &podresourcesapi.AllocatableResourcesResponse{}, nil
}

func (m *rateLimitResourceServer) Get(_ context.Context, _ *podresourcesapi.GetPodResourcesRequest) (*podresourcesapi.GetPodResourcesResponse, error) {
return &podresourcesapi.GetPodResourcesResponse{}, nil
}

func (m *rateLimitResourceServer) List(_ context.Context, _ *podresourcesapi.ListPodResourcesRequest) (*podresourcesapi.ListPodResourcesResponse, error) {
count := atomic.AddInt32(&m.currentCount, 1)

// Fail for the first N calls with ResourceExhausted error
if count <= m.failCount {
return nil, status.Error(codes.ResourceExhausted, "rejected by rate limit")
}

// After N failures, succeed
podName := "pod-name"
podNamespace := "pod-namespace"
containerName := "container-name"

devs := []*podresourcesapi.ContainerDevices{
{
ResourceName: "resource",
DeviceIds: []string{"dev0", "dev1"},
},
}

resp := &podresourcesapi.ListPodResourcesResponse{
PodResources: []*podresourcesapi.PodResources{
{
Name: podName,
Namespace: podNamespace,
Containers: []*podresourcesapi.ContainerResources{
{
Name: containerName,
Devices: devs,
},
},
},
},
}
return resp, nil
}
Loading