diff --git a/agent/outbound_test.go b/agent/outbound_test.go index 6b4bb256a..f1ec53b55 100644 --- a/agent/outbound_test.go +++ b/agent/outbound_test.go @@ -139,6 +139,7 @@ func Test_addAppDeletionToQueue(t *testing.T) { _ = a.appManager.Manage("agent/guestbook") a.addAppDeletionToQueue(app) ev, _ := a.queues.SendQ(defaultQueueName).Get() + a.queues.SendQ(defaultQueueName).Done(ev) assert.Equal(t, event.Delete.String(), ev.Type()) require.False(t, a.appManager.IsManaged("agent/guestbook")) }) @@ -365,6 +366,7 @@ func Test_addAppProjectCreationToQueue(t *testing.T) { require.Equal(t, 1, a.queues.SendQ(defaultQueueName).Len()) ev, _ := a.queues.SendQ(defaultQueueName).Get() assert.NotNil(t, ev) + a.queues.SendQ(defaultQueueName).Done(ev) assert.Equal(t, event.Create.String(), ev.Type()) // Queue should be empty after get assert.Equal(t, 0, a.queues.SendQ(defaultQueueName).Len()) @@ -512,6 +514,7 @@ func Test_addAppProjectDeletionToQueue(t *testing.T) { a.addAppProjectDeletionToQueue(appProject) ev, _ := a.queues.SendQ(defaultQueueName).Get() + a.queues.SendQ(defaultQueueName).Done(ev) assert.Equal(t, event.Delete.String(), ev.Type()) require.False(t, a.projectManager.IsManaged("test-project")) }) diff --git a/internal/queue/dedupe_queue.go b/internal/queue/dedupe_queue.go new file mode 100644 index 000000000..539ba7d87 --- /dev/null +++ b/internal/queue/dedupe_queue.go @@ -0,0 +1,222 @@ +// Copyright 2026 The argocd-agent Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package queue + +import ( + "sync" + + internalevent "github.com/argoproj-labs/argocd-agent/internal/event" + "github.com/cloudevents/sdk-go/v2/event" + "k8s.io/client-go/util/workqueue" +) + +// EventKey is the key that goes into the workqueue +type EventKey struct { + ResourceID string + EventType string + EventID string +} + +// reorderQueue is used to customize the functionality of the default workqueue. +// When a duplicate item is added (via Touch), it removes the existing entry and +// appends the item to the tail of the queue. +type reorderQueue[T comparable] struct { + items []T + // index is a map of item to its index in the queue + index map[T]int +} + +func newReorderQueue[T comparable]() workqueue.Queue[T] { + return &reorderQueue[T]{ + index: make(map[T]int), + } +} + +func (q *reorderQueue[T]) Push(item T) { + q.index[item] = len(q.items) + q.items = append(q.items, item) +} + +func (q *reorderQueue[T]) Pop() (item T) { + item = q.items[0] + delete(q.index, item) + + q.items[0] = *new(T) + q.items = q.items[1:] + + // Rebuild indices after shift + for i, it := range q.items { + q.index[it] = i + } + return item +} + +// Touch is a hook that is invoked when queue.Add is called with an item that already exists in the queue. +// It is only called when the item is not being processed i.e in dirty set and not in processing set. +func (q *reorderQueue[T]) Touch(item T) { + idx, ok := q.index[item] + if !ok { + return + } + // Remove from current position + q.items = append(q.items[:idx], q.items[idx+1:]...) + // Re-add at the end + q.items = append(q.items, item) + // Rebuild indices from the moved position onward + for i := idx; i < len(q.items); i++ { + q.index[q.items[i]] = i + } +} + +func (q *reorderQueue[T]) Len() int { + return len(q.items) +} + +type dedupeQueue struct { + queue workqueue.TypedRateLimitingInterface[EventKey] + + maxSize int + + mu sync.Mutex + latestEvents map[EventKey]*event.Event + eventKeys map[*event.Event]EventKey + + notify chan struct{} +} + +func NewDedupeQueue(name string, maxSize int) WorkQueue { + baseQueue := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[EventKey]{ + Name: name, + Queue: newReorderQueue[EventKey](), + }) + + delayingQueue := workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[EventKey]{ + Queue: baseQueue, + }) + + queue := workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[EventKey](), + workqueue.TypedRateLimitingQueueConfig[EventKey]{ + DelayingQueue: delayingQueue, + }, + ) + + return &dedupeQueue{ + queue: queue, + maxSize: maxSize, + latestEvents: make(map[EventKey]*event.Event), + eventKeys: make(map[*event.Event]EventKey), + notify: make(chan struct{}, 10), + } +} + +func getKey(ev *event.Event) EventKey { + resID := internalevent.ResourceID(ev) + evType := ev.Type() + + if canDedupe(ev) { + return EventKey{ + ResourceID: resID, + EventType: evType, + } + } + + // Non-dedupable events get a unique key + return EventKey{ + ResourceID: resID, + EventType: evType, + EventID: internalevent.EventID(ev), + } +} + +func (q *dedupeQueue) Add(item *event.Event) { + key := getKey(item) + + q.mu.Lock() + oldEvent := q.latestEvents[key] + _, exists := q.latestEvents[key] + q.latestEvents[key] = item + q.eventKeys[item] = key + if exists && oldEvent != nil { + delete(q.eventKeys, oldEvent) + } + q.mu.Unlock() + + // Only evict when this is a genuinely new key. If the key already + // exists the workqueue will update it in-place (Touch) without + // growing, so evicting would incorrectly shrink the queue. + if !exists && q.queue.Len() == q.maxSize { + oldest, shutdown := q.queue.Get() + if !shutdown { + q.mu.Lock() + evicted := q.latestEvents[oldest] + delete(q.latestEvents, oldest) + if evicted != nil { + delete(q.eventKeys, evicted) + } + q.mu.Unlock() + q.queue.Done(oldest) + } + } + + q.queue.Add(key) + select { + case q.notify <- struct{}{}: + default: + return + } +} + +func (q *dedupeQueue) Get() (*event.Event, bool) { + key, shutdown := q.queue.Get() + if shutdown { + return nil, shutdown + } + + q.mu.Lock() + ev := q.latestEvents[key] + delete(q.latestEvents, key) + q.mu.Unlock() + + return ev, shutdown +} + +func (q *dedupeQueue) Done(item *event.Event) { + q.mu.Lock() + key, ok := q.eventKeys[item] + if ok { + delete(q.eventKeys, item) + delete(q.latestEvents, key) + } + q.mu.Unlock() + + if ok { + q.queue.Done(key) + } +} + +func (q *dedupeQueue) ShutDown() { + q.queue.ShutDown() +} + +func (q *dedupeQueue) Len() int { + return q.queue.Len() +} + +// canDedupe returns true if the event type supports de-duplication. +func canDedupe(ev *event.Event) bool { + evType := ev.Type() + return evType == internalevent.SpecUpdate.String() || evType == internalevent.StatusUpdate.String() +} diff --git a/internal/queue/dedupe_queue_test.go b/internal/queue/dedupe_queue_test.go new file mode 100644 index 000000000..1c667e8cc --- /dev/null +++ b/internal/queue/dedupe_queue_test.go @@ -0,0 +1,401 @@ +// Copyright 2026 The argocd-agent Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package queue + +import ( + "fmt" + "sync" + "testing" + + internalevent "github.com/argoproj-labs/argocd-agent/internal/event" + cloudevents "github.com/cloudevents/sdk-go/v2/event" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func newDedupableEvent(resourceID, data string) *cloudevents.Event { + ev := cloudevents.New() + ev.SetType(internalevent.SpecUpdate.String()) + ev.SetExtension("resourceid", resourceID) + ev.SetExtension("eventid", fmt.Sprintf("%s_%s", resourceID, data)) + _ = ev.SetData(cloudevents.TextPlain, data) + return &ev +} + +func newStatusUpdateEvent(resourceID, data string) *cloudevents.Event { + ev := cloudevents.New() + ev.SetType(internalevent.StatusUpdate.String()) + ev.SetExtension("resourceid", resourceID) + ev.SetExtension("eventid", fmt.Sprintf("%s_%s", resourceID, data)) + _ = ev.SetData(cloudevents.TextPlain, data) + return &ev +} + +func newNonDedupableEvent(resourceID, eventID, data string) *cloudevents.Event { + ev := cloudevents.New() + ev.SetType(internalevent.Create.String()) + ev.SetExtension("resourceid", resourceID) + ev.SetExtension("eventid", eventID) + _ = ev.SetData(cloudevents.TextPlain, data) + return &ev +} + +func TestDedupeQueue_DifferentResourcesNotDeduplicated(t *testing.T) { + q := NewDedupeQueue("test", 100) + + ev1 := newDedupableEvent("app1_uid1", "v1") + ev2 := newDedupableEvent("app2_uid2", "v1") + + q.Add(ev1) + q.Add(ev2) + + assert.Equal(t, 2, q.Len()) +} + +func TestDedupeQueue_DifferentTypesNotDeduplicated(t *testing.T) { + q := NewDedupeQueue("test", 100) + + ev1 := newDedupableEvent("app1_uid1", "v1") + ev2 := newStatusUpdateEvent("app1_uid1", "v1") + + q.Add(ev1) + q.Add(ev2) + + assert.Equal(t, 2, q.Len()) +} + +func TestDedupeQueue_NonDedupableEventsNeverCoalesce(t *testing.T) { + q := NewDedupeQueue("test", 100) + + ev1 := newNonDedupableEvent("app1_uid1", "evt-1", "data1") + ev2 := newNonDedupableEvent("app1_uid1", "evt-2", "data2") + ev3 := newNonDedupableEvent("app1_uid1", "evt-3", "data3") + + q.Add(ev1) + q.Add(ev2) + q.Add(ev3) + + assert.Equal(t, 3, q.Len()) +} + +func TestDedupeQueue_FIFOOrdering(t *testing.T) { + q := NewDedupeQueue("test", 100) + + ev1 := newNonDedupableEvent("app1", "evt-1", "first") + ev2 := newNonDedupableEvent("app2", "evt-2", "second") + ev3 := newNonDedupableEvent("app3", "evt-3", "third") + + q.Add(ev1) + q.Add(ev2) + q.Add(ev3) + + got1, _ := q.Get() + q.Done(got1) + got2, _ := q.Get() + q.Done(got2) + got3, _ := q.Get() + q.Done(got3) + + var d1, d2, d3 string + _ = got1.DataAs(&d1) + _ = got2.DataAs(&d2) + _ = got3.DataAs(&d3) + assert.Equal(t, "first", d1) + assert.Equal(t, "second", d2) + assert.Equal(t, "third", d3) +} + +func TestDedupeQueue_DedupeMovesToBack(t *testing.T) { + q := NewDedupeQueue("test", 100) + + ev1 := newDedupableEvent("app1_uid1", "v1") + ev2 := newDedupableEvent("app2_uid2", "v1") + + q.Add(ev1) + q.Add(ev2) + + // Re-add app1 with new data — should move to back + ev1Updated := newDedupableEvent("app1_uid1", "v2") + q.Add(ev1Updated) + + assert.Equal(t, 2, q.Len()) + + // First out should be app2 (app1 moved to back) + got, _ := q.Get() + q.Done(got) + assert.Equal(t, "app2_uid2", internalevent.ResourceID(got)) + + got, _ = q.Get() + q.Done(got) + assert.Equal(t, "app1_uid1", internalevent.ResourceID(got)) + var data string + _ = got.DataAs(&data) + assert.Equal(t, "v2", data, "should have latest payload") +} + +func TestDedupeQueue_BoundedEviction(t *testing.T) { + maxSize := 5 + q := NewDedupeQueue("test", maxSize) + + for i := 0; i < maxSize; i++ { + ev := newNonDedupableEvent(fmt.Sprintf("app%d", i), fmt.Sprintf("evt-%d", i), fmt.Sprintf("data%d", i)) + q.Add(ev) + } + assert.Equal(t, maxSize, q.Len()) + + // Adding one more should evict the oldest + overflow := newNonDedupableEvent("appNew", "evt-new", "new-data") + q.Add(overflow) + assert.Equal(t, maxSize, q.Len()) + + // The oldest (app0) should have been evicted; first available is app1 + got, _ := q.Get() + q.Done(got) + var data string + _ = got.DataAs(&data) + assert.Equal(t, "data1", data) +} + +func TestDedupeQueue_DuplicateDoesNotEvict(t *testing.T) { + maxSize := 3 + q := NewDedupeQueue("test", maxSize) + + ev1 := newDedupableEvent("app1_uid1", "v1") + ev2 := newDedupableEvent("app2_uid2", "v1") + ev3 := newDedupableEvent("app3_uid3", "v1") + + q.Add(ev1) + q.Add(ev2) + q.Add(ev3) + assert.Equal(t, maxSize, q.Len()) + + // Re-adding app1 (deduplicate) should NOT evict anything + ev1Updated := newDedupableEvent("app1_uid1", "v2") + q.Add(ev1Updated) + assert.Equal(t, maxSize, q.Len()) + + // All three resources should still be present + got1, _ := q.Get() + q.Done(got1) + got2, _ := q.Get() + q.Done(got2) + got3, _ := q.Get() + q.Done(got3) + + resources := []string{ + internalevent.ResourceID(got1), + internalevent.ResourceID(got2), + internalevent.ResourceID(got3), + } + assert.Contains(t, resources, "app1_uid1") + assert.Contains(t, resources, "app2_uid2") + assert.Contains(t, resources, "app3_uid3") +} + +func TestDedupeQueue_GetReturnsLatestAfterMultipleUpdates(t *testing.T) { + q := NewDedupeQueue("test", 100) + + for i := 0; i < 10; i++ { + ev := newDedupableEvent("app1_uid1", fmt.Sprintf("version-%d", i)) + q.Add(ev) + } + + assert.Equal(t, 1, q.Len()) + + got, _ := q.Get() + var data string + _ = got.DataAs(&data) + assert.Equal(t, "version-9", data) +} + +func TestDedupeQueue_Done(t *testing.T) { + q := NewDedupeQueue("test", 100) + + ev := newDedupableEvent("app1_uid1", "v1") + q.Add(ev) + + got, _ := q.Get() + assert.Equal(t, 0, q.Len()) + q.Done(got) + + // After Done, adding a new event for the same resource should work fresh + ev2 := newDedupableEvent("app1_uid1", "v2") + q.Add(ev2) + assert.Equal(t, 1, q.Len()) +} + +func TestDedupeQueue_ShutDown(t *testing.T) { + q := NewDedupeQueue("test", 100) + + ev := newDedupableEvent("app1_uid1", "v1") + q.Add(ev) + + q.ShutDown() + + // Pending items are still returned after shutdown + got, shutdown := q.Get() + assert.False(t, shutdown) + assert.NotNil(t, got) + q.Done(got) + + // Once drained, Get signals shutdown + _, shutdown = q.Get() + assert.True(t, shutdown) +} + +func TestDedupeQueue_EmptyQueueLen(t *testing.T) { + q := NewDedupeQueue("test", 100) + assert.Equal(t, 0, q.Len()) +} + +func TestDedupeQueue_MixedDedupableAndNonDedupable(t *testing.T) { + q := NewDedupeQueue("test", 100) + + spec1 := newDedupableEvent("app1_uid1", "spec-v1") + spec2 := newDedupableEvent("app1_uid1", "spec-v2") + create1 := newNonDedupableEvent("app1_uid1", "create-1", "create-data") + + q.Add(spec1) + q.Add(create1) + q.Add(spec2) + + // spec events coalesce (1 slot), create is separate (1 slot) = 2 total + assert.Equal(t, 2, q.Len()) +} + +func TestDedupeQueue_NotifyOnAdd(t *testing.T) { + dq := NewDedupeQueue("test", 100).(*dedupeQueue) + + ev := newDedupableEvent("app1_uid1", "v1") + q := dq + q.Add(ev) + + select { + case <-dq.notify: + // expected + default: + t.Fatal("expected notification on Add") + } +} + +func TestDedupeQueue_ConcurrentAdds(t *testing.T) { + q := NewDedupeQueue("test", 1000) + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + ev := newDedupableEvent(fmt.Sprintf("app%d_uid%d", i, i), fmt.Sprintf("v%d", i)) + q.Add(ev) + }(i) + } + wg.Wait() + + assert.Equal(t, 100, q.Len()) +} + +func TestDedupeQueue_ConcurrentDeduplication(t *testing.T) { + q := NewDedupeQueue("test", 1000) + + var wg sync.WaitGroup + for i := 0; i < 50; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + ev := newDedupableEvent("app1_uid1", fmt.Sprintf("v%d", i)) + q.Add(ev) + }(i) + } + wg.Wait() + + assert.Equal(t, 1, q.Len()) + + got, _ := q.Get() + require.NotNil(t, got) +} + +func TestReorderQueue_Push_Pop(t *testing.T) { + q := newReorderQueue[string]() + + q.Push("a") + q.Push("b") + q.Push("c") + + assert.Equal(t, 3, q.Len()) + assert.Equal(t, "a", q.Pop()) + assert.Equal(t, "b", q.Pop()) + assert.Equal(t, "c", q.Pop()) + assert.Equal(t, 0, q.Len()) +} + +func TestReorderQueue_Touch_MovesToBack(t *testing.T) { + q := newReorderQueue[string]() + + q.Push("a") + q.Push("b") + q.Push("c") + + q.Touch("a") + + assert.Equal(t, 3, q.Len()) + assert.Equal(t, "b", q.Pop()) + assert.Equal(t, "c", q.Pop()) + assert.Equal(t, "a", q.Pop()) +} + +func TestReorderQueue_Touch_MiddleElement(t *testing.T) { + q := newReorderQueue[string]() + + q.Push("a") + q.Push("b") + q.Push("c") + q.Push("d") + + q.Touch("b") + + assert.Equal(t, "a", q.Pop()) + assert.Equal(t, "c", q.Pop()) + assert.Equal(t, "d", q.Pop()) + assert.Equal(t, "b", q.Pop()) +} + +func TestReorderQueue_Touch_LastElement(t *testing.T) { + q := newReorderQueue[string]() + + q.Push("a") + q.Push("b") + q.Push("c") + + q.Touch("c") + + // "c" is already last — order shouldn't change + assert.Equal(t, "a", q.Pop()) + assert.Equal(t, "b", q.Pop()) + assert.Equal(t, "c", q.Pop()) +} + +func TestReorderQueue_Touch_NonExistent(t *testing.T) { + q := newReorderQueue[string]() + + q.Push("a") + q.Push("b") + + q.Touch("z") // should be a no-op + + assert.Equal(t, 2, q.Len()) + assert.Equal(t, "a", q.Pop()) + assert.Equal(t, "b", q.Pop()) +} diff --git a/internal/queue/mocks/QueuePair.go b/internal/queue/mocks/QueuePair.go index e78e682f4..211b0b5f7 100644 --- a/internal/queue/mocks/QueuePair.go +++ b/internal/queue/mocks/QueuePair.go @@ -6,6 +6,7 @@ import ( event "github.com/cloudevents/sdk-go/v2/event" mock "github.com/stretchr/testify/mock" + queue "github.com/argoproj-labs/argocd-agent/internal/queue" workqueue "k8s.io/client-go/util/workqueue" ) @@ -254,19 +255,19 @@ func (_c *QueuePair_Names_Call) RunAndReturn(run func() []string) *QueuePair_Nam } // RecvQ provides a mock function with given fields: name -func (_m *QueuePair) RecvQ(name string) workqueue.TypedRateLimitingInterface[*event.Event] { +func (_m *QueuePair) RecvQ(name string) queue.WorkQueue { ret := _m.Called(name) if len(ret) == 0 { panic("no return value specified for RecvQ") } - var r0 workqueue.TypedRateLimitingInterface[*event.Event] - if rf, ok := ret.Get(0).(func(string) workqueue.TypedRateLimitingInterface[*event.Event]); ok { + var r0 queue.WorkQueue + if rf, ok := ret.Get(0).(func(string) queue.WorkQueue); ok { r0 = rf(name) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(workqueue.TypedRateLimitingInterface[*event.Event]) + r0 = ret.Get(0).(queue.WorkQueue) } } @@ -291,12 +292,12 @@ func (_c *QueuePair_RecvQ_Call) Run(run func(name string)) *QueuePair_RecvQ_Call return _c } -func (_c *QueuePair_RecvQ_Call) Return(_a0 workqueue.TypedRateLimitingInterface[*event.Event]) *QueuePair_RecvQ_Call { +func (_c *QueuePair_RecvQ_Call) Return(_a0 queue.WorkQueue) *QueuePair_RecvQ_Call { _c.Call.Return(_a0) return _c } -func (_c *QueuePair_RecvQ_Call) RunAndReturn(run func(string) workqueue.TypedRateLimitingInterface[*event.Event]) *QueuePair_RecvQ_Call { +func (_c *QueuePair_RecvQ_Call) RunAndReturn(run func(string) queue.WorkQueue) *QueuePair_RecvQ_Call { _c.Call.Return(run) return _c } diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 7d6d967ba..333553703 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -27,13 +27,24 @@ import ( var _ QueuePair = &SendRecvQueues{} +// WorkQueue is the minimal interface used by workqueue consumers. It covers only +// the methods actually called on workqueues (Add, Get, Done, Len, +// ShutDown), avoiding a dependency on the full workqueue interface. +type WorkQueue interface { + Add(item *event.Event) + Get() (*event.Event, bool) + Done(item *event.Event) + Len() int + ShutDown() +} + // QueuePair maintains a map (indexed by name) of send/receive queue pairs type QueuePair interface { Names() []string HasQueuePair(name string) bool Len() int - SendQ(name string) workqueue.TypedRateLimitingInterface[*event.Event] - RecvQ(name string) workqueue.TypedRateLimitingInterface[*event.Event] + SendQ(name string) WorkQueue + RecvQ(name string) WorkQueue Create(name string) error Delete(name string, shutdown bool) error } @@ -44,8 +55,8 @@ const ( ) type queuepair struct { - recvq *boundedQueue - sendq *boundedQueue + recvq WorkQueue + sendq WorkQueue } type boundedQueue struct { @@ -125,7 +136,7 @@ func (q *SendRecvQueues) Len() int { // SendQ will return the send queue from the queue pair named name. If no such // queue pair exists, returns nil -func (q *SendRecvQueues) SendQ(name string) workqueue.TypedRateLimitingInterface[*event.Event] { +func (q *SendRecvQueues) SendQ(name string) WorkQueue { q.queuelock.RLock() defer q.queuelock.RUnlock() qp, ok := q.queues[name] @@ -137,7 +148,7 @@ func (q *SendRecvQueues) SendQ(name string) workqueue.TypedRateLimitingInterface // RecvQ will return the receive queue from the queue pair named name. If no // such queue pair exists, returns nil -func (q *SendRecvQueues) RecvQ(name string) workqueue.TypedRateLimitingInterface[*event.Event] { +func (q *SendRecvQueues) RecvQ(name string) WorkQueue { q.queuelock.RLock() defer q.queuelock.RUnlock() qp, ok := q.queues[name] @@ -171,8 +182,8 @@ func (q *SendRecvQueues) Create(name string) error { }, defaultMaxQueueSize) qp := &queuepair{} - qp.sendq = newBoundedQueue(sendQueueSize, name+"-send") - qp.recvq = newBoundedQueue(recvQueueSize, name+"-recv") + qp.sendq = NewDedupeQueue(name+"-send", sendQueueSize) + qp.recvq = NewDedupeQueue(name+"-recv", recvQueueSize) q.queues[name] = qp return nil @@ -199,8 +210,8 @@ func (q *SendRecvQueues) Delete(name string, shutdown bool) error { // GetWithContext is a wrapper around the workqueue's Get method. // It waits until an item is available in the queue or the context is Done -func GetWithContext(q workqueue.TypedRateLimitingInterface[*event.Event], ctx context.Context) (*event.Event, bool) { - bq, ok := q.(*boundedQueue) +func GetWithContext(q WorkQueue, ctx context.Context) (*event.Event, bool) { + bq, ok := q.(*dedupeQueue) if !ok { return nil, false } diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go index 8417a4b62..20c4caba0 100644 --- a/internal/queue/queue_test.go +++ b/internal/queue/queue_test.go @@ -69,12 +69,16 @@ func Test_Queue(t *testing.T) { for i := 1; i <= defaultMaxQueueSize; i++ { ev := event.New() ev.SetID(strconv.Itoa(i)) + ev.SetExtension("eventid", strconv.Itoa(i)) + ev.SetExtension("resourceid", strconv.Itoa(i)) queue.Add(&ev) } // Since the queue is full, check if the oldest item is popped before adding a new item. ev := event.New() ev.SetID(strconv.Itoa(defaultMaxQueueSize + 1)) + ev.SetExtension("eventid", "new-event-id") + ev.SetExtension("resourceid", "new-resource-id") queue.Add(&ev) assert.Equal(t, defaultMaxQueueSize, queue.Len()) front, _ := queue.Get() @@ -94,6 +98,8 @@ func Test_Queue(t *testing.T) { for i := 1; i <= queueSize+20; i++ { ev := event.New() ev.SetID(strconv.Itoa(i)) + ev.SetExtension("eventid", strconv.Itoa(i)) + ev.SetExtension("resourceid", strconv.Itoa(i)) recvQueue.Add(&ev) sendQueue.Add(&ev) } @@ -101,6 +107,8 @@ func Test_Queue(t *testing.T) { // Since the queue is full, check if the oldest item is popped before adding a new item. ev := event.New() ev.SetID(strconv.Itoa(queueSize + 1)) + ev.SetExtension("eventid", "new-event-id") + ev.SetExtension("resourceid", "new-resource-id") recvQueue.Add(&ev) sendQueue.Add(&ev) assert.Equal(t, queueSize, recvQueue.Len()) diff --git a/internal/resync/resync.go b/internal/resync/resync.go index 13867438f..82f21da0c 100644 --- a/internal/resync/resync.go +++ b/internal/resync/resync.go @@ -26,6 +26,7 @@ import ( "github.com/argoproj-labs/argocd-agent/internal/logging/logfields" "github.com/argoproj-labs/argocd-agent/internal/manager" "github.com/argoproj-labs/argocd-agent/internal/manager/appproject" + "github.com/argoproj-labs/argocd-agent/internal/queue" "github.com/argoproj-labs/argocd-agent/internal/resources" "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1" cloudevent "github.com/cloudevents/sdk-go/v2/event" @@ -37,7 +38,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" ktypes "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" - "k8s.io/client-go/util/workqueue" ) // ErrSourceUIDNotFound is returned when a resource does not have the source UID annotation. @@ -49,7 +49,7 @@ var ErrSourceUIDNotFound = errors.New("source UID annotation not found") type RequestHandler struct { dynClient dynamic.Interface - sendQ workqueue.TypedRateLimitingInterface[*cloudevent.Event] + sendQ queue.WorkQueue events *event.EventSource @@ -80,7 +80,7 @@ type RequestHandler struct { peerNamespace string } -func NewRequestHandler(dynClient dynamic.Interface, queue workqueue.TypedRateLimitingInterface[*cloudevent.Event], events *event.EventSource, resources *resources.Resources, log *logrus.Entry, role manager.ManagerRole, namespace string) *RequestHandler { +func NewRequestHandler(dynClient dynamic.Interface, queue queue.WorkQueue, events *event.EventSource, resources *resources.Resources, log *logrus.Entry, role manager.ManagerRole, namespace string) *RequestHandler { return &RequestHandler{ dynClient: dynClient, sendQ: queue, diff --git a/principal/callbacks_test.go b/principal/callbacks_test.go index 170e45509..57435e9f0 100644 --- a/principal/callbacks_test.go +++ b/principal/callbacks_test.go @@ -45,7 +45,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" k8stypes "k8s.io/apimachinery/pkg/types" kubefake "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/util/workqueue" ) func TestMapAppProjectToAgents(t *testing.T) { @@ -2673,7 +2672,7 @@ func TestServer_deleteGPGKeyCallback(t *testing.T) { }) } -func drainQueue(t *testing.T, q workqueue.TypedRateLimitingInterface[*cloudevents.Event]) { +func drainQueue(t *testing.T, q queue.WorkQueue) { for q.Len() > 0 { ev, shutdown := q.Get() if shutdown { diff --git a/principal/event.go b/principal/event.go index 7263feda0..6bd353f20 100644 --- a/principal/event.go +++ b/principal/event.go @@ -27,6 +27,7 @@ import ( "github.com/argoproj-labs/argocd-agent/internal/manager" "github.com/argoproj-labs/argocd-agent/internal/metrics" "github.com/argoproj-labs/argocd-agent/internal/namedlock" + "github.com/argoproj-labs/argocd-agent/internal/queue" "github.com/argoproj-labs/argocd-agent/internal/resync" "github.com/argoproj-labs/argocd-agent/internal/tracing" "github.com/argoproj-labs/argocd-agent/pkg/replication" @@ -41,7 +42,6 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/validation" "k8s.io/client-go/dynamic" - "k8s.io/client-go/util/workqueue" ) // skipReplication returns true for event targets that are operational noise and @@ -59,7 +59,7 @@ func skipReplication(target targets.EventTarget) bool { // processRecvQueue processes an entry from the receiver queue, which holds the // events received by agents. It will trigger updates of resources in the // server's backend. -func (s *Server) processRecvQueue(ctx context.Context, agentName string, q workqueue.TypedRateLimitingInterface[*cloudevents.Event]) (*cloudevents.Event, error) { +func (s *Server) processRecvQueue(ctx context.Context, agentName string, q queue.WorkQueue) (*cloudevents.Event, error) { status := metrics.EventProcessingSuccess ev, _ := q.Get() @@ -805,7 +805,7 @@ func (s *Server) eventProcessor(ctx context.Context) error { queueLogCtx.Trace("Acquired semaphore") - go func(agentName string, q workqueue.TypedRateLimitingInterface[*cloudevents.Event], logCtx *logrus.Entry) { + go func(agentName string, q queue.WorkQueue, logCtx *logrus.Entry) { defer func() { sem.Release(1) queueLock.Unlock(agentName) diff --git a/principal/event_test.go b/principal/event_test.go index 70feb31de..41f4aca54 100644 --- a/principal/event_test.go +++ b/principal/event_test.go @@ -23,11 +23,11 @@ import ( "github.com/argoproj-labs/argocd-agent/internal/event" "github.com/argoproj-labs/argocd-agent/internal/event/targets" "github.com/argoproj-labs/argocd-agent/internal/manager" + "github.com/argoproj-labs/argocd-agent/internal/queue" "github.com/argoproj-labs/argocd-agent/internal/resources" "github.com/argoproj-labs/argocd-agent/pkg/types" "github.com/argoproj-labs/argocd-agent/principal/resourceproxy" "github.com/argoproj-labs/argocd-agent/test/fake/kube" - wqmock "github.com/argoproj-labs/argocd-agent/test/mocks/k8s-workqueue" "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/stretchr/testify/assert" @@ -38,6 +38,14 @@ import ( "k8s.io/utils/ptr" ) +// newTestRecvQueue creates a RecvQueue with a single event pre-loaded, for use in +// tests that call processRecvQueue. +func newTestRecvQueue(ev *cloudevents.Event) queue.WorkQueue { + q := queue.NewDedupeQueue("cloud-events", 1000) + q.Add(ev) + return q +} + func Test_EventProcessorRoutesACKToMatchingQueue(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -98,9 +106,7 @@ func Test_InvalidEvents(t *testing.T) { t.Run("Unknown event schema", func(t *testing.T) { ev := cloudevents.NewEvent() ev.SetDataSchema("unknown") - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) got, err := s.processRecvQueue(context.Background(), "foo", wq) @@ -112,9 +118,7 @@ func Test_InvalidEvents(t *testing.T) { ev := cloudevents.NewEvent() ev.SetDataSchema("application") ev.SetType("application") - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) got, err := s.processRecvQueue(context.Background(), "foo", wq) @@ -127,9 +131,7 @@ func Test_InvalidEvents(t *testing.T) { ev.SetDataSchema("application") ev.SetType(event.Create.String()) ev.SetData(cloudevents.ApplicationJSON, "something") - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) got, err := s.processRecvQueue(context.Background(), "foo", wq) @@ -143,9 +145,7 @@ func Test_CreateEvents(t *testing.T) { ev := cloudevents.NewEvent() ev.SetDataSchema("application") ev.SetType(event.Create.String()) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) got, err := s.processRecvQueue(context.Background(), "foo", wq) @@ -182,9 +182,7 @@ func Test_CreateEvents(t *testing.T) { // Update the application before sending the event app.Spec.Source.TargetRevision = "test" ev.SetData(cloudevents.ApplicationJSON, app) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "", nil), WithRedisProxyDisabled()) s.Start(context.Background(), make(chan error)) s.clusterMgr.MapCluster("argocd", &v1alpha1.Cluster{Name: "argocd", Server: "https://argocd.com"}) @@ -236,9 +234,7 @@ func Test_CreateEvents(t *testing.T) { ev.SetDataSchema("application") ev.SetType(event.Create.String()) ev.SetData(cloudevents.ApplicationJSON, app) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "", nil), WithRedisProxyDisabled()) require.NoError(t, err) s.clusterMgr.MapCluster("foo", &v1alpha1.Cluster{Name: "foo", Server: "https://foo.com"}) @@ -305,9 +301,7 @@ func Test_CreateEvents(t *testing.T) { ev.SetDataSchema("application") ev.SetType(event.Create.String()) ev.SetData(cloudevents.ApplicationJSON, app) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "", nil), WithRedisProxyDisabled()) require.NoError(t, err) s.clusterMgr.MapCluster("agent-staging", &v1alpha1.Cluster{Name: "agent-staging", Server: "https://staging.com"}) @@ -363,9 +357,7 @@ func Test_CreateEvents(t *testing.T) { ev.SetDataSchema("application") ev.SetType(event.Create.String()) ev.SetData(cloudevents.ApplicationJSON, app) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -745,9 +737,7 @@ func Test_UpdateEvents(t *testing.T) { ev.SetDataSchema("application") ev.SetType(event.SpecUpdate.String()) ev.SetData(cloudevents.ApplicationJSON, upApp) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -828,9 +818,7 @@ func Test_UpdateEvents(t *testing.T) { ev.SetDataSchema("application") ev.SetType(event.SpecUpdate.String()) ev.SetData(cloudevents.ApplicationJSON, upApp) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -889,9 +877,7 @@ func Test_UpdateEvents(t *testing.T) { ev.SetDataSchema("application") ev.SetType(event.SpecUpdate.String()) ev.SetData(cloudevents.ApplicationJSON, upApp) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) s.setAgentMode("foo", types.AgentModeManaged) @@ -957,9 +943,7 @@ func Test_DeleteEvents_ManagedMode(t *testing.T) { ev.SetDataSchema("application") ev.SetType(event.Delete.String()) ev.SetData(cloudevents.ApplicationJSON, delApp) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -1054,9 +1038,7 @@ func Test_processAppProjectEvent(t *testing.T) { ev := cloudevents.NewEvent() ev.SetDataSchema("appproject") ev.SetType(event.Create.String()) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) got, err := s.processRecvQueue(context.Background(), "foo", wq) @@ -1099,9 +1081,7 @@ func Test_processAppProjectEvent(t *testing.T) { ev.SetType(event.Create.String()) ev.SetData(cloudevents.ApplicationJSON, project) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) @@ -1160,9 +1140,7 @@ func Test_processAppProjectEvent(t *testing.T) { updatedProject.Name = "test" // Use original name (will be prefixed) ev.SetData(cloudevents.ApplicationJSON, updatedProject) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) @@ -1205,9 +1183,7 @@ func Test_processAppProjectEvent(t *testing.T) { ev.SetDataSchema("appproject") ev.SetType(event.Delete.String()) ev.SetData(cloudevents.ApplicationJSON, upApp) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) s.setAgentMode("foo", types.AgentModeManaged)