From f48190fe380996f9d0086c96e50aea8c15d57f50 Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Wed, 10 Jun 2026 18:05:54 +0530 Subject: [PATCH 1/5] feat: dedupe incoming events on the principal Assisted-by: Cursor Signed-off-by: Chetan Banavikalmutt --- internal/metrics/queue.go | 58 ++++++ internal/queue/dedupe_queue.go | 199 ++++++++++++++++++ internal/queue/dedupe_queue_test.go | 304 ++++++++++++++++++++++++++++ internal/queue/mocks/QueuePair.go | 13 +- internal/queue/queue.go | 19 +- principal/event.go | 6 +- principal/event_test.go | 74 +++---- principal/server.go | 1 + 8 files changed, 612 insertions(+), 62 deletions(-) create mode 100644 internal/queue/dedupe_queue.go create mode 100644 internal/queue/dedupe_queue_test.go diff --git a/internal/metrics/queue.go b/internal/metrics/queue.go index 60c275770..444f9d595 100644 --- a/internal/metrics/queue.go +++ b/internal/metrics/queue.go @@ -21,6 +21,8 @@ import ( var _ workqueue.MetricsProvider = &QueueMetrics{} +var dqMetricsProvider *DedupeQueueMetrics + // QueueMetrics implements the workqueue.MetricsProvider interface, // providing metrics for our send/recv queues. type QueueMetrics struct { @@ -142,3 +144,59 @@ func RegisterQueueMetrics(prefix string) { ) workqueue.SetProvider(provider) } + +// DedupeQueueMetrics holds Prometheus metrics for dedupeQueue instances. +type DedupeQueueMetrics struct { + Depth *prometheus.GaugeVec + Adds *prometheus.CounterVec + EventsDeduped *prometheus.CounterVec + Duration *prometheus.HistogramVec +} + +func NewDedupeQueueMetrics(prefix string) *DedupeQueueMetrics { + return &DedupeQueueMetrics{ + Depth: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: prefix + "_queue_depth", + Help: "Current depth of queue", + }, + []string{"queue"}, + ), + Adds: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: prefix + "_queue_adds_total", + Help: "Total number of adds to the deduped queue", + }, + []string{"queue"}, + ), + EventsDeduped: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: prefix + "_queue_events_deduped_total", + Help: "Total number of events deduped from the queue", + }, + []string{"queue"}, + ), + Duration: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: prefix + "_queue_duration_seconds", + Help: "Time an event spent waiting in the queue before being dequeued", + Buckets: prometheus.DefBuckets, + }, + []string{"queue"}, + ), + } +} + +func RegisterDedupeQueueMetrics(prefix string) { + dqMetricsProvider = NewDedupeQueueMetrics(prefix) + prometheus.DefaultRegisterer.MustRegister( + dqMetricsProvider.Depth, + dqMetricsProvider.Adds, + dqMetricsProvider.EventsDeduped, + dqMetricsProvider.Duration, + ) +} + +func GetDedupeQueueMetrics() *DedupeQueueMetrics { + return dqMetricsProvider +} diff --git a/internal/queue/dedupe_queue.go b/internal/queue/dedupe_queue.go new file mode 100644 index 000000000..f91ee5c1f --- /dev/null +++ b/internal/queue/dedupe_queue.go @@ -0,0 +1,199 @@ +// Copyright 2026 The argocd-agent Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package queue + +import ( + "sync" + "time" + + internalevent "github.com/argoproj-labs/argocd-agent/internal/event" + "github.com/argoproj-labs/argocd-agent/internal/metrics" + "github.com/cloudevents/sdk-go/v2/event" +) + +// dedupeQueue is a bounded FIFO queue that de-duplicates resource events. When a new event +// arrives for a resource of the same type, the older events are removed and the +// newer event is appended to the tail of the queue. +type dedupeQueue struct { + mu sync.Mutex + items []*queueItem + notify chan struct{} + name string + maxSize int + isShutdown bool + metrics *metrics.DedupeQueueMetrics +} + +type queueItem struct { + event *event.Event + enqueuedAt time.Time +} + +// NewDedupeQueueForTest creates a dedupeQueue for use in tests. +func NewDedupeQueueForTest(maxSize int) *dedupeQueue { + return newDedupeQueue(maxSize, "test") +} + +func newDedupeQueue(maxSize int, name string) *dedupeQueue { + return &dedupeQueue{ + items: make([]*queueItem, 0), + notify: make(chan struct{}, 10), + name: name, + maxSize: maxSize, + metrics: metrics.GetDedupeQueueMetrics(), + } +} + +// canDedupe returns true if the event type supports de-duplication. +func canDedupe(ev *event.Event) bool { + evType := ev.Type() + return evType == internalevent.SpecUpdate.String() || evType == internalevent.StatusUpdate.String() +} + +// dedupeKey returns a composite key of resourceID and event type for +// identifying duplicate events. +func dedupeKey(ev *event.Event) (string, string) { + return internalevent.ResourceID(ev), ev.Type() +} + +func (dq *dedupeQueue) Add(item *event.Event) { + dq.mu.Lock() + defer dq.mu.Unlock() + + if item == nil || dq.isShutdown { + return + } + + if canDedupe(item) { + dq.removeDuplicates(item) + } + + if len(dq.items) >= dq.maxSize { + dq.pop() + } + + dq.items = append(dq.items, &queueItem{ + event: item, + enqueuedAt: time.Now(), + }) + + if dq.metrics != nil { + dq.metrics.Adds.WithLabelValues(dq.name).Inc() + dq.metrics.Depth.WithLabelValues(dq.name).Set(float64(len(dq.items))) + } + + select { + case dq.notify <- struct{}{}: + default: + } +} + +func (dq *dedupeQueue) Get() (*event.Event, bool) { + dq.mu.Lock() + if dq.isShutdown && len(dq.items) == 0 { + dq.mu.Unlock() + return nil, true + } + + if len(dq.items) > 0 { + item := dq.pop() + + if dq.metrics != nil { + dq.metrics.Depth.WithLabelValues(dq.name).Set(float64(len(dq.items))) + dq.metrics.Duration.WithLabelValues(dq.name).Observe(time.Since(item.enqueuedAt).Seconds()) + } + + dq.mu.Unlock() + return item.event, false + } + dq.mu.Unlock() + + // Block until an item is available or shutdown + for { + select { + case <-dq.notify: + dq.mu.Lock() + if len(dq.items) > 0 { + item := dq.pop() + + if dq.metrics != nil { + dq.metrics.Depth.WithLabelValues(dq.name).Set(float64(len(dq.items))) + dq.metrics.Duration.WithLabelValues(dq.name).Observe(time.Since(item.enqueuedAt).Seconds()) + } + + dq.mu.Unlock() + return item.event, false + } + + if dq.isShutdown { + dq.mu.Unlock() + return nil, true + } + dq.mu.Unlock() + } + } +} + +func (dq *dedupeQueue) Done(_ *event.Event) { + // No-op: the dedupe queue does not track in-flight items. +} + +func (dq *dedupeQueue) Len() int { + dq.mu.Lock() + defer dq.mu.Unlock() + return len(dq.items) +} + +func (dq *dedupeQueue) ShutDown() { + dq.mu.Lock() + defer dq.mu.Unlock() + if dq.isShutdown { + return + } + dq.isShutdown = true + close(dq.notify) +} + +// removeDuplicates removes all queued events matching the same resourceID and +// event type as item. Must be called with dq.mu held. +func (dq *dedupeQueue) removeDuplicates(incoming *event.Event) { + resID, evType := dedupeKey(incoming) + if resID == "" { + return + } + + for i := len(dq.items) - 1; i >= 0; i-- { + existingID, existingEvType := dedupeKey(dq.items[i].event) + if existingID == resID && existingEvType == evType { + dq.items[i] = nil + dq.items = append(dq.items[:i], dq.items[i+1:]...) + if dq.metrics != nil { + dq.metrics.EventsDeduped.WithLabelValues(dq.name).Inc() + } + } + } +} + +// pop the first item from the queue. +// Must be called with queue lock held. +func (dq *dedupeQueue) pop() *queueItem { + if len(dq.items) == 0 { + return nil + } + item := dq.items[0] + dq.items[0] = nil + dq.items = dq.items[1:] + return item +} diff --git a/internal/queue/dedupe_queue_test.go b/internal/queue/dedupe_queue_test.go new file mode 100644 index 000000000..248e4fafe --- /dev/null +++ b/internal/queue/dedupe_queue_test.go @@ -0,0 +1,304 @@ +// Copyright 2026 The argocd-agent Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package queue + +import ( + "strconv" + "testing" + + internalevent "github.com/argoproj-labs/argocd-agent/internal/event" + "github.com/cloudevents/sdk-go/v2/event" + "github.com/stretchr/testify/assert" +) + +func newTestEvent(id string, eventType string, resourceID string) *event.Event { + ev := event.New() + ev.SetID(id) + ev.SetType(eventType) + if resourceID != "" { + ev.SetExtension("resourceid", resourceID) + } + return &ev +} + +func TestDedupeQueue_BasicFIFO(t *testing.T) { + t.Run("Events dequeue in FIFO order", func(t *testing.T) { + q := newDedupeQueue(1000, "test") + ev1 := newTestEvent("1", internalevent.Create.String(), "app-a") + ev2 := newTestEvent("2", internalevent.Create.String(), "app-b") + ev3 := newTestEvent("3", internalevent.Delete.String(), "app-c") + + q.Add(ev1) + q.Add(ev2) + q.Add(ev3) + + assert.Equal(t, 3, q.Len()) + + got, shutdown := q.Get() + assert.False(t, shutdown) + assert.Equal(t, "1", got.ID()) + + got, _ = q.Get() + assert.Equal(t, "2", got.ID()) + + got, _ = q.Get() + assert.Equal(t, "3", got.ID()) + + assert.Equal(t, 0, q.Len()) + }) +} + +func TestDedupeQueue_SpecUpdateDedup(t *testing.T) { + t.Run("Newer SpecUpdate replaces older for same resource", func(t *testing.T) { + q := newDedupeQueue(1000, "test") + ev1 := newTestEvent("v1", internalevent.SpecUpdate.String(), "app-x_uid1") + ev2 := newTestEvent("v2", internalevent.SpecUpdate.String(), "app-x_uid1") + ev3 := newTestEvent("v3", internalevent.SpecUpdate.String(), "app-x_uid1") + + q.Add(ev1) + q.Add(ev2) + q.Add(ev3) + + assert.Equal(t, 1, q.Len()) + + got, _ := q.Get() + assert.Equal(t, "v3", got.ID()) + }) + + t.Run("SpecUpdates for different resources are not deduped", func(t *testing.T) { + q := newDedupeQueue(1000, "test") + ev1 := newTestEvent("v1", internalevent.SpecUpdate.String(), "app-x_uid1") + ev2 := newTestEvent("v2", internalevent.SpecUpdate.String(), "app-y_uid2") + + q.Add(ev1) + q.Add(ev2) + + assert.Equal(t, 2, q.Len()) + + got, _ := q.Get() + assert.Equal(t, "v1", got.ID()) + + got, _ = q.Get() + assert.Equal(t, "v2", got.ID()) + }) +} + +func TestDedupeQueue_StatusUpdateDedup(t *testing.T) { + t.Run("Newer StatusUpdate replaces older for same resource", func(t *testing.T) { + q := newDedupeQueue(1000, "test") + ev1 := newTestEvent("s1", internalevent.StatusUpdate.String(), "app-x_uid1") + ev2 := newTestEvent("s2", internalevent.StatusUpdate.String(), "app-x_uid1") + + q.Add(ev1) + q.Add(ev2) + + assert.Equal(t, 1, q.Len()) + + got, _ := q.Get() + assert.Equal(t, "s2", got.ID()) + }) +} + +func TestDedupeQueue_DifferentTypesNotDeduped(t *testing.T) { + t.Run("SpecUpdate and StatusUpdate for same resource are both kept (different types)", func(t *testing.T) { + q := newDedupeQueue(1000, "test") + evSpec := newTestEvent("spec1", internalevent.SpecUpdate.String(), "app-x_uid1") + evStatus := newTestEvent("status1", internalevent.StatusUpdate.String(), "app-x_uid1") + + q.Add(evSpec) + q.Add(evStatus) + + assert.Equal(t, 2, q.Len()) + + got, _ := q.Get() + assert.Equal(t, "spec1", got.ID()) + + got, _ = q.Get() + assert.Equal(t, "status1", got.ID()) + }) + + t.Run("Create event is never deduped even for same resource", func(t *testing.T) { + q := newDedupeQueue(1000, "test") + evCreate := newTestEvent("c1", internalevent.Create.String(), "app-x_uid1") + evSpec := newTestEvent("spec1", internalevent.SpecUpdate.String(), "app-x_uid1") + + q.Add(evCreate) + q.Add(evSpec) + + assert.Equal(t, 2, q.Len()) + + got, _ := q.Get() + assert.Equal(t, "c1", got.ID()) + + got, _ = q.Get() + assert.Equal(t, "spec1", got.ID()) + }) + + t.Run("Delete event is never deduped", func(t *testing.T) { + q := newDedupeQueue(1000, "test") + evDel1 := newTestEvent("d1", internalevent.Delete.String(), "app-x_uid1") + evDel2 := newTestEvent("d2", internalevent.Delete.String(), "app-x_uid1") + + q.Add(evDel1) + q.Add(evDel2) + + assert.Equal(t, 2, q.Len()) + + got, _ := q.Get() + assert.Equal(t, "d1", got.ID()) + + got, _ = q.Get() + assert.Equal(t, "d2", got.ID()) + }) +} + +func TestDedupeQueue_MoveToTail(t *testing.T) { + t.Run("Deduped event moves to tail preserving order", func(t *testing.T) { + q := newDedupeQueue(1000, "test") + + evSpecX := newTestEvent("spec-x-v1", internalevent.SpecUpdate.String(), "app-x_uid1") + evCreate := newTestEvent("create-y", internalevent.Create.String(), "app-y_uid2") + evSpecXv2 := newTestEvent("spec-x-v2", internalevent.SpecUpdate.String(), "app-x_uid1") + + q.Add(evSpecX) + q.Add(evCreate) + // This should remove evSpecX from position 0 and append evSpecXv2 at tail + q.Add(evSpecXv2) + + assert.Equal(t, 2, q.Len()) + + got, _ := q.Get() + assert.Equal(t, "create-y", got.ID()) + + got, _ = q.Get() + assert.Equal(t, "spec-x-v2", got.ID()) + }) +} + +func TestDedupeQueue_NonDedupEligibleEvents(t *testing.T) { + t.Run("Heartbeat events are never deduped", func(t *testing.T) { + q := newDedupeQueue(1000, "test") + ev1 := newTestEvent("hb1", internalevent.Ping.String(), "uuid-1") + ev2 := newTestEvent("hb2", internalevent.Ping.String(), "uuid-2") + + q.Add(ev1) + q.Add(ev2) + + assert.Equal(t, 2, q.Len()) + }) + + t.Run("Events with empty resourceID are not deduped", func(t *testing.T) { + q := newDedupeQueue(1000, "test") + ev1 := newTestEvent("1", internalevent.SpecUpdate.String(), "") + ev2 := newTestEvent("2", internalevent.SpecUpdate.String(), "") + + q.Add(ev1) + q.Add(ev2) + + assert.Equal(t, 2, q.Len()) + }) +} + +func TestDedupeQueue_MaxSize(t *testing.T) { + t.Run("Oldest item is dropped when max size is exceeded", func(t *testing.T) { + maxSize := 5 + q := newDedupeQueue(maxSize, "test") + + for i := 1; i <= maxSize; i++ { + q.Add(newTestEvent(strconv.Itoa(i), internalevent.Create.String(), "app-"+strconv.Itoa(i))) + } + + assert.Equal(t, maxSize, q.Len()) + + q.Add(newTestEvent("6", internalevent.Create.String(), "app-6")) + + assert.Equal(t, maxSize, q.Len()) + + got, _ := q.Get() + assert.Equal(t, "2", got.ID()) + }) +} + +func TestDedupeQueue_Done(t *testing.T) { + t.Run("Done is a no-op", func(t *testing.T) { + q := newDedupeQueue(1000, "test") + ev := newTestEvent("1", internalevent.Create.String(), "app-a") + q.Add(ev) + + got, _ := q.Get() + q.Done(got) // should not panic + assert.Equal(t, 0, q.Len()) + }) +} + +func TestDedupeQueue_ShutDown(t *testing.T) { + t.Run("Get returns shutdown signal after ShutDown", func(t *testing.T) { + q := newDedupeQueue(1000, "test") + q.ShutDown() + + _, shutdown := q.Get() + assert.True(t, shutdown) + }) + + t.Run("Double ShutDown does not panic", func(t *testing.T) { + q := newDedupeQueue(1000, "test") + q.ShutDown() + assert.NotPanics(t, func() { q.ShutDown() }) + }) + + t.Run("Add after ShutDown does not panic and is ignored", func(t *testing.T) { + q := newDedupeQueue(1000, "test") + q.ShutDown() + assert.NotPanics(t, func() { + q.Add(newTestEvent("1", internalevent.SpecUpdate.String(), "app-x")) + }) + assert.Equal(t, 0, q.Len()) + }) +} + +func TestDedupeQueue_MixedScenario(t *testing.T) { + t.Run("Mixed dedup-eligible and non-dedup-eligible events", func(t *testing.T) { + q := newDedupeQueue(1000, "test") + + q.Add(newTestEvent("create-x", internalevent.Create.String(), "app-x_uid1")) + q.Add(newTestEvent("spec-x-v1", internalevent.SpecUpdate.String(), "app-x_uid1")) + q.Add(newTestEvent("status-y-v1", internalevent.StatusUpdate.String(), "app-y_uid2")) + q.Add(newTestEvent("spec-x-v2", internalevent.SpecUpdate.String(), "app-x_uid1")) + q.Add(newTestEvent("status-y-v2", internalevent.StatusUpdate.String(), "app-y_uid2")) + q.Add(newTestEvent("delete-z", internalevent.Delete.String(), "app-z_uid3")) + + // Expected order: + // create-x (non-dedup-eligible, stays at original position) + // spec-x-v2 (replaced spec-x-v1, moved to tail of where spec-x-v1 was... no, moved to tail) + // status-y-v2 (replaced status-y-v1, moved to tail) + // delete-z (non-dedup-eligible, appended) + + assert.Equal(t, 4, q.Len()) + + got, _ := q.Get() + assert.Equal(t, "create-x", got.ID()) + + got, _ = q.Get() + assert.Equal(t, "spec-x-v2", got.ID()) + + got, _ = q.Get() + assert.Equal(t, "status-y-v2", got.ID()) + + got, _ = q.Get() + assert.Equal(t, "delete-z", got.ID()) + + assert.Equal(t, 0, q.Len()) + }) +} diff --git a/internal/queue/mocks/QueuePair.go b/internal/queue/mocks/QueuePair.go index e78e682f4..fbe9a1186 100644 --- a/internal/queue/mocks/QueuePair.go +++ b/internal/queue/mocks/QueuePair.go @@ -6,6 +6,7 @@ import ( event "github.com/cloudevents/sdk-go/v2/event" mock "github.com/stretchr/testify/mock" + queue "github.com/argoproj-labs/argocd-agent/internal/queue" workqueue "k8s.io/client-go/util/workqueue" ) @@ -254,19 +255,19 @@ func (_c *QueuePair_Names_Call) RunAndReturn(run func() []string) *QueuePair_Nam } // RecvQ provides a mock function with given fields: name -func (_m *QueuePair) RecvQ(name string) workqueue.TypedRateLimitingInterface[*event.Event] { +func (_m *QueuePair) RecvQ(name string) queue.RecvQueue { ret := _m.Called(name) if len(ret) == 0 { panic("no return value specified for RecvQ") } - var r0 workqueue.TypedRateLimitingInterface[*event.Event] - if rf, ok := ret.Get(0).(func(string) workqueue.TypedRateLimitingInterface[*event.Event]); ok { + var r0 queue.RecvQueue + if rf, ok := ret.Get(0).(func(string) queue.RecvQueue); ok { r0 = rf(name) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(workqueue.TypedRateLimitingInterface[*event.Event]) + r0 = ret.Get(0).(queue.RecvQueue) } } @@ -291,12 +292,12 @@ func (_c *QueuePair_RecvQ_Call) Run(run func(name string)) *QueuePair_RecvQ_Call return _c } -func (_c *QueuePair_RecvQ_Call) Return(_a0 workqueue.TypedRateLimitingInterface[*event.Event]) *QueuePair_RecvQ_Call { +func (_c *QueuePair_RecvQ_Call) Return(_a0 queue.RecvQueue) *QueuePair_RecvQ_Call { _c.Call.Return(_a0) return _c } -func (_c *QueuePair_RecvQ_Call) RunAndReturn(run func(string) workqueue.TypedRateLimitingInterface[*event.Event]) *QueuePair_RecvQ_Call { +func (_c *QueuePair_RecvQ_Call) RunAndReturn(run func(string) queue.RecvQueue) *QueuePair_RecvQ_Call { _c.Call.Return(run) return _c } diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 7d6d967ba..813046664 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -27,13 +27,24 @@ import ( var _ QueuePair = &SendRecvQueues{} +// RecvQueue is the minimal interface used by RecvQ consumers. It covers only +// the methods actually called on receive queues (Add, Get, Done, Len, +// ShutDown), avoiding a dependency on the full workqueue interface. +type RecvQueue interface { + Add(item *event.Event) + Get() (*event.Event, bool) + Done(item *event.Event) + Len() int + ShutDown() +} + // QueuePair maintains a map (indexed by name) of send/receive queue pairs type QueuePair interface { Names() []string HasQueuePair(name string) bool Len() int SendQ(name string) workqueue.TypedRateLimitingInterface[*event.Event] - RecvQ(name string) workqueue.TypedRateLimitingInterface[*event.Event] + RecvQ(name string) RecvQueue Create(name string) error Delete(name string, shutdown bool) error } @@ -44,7 +55,7 @@ const ( ) type queuepair struct { - recvq *boundedQueue + recvq *dedupeQueue sendq *boundedQueue } @@ -137,7 +148,7 @@ func (q *SendRecvQueues) SendQ(name string) workqueue.TypedRateLimitingInterface // RecvQ will return the receive queue from the queue pair named name. If no // such queue pair exists, returns nil -func (q *SendRecvQueues) RecvQ(name string) workqueue.TypedRateLimitingInterface[*event.Event] { +func (q *SendRecvQueues) RecvQ(name string) RecvQueue { q.queuelock.RLock() defer q.queuelock.RUnlock() qp, ok := q.queues[name] @@ -172,7 +183,7 @@ func (q *SendRecvQueues) Create(name string) error { qp := &queuepair{} qp.sendq = newBoundedQueue(sendQueueSize, name+"-send") - qp.recvq = newBoundedQueue(recvQueueSize, name+"-recv") + qp.recvq = newDedupeQueue(recvQueueSize, name+"-recv") q.queues[name] = qp return nil diff --git a/principal/event.go b/principal/event.go index 7263feda0..12245c8f0 100644 --- a/principal/event.go +++ b/principal/event.go @@ -27,6 +27,7 @@ import ( "github.com/argoproj-labs/argocd-agent/internal/manager" "github.com/argoproj-labs/argocd-agent/internal/metrics" "github.com/argoproj-labs/argocd-agent/internal/namedlock" + "github.com/argoproj-labs/argocd-agent/internal/queue" "github.com/argoproj-labs/argocd-agent/internal/resync" "github.com/argoproj-labs/argocd-agent/internal/tracing" "github.com/argoproj-labs/argocd-agent/pkg/replication" @@ -41,7 +42,6 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/validation" "k8s.io/client-go/dynamic" - "k8s.io/client-go/util/workqueue" ) // skipReplication returns true for event targets that are operational noise and @@ -59,7 +59,7 @@ func skipReplication(target targets.EventTarget) bool { // processRecvQueue processes an entry from the receiver queue, which holds the // events received by agents. It will trigger updates of resources in the // server's backend. -func (s *Server) processRecvQueue(ctx context.Context, agentName string, q workqueue.TypedRateLimitingInterface[*cloudevents.Event]) (*cloudevents.Event, error) { +func (s *Server) processRecvQueue(ctx context.Context, agentName string, q queue.RecvQueue) (*cloudevents.Event, error) { status := metrics.EventProcessingSuccess ev, _ := q.Get() @@ -805,7 +805,7 @@ func (s *Server) eventProcessor(ctx context.Context) error { queueLogCtx.Trace("Acquired semaphore") - go func(agentName string, q workqueue.TypedRateLimitingInterface[*cloudevents.Event], logCtx *logrus.Entry) { + go func(agentName string, q queue.RecvQueue, logCtx *logrus.Entry) { defer func() { sem.Release(1) queueLock.Unlock(agentName) diff --git a/principal/event_test.go b/principal/event_test.go index 70feb31de..2ec702d32 100644 --- a/principal/event_test.go +++ b/principal/event_test.go @@ -23,11 +23,11 @@ import ( "github.com/argoproj-labs/argocd-agent/internal/event" "github.com/argoproj-labs/argocd-agent/internal/event/targets" "github.com/argoproj-labs/argocd-agent/internal/manager" + "github.com/argoproj-labs/argocd-agent/internal/queue" "github.com/argoproj-labs/argocd-agent/internal/resources" "github.com/argoproj-labs/argocd-agent/pkg/types" "github.com/argoproj-labs/argocd-agent/principal/resourceproxy" "github.com/argoproj-labs/argocd-agent/test/fake/kube" - wqmock "github.com/argoproj-labs/argocd-agent/test/mocks/k8s-workqueue" "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/stretchr/testify/assert" @@ -38,6 +38,14 @@ import ( "k8s.io/utils/ptr" ) +// newTestRecvQueue creates a RecvQueue with a single event pre-loaded, for use in +// tests that call processRecvQueue. +func newTestRecvQueue(ev *cloudevents.Event) queue.RecvQueue { + q := queue.NewDedupeQueueForTest(1000) + q.Add(ev) + return q +} + func Test_EventProcessorRoutesACKToMatchingQueue(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -98,9 +106,7 @@ func Test_InvalidEvents(t *testing.T) { t.Run("Unknown event schema", func(t *testing.T) { ev := cloudevents.NewEvent() ev.SetDataSchema("unknown") - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) got, err := s.processRecvQueue(context.Background(), "foo", wq) @@ -112,9 +118,7 @@ func Test_InvalidEvents(t *testing.T) { ev := cloudevents.NewEvent() ev.SetDataSchema("application") ev.SetType("application") - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) got, err := s.processRecvQueue(context.Background(), "foo", wq) @@ -127,9 +131,7 @@ func Test_InvalidEvents(t *testing.T) { ev.SetDataSchema("application") ev.SetType(event.Create.String()) ev.SetData(cloudevents.ApplicationJSON, "something") - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) got, err := s.processRecvQueue(context.Background(), "foo", wq) @@ -143,9 +145,7 @@ func Test_CreateEvents(t *testing.T) { ev := cloudevents.NewEvent() ev.SetDataSchema("application") ev.SetType(event.Create.String()) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) got, err := s.processRecvQueue(context.Background(), "foo", wq) @@ -182,9 +182,7 @@ func Test_CreateEvents(t *testing.T) { // Update the application before sending the event app.Spec.Source.TargetRevision = "test" ev.SetData(cloudevents.ApplicationJSON, app) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "", nil), WithRedisProxyDisabled()) s.Start(context.Background(), make(chan error)) s.clusterMgr.MapCluster("argocd", &v1alpha1.Cluster{Name: "argocd", Server: "https://argocd.com"}) @@ -236,9 +234,7 @@ func Test_CreateEvents(t *testing.T) { ev.SetDataSchema("application") ev.SetType(event.Create.String()) ev.SetData(cloudevents.ApplicationJSON, app) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "", nil), WithRedisProxyDisabled()) require.NoError(t, err) s.clusterMgr.MapCluster("foo", &v1alpha1.Cluster{Name: "foo", Server: "https://foo.com"}) @@ -305,9 +301,7 @@ func Test_CreateEvents(t *testing.T) { ev.SetDataSchema("application") ev.SetType(event.Create.String()) ev.SetData(cloudevents.ApplicationJSON, app) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "", nil), WithRedisProxyDisabled()) require.NoError(t, err) s.clusterMgr.MapCluster("agent-staging", &v1alpha1.Cluster{Name: "agent-staging", Server: "https://staging.com"}) @@ -363,9 +357,7 @@ func Test_CreateEvents(t *testing.T) { ev.SetDataSchema("application") ev.SetType(event.Create.String()) ev.SetData(cloudevents.ApplicationJSON, app) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -745,9 +737,7 @@ func Test_UpdateEvents(t *testing.T) { ev.SetDataSchema("application") ev.SetType(event.SpecUpdate.String()) ev.SetData(cloudevents.ApplicationJSON, upApp) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -828,9 +818,7 @@ func Test_UpdateEvents(t *testing.T) { ev.SetDataSchema("application") ev.SetType(event.SpecUpdate.String()) ev.SetData(cloudevents.ApplicationJSON, upApp) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -889,9 +877,7 @@ func Test_UpdateEvents(t *testing.T) { ev.SetDataSchema("application") ev.SetType(event.SpecUpdate.String()) ev.SetData(cloudevents.ApplicationJSON, upApp) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) s.setAgentMode("foo", types.AgentModeManaged) @@ -957,9 +943,7 @@ func Test_DeleteEvents_ManagedMode(t *testing.T) { ev.SetDataSchema("application") ev.SetType(event.Delete.String()) ev.SetData(cloudevents.ApplicationJSON, delApp) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -1054,9 +1038,7 @@ func Test_processAppProjectEvent(t *testing.T) { ev := cloudevents.NewEvent() ev.SetDataSchema("appproject") ev.SetType(event.Create.String()) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) got, err := s.processRecvQueue(context.Background(), "foo", wq) @@ -1099,9 +1081,7 @@ func Test_processAppProjectEvent(t *testing.T) { ev.SetType(event.Create.String()) ev.SetData(cloudevents.ApplicationJSON, project) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) @@ -1160,9 +1140,7 @@ func Test_processAppProjectEvent(t *testing.T) { updatedProject.Name = "test" // Use original name (will be prefixed) ev.SetData(cloudevents.ApplicationJSON, updatedProject) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) @@ -1205,9 +1183,7 @@ func Test_processAppProjectEvent(t *testing.T) { ev.SetDataSchema("appproject") ev.SetType(event.Delete.String()) ev.SetData(cloudevents.ApplicationJSON, upApp) - wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) - wq.On("Get").Return(&ev, false) - wq.On("Done", &ev) + wq := newTestRecvQueue(&ev) s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) s.setAgentMode("foo", types.AgentModeManaged) diff --git a/principal/server.go b/principal/server.go index 4f7a08d35..e4e008a91 100644 --- a/principal/server.go +++ b/principal/server.go @@ -262,6 +262,7 @@ func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace metricsRegistered.Do(func() { metrics.RegisterK8sClientMetrics() metrics.RegisterQueueMetrics("argocd_principal") + metrics.RegisterDedupeQueueMetrics("argocd_principal") }) } From f31869e8689807a1d2db07c863d86d949165c7d3 Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Thu, 11 Jun 2026 17:59:46 +0530 Subject: [PATCH 2/5] address review comments Assisted-by: Cursor Signed-off-by: Chetan Banavikalmutt --- internal/queue/dedupe_queue.go | 41 +++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/internal/queue/dedupe_queue.go b/internal/queue/dedupe_queue.go index f91ee5c1f..1c6046d5d 100644 --- a/internal/queue/dedupe_queue.go +++ b/internal/queue/dedupe_queue.go @@ -15,6 +15,7 @@ package queue import ( + "fmt" "sync" "time" @@ -47,6 +48,10 @@ func NewDedupeQueueForTest(maxSize int) *dedupeQueue { } func newDedupeQueue(maxSize int, name string) *dedupeQueue { + if maxSize <= 0 { + panic(fmt.Sprintf("maxSize must be positive, got %d", maxSize)) + } + return &dedupeQueue{ items: make([]*queueItem, 0), notify: make(chan struct{}, 10), @@ -121,29 +126,29 @@ func (dq *dedupeQueue) Get() (*event.Event, bool) { dq.mu.Unlock() // Block until an item is available or shutdown - for { - select { - case <-dq.notify: - dq.mu.Lock() - if len(dq.items) > 0 { - item := dq.pop() - - if dq.metrics != nil { - dq.metrics.Depth.WithLabelValues(dq.name).Set(float64(len(dq.items))) - dq.metrics.Duration.WithLabelValues(dq.name).Observe(time.Since(item.enqueuedAt).Seconds()) - } - - dq.mu.Unlock() - return item.event, false - } + for range dq.notify { + dq.mu.Lock() + if len(dq.items) > 0 { + item := dq.pop() - if dq.isShutdown { - dq.mu.Unlock() - return nil, true + if dq.metrics != nil { + dq.metrics.Depth.WithLabelValues(dq.name).Set(float64(len(dq.items))) + dq.metrics.Duration.WithLabelValues(dq.name).Observe(time.Since(item.enqueuedAt).Seconds()) } + dq.mu.Unlock() + return item.event, false } + + if dq.isShutdown { + dq.mu.Unlock() + return nil, true + } + dq.mu.Unlock() } + + // notify channel was closed (shutdown) + return nil, true } func (dq *dedupeQueue) Done(_ *event.Event) { From 93bf6087247c4b2214fda4c9f78f33a8fd983321 Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Thu, 11 Jun 2026 18:14:56 +0530 Subject: [PATCH 3/5] re-run e2e tests job Signed-off-by: Chetan Banavikalmutt From 2acca0e5b6370f1bfe0d8700659923256e65ad3e Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Thu, 11 Jun 2026 19:00:19 +0530 Subject: [PATCH 4/5] add TODO for future optimization Signed-off-by: Chetan Banavikalmutt --- internal/queue/dedupe_queue.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/queue/dedupe_queue.go b/internal/queue/dedupe_queue.go index 1c6046d5d..bb928fd17 100644 --- a/internal/queue/dedupe_queue.go +++ b/internal/queue/dedupe_queue.go @@ -179,6 +179,7 @@ func (dq *dedupeQueue) removeDuplicates(incoming *event.Event) { return } + // TODO: avoid this linear scan/shift for every incoming event. for i := len(dq.items) - 1; i >= 0; i-- { existingID, existingEvType := dedupeKey(dq.items[i].event) if existingID == resID && existingEvType == evType { From 409766e47f8d012c8ea43afe7ea81b188f1967b6 Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Wed, 17 Jun 2026 18:15:15 +0530 Subject: [PATCH 5/5] feat: customize workqueue with de-duplication logic Assisted-by: Cursor Signed-off-by: Chetan Banavikalmutt --- agent/outbound_test.go | 3 + internal/metrics/queue.go | 58 ---- internal/queue/dedupe_queue.go | 289 ++++++++-------- internal/queue/dedupe_queue_test.go | 513 +++++++++++++++++----------- internal/queue/mocks/QueuePair.go | 12 +- internal/queue/queue.go | 26 +- internal/queue/queue_test.go | 8 + internal/resync/resync.go | 6 +- principal/callbacks_test.go | 3 +- principal/event.go | 4 +- principal/event_test.go | 4 +- principal/server.go | 1 - 12 files changed, 496 insertions(+), 431 deletions(-) diff --git a/agent/outbound_test.go b/agent/outbound_test.go index 6b4bb256a..f1ec53b55 100644 --- a/agent/outbound_test.go +++ b/agent/outbound_test.go @@ -139,6 +139,7 @@ func Test_addAppDeletionToQueue(t *testing.T) { _ = a.appManager.Manage("agent/guestbook") a.addAppDeletionToQueue(app) ev, _ := a.queues.SendQ(defaultQueueName).Get() + a.queues.SendQ(defaultQueueName).Done(ev) assert.Equal(t, event.Delete.String(), ev.Type()) require.False(t, a.appManager.IsManaged("agent/guestbook")) }) @@ -365,6 +366,7 @@ func Test_addAppProjectCreationToQueue(t *testing.T) { require.Equal(t, 1, a.queues.SendQ(defaultQueueName).Len()) ev, _ := a.queues.SendQ(defaultQueueName).Get() assert.NotNil(t, ev) + a.queues.SendQ(defaultQueueName).Done(ev) assert.Equal(t, event.Create.String(), ev.Type()) // Queue should be empty after get assert.Equal(t, 0, a.queues.SendQ(defaultQueueName).Len()) @@ -512,6 +514,7 @@ func Test_addAppProjectDeletionToQueue(t *testing.T) { a.addAppProjectDeletionToQueue(appProject) ev, _ := a.queues.SendQ(defaultQueueName).Get() + a.queues.SendQ(defaultQueueName).Done(ev) assert.Equal(t, event.Delete.String(), ev.Type()) require.False(t, a.projectManager.IsManaged("test-project")) }) diff --git a/internal/metrics/queue.go b/internal/metrics/queue.go index 444f9d595..60c275770 100644 --- a/internal/metrics/queue.go +++ b/internal/metrics/queue.go @@ -21,8 +21,6 @@ import ( var _ workqueue.MetricsProvider = &QueueMetrics{} -var dqMetricsProvider *DedupeQueueMetrics - // QueueMetrics implements the workqueue.MetricsProvider interface, // providing metrics for our send/recv queues. type QueueMetrics struct { @@ -144,59 +142,3 @@ func RegisterQueueMetrics(prefix string) { ) workqueue.SetProvider(provider) } - -// DedupeQueueMetrics holds Prometheus metrics for dedupeQueue instances. -type DedupeQueueMetrics struct { - Depth *prometheus.GaugeVec - Adds *prometheus.CounterVec - EventsDeduped *prometheus.CounterVec - Duration *prometheus.HistogramVec -} - -func NewDedupeQueueMetrics(prefix string) *DedupeQueueMetrics { - return &DedupeQueueMetrics{ - Depth: prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: prefix + "_queue_depth", - Help: "Current depth of queue", - }, - []string{"queue"}, - ), - Adds: prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: prefix + "_queue_adds_total", - Help: "Total number of adds to the deduped queue", - }, - []string{"queue"}, - ), - EventsDeduped: prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: prefix + "_queue_events_deduped_total", - Help: "Total number of events deduped from the queue", - }, - []string{"queue"}, - ), - Duration: prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: prefix + "_queue_duration_seconds", - Help: "Time an event spent waiting in the queue before being dequeued", - Buckets: prometheus.DefBuckets, - }, - []string{"queue"}, - ), - } -} - -func RegisterDedupeQueueMetrics(prefix string) { - dqMetricsProvider = NewDedupeQueueMetrics(prefix) - prometheus.DefaultRegisterer.MustRegister( - dqMetricsProvider.Depth, - dqMetricsProvider.Adds, - dqMetricsProvider.EventsDeduped, - dqMetricsProvider.Duration, - ) -} - -func GetDedupeQueueMetrics() *DedupeQueueMetrics { - return dqMetricsProvider -} diff --git a/internal/queue/dedupe_queue.go b/internal/queue/dedupe_queue.go index bb928fd17..539ba7d87 100644 --- a/internal/queue/dedupe_queue.go +++ b/internal/queue/dedupe_queue.go @@ -15,191 +15,208 @@ package queue import ( - "fmt" "sync" - "time" internalevent "github.com/argoproj-labs/argocd-agent/internal/event" - "github.com/argoproj-labs/argocd-agent/internal/metrics" "github.com/cloudevents/sdk-go/v2/event" + "k8s.io/client-go/util/workqueue" ) -// dedupeQueue is a bounded FIFO queue that de-duplicates resource events. When a new event -// arrives for a resource of the same type, the older events are removed and the -// newer event is appended to the tail of the queue. -type dedupeQueue struct { - mu sync.Mutex - items []*queueItem - notify chan struct{} - name string - maxSize int - isShutdown bool - metrics *metrics.DedupeQueueMetrics +// EventKey is the key that goes into the workqueue +type EventKey struct { + ResourceID string + EventType string + EventID string } -type queueItem struct { - event *event.Event - enqueuedAt time.Time +// reorderQueue is used to customize the functionality of the default workqueue. +// When a duplicate item is added (via Touch), it removes the existing entry and +// appends the item to the tail of the queue. +type reorderQueue[T comparable] struct { + items []T + // index is a map of item to its index in the queue + index map[T]int } -// NewDedupeQueueForTest creates a dedupeQueue for use in tests. -func NewDedupeQueueForTest(maxSize int) *dedupeQueue { - return newDedupeQueue(maxSize, "test") +func newReorderQueue[T comparable]() workqueue.Queue[T] { + return &reorderQueue[T]{ + index: make(map[T]int), + } } -func newDedupeQueue(maxSize int, name string) *dedupeQueue { - if maxSize <= 0 { - panic(fmt.Sprintf("maxSize must be positive, got %d", maxSize)) - } +func (q *reorderQueue[T]) Push(item T) { + q.index[item] = len(q.items) + q.items = append(q.items, item) +} - return &dedupeQueue{ - items: make([]*queueItem, 0), - notify: make(chan struct{}, 10), - name: name, - maxSize: maxSize, - metrics: metrics.GetDedupeQueueMetrics(), +func (q *reorderQueue[T]) Pop() (item T) { + item = q.items[0] + delete(q.index, item) + + q.items[0] = *new(T) + q.items = q.items[1:] + + // Rebuild indices after shift + for i, it := range q.items { + q.index[it] = i } + return item } -// canDedupe returns true if the event type supports de-duplication. -func canDedupe(ev *event.Event) bool { - evType := ev.Type() - return evType == internalevent.SpecUpdate.String() || evType == internalevent.StatusUpdate.String() +// Touch is a hook that is invoked when queue.Add is called with an item that already exists in the queue. +// It is only called when the item is not being processed i.e in dirty set and not in processing set. +func (q *reorderQueue[T]) Touch(item T) { + idx, ok := q.index[item] + if !ok { + return + } + // Remove from current position + q.items = append(q.items[:idx], q.items[idx+1:]...) + // Re-add at the end + q.items = append(q.items, item) + // Rebuild indices from the moved position onward + for i := idx; i < len(q.items); i++ { + q.index[q.items[i]] = i + } } -// dedupeKey returns a composite key of resourceID and event type for -// identifying duplicate events. -func dedupeKey(ev *event.Event) (string, string) { - return internalevent.ResourceID(ev), ev.Type() +func (q *reorderQueue[T]) Len() int { + return len(q.items) } -func (dq *dedupeQueue) Add(item *event.Event) { - dq.mu.Lock() - defer dq.mu.Unlock() +type dedupeQueue struct { + queue workqueue.TypedRateLimitingInterface[EventKey] - if item == nil || dq.isShutdown { - return - } + maxSize int - if canDedupe(item) { - dq.removeDuplicates(item) - } + mu sync.Mutex + latestEvents map[EventKey]*event.Event + eventKeys map[*event.Event]EventKey - if len(dq.items) >= dq.maxSize { - dq.pop() - } + notify chan struct{} +} - dq.items = append(dq.items, &queueItem{ - event: item, - enqueuedAt: time.Now(), +func NewDedupeQueue(name string, maxSize int) WorkQueue { + baseQueue := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[EventKey]{ + Name: name, + Queue: newReorderQueue[EventKey](), }) - if dq.metrics != nil { - dq.metrics.Adds.WithLabelValues(dq.name).Inc() - dq.metrics.Depth.WithLabelValues(dq.name).Set(float64(len(dq.items))) - } + delayingQueue := workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[EventKey]{ + Queue: baseQueue, + }) - select { - case dq.notify <- struct{}{}: - default: - } -} + queue := workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[EventKey](), + workqueue.TypedRateLimitingQueueConfig[EventKey]{ + DelayingQueue: delayingQueue, + }, + ) -func (dq *dedupeQueue) Get() (*event.Event, bool) { - dq.mu.Lock() - if dq.isShutdown && len(dq.items) == 0 { - dq.mu.Unlock() - return nil, true + return &dedupeQueue{ + queue: queue, + maxSize: maxSize, + latestEvents: make(map[EventKey]*event.Event), + eventKeys: make(map[*event.Event]EventKey), + notify: make(chan struct{}, 10), } +} - if len(dq.items) > 0 { - item := dq.pop() +func getKey(ev *event.Event) EventKey { + resID := internalevent.ResourceID(ev) + evType := ev.Type() - if dq.metrics != nil { - dq.metrics.Depth.WithLabelValues(dq.name).Set(float64(len(dq.items))) - dq.metrics.Duration.WithLabelValues(dq.name).Observe(time.Since(item.enqueuedAt).Seconds()) + if canDedupe(ev) { + return EventKey{ + ResourceID: resID, + EventType: evType, } + } - dq.mu.Unlock() - return item.event, false + // Non-dedupable events get a unique key + return EventKey{ + ResourceID: resID, + EventType: evType, + EventID: internalevent.EventID(ev), } - dq.mu.Unlock() +} - // Block until an item is available or shutdown - for range dq.notify { - dq.mu.Lock() - if len(dq.items) > 0 { - item := dq.pop() +func (q *dedupeQueue) Add(item *event.Event) { + key := getKey(item) - if dq.metrics != nil { - dq.metrics.Depth.WithLabelValues(dq.name).Set(float64(len(dq.items))) - dq.metrics.Duration.WithLabelValues(dq.name).Observe(time.Since(item.enqueuedAt).Seconds()) + q.mu.Lock() + oldEvent := q.latestEvents[key] + _, exists := q.latestEvents[key] + q.latestEvents[key] = item + q.eventKeys[item] = key + if exists && oldEvent != nil { + delete(q.eventKeys, oldEvent) + } + q.mu.Unlock() + + // Only evict when this is a genuinely new key. If the key already + // exists the workqueue will update it in-place (Touch) without + // growing, so evicting would incorrectly shrink the queue. + if !exists && q.queue.Len() == q.maxSize { + oldest, shutdown := q.queue.Get() + if !shutdown { + q.mu.Lock() + evicted := q.latestEvents[oldest] + delete(q.latestEvents, oldest) + if evicted != nil { + delete(q.eventKeys, evicted) } - - dq.mu.Unlock() - return item.event, false - } - - if dq.isShutdown { - dq.mu.Unlock() - return nil, true + q.mu.Unlock() + q.queue.Done(oldest) } - dq.mu.Unlock() } - // notify channel was closed (shutdown) - return nil, true + q.queue.Add(key) + select { + case q.notify <- struct{}{}: + default: + return + } } -func (dq *dedupeQueue) Done(_ *event.Event) { - // No-op: the dedupe queue does not track in-flight items. -} +func (q *dedupeQueue) Get() (*event.Event, bool) { + key, shutdown := q.queue.Get() + if shutdown { + return nil, shutdown + } -func (dq *dedupeQueue) Len() int { - dq.mu.Lock() - defer dq.mu.Unlock() - return len(dq.items) -} + q.mu.Lock() + ev := q.latestEvents[key] + delete(q.latestEvents, key) + q.mu.Unlock() -func (dq *dedupeQueue) ShutDown() { - dq.mu.Lock() - defer dq.mu.Unlock() - if dq.isShutdown { - return - } - dq.isShutdown = true - close(dq.notify) + return ev, shutdown } -// removeDuplicates removes all queued events matching the same resourceID and -// event type as item. Must be called with dq.mu held. -func (dq *dedupeQueue) removeDuplicates(incoming *event.Event) { - resID, evType := dedupeKey(incoming) - if resID == "" { - return +func (q *dedupeQueue) Done(item *event.Event) { + q.mu.Lock() + key, ok := q.eventKeys[item] + if ok { + delete(q.eventKeys, item) + delete(q.latestEvents, key) } + q.mu.Unlock() - // TODO: avoid this linear scan/shift for every incoming event. - for i := len(dq.items) - 1; i >= 0; i-- { - existingID, existingEvType := dedupeKey(dq.items[i].event) - if existingID == resID && existingEvType == evType { - dq.items[i] = nil - dq.items = append(dq.items[:i], dq.items[i+1:]...) - if dq.metrics != nil { - dq.metrics.EventsDeduped.WithLabelValues(dq.name).Inc() - } - } + if ok { + q.queue.Done(key) } } -// pop the first item from the queue. -// Must be called with queue lock held. -func (dq *dedupeQueue) pop() *queueItem { - if len(dq.items) == 0 { - return nil - } - item := dq.items[0] - dq.items[0] = nil - dq.items = dq.items[1:] - return item +func (q *dedupeQueue) ShutDown() { + q.queue.ShutDown() +} + +func (q *dedupeQueue) Len() int { + return q.queue.Len() +} + +// canDedupe returns true if the event type supports de-duplication. +func canDedupe(ev *event.Event) bool { + evType := ev.Type() + return evType == internalevent.SpecUpdate.String() || evType == internalevent.StatusUpdate.String() } diff --git a/internal/queue/dedupe_queue_test.go b/internal/queue/dedupe_queue_test.go index 248e4fafe..1c667e8cc 100644 --- a/internal/queue/dedupe_queue_test.go +++ b/internal/queue/dedupe_queue_test.go @@ -15,290 +15,387 @@ package queue import ( - "strconv" + "fmt" + "sync" "testing" internalevent "github.com/argoproj-labs/argocd-agent/internal/event" - "github.com/cloudevents/sdk-go/v2/event" + cloudevents "github.com/cloudevents/sdk-go/v2/event" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -func newTestEvent(id string, eventType string, resourceID string) *event.Event { - ev := event.New() - ev.SetID(id) - ev.SetType(eventType) - if resourceID != "" { - ev.SetExtension("resourceid", resourceID) - } +func newDedupableEvent(resourceID, data string) *cloudevents.Event { + ev := cloudevents.New() + ev.SetType(internalevent.SpecUpdate.String()) + ev.SetExtension("resourceid", resourceID) + ev.SetExtension("eventid", fmt.Sprintf("%s_%s", resourceID, data)) + _ = ev.SetData(cloudevents.TextPlain, data) return &ev } -func TestDedupeQueue_BasicFIFO(t *testing.T) { - t.Run("Events dequeue in FIFO order", func(t *testing.T) { - q := newDedupeQueue(1000, "test") - ev1 := newTestEvent("1", internalevent.Create.String(), "app-a") - ev2 := newTestEvent("2", internalevent.Create.String(), "app-b") - ev3 := newTestEvent("3", internalevent.Delete.String(), "app-c") - - q.Add(ev1) - q.Add(ev2) - q.Add(ev3) +func newStatusUpdateEvent(resourceID, data string) *cloudevents.Event { + ev := cloudevents.New() + ev.SetType(internalevent.StatusUpdate.String()) + ev.SetExtension("resourceid", resourceID) + ev.SetExtension("eventid", fmt.Sprintf("%s_%s", resourceID, data)) + _ = ev.SetData(cloudevents.TextPlain, data) + return &ev +} - assert.Equal(t, 3, q.Len()) +func newNonDedupableEvent(resourceID, eventID, data string) *cloudevents.Event { + ev := cloudevents.New() + ev.SetType(internalevent.Create.String()) + ev.SetExtension("resourceid", resourceID) + ev.SetExtension("eventid", eventID) + _ = ev.SetData(cloudevents.TextPlain, data) + return &ev +} - got, shutdown := q.Get() - assert.False(t, shutdown) - assert.Equal(t, "1", got.ID()) +func TestDedupeQueue_DifferentResourcesNotDeduplicated(t *testing.T) { + q := NewDedupeQueue("test", 100) - got, _ = q.Get() - assert.Equal(t, "2", got.ID()) + ev1 := newDedupableEvent("app1_uid1", "v1") + ev2 := newDedupableEvent("app2_uid2", "v1") - got, _ = q.Get() - assert.Equal(t, "3", got.ID()) + q.Add(ev1) + q.Add(ev2) - assert.Equal(t, 0, q.Len()) - }) + assert.Equal(t, 2, q.Len()) } -func TestDedupeQueue_SpecUpdateDedup(t *testing.T) { - t.Run("Newer SpecUpdate replaces older for same resource", func(t *testing.T) { - q := newDedupeQueue(1000, "test") - ev1 := newTestEvent("v1", internalevent.SpecUpdate.String(), "app-x_uid1") - ev2 := newTestEvent("v2", internalevent.SpecUpdate.String(), "app-x_uid1") - ev3 := newTestEvent("v3", internalevent.SpecUpdate.String(), "app-x_uid1") +func TestDedupeQueue_DifferentTypesNotDeduplicated(t *testing.T) { + q := NewDedupeQueue("test", 100) - q.Add(ev1) - q.Add(ev2) - q.Add(ev3) + ev1 := newDedupableEvent("app1_uid1", "v1") + ev2 := newStatusUpdateEvent("app1_uid1", "v1") - assert.Equal(t, 1, q.Len()) + q.Add(ev1) + q.Add(ev2) - got, _ := q.Get() - assert.Equal(t, "v3", got.ID()) - }) + assert.Equal(t, 2, q.Len()) +} - t.Run("SpecUpdates for different resources are not deduped", func(t *testing.T) { - q := newDedupeQueue(1000, "test") - ev1 := newTestEvent("v1", internalevent.SpecUpdate.String(), "app-x_uid1") - ev2 := newTestEvent("v2", internalevent.SpecUpdate.String(), "app-y_uid2") +func TestDedupeQueue_NonDedupableEventsNeverCoalesce(t *testing.T) { + q := NewDedupeQueue("test", 100) - q.Add(ev1) - q.Add(ev2) + ev1 := newNonDedupableEvent("app1_uid1", "evt-1", "data1") + ev2 := newNonDedupableEvent("app1_uid1", "evt-2", "data2") + ev3 := newNonDedupableEvent("app1_uid1", "evt-3", "data3") - assert.Equal(t, 2, q.Len()) + q.Add(ev1) + q.Add(ev2) + q.Add(ev3) - got, _ := q.Get() - assert.Equal(t, "v1", got.ID()) + assert.Equal(t, 3, q.Len()) +} - got, _ = q.Get() - assert.Equal(t, "v2", got.ID()) - }) +func TestDedupeQueue_FIFOOrdering(t *testing.T) { + q := NewDedupeQueue("test", 100) + + ev1 := newNonDedupableEvent("app1", "evt-1", "first") + ev2 := newNonDedupableEvent("app2", "evt-2", "second") + ev3 := newNonDedupableEvent("app3", "evt-3", "third") + + q.Add(ev1) + q.Add(ev2) + q.Add(ev3) + + got1, _ := q.Get() + q.Done(got1) + got2, _ := q.Get() + q.Done(got2) + got3, _ := q.Get() + q.Done(got3) + + var d1, d2, d3 string + _ = got1.DataAs(&d1) + _ = got2.DataAs(&d2) + _ = got3.DataAs(&d3) + assert.Equal(t, "first", d1) + assert.Equal(t, "second", d2) + assert.Equal(t, "third", d3) } -func TestDedupeQueue_StatusUpdateDedup(t *testing.T) { - t.Run("Newer StatusUpdate replaces older for same resource", func(t *testing.T) { - q := newDedupeQueue(1000, "test") - ev1 := newTestEvent("s1", internalevent.StatusUpdate.String(), "app-x_uid1") - ev2 := newTestEvent("s2", internalevent.StatusUpdate.String(), "app-x_uid1") +func TestDedupeQueue_DedupeMovesToBack(t *testing.T) { + q := NewDedupeQueue("test", 100) - q.Add(ev1) - q.Add(ev2) + ev1 := newDedupableEvent("app1_uid1", "v1") + ev2 := newDedupableEvent("app2_uid2", "v1") - assert.Equal(t, 1, q.Len()) + q.Add(ev1) + q.Add(ev2) - got, _ := q.Get() - assert.Equal(t, "s2", got.ID()) - }) -} + // Re-add app1 with new data — should move to back + ev1Updated := newDedupableEvent("app1_uid1", "v2") + q.Add(ev1Updated) -func TestDedupeQueue_DifferentTypesNotDeduped(t *testing.T) { - t.Run("SpecUpdate and StatusUpdate for same resource are both kept (different types)", func(t *testing.T) { - q := newDedupeQueue(1000, "test") - evSpec := newTestEvent("spec1", internalevent.SpecUpdate.String(), "app-x_uid1") - evStatus := newTestEvent("status1", internalevent.StatusUpdate.String(), "app-x_uid1") + assert.Equal(t, 2, q.Len()) - q.Add(evSpec) - q.Add(evStatus) + // First out should be app2 (app1 moved to back) + got, _ := q.Get() + q.Done(got) + assert.Equal(t, "app2_uid2", internalevent.ResourceID(got)) - assert.Equal(t, 2, q.Len()) - - got, _ := q.Get() - assert.Equal(t, "spec1", got.ID()) + got, _ = q.Get() + q.Done(got) + assert.Equal(t, "app1_uid1", internalevent.ResourceID(got)) + var data string + _ = got.DataAs(&data) + assert.Equal(t, "v2", data, "should have latest payload") +} - got, _ = q.Get() - assert.Equal(t, "status1", got.ID()) - }) +func TestDedupeQueue_BoundedEviction(t *testing.T) { + maxSize := 5 + q := NewDedupeQueue("test", maxSize) - t.Run("Create event is never deduped even for same resource", func(t *testing.T) { - q := newDedupeQueue(1000, "test") - evCreate := newTestEvent("c1", internalevent.Create.String(), "app-x_uid1") - evSpec := newTestEvent("spec1", internalevent.SpecUpdate.String(), "app-x_uid1") + for i := 0; i < maxSize; i++ { + ev := newNonDedupableEvent(fmt.Sprintf("app%d", i), fmt.Sprintf("evt-%d", i), fmt.Sprintf("data%d", i)) + q.Add(ev) + } + assert.Equal(t, maxSize, q.Len()) + + // Adding one more should evict the oldest + overflow := newNonDedupableEvent("appNew", "evt-new", "new-data") + q.Add(overflow) + assert.Equal(t, maxSize, q.Len()) + + // The oldest (app0) should have been evicted; first available is app1 + got, _ := q.Get() + q.Done(got) + var data string + _ = got.DataAs(&data) + assert.Equal(t, "data1", data) +} - q.Add(evCreate) - q.Add(evSpec) +func TestDedupeQueue_DuplicateDoesNotEvict(t *testing.T) { + maxSize := 3 + q := NewDedupeQueue("test", maxSize) + + ev1 := newDedupableEvent("app1_uid1", "v1") + ev2 := newDedupableEvent("app2_uid2", "v1") + ev3 := newDedupableEvent("app3_uid3", "v1") + + q.Add(ev1) + q.Add(ev2) + q.Add(ev3) + assert.Equal(t, maxSize, q.Len()) + + // Re-adding app1 (deduplicate) should NOT evict anything + ev1Updated := newDedupableEvent("app1_uid1", "v2") + q.Add(ev1Updated) + assert.Equal(t, maxSize, q.Len()) + + // All three resources should still be present + got1, _ := q.Get() + q.Done(got1) + got2, _ := q.Get() + q.Done(got2) + got3, _ := q.Get() + q.Done(got3) + + resources := []string{ + internalevent.ResourceID(got1), + internalevent.ResourceID(got2), + internalevent.ResourceID(got3), + } + assert.Contains(t, resources, "app1_uid1") + assert.Contains(t, resources, "app2_uid2") + assert.Contains(t, resources, "app3_uid3") +} - assert.Equal(t, 2, q.Len()) +func TestDedupeQueue_GetReturnsLatestAfterMultipleUpdates(t *testing.T) { + q := NewDedupeQueue("test", 100) - got, _ := q.Get() - assert.Equal(t, "c1", got.ID()) + for i := 0; i < 10; i++ { + ev := newDedupableEvent("app1_uid1", fmt.Sprintf("version-%d", i)) + q.Add(ev) + } - got, _ = q.Get() - assert.Equal(t, "spec1", got.ID()) - }) + assert.Equal(t, 1, q.Len()) - t.Run("Delete event is never deduped", func(t *testing.T) { - q := newDedupeQueue(1000, "test") - evDel1 := newTestEvent("d1", internalevent.Delete.String(), "app-x_uid1") - evDel2 := newTestEvent("d2", internalevent.Delete.String(), "app-x_uid1") + got, _ := q.Get() + var data string + _ = got.DataAs(&data) + assert.Equal(t, "version-9", data) +} - q.Add(evDel1) - q.Add(evDel2) +func TestDedupeQueue_Done(t *testing.T) { + q := NewDedupeQueue("test", 100) - assert.Equal(t, 2, q.Len()) + ev := newDedupableEvent("app1_uid1", "v1") + q.Add(ev) - got, _ := q.Get() - assert.Equal(t, "d1", got.ID()) + got, _ := q.Get() + assert.Equal(t, 0, q.Len()) + q.Done(got) - got, _ = q.Get() - assert.Equal(t, "d2", got.ID()) - }) + // After Done, adding a new event for the same resource should work fresh + ev2 := newDedupableEvent("app1_uid1", "v2") + q.Add(ev2) + assert.Equal(t, 1, q.Len()) } -func TestDedupeQueue_MoveToTail(t *testing.T) { - t.Run("Deduped event moves to tail preserving order", func(t *testing.T) { - q := newDedupeQueue(1000, "test") +func TestDedupeQueue_ShutDown(t *testing.T) { + q := NewDedupeQueue("test", 100) - evSpecX := newTestEvent("spec-x-v1", internalevent.SpecUpdate.String(), "app-x_uid1") - evCreate := newTestEvent("create-y", internalevent.Create.String(), "app-y_uid2") - evSpecXv2 := newTestEvent("spec-x-v2", internalevent.SpecUpdate.String(), "app-x_uid1") + ev := newDedupableEvent("app1_uid1", "v1") + q.Add(ev) - q.Add(evSpecX) - q.Add(evCreate) - // This should remove evSpecX from position 0 and append evSpecXv2 at tail - q.Add(evSpecXv2) + q.ShutDown() - assert.Equal(t, 2, q.Len()) + // Pending items are still returned after shutdown + got, shutdown := q.Get() + assert.False(t, shutdown) + assert.NotNil(t, got) + q.Done(got) - got, _ := q.Get() - assert.Equal(t, "create-y", got.ID()) + // Once drained, Get signals shutdown + _, shutdown = q.Get() + assert.True(t, shutdown) +} - got, _ = q.Get() - assert.Equal(t, "spec-x-v2", got.ID()) - }) +func TestDedupeQueue_EmptyQueueLen(t *testing.T) { + q := NewDedupeQueue("test", 100) + assert.Equal(t, 0, q.Len()) } -func TestDedupeQueue_NonDedupEligibleEvents(t *testing.T) { - t.Run("Heartbeat events are never deduped", func(t *testing.T) { - q := newDedupeQueue(1000, "test") - ev1 := newTestEvent("hb1", internalevent.Ping.String(), "uuid-1") - ev2 := newTestEvent("hb2", internalevent.Ping.String(), "uuid-2") +func TestDedupeQueue_MixedDedupableAndNonDedupable(t *testing.T) { + q := NewDedupeQueue("test", 100) - q.Add(ev1) - q.Add(ev2) + spec1 := newDedupableEvent("app1_uid1", "spec-v1") + spec2 := newDedupableEvent("app1_uid1", "spec-v2") + create1 := newNonDedupableEvent("app1_uid1", "create-1", "create-data") - assert.Equal(t, 2, q.Len()) - }) + q.Add(spec1) + q.Add(create1) + q.Add(spec2) - t.Run("Events with empty resourceID are not deduped", func(t *testing.T) { - q := newDedupeQueue(1000, "test") - ev1 := newTestEvent("1", internalevent.SpecUpdate.String(), "") - ev2 := newTestEvent("2", internalevent.SpecUpdate.String(), "") + // spec events coalesce (1 slot), create is separate (1 slot) = 2 total + assert.Equal(t, 2, q.Len()) +} - q.Add(ev1) - q.Add(ev2) +func TestDedupeQueue_NotifyOnAdd(t *testing.T) { + dq := NewDedupeQueue("test", 100).(*dedupeQueue) - assert.Equal(t, 2, q.Len()) - }) -} + ev := newDedupableEvent("app1_uid1", "v1") + q := dq + q.Add(ev) -func TestDedupeQueue_MaxSize(t *testing.T) { - t.Run("Oldest item is dropped when max size is exceeded", func(t *testing.T) { - maxSize := 5 - q := newDedupeQueue(maxSize, "test") + select { + case <-dq.notify: + // expected + default: + t.Fatal("expected notification on Add") + } +} - for i := 1; i <= maxSize; i++ { - q.Add(newTestEvent(strconv.Itoa(i), internalevent.Create.String(), "app-"+strconv.Itoa(i))) - } +func TestDedupeQueue_ConcurrentAdds(t *testing.T) { + q := NewDedupeQueue("test", 1000) + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + ev := newDedupableEvent(fmt.Sprintf("app%d_uid%d", i, i), fmt.Sprintf("v%d", i)) + q.Add(ev) + }(i) + } + wg.Wait() - assert.Equal(t, maxSize, q.Len()) + assert.Equal(t, 100, q.Len()) +} - q.Add(newTestEvent("6", internalevent.Create.String(), "app-6")) +func TestDedupeQueue_ConcurrentDeduplication(t *testing.T) { + q := NewDedupeQueue("test", 1000) + + var wg sync.WaitGroup + for i := 0; i < 50; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + ev := newDedupableEvent("app1_uid1", fmt.Sprintf("v%d", i)) + q.Add(ev) + }(i) + } + wg.Wait() - assert.Equal(t, maxSize, q.Len()) + assert.Equal(t, 1, q.Len()) - got, _ := q.Get() - assert.Equal(t, "2", got.ID()) - }) + got, _ := q.Get() + require.NotNil(t, got) } -func TestDedupeQueue_Done(t *testing.T) { - t.Run("Done is a no-op", func(t *testing.T) { - q := newDedupeQueue(1000, "test") - ev := newTestEvent("1", internalevent.Create.String(), "app-a") - q.Add(ev) +func TestReorderQueue_Push_Pop(t *testing.T) { + q := newReorderQueue[string]() - got, _ := q.Get() - q.Done(got) // should not panic - assert.Equal(t, 0, q.Len()) - }) + q.Push("a") + q.Push("b") + q.Push("c") + + assert.Equal(t, 3, q.Len()) + assert.Equal(t, "a", q.Pop()) + assert.Equal(t, "b", q.Pop()) + assert.Equal(t, "c", q.Pop()) + assert.Equal(t, 0, q.Len()) } -func TestDedupeQueue_ShutDown(t *testing.T) { - t.Run("Get returns shutdown signal after ShutDown", func(t *testing.T) { - q := newDedupeQueue(1000, "test") - q.ShutDown() - - _, shutdown := q.Get() - assert.True(t, shutdown) - }) - - t.Run("Double ShutDown does not panic", func(t *testing.T) { - q := newDedupeQueue(1000, "test") - q.ShutDown() - assert.NotPanics(t, func() { q.ShutDown() }) - }) - - t.Run("Add after ShutDown does not panic and is ignored", func(t *testing.T) { - q := newDedupeQueue(1000, "test") - q.ShutDown() - assert.NotPanics(t, func() { - q.Add(newTestEvent("1", internalevent.SpecUpdate.String(), "app-x")) - }) - assert.Equal(t, 0, q.Len()) - }) +func TestReorderQueue_Touch_MovesToBack(t *testing.T) { + q := newReorderQueue[string]() + + q.Push("a") + q.Push("b") + q.Push("c") + + q.Touch("a") + + assert.Equal(t, 3, q.Len()) + assert.Equal(t, "b", q.Pop()) + assert.Equal(t, "c", q.Pop()) + assert.Equal(t, "a", q.Pop()) } -func TestDedupeQueue_MixedScenario(t *testing.T) { - t.Run("Mixed dedup-eligible and non-dedup-eligible events", func(t *testing.T) { - q := newDedupeQueue(1000, "test") +func TestReorderQueue_Touch_MiddleElement(t *testing.T) { + q := newReorderQueue[string]() - q.Add(newTestEvent("create-x", internalevent.Create.String(), "app-x_uid1")) - q.Add(newTestEvent("spec-x-v1", internalevent.SpecUpdate.String(), "app-x_uid1")) - q.Add(newTestEvent("status-y-v1", internalevent.StatusUpdate.String(), "app-y_uid2")) - q.Add(newTestEvent("spec-x-v2", internalevent.SpecUpdate.String(), "app-x_uid1")) - q.Add(newTestEvent("status-y-v2", internalevent.StatusUpdate.String(), "app-y_uid2")) - q.Add(newTestEvent("delete-z", internalevent.Delete.String(), "app-z_uid3")) + q.Push("a") + q.Push("b") + q.Push("c") + q.Push("d") - // Expected order: - // create-x (non-dedup-eligible, stays at original position) - // spec-x-v2 (replaced spec-x-v1, moved to tail of where spec-x-v1 was... no, moved to tail) - // status-y-v2 (replaced status-y-v1, moved to tail) - // delete-z (non-dedup-eligible, appended) + q.Touch("b") - assert.Equal(t, 4, q.Len()) + assert.Equal(t, "a", q.Pop()) + assert.Equal(t, "c", q.Pop()) + assert.Equal(t, "d", q.Pop()) + assert.Equal(t, "b", q.Pop()) +} + +func TestReorderQueue_Touch_LastElement(t *testing.T) { + q := newReorderQueue[string]() + + q.Push("a") + q.Push("b") + q.Push("c") - got, _ := q.Get() - assert.Equal(t, "create-x", got.ID()) + q.Touch("c") + + // "c" is already last — order shouldn't change + assert.Equal(t, "a", q.Pop()) + assert.Equal(t, "b", q.Pop()) + assert.Equal(t, "c", q.Pop()) +} - got, _ = q.Get() - assert.Equal(t, "spec-x-v2", got.ID()) +func TestReorderQueue_Touch_NonExistent(t *testing.T) { + q := newReorderQueue[string]() - got, _ = q.Get() - assert.Equal(t, "status-y-v2", got.ID()) + q.Push("a") + q.Push("b") - got, _ = q.Get() - assert.Equal(t, "delete-z", got.ID()) + q.Touch("z") // should be a no-op - assert.Equal(t, 0, q.Len()) - }) + assert.Equal(t, 2, q.Len()) + assert.Equal(t, "a", q.Pop()) + assert.Equal(t, "b", q.Pop()) } diff --git a/internal/queue/mocks/QueuePair.go b/internal/queue/mocks/QueuePair.go index fbe9a1186..211b0b5f7 100644 --- a/internal/queue/mocks/QueuePair.go +++ b/internal/queue/mocks/QueuePair.go @@ -255,19 +255,19 @@ func (_c *QueuePair_Names_Call) RunAndReturn(run func() []string) *QueuePair_Nam } // RecvQ provides a mock function with given fields: name -func (_m *QueuePair) RecvQ(name string) queue.RecvQueue { +func (_m *QueuePair) RecvQ(name string) queue.WorkQueue { ret := _m.Called(name) if len(ret) == 0 { panic("no return value specified for RecvQ") } - var r0 queue.RecvQueue - if rf, ok := ret.Get(0).(func(string) queue.RecvQueue); ok { + var r0 queue.WorkQueue + if rf, ok := ret.Get(0).(func(string) queue.WorkQueue); ok { r0 = rf(name) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(queue.RecvQueue) + r0 = ret.Get(0).(queue.WorkQueue) } } @@ -292,12 +292,12 @@ func (_c *QueuePair_RecvQ_Call) Run(run func(name string)) *QueuePair_RecvQ_Call return _c } -func (_c *QueuePair_RecvQ_Call) Return(_a0 queue.RecvQueue) *QueuePair_RecvQ_Call { +func (_c *QueuePair_RecvQ_Call) Return(_a0 queue.WorkQueue) *QueuePair_RecvQ_Call { _c.Call.Return(_a0) return _c } -func (_c *QueuePair_RecvQ_Call) RunAndReturn(run func(string) queue.RecvQueue) *QueuePair_RecvQ_Call { +func (_c *QueuePair_RecvQ_Call) RunAndReturn(run func(string) queue.WorkQueue) *QueuePair_RecvQ_Call { _c.Call.Return(run) return _c } diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 813046664..333553703 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -27,10 +27,10 @@ import ( var _ QueuePair = &SendRecvQueues{} -// RecvQueue is the minimal interface used by RecvQ consumers. It covers only -// the methods actually called on receive queues (Add, Get, Done, Len, +// WorkQueue is the minimal interface used by workqueue consumers. It covers only +// the methods actually called on workqueues (Add, Get, Done, Len, // ShutDown), avoiding a dependency on the full workqueue interface. -type RecvQueue interface { +type WorkQueue interface { Add(item *event.Event) Get() (*event.Event, bool) Done(item *event.Event) @@ -43,8 +43,8 @@ type QueuePair interface { Names() []string HasQueuePair(name string) bool Len() int - SendQ(name string) workqueue.TypedRateLimitingInterface[*event.Event] - RecvQ(name string) RecvQueue + SendQ(name string) WorkQueue + RecvQ(name string) WorkQueue Create(name string) error Delete(name string, shutdown bool) error } @@ -55,8 +55,8 @@ const ( ) type queuepair struct { - recvq *dedupeQueue - sendq *boundedQueue + recvq WorkQueue + sendq WorkQueue } type boundedQueue struct { @@ -136,7 +136,7 @@ func (q *SendRecvQueues) Len() int { // SendQ will return the send queue from the queue pair named name. If no such // queue pair exists, returns nil -func (q *SendRecvQueues) SendQ(name string) workqueue.TypedRateLimitingInterface[*event.Event] { +func (q *SendRecvQueues) SendQ(name string) WorkQueue { q.queuelock.RLock() defer q.queuelock.RUnlock() qp, ok := q.queues[name] @@ -148,7 +148,7 @@ func (q *SendRecvQueues) SendQ(name string) workqueue.TypedRateLimitingInterface // RecvQ will return the receive queue from the queue pair named name. If no // such queue pair exists, returns nil -func (q *SendRecvQueues) RecvQ(name string) RecvQueue { +func (q *SendRecvQueues) RecvQ(name string) WorkQueue { q.queuelock.RLock() defer q.queuelock.RUnlock() qp, ok := q.queues[name] @@ -182,8 +182,8 @@ func (q *SendRecvQueues) Create(name string) error { }, defaultMaxQueueSize) qp := &queuepair{} - qp.sendq = newBoundedQueue(sendQueueSize, name+"-send") - qp.recvq = newDedupeQueue(recvQueueSize, name+"-recv") + qp.sendq = NewDedupeQueue(name+"-send", sendQueueSize) + qp.recvq = NewDedupeQueue(name+"-recv", recvQueueSize) q.queues[name] = qp return nil @@ -210,8 +210,8 @@ func (q *SendRecvQueues) Delete(name string, shutdown bool) error { // GetWithContext is a wrapper around the workqueue's Get method. // It waits until an item is available in the queue or the context is Done -func GetWithContext(q workqueue.TypedRateLimitingInterface[*event.Event], ctx context.Context) (*event.Event, bool) { - bq, ok := q.(*boundedQueue) +func GetWithContext(q WorkQueue, ctx context.Context) (*event.Event, bool) { + bq, ok := q.(*dedupeQueue) if !ok { return nil, false } diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go index 8417a4b62..20c4caba0 100644 --- a/internal/queue/queue_test.go +++ b/internal/queue/queue_test.go @@ -69,12 +69,16 @@ func Test_Queue(t *testing.T) { for i := 1; i <= defaultMaxQueueSize; i++ { ev := event.New() ev.SetID(strconv.Itoa(i)) + ev.SetExtension("eventid", strconv.Itoa(i)) + ev.SetExtension("resourceid", strconv.Itoa(i)) queue.Add(&ev) } // Since the queue is full, check if the oldest item is popped before adding a new item. ev := event.New() ev.SetID(strconv.Itoa(defaultMaxQueueSize + 1)) + ev.SetExtension("eventid", "new-event-id") + ev.SetExtension("resourceid", "new-resource-id") queue.Add(&ev) assert.Equal(t, defaultMaxQueueSize, queue.Len()) front, _ := queue.Get() @@ -94,6 +98,8 @@ func Test_Queue(t *testing.T) { for i := 1; i <= queueSize+20; i++ { ev := event.New() ev.SetID(strconv.Itoa(i)) + ev.SetExtension("eventid", strconv.Itoa(i)) + ev.SetExtension("resourceid", strconv.Itoa(i)) recvQueue.Add(&ev) sendQueue.Add(&ev) } @@ -101,6 +107,8 @@ func Test_Queue(t *testing.T) { // Since the queue is full, check if the oldest item is popped before adding a new item. ev := event.New() ev.SetID(strconv.Itoa(queueSize + 1)) + ev.SetExtension("eventid", "new-event-id") + ev.SetExtension("resourceid", "new-resource-id") recvQueue.Add(&ev) sendQueue.Add(&ev) assert.Equal(t, queueSize, recvQueue.Len()) diff --git a/internal/resync/resync.go b/internal/resync/resync.go index 13867438f..82f21da0c 100644 --- a/internal/resync/resync.go +++ b/internal/resync/resync.go @@ -26,6 +26,7 @@ import ( "github.com/argoproj-labs/argocd-agent/internal/logging/logfields" "github.com/argoproj-labs/argocd-agent/internal/manager" "github.com/argoproj-labs/argocd-agent/internal/manager/appproject" + "github.com/argoproj-labs/argocd-agent/internal/queue" "github.com/argoproj-labs/argocd-agent/internal/resources" "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1" cloudevent "github.com/cloudevents/sdk-go/v2/event" @@ -37,7 +38,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" ktypes "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" - "k8s.io/client-go/util/workqueue" ) // ErrSourceUIDNotFound is returned when a resource does not have the source UID annotation. @@ -49,7 +49,7 @@ var ErrSourceUIDNotFound = errors.New("source UID annotation not found") type RequestHandler struct { dynClient dynamic.Interface - sendQ workqueue.TypedRateLimitingInterface[*cloudevent.Event] + sendQ queue.WorkQueue events *event.EventSource @@ -80,7 +80,7 @@ type RequestHandler struct { peerNamespace string } -func NewRequestHandler(dynClient dynamic.Interface, queue workqueue.TypedRateLimitingInterface[*cloudevent.Event], events *event.EventSource, resources *resources.Resources, log *logrus.Entry, role manager.ManagerRole, namespace string) *RequestHandler { +func NewRequestHandler(dynClient dynamic.Interface, queue queue.WorkQueue, events *event.EventSource, resources *resources.Resources, log *logrus.Entry, role manager.ManagerRole, namespace string) *RequestHandler { return &RequestHandler{ dynClient: dynClient, sendQ: queue, diff --git a/principal/callbacks_test.go b/principal/callbacks_test.go index 170e45509..57435e9f0 100644 --- a/principal/callbacks_test.go +++ b/principal/callbacks_test.go @@ -45,7 +45,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" k8stypes "k8s.io/apimachinery/pkg/types" kubefake "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/util/workqueue" ) func TestMapAppProjectToAgents(t *testing.T) { @@ -2673,7 +2672,7 @@ func TestServer_deleteGPGKeyCallback(t *testing.T) { }) } -func drainQueue(t *testing.T, q workqueue.TypedRateLimitingInterface[*cloudevents.Event]) { +func drainQueue(t *testing.T, q queue.WorkQueue) { for q.Len() > 0 { ev, shutdown := q.Get() if shutdown { diff --git a/principal/event.go b/principal/event.go index 12245c8f0..6bd353f20 100644 --- a/principal/event.go +++ b/principal/event.go @@ -59,7 +59,7 @@ func skipReplication(target targets.EventTarget) bool { // processRecvQueue processes an entry from the receiver queue, which holds the // events received by agents. It will trigger updates of resources in the // server's backend. -func (s *Server) processRecvQueue(ctx context.Context, agentName string, q queue.RecvQueue) (*cloudevents.Event, error) { +func (s *Server) processRecvQueue(ctx context.Context, agentName string, q queue.WorkQueue) (*cloudevents.Event, error) { status := metrics.EventProcessingSuccess ev, _ := q.Get() @@ -805,7 +805,7 @@ func (s *Server) eventProcessor(ctx context.Context) error { queueLogCtx.Trace("Acquired semaphore") - go func(agentName string, q queue.RecvQueue, logCtx *logrus.Entry) { + go func(agentName string, q queue.WorkQueue, logCtx *logrus.Entry) { defer func() { sem.Release(1) queueLock.Unlock(agentName) diff --git a/principal/event_test.go b/principal/event_test.go index 2ec702d32..41f4aca54 100644 --- a/principal/event_test.go +++ b/principal/event_test.go @@ -40,8 +40,8 @@ import ( // newTestRecvQueue creates a RecvQueue with a single event pre-loaded, for use in // tests that call processRecvQueue. -func newTestRecvQueue(ev *cloudevents.Event) queue.RecvQueue { - q := queue.NewDedupeQueueForTest(1000) +func newTestRecvQueue(ev *cloudevents.Event) queue.WorkQueue { + q := queue.NewDedupeQueue("cloud-events", 1000) q.Add(ev) return q } diff --git a/principal/server.go b/principal/server.go index e4e008a91..4f7a08d35 100644 --- a/principal/server.go +++ b/principal/server.go @@ -262,7 +262,6 @@ func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace metricsRegistered.Do(func() { metrics.RegisterK8sClientMetrics() metrics.RegisterQueueMetrics("argocd_principal") - metrics.RegisterDedupeQueueMetrics("argocd_principal") }) }