Skip to content

Commit f246506

Browse files
committed
feat: deduplicate event writer events for spec/statusupdate
Signed-off-by: Jonathan West <jgwest@gmail.com>
1 parent 2f8cd99 commit f246506

2 files changed

Lines changed: 194 additions & 43 deletions

File tree

internal/event/event_writer.go

Lines changed: 70 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package event
22

33
import (
44
"context"
5+
"math"
6+
"math/rand/v2"
57
"sync"
68
"time"
79

@@ -17,7 +19,7 @@ import (
1719

1820
const (
1921
// maxEventRetries is the maximum number of times an event will be retried before giving up.
20-
maxEventRetries = 12
22+
maxEventRetries = math.MaxInt
2123
)
2224

2325
type streamWriter interface {
@@ -113,9 +115,10 @@ func (ew *EventWriter) Add(ev *cloudevents.Event) {
113115

114116
defaultBackoff := wait.Backoff{
115117
Steps: maxEventRetries,
116-
Duration: 1 * time.Second,
117-
Factor: 1.5,
118-
Jitter: 0.1,
118+
Duration: 5 * time.Second,
119+
Factor: 4,
120+
Jitter: 1,
121+
Cap: 2 * time.Minute, // never wait longer than 2 minutes
119122
}
120123

121124
if resID == "" {
@@ -171,7 +174,7 @@ func (ew *EventWriter) Get(resID string) *eventMessage {
171174

172175
// Then check unsent queue
173176
if eq, exists := ew.unsentEvents[resID]; exists {
174-
return eq.get()
177+
return eq.peek()
175178
}
176179
return nil
177180
}
@@ -196,13 +199,18 @@ func (ew *EventWriter) Remove(ev *cloudevents.Event) {
196199
}
197200
}
198201

202+
// JGW-TODO:
203+
// - A) Should Remove remove from both sentEvents/unsentEvents? (I mean, it seems likely it will be in one or the other, but we could check both?)
204+
// and
205+
// - B) In the unsent case, why does remove only remove from the front of the queue?
206+
199207
// If not in sent events, check unsent queue
200208
eq, exists := ew.unsentEvents[resourceID]
201209
if !exists {
202210
return
203211
}
204212

205-
front := eq.get()
213+
front := eq.peek()
206214
if front == nil {
207215
return
208216
}
@@ -248,6 +256,11 @@ func (ew *EventWriter) SendWaitingEvents(ctx context.Context) {
248256
}
249257
ew.mu.RUnlock()
250258

259+
// Shuffle so no resource is systematically starved
260+
rand.Shuffle(len(resourceIDs), func(i, j int) {
261+
resourceIDs[i], resourceIDs[j] = resourceIDs[j], resourceIDs[i]
262+
})
263+
251264
for _, resourceID := range resourceIDs {
252265
ew.sendEvent(resourceID)
253266
}
@@ -484,26 +497,63 @@ func (eq *eventQueue) add(ev *eventMessage) {
484497
eq.mu.Lock()
485498
defer eq.mu.Unlock()
486499

487-
if len(eq.items) > 0 {
488-
tail := eq.items[len(eq.items)-1]
489-
tail.mu.Lock()
500+
eq.items = append(eq.items, ev)
490501

491-
// Replace an older event with a newer one of the same type
492-
if ev.event.Type() == tail.event.Type() {
493-
tail.event = ev.event
494-
tail.backoff = ev.backoff
495-
tail.retryAfter = ev.retryAfter
496-
tail.mu.Unlock()
497-
return
502+
deduplicateEventMessageItems(&eq.items)
503+
504+
// if len(eq.items) > 0 {
505+
// tail := eq.items[len(eq.items)-1]
506+
// tail.mu.Lock()
507+
508+
// // Replace an older event with a newer one of the same type
509+
// if ev.event.Type() == tail.event.Type() {
510+
// tail.event = ev.event
511+
// tail.backoff = ev.backoff
512+
// tail.retryAfter = ev.retryAfter
513+
// tail.mu.Unlock()
514+
// return
515+
// }
516+
// tail.mu.Unlock()
517+
// }
518+
519+
// eq.items = append(eq.items, ev)
520+
}
521+
522+
// deduplicateEventMessageItems
523+
// - Ensure you own the lock on the items parameter (e.g. via eq.mu.Lock()) before calling this function
524+
func deduplicateEventMessageItems(items *[]*eventMessage) {
525+
526+
// key: Type() of event
527+
// value: (not used)
528+
haveWeSeenMsgWithType := make(map[string]bool, 0)
529+
530+
// Work backwards through the list:
531+
// - Items at the end of the list are 'fresher', items are the beginning of the list are more stale
532+
// - We thus remove items early in the list in favour of those that are later in the list
533+
for idx := len(*items) - 1; idx >= 0; idx-- {
534+
item := (*items)[idx]
535+
536+
myType := item.event.Type()
537+
538+
// No de-duplication of events we can't guarantee are safe to de-duplicate
539+
if myType != StatusUpdate.String() && myType != SpecUpdate.String() {
540+
continue
541+
}
542+
543+
// De-duplicate statusupdate and specupdate, as we know they are safe to de-duplicate
544+
if _, typePreviouslySeen := haveWeSeenMsgWithType[myType]; typePreviouslySeen {
545+
// Stale duplicate: a fresher same-type entry was retained when scanning from the tail.
546+
*items = append((*items)[:idx], (*items)[idx+1:]...)
547+
} else {
548+
// This is the first type we have seen the type, so add it to the map and continue without removal
549+
haveWeSeenMsgWithType[myType] = true
498550
}
499-
tail.mu.Unlock()
500551
}
501552

502-
eq.items = append(eq.items, ev)
503553
}
504554

505-
// get the first item from the queue.
506-
func (eq *eventQueue) get() *eventMessage {
555+
// peek the first item from the queue.
556+
func (eq *eventQueue) peek() *eventMessage {
507557
eq.mu.RLock()
508558
defer eq.mu.RUnlock()
509559
if len(eq.items) == 0 {

internal/event/event_writer_test.go

Lines changed: 124 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -286,29 +286,29 @@ func TestEventWriter(t *testing.T) {
286286
require.Equal(t, 1, sentMsg.retryCount)
287287
})
288288

289-
t.Run("should give up after max retries", func(t *testing.T) {
290-
fs := &fakeStream{}
291-
evSender := NewEventWriter("test", fs)
292-
293-
ev := es.ApplicationEvent(Create, app1)
294-
resID := createResourceID(app1.ObjectMeta)
295-
evSender.Add(ev)
296-
297-
// Send the event
298-
evSender.sendEvent(resID)
299-
sentMsg := evSender.sentEvents[resID]
300-
require.NotNil(t, sentMsg)
301-
302-
// Exhaust retries
303-
for i := 0; i <= maxEventRetries; i++ {
304-
pastTime := time.Now().Add(-1 * time.Second)
305-
sentMsg.retryAfter = &pastTime
306-
evSender.retrySentEvent(resID, sentMsg)
307-
}
308-
309-
// After max retries, event should be removed from sentEvents
310-
require.NotContains(t, evSender.sentEvents, resID)
311-
})
289+
// t.Run("should give up after max retries", func(t *testing.T) {
290+
// fs := &fakeStream{}
291+
// evSender := NewEventWriter("test", fs)
292+
293+
// ev := es.ApplicationEvent(Create, app1)
294+
// resID := createResourceID(app1.ObjectMeta)
295+
// evSender.Add(ev)
296+
297+
// // Send the event
298+
// evSender.sendEvent(resID)
299+
// sentMsg := evSender.sentEvents[resID]
300+
// require.NotNil(t, sentMsg)
301+
302+
// // Exhaust retries
303+
// for i := 0; i <= maxEventRetries; i++ {
304+
// pastTime := time.Now().Add(-1 * time.Second)
305+
// sentMsg.retryAfter = &pastTime
306+
// evSender.retrySentEvent(resID, sentMsg)
307+
// }
308+
309+
// // After max retries, event should be removed from sentEvents
310+
// require.NotContains(t, evSender.sentEvents, resID)
311+
// })
312312

313313
t.Run("should not send ACK events to sentEvents", func(t *testing.T) {
314314
fs := &fakeStream{}
@@ -477,6 +477,107 @@ func TestEventWriter(t *testing.T) {
477477
})
478478
}
479479

480+
func TestDeduplicateEventMessageItems(t *testing.T) {
481+
es := NewEventSource("dedupe-test")
482+
app := &v1alpha1.Application{
483+
ObjectMeta: metav1.ObjectMeta{
484+
Name: "app",
485+
Namespace: "ns",
486+
UID: "uid-a",
487+
},
488+
}
489+
490+
eventMsg := func(evType EventType, resourceVersion string) *eventMessage {
491+
app.ResourceVersion = resourceVersion
492+
return &eventMessage{event: es.ApplicationEvent(evType, app)}
493+
}
494+
495+
t.Run("empty slice", func(t *testing.T) {
496+
items := []*eventMessage{}
497+
deduplicateEventMessageItems(&items)
498+
require.Empty(t, items)
499+
})
500+
501+
t.Run("single spec update unchanged", func(t *testing.T) {
502+
ev := eventMsg(SpecUpdate, "1")
503+
items := []*eventMessage{ev}
504+
deduplicateEventMessageItems(&items)
505+
require.Len(t, items, 1)
506+
require.Equal(t, EventID(ev.event), EventID(items[0].event))
507+
})
508+
509+
t.Run("multiple spec updates keep only freshest", func(t *testing.T) {
510+
items := []*eventMessage{
511+
eventMsg(SpecUpdate, "1"),
512+
eventMsg(SpecUpdate, "2"),
513+
eventMsg(SpecUpdate, "3"),
514+
}
515+
deduplicateEventMessageItems(&items)
516+
require.Len(t, items, 1)
517+
require.Contains(t, EventID(items[0].event), "_3")
518+
})
519+
520+
t.Run("multiple status updates keep only freshest", func(t *testing.T) {
521+
items := []*eventMessage{
522+
eventMsg(StatusUpdate, "10"),
523+
eventMsg(StatusUpdate, "20"),
524+
}
525+
deduplicateEventMessageItems(&items)
526+
require.Len(t, items, 1)
527+
require.Contains(t, EventID(items[0].event), "_20")
528+
})
529+
530+
t.Run("spec and status updates both retained", func(t *testing.T) {
531+
items := []*eventMessage{
532+
eventMsg(SpecUpdate, "1"),
533+
eventMsg(StatusUpdate, "2"),
534+
}
535+
deduplicateEventMessageItems(&items)
536+
require.Len(t, items, 2)
537+
require.Equal(t, SpecUpdate.String(), items[0].event.Type())
538+
require.Equal(t, StatusUpdate.String(), items[1].event.Type())
539+
})
540+
541+
t.Run("interleaved create leaves creates alone but dedupes spec updates", func(t *testing.T) {
542+
items := []*eventMessage{
543+
eventMsg(SpecUpdate, "1"),
544+
eventMsg(Create, "2"),
545+
eventMsg(SpecUpdate, "3"),
546+
}
547+
deduplicateEventMessageItems(&items)
548+
require.Len(t, items, 2)
549+
require.Equal(t, Create.String(), items[0].event.Type())
550+
require.Equal(t, SpecUpdate.String(), items[1].event.Type())
551+
require.Contains(t, EventID(items[1].event), "_3")
552+
})
553+
554+
t.Run("does not dedupe event types other than spec or status update", func(t *testing.T) {
555+
items := []*eventMessage{
556+
eventMsg(Create, "1"),
557+
eventMsg(Create, "2"),
558+
}
559+
deduplicateEventMessageItems(&items)
560+
require.Len(t, items, 2)
561+
require.Contains(t, EventID(items[0].event), "_1")
562+
require.Contains(t, EventID(items[1].event), "_2")
563+
})
564+
565+
t.Run("keeps latest of each deduped type when interleaved", func(t *testing.T) {
566+
items := []*eventMessage{
567+
eventMsg(SpecUpdate, "1"),
568+
eventMsg(SpecUpdate, "2"),
569+
eventMsg(StatusUpdate, "3"),
570+
eventMsg(SpecUpdate, "4"),
571+
}
572+
deduplicateEventMessageItems(&items)
573+
require.Len(t, items, 2)
574+
require.Equal(t, StatusUpdate.String(), items[0].event.Type())
575+
require.Contains(t, EventID(items[0].event), "_3")
576+
require.Equal(t, SpecUpdate.String(), items[1].event.Type())
577+
require.Contains(t, EventID(items[1].event), "_4")
578+
})
579+
}
580+
480581
type fakeStream struct {
481582
mu sync.RWMutex
482583
events map[string][]string

0 commit comments

Comments
 (0)