11package e2e
22
33import (
4+ "context"
45 "fmt"
56 "strconv"
67 "time"
78
8- "github.com/jellydator/ttlcache/v2 "
9+ "github.com/jellydator/ttlcache/v3 "
910
1011 "go.uber.org/zap"
1112)
@@ -22,79 +23,78 @@ import (
2223type messageTracker struct {
2324 svc * Service
2425 logger * zap.Logger
25- cache * ttlcache.Cache
26+ cache * ttlcache.Cache [ string , * EndToEndMessage ]
2627}
2728
2829func newMessageTracker (svc * Service ) * messageTracker {
2930 defaultExpirationDuration := svc .config .Consumer .RoundtripSla
30- cache := ttlcache .NewCache ()
31- _ = cache .SetTTL (defaultExpirationDuration )
31+ cache := ttlcache.New [string , * EndToEndMessage ](
32+ ttlcache.WithTTL [string , * EndToEndMessage ](defaultExpirationDuration ),
33+ )
3234
3335 t := & messageTracker {
3436 svc : svc ,
3537 logger : svc .logger .Named ("message_tracker" ),
3638 cache : cache ,
3739 }
38- t .cache .SetExpirationReasonCallback (func (key string , reason ttlcache.EvictionReason , value interface {}) {
39- t .onMessageExpired (key , reason , value .(* EndToEndMessage ))
40+
41+ cache .OnEviction (func (ctx context.Context , reason ttlcache.EvictionReason , item * ttlcache.Item [string , * EndToEndMessage ]) {
42+ t .onMessageExpired (item .Key (), reason , item .Value ())
4043 })
4144
45+ // Start the cache's automatic cleanup
46+ go cache .Start ()
47+
4248 return t
4349}
4450
4551func (t * messageTracker ) addToTracker (msg * EndToEndMessage ) {
46- _ = t .cache .Set (msg .MessageID , msg )
52+ t .cache .Set (msg .MessageID , msg , ttlcache . DefaultTTL )
4753}
4854
4955// updateItemIfExists only updates a message if it still exists in the cache. The remaining time to live will not
5056// be refreshed.
51- // If it doesn't exist an ttlcache.ErrNotFound error will be returned.
57+ // If it doesn't exist an error will be returned.
5258//
5359//nolint:unused
5460func (t * messageTracker ) updateItemIfExists (msg * EndToEndMessage ) error {
55- _ , ttl , err := t .cache .GetWithTTL (msg .MessageID )
56- if err != nil {
57- if err == ttlcache .ErrNotFound {
58- return err
59- }
60- panic (err )
61+ item := t .cache .Get (msg .MessageID )
62+ if item == nil {
63+ return fmt .Errorf ("item not found" )
6164 }
6265
63- // Because the returned TTL is set to the original TTL duration (and not the remaining TTL) we have to calculate
64- // the remaining TTL now as we want to update the existing cache item without changing the remaining time to live.
65- expiryTimestamp := msg .creationTime ().Add (ttl )
66- remainingTTL := time .Until (expiryTimestamp )
66+ // Calculate the remaining TTL to preserve the original expiration time
67+ remainingTTL := time .Until (item .ExpiresAt ())
6768 if remainingTTL < 0 {
6869 // This entry should have been deleted already. Race condition.
69- return ttlcache . ErrNotFound
70+ return fmt . Errorf ( "item expired" )
7071 }
7172
72- err = t .cache .SetWithTTL (msg .MessageID , msg , remainingTTL )
73- if err != nil {
74- panic (err )
75- }
73+ // Set the updated message with the remaining TTL
74+ t .cache .Set (msg .MessageID , msg , remainingTTL )
7675
7776 return nil
7877}
7978
80- // removeFromTracker removes an entry from the cache. If the key does not exist it will return an ttlcache.ErrNotFound error.
79+ // removeFromTracker removes an entry from the cache. If the key does not exist it will return an error.
8180func (t * messageTracker ) removeFromTracker (messageID string ) error {
82- return t .cache .Remove (messageID )
81+ // Check if the item exists before trying to delete it
82+ if ! t .cache .Has (messageID ) {
83+ return fmt .Errorf ("item not found" )
84+ }
85+ t .cache .Delete (messageID )
86+ return nil
8387}
8488
8589func (t * messageTracker ) onMessageArrived (arrivedMessage * EndToEndMessage ) {
86- cm , err := t .cache .Get (arrivedMessage .MessageID )
87- if err != nil {
88- if err == ttlcache .ErrNotFound {
89- // message expired and was removed from the cache
90- // it arrived too late, nothing to do here...
91- return
92- } else {
93- panic (fmt .Errorf ("failed to get message from cache: %w" , err ))
94- }
90+ item := t .cache .Get (arrivedMessage .MessageID )
91+ if item == nil {
92+ // message expired and was removed from the cache
93+ // it arrived too late, nothing to do here...
94+ return
9595 }
9696
97- msg := cm .( * EndToEndMessage )
97+ msg := item . Value ( )
9898
9999 expireTime := msg .creationTime ().Add (t .svc .config .Consumer .RoundtripSla )
100100 isExpired := time .Now ().Before (expireTime )
@@ -116,17 +116,15 @@ func (t *messageTracker) onMessageArrived(arrivedMessage *EndToEndMessage) {
116116 t .svc .roundtripLatency .WithLabelValues (pID ).Observe (latency .Seconds ())
117117
118118 // Remove message from cache, so that we don't track it any longer and won't mark it as lost when the entry expires.
119- _ = t .cache .Remove (msg .MessageID )
119+ t .cache .Delete (msg .MessageID )
120120}
121121
122- func (t * messageTracker ) onMessageExpired (_ string , reason ttlcache.EvictionReason , value interface {} ) {
123- if reason == ttlcache .Removed {
122+ func (t * messageTracker ) onMessageExpired (key string , reason ttlcache.EvictionReason , msg * EndToEndMessage ) {
123+ if reason == ttlcache .EvictionReasonDeleted {
124124 // We are not interested in messages that have been removed by us!
125125 return
126126 }
127127
128- msg := value .(* EndToEndMessage )
129-
130128 created := msg .creationTime ()
131129 age := time .Since (created )
132130 t .svc .lostMessages .WithLabelValues (strconv .Itoa (msg .partition )).Inc ()
0 commit comments