Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -91,8 +91,8 @@ type podConfigurator struct {
isSecondaryNetwork bool

containerAccess *containerAccessArbitrator
eventBroadcaster record.EventBroadcaster
recorder record.EventRecorder
eventBroadcaster events.EventBroadcaster
recorder events.EventRecorder
podListerSynced cache.InformerSynced
podLister v1.PodLister
kubeClient clientset.Interface
Expand Down Expand Up @@ -679,11 +679,11 @@ func (pc *podConfigurator) recordPodEvent(ifConfig *interfacestore.InterfaceConf

if installed {
// Add normal event to record Pod network is ready.
pc.recorder.Eventf(pod, corev1.EventTypeNormal, "NetworkReady", "Installed Pod network forwarding rules")
pc.recorder.Eventf(pod, nil, corev1.EventTypeNormal, "NetworkReady", "NetworkConfiguration", "Installed Pod network forwarding rules")
return
}

pc.recorder.Eventf(pod, corev1.EventTypeWarning, "NetworkNotReady", "Pod network forwarding rules not installed")
pc.recorder.Eventf(pod, nil, corev1.EventTypeWarning, "NetworkNotReady", "NetworkConfiguration", "Pod network forwarding rules not installed")
}

func (pc *podConfigurator) processPortStatusMessage(status *openflow15.PortStatus) {
Expand Down
11 changes: 6 additions & 5 deletions pkg/agent/cniserver/pod_configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
fakeclientset "k8s.io/client-go/kubernetes/fake"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/workqueue"

"antrea.io/antrea/pkg/agent/interfacestore"
Expand Down Expand Up @@ -74,7 +74,7 @@ type mockClients struct {
podLister corelisters.PodLister
podListerSynced cache.InformerSynced
ofClient *openflowtest.MockClient
recorder *record.FakeRecorder
recorder *events.FakeRecorder
}

func newMockClients(ctrl *gomock.Controller, nodeName string, objects ...runtime.Object) *mockClients {
Expand All @@ -91,8 +91,7 @@ func newMockClients(ctrl *gomock.Controller, nodeName string, objects ...runtime
)
podLister := corelisters.NewPodLister(localPodInformer.GetIndexer())
ofClient := openflowtest.NewMockClient(ctrl)
recorder := record.NewFakeRecorder(100)
recorder.IncludeObject = false
recorder := events.NewFakeRecorder(100)

return &mockClients{
kubeClient: kubeClient,
Expand Down Expand Up @@ -154,7 +153,9 @@ func mockRetryInterval(t *testing.T) {

func newTestPodConfigurator(testClients *mockClients, waiter *asyncWaiter) *podConfigurator {
interfaceStore := interfacestore.NewInterfaceStore()
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: testClients.kubeClient.EventsV1(),
})
queue := workqueue.NewTypedDelayingQueueWithConfig[string](
workqueue.TypedDelayingQueueConfig[string]{
Name: "podConfigurator",
Expand Down
14 changes: 6 additions & 8 deletions pkg/agent/cniserver/pod_configuration_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@ import (

"antrea.io/libOpenflow/openflow15"
current "github.com/containernetworking/cni/pkg/types/100"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
v1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -157,11 +155,13 @@ func (pc *podConfigurator) initPortStatusMonitor(podInformer cache.SharedIndexIn
Name: workerName,
},
)
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: pc.kubeClient.EventsV1(),
})
pc.eventBroadcaster = eventBroadcaster
pc.recorder = eventBroadcaster.NewRecorder(
scheme.Scheme,
corev1.EventSource{Component: "AntreaPodConfigurator"},
"AntreaPodConfigurator",
)
pc.statusCh = make(chan *openflow15.PortStatus, 100)
pc.ofClient.SubscribeOFPortStatusMessage(pc.statusCh)
Expand All @@ -177,9 +177,7 @@ func (pc *podConfigurator) Run(stopCh <-chan struct{}) {
return
}
pc.eventBroadcaster.StartStructuredLogging(0)
pc.eventBroadcaster.StartRecordingToSink(&typedv1.EventSinkImpl{
Interface: pc.kubeClient.CoreV1().Events(""),
})
pc.eventBroadcaster.StartRecordingToSink(stopCh)
defer pc.eventBroadcaster.Shutdown()

go wait.Until(pc.worker, time.Second, stopCh)
Expand Down
26 changes: 11 additions & 15 deletions pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ import (
"k8s.io/apimachinery/pkg/watch"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -198,8 +197,8 @@ type EgressController struct {

trafficShapingEnabled bool

eventBroadcaster record.EventBroadcaster
record record.EventRecorder
eventBroadcaster events.EventBroadcaster
record events.EventRecorder
// Whether to support non-default subnets.
supportSeparateSubnet bool
// Used to allocate route table ID.
Expand Down Expand Up @@ -235,11 +234,10 @@ func NewEgressController(
klog.Info("EgressTrafficShaping feature gate is enabled, but it is ignored because OVS meters are not supported.")
}

eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(
scheme.Scheme,
corev1.EventSource{Component: controllerName},
)
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: k8sClient.EventsV1(),
})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, controllerName)

c := &EgressController{
ofClient: ofClient,
Expand Down Expand Up @@ -503,9 +501,7 @@ func (c *EgressController) Run(stopCh <-chan struct{}) {
defer klog.Infof("Shutting down %s", controllerName)

c.eventBroadcaster.StartStructuredLogging(0)
c.eventBroadcaster.StartRecordingToSink(&v1.EventSinkImpl{
Interface: c.k8sClient.CoreV1().Events(""),
})
c.eventBroadcaster.StartRecordingToSink(stopCh)
defer c.eventBroadcaster.Shutdown()

go c.localIPDetector.Run(stopCh)
Expand Down Expand Up @@ -1069,7 +1065,7 @@ func (c *EgressController) syncEgress(egressName string) error {
return err
}
if assigned {
c.record.Eventf(egress, corev1.EventTypeNormal, "IPAssigned", "Assigned Egress %s with IP %s on Node %s", egress.Name, desiredEgressIP, desiredNode)
c.record.Eventf(egress, nil, corev1.EventTypeNormal, "IPAssigned", "NodeAssignment", "Assigned Egress %s with IP %s on Node %s", egress.Name, desiredEgressIP, desiredNode)
}
} else {
// Unassign the Egress IP from the local Node if it was assigned by the agent.
Expand All @@ -1078,7 +1074,7 @@ func (c *EgressController) syncEgress(egressName string) error {
return err
}
if unassigned {
c.record.Eventf(egress, corev1.EventTypeNormal, "IPUnassigned", "Unassigned Egress %s with IP %s from Node %s", egress.Name, desiredEgressIP, c.nodeName)
c.record.Eventf(egress, nil, corev1.EventTypeNormal, "IPUnassigned", "NodeAssignment", "Unassigned Egress %s with IP %s from Node %s", egress.Name, desiredEgressIP, c.nodeName)
}
}

Expand Down Expand Up @@ -1180,7 +1176,7 @@ func (c *EgressController) uninstallEgress(egressName string, eState *egressStat
return err
}
if unassigned && egress != nil {
c.record.Eventf(egress, corev1.EventTypeNormal, "IPUnassigned", "Unassigned Egress %s with IP %s from Node %s", egressName, eState.egressIP, c.nodeName)
c.record.Eventf(egress, nil, corev1.EventTypeNormal, "IPUnassigned", "NodeAssignment", "Unassigned Egress %s with IP %s from Node %s", egressName, eState.egressIP, c.nodeName)
}
// Remove the Egress's state.
c.deleteEgressState(egressName)
Expand Down
9 changes: 5 additions & 4 deletions pkg/agent/controller/egress/egress_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
v1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -1118,19 +1119,19 @@ func TestSyncEgress(t *testing.T) {
if tt.maxEgressIPsPerNode > 0 {
c.egressIPScheduler.maxEgressIPsPerNode = tt.maxEgressIPsPerNode
}
events := make([]*v1.Event, 0)
events := make([]*eventsv1.Event, 0)
var eventsMutex sync.Mutex
c.eventBroadcaster.StartEventWatcher(func(e *v1.Event) {
c.eventBroadcaster.StartEventWatcher(func(e runtime.Object) {
eventsMutex.Lock()
defer eventsMutex.Unlock()
events = append(events, e)
events = append(events, e.(*eventsv1.Event))
})
getEventMessages := func() []string {
eventsMutex.Lock()
defer eventsMutex.Unlock()
messages := make([]string, len(events))
for idx := range events {
messages[idx] = events[idx].Message
messages[idx] = events[idx].Note
}
return messages
}
Expand Down
13 changes: 8 additions & 5 deletions test/e2e/egress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (

"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/apis/crd/v1beta1"
"antrea.io/antrea/pkg/client/clientset/versioned/scheme"
"antrea.io/antrea/pkg/features"
"antrea.io/antrea/pkg/util/k8s"
)
Expand Down Expand Up @@ -486,9 +485,11 @@ func testEgressCRUD(t *testing.T, data *TestData) {
// Testing the events recorded during creation of an Egress resource.
expectedMessage := fmt.Sprintf("Assigned Egress %s with IP %s on Node %v", egress.Name, tt.expectedEgressIP, egress.Status.EgressNode)
assert.EventuallyWithT(t, func(c *assert.CollectT) {
events, err := data.clientset.CoreV1().Events("").SearchWithContext(context.TODO(), scheme.Scheme, egress)
events, err := data.clientset.EventsV1().Events("").List(context.TODO(), metav1.ListOptions{
FieldSelector: fmt.Sprintf("regarding.name=%s", egress.Name),
})
if assert.NoError(c, err) && assert.Len(c, events.Items, 1) {
assert.Contains(c, events.Items[0].Message, expectedMessage)
assert.Contains(c, events.Items[0].Note, expectedMessage)
}
}, 2*time.Second, 200*time.Millisecond)
}
Expand Down Expand Up @@ -605,11 +606,13 @@ func testEgressUpdateEgressIP(t *testing.T, data *TestData) {
fmt.Sprintf("Assigned Egress %s with IP %s on Node %v", egress.Name, tt.newEgressIP, tt.newNode),
}
assert.EventuallyWithT(t, func(c *assert.CollectT) {
events, err := data.clientset.CoreV1().Events("").SearchWithContext(context.TODO(), scheme.Scheme, egress)
events, err := data.clientset.EventsV1().Events("").List(context.TODO(), metav1.ListOptions{
FieldSelector: fmt.Sprintf("regarding.name=%s", egress.Name),
})
if assert.NoError(c, err) && assert.Len(c, events.Items, len(expectedMessages)) {
recordedMessages := []string{}
for _, items := range events.Items {
recordedMessages = append(recordedMessages, items.Message)
recordedMessages = append(recordedMessages, items.Note)
}
assert.Equal(c, expectedMessages[0], recordedMessages[0])
// The order of unassigning from original Node and assigning on new Node is random.
Expand Down