Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions cmd/dranet/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,14 @@ import (
"sigs.k8s.io/dranet/pkg/inventory"
"sigs.k8s.io/dranet/pkg/pcidb"

v1 "k8s.io/api/core/v1"
resourcev1 "k8s.io/api/resource/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
nodeutil "k8s.io/component-helpers/node/util"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -151,6 +155,17 @@ func main() {
opts = append(opts, driver.WithDBPath(dbPath))
}

// Create the event broadcaster
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: clientset.CoreV1().Events("")})
eventBroadcaster.StartLogging(klog.Infof)
eventRecorder := eventBroadcaster.NewRecorder(
scheme.Scheme,
v1.EventSource{Component: driverName},
)

opts = append(opts, driver.WithEventRecorder(eventRecorder))

if celExpression != "" {
env, err := cel.NewEnv(
ext.NativeTypes(
Expand Down
8 changes: 8 additions & 0 deletions deployments/helm/dranet/templates/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ rules:
- nodes
verbs:
- get
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
- update
- apiGroups:
- resource.k8s.io
resources:
Expand Down
8 changes: 8 additions & 0 deletions install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ rules:
- nodes
verbs:
- get
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
- update
- apiGroups:
- "resource.k8s.io"
resources:
Expand Down
9 changes: 7 additions & 2 deletions pkg/driver/dra_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"golang.org/x/sys/unix"
"sigs.k8s.io/dranet/internal/nlwrap"

v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -427,9 +428,13 @@ func (np *NetworkDriver) prepareResourceClaim(ctx context.Context, claim *resour
}

if len(errorList) > 0 {
klog.Infof("claim %s contain errors: %v", claim.UID, errors.Join(errorList...))
joinedErr := errors.Join(errorList...)
klog.Infof("claim %s contain errors: %v", claim.UID, joinedErr)
if np.eventRecorder != nil {
np.eventRecorder.Eventf(claim, v1.EventTypeWarning, "ClaimPrepareFailed", "%v", joinedErr)
}
return kubeletplugin.PrepareResult{
Err: fmt.Errorf("claim %s contain errors: %w", claim.UID, errors.Join(errorList...)),
Err: fmt.Errorf("claim %s contain errors: %w", claim.UID, joinedErr),
}
}
return kubeletplugin.PrepareResult{}
Expand Down
52 changes: 52 additions & 0 deletions pkg/driver/dra_hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
resourcev1 "k8s.io/api/resource/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/dynamic-resource-allocation/kubeletplugin"
"sigs.k8s.io/dranet/pkg/apis"
)
Expand Down Expand Up @@ -240,6 +241,57 @@ func TestUnprepareResourceClaimsMetrics(t *testing.T) {
})
}

func TestClaimPrepareFailedEvent(t *testing.T) {
ctx := context.Background()
fakeRecorder := record.NewFakeRecorder(10)

np := &NetworkDriver{
netdb: newFakeInventoryDB(),
driverName: "test.driver",
eventRecorder: fakeRecorder,
podConfigStore: mustNewPodConfigStore(),
}

claims := []*resourcev1.ResourceClaim{
{
ObjectMeta: metav1.ObjectMeta{
Name: "my-claim",
Namespace: "default",
UID: "claim-uid-1",
},
Status: resourcev1.ResourceClaimStatus{
ReservedFor: []resourcev1.ResourceClaimConsumerReference{
{APIGroup: "", Resource: "pods", Name: "test-pod", UID: "pod-uid-1"},
},
Allocation: &resourcev1.AllocationResult{
Devices: resourcev1.DeviceAllocationResult{
Results: []resourcev1.DeviceRequestAllocationResult{
{Driver: "test.driver", Device: "device-does-not-exist"},
},
},
},
},
},
}

res, err := np.PrepareResourceClaims(ctx, claims)
if err != nil {
t.Fatalf("PrepareResourceClaims returned unexpected error: %v", err)
}
if res["claim-uid-1"].Err == nil {
t.Fatal("expected per-claim error, got none")
}

select {
case event := <-fakeRecorder.Events:
if !strings.Contains(event, "ClaimPrepareFailed") {
t.Errorf("expected ClaimPrepareFailed event, got: %s", event)
}
default:
t.Error("expected a ClaimPrepareFailed event to be emitted, but none was received")
}
}

func TestPublishResourcesMetrics(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
Expand Down
19 changes: 14 additions & 5 deletions pkg/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
resourceapi "k8s.io/api/resource/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/dynamic-resource-allocation/kubeletplugin"
"k8s.io/dynamic-resource-allocation/resourceslice"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -81,6 +82,13 @@ func WithInventory(db inventoryDB) Option {
}
}

// WithEventRecorder sets the event recorder for the driver.
func WithEventRecorder(recorder record.EventRecorder) Option {
return func(o *NetworkDriver) {
o.eventRecorder = recorder
}
}

// WithDBPath sets the path for the persistent pod config database.
// If not set, an in-memory store is used.
func WithDBPath(path string) Option {
Expand All @@ -90,11 +98,12 @@ func WithDBPath(path string) Option {
}

type NetworkDriver struct {
driverName string
nodeName string
kubeClient kubernetes.Interface
draPlugin pluginHelper
nriPlugin stub.Stub
draPlugin pluginHelper
driverName string
eventRecorder record.EventRecorder
nodeName string
nriPlugin stub.Stub
kubeClient kubernetes.Interface

// contains the host interfaces
netdb inventoryDB
Expand Down
27 changes: 27 additions & 0 deletions pkg/driver/nri_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/containerd/nri/pkg/api"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
metav1apply "k8s.io/client-go/applyconfigurations/meta/v1"
Expand Down Expand Up @@ -176,6 +177,10 @@ func (np *NetworkDriver) runPodSandbox(_ context.Context, pod *api.PodSandbox, p
// Block 1: netdev operations — only when a network interface is present.
if ifName != "" {
if err := attachNetdevToNS(pod, ns, deviceName, config, resourceClaimStatusDevice); err != nil {
if np.eventRecorder != nil {
np.eventRecorder.Eventf(podObjectRef(pod), v1.EventTypeWarning, "NetworkDeviceAttachFailed",
"failed to attach network device %s to pod %s/%s: %v", deviceName, pod.GetNamespace(), pod.GetName(), err)
}
return err
}
}
Expand All @@ -185,6 +190,10 @@ func (np *NetworkDriver) runPodSandbox(_ context.Context, pod *api.PodSandbox, p
// for RoCE (netdev + RDMA) it runs after the netdev block above.
if !np.rdmaSharedMode && config.RDMADevice.LinkDev != "" {
if err := attachRdmaToNS(config.RDMADevice.LinkDev, ns, resourceClaimStatusDevice); err != nil {
if np.eventRecorder != nil {
np.eventRecorder.Eventf(podObjectRef(pod), v1.EventTypeWarning, "RDMADeviceAttachFailed",
"failed to attach RDMA device %s to pod %s/%s: %v", config.RDMADevice.LinkDev, pod.GetNamespace(), pod.GetName(), err)
}
return err
}
}
Expand Down Expand Up @@ -384,6 +393,10 @@ func (np *NetworkDriver) stopPodSandbox(_ context.Context, pod *api.PodSandbox,
if !np.rdmaSharedMode && config.RDMADevice.LinkDev != "" {
if err := nsDetachRdmadev(ns, config.RDMADevice.LinkDev); err != nil {
klog.Errorf("fail to return rdma device %s : %v", deviceName, err)
if np.eventRecorder != nil {
np.eventRecorder.Eventf(podObjectRef(pod), v1.EventTypeWarning, "RDMADeviceDetachFailed",
"failed to detach RDMA device %s from pod %s/%s: %v", config.RDMADevice.LinkDev, pod.GetNamespace(), pod.GetName(), err)
}
} else {
rdmaDetached = true
}
Expand All @@ -394,6 +407,10 @@ func (np *NetworkDriver) stopPodSandbox(_ context.Context, pod *api.PodSandbox,
if ifName != "" {
if err := nsDetachNetdev(ns, ifName, config.NetworkInterfaceConfigInHost.Interface.Name); err != nil {
klog.Errorf("fail to return network device %s : %v", deviceName, err)
if np.eventRecorder != nil {
np.eventRecorder.Eventf(podObjectRef(pod), v1.EventTypeWarning, "NetworkDeviceDetachFailed",
"failed to detach network device %s from pod %s/%s: %v", deviceName, pod.GetNamespace(), pod.GetName(), err)
}
} else {
netdevDetached = true
}
Expand Down Expand Up @@ -467,3 +484,13 @@ func getNetworkNamespace(pod *api.PodSandbox) string {
func podKey(pod *api.PodSandbox) string {
return fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName())
}

// NRI gives us *api.PodSandbox while we need *v1.Pod for the Eventf.
// As such, we construct the minimal *v1.Pod object reference needed for the event.
func podObjectRef(pod *api.PodSandbox) *v1.Pod {
p := &v1.Pod{}
p.Name = pod.GetName()
p.Namespace = pod.GetNamespace()
p.UID = types.UID(pod.GetUid())
return p
}
Loading