Skip to content

Commit fc0b027

Browse files
renovate[bot]amuraru
andauthored
fix(deps): update go modules (major) (#20)
Co-authored-by: amuraru <[email protected]>
1 parent 5123a6b commit fc0b027

File tree

5 files changed

+101
-443
lines changed

5 files changed

+101
-443
lines changed

config.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@ import (
99
"github.com/cloudhut/kminion/v2/logging"
1010
"github.com/cloudhut/kminion/v2/minion"
1111
"github.com/cloudhut/kminion/v2/prometheus"
12-
"github.com/knadh/koanf"
12+
"github.com/go-viper/mapstructure/v2"
13+
"github.com/knadh/koanf/v2"
1314
"github.com/knadh/koanf/parsers/yaml"
14-
"github.com/knadh/koanf/providers/env"
15+
"github.com/knadh/koanf/providers/env/v2"
1516
"github.com/knadh/koanf/providers/file"
16-
"github.com/mitchellh/mapstructure"
1717
"go.uber.org/zap"
1818
)
1919

@@ -86,16 +86,18 @@ func newConfig(logger *zap.Logger) (Config, error) {
8686
return Config{}, err
8787
}
8888

89-
err = k.Load(env.ProviderWithValue("", ".", func(s string, v string) (string, interface{}) {
90-
key := strings.ReplaceAll(strings.ToLower(s), "_", ".")
91-
// Check to exist if we have a configuration option already and see if it's a slice
92-
// If there is a comma in the value, split the value into a slice by the comma.
93-
if strings.Contains(v, ",") {
94-
return key, strings.Split(v, ",")
95-
}
96-
97-
// Otherwise return the new key with the unaltered value
98-
return key, v
89+
err = k.Load(env.Provider(".", env.Opt{
90+
TransformFunc: func(s string, v string) (string, interface{}) {
91+
key := strings.ReplaceAll(strings.ToLower(s), "_", ".")
92+
// Check to exist if we have a configuration option already and see if it's a slice
93+
// If there is a comma in the value, split the value into a slice by the comma.
94+
if strings.Contains(v, ",") {
95+
return key, strings.Split(v, ",")
96+
}
97+
98+
// Otherwise return the new key with the unaltered value
99+
return key, v
100+
},
99101
}), nil)
100102
if err != nil {
101103
return Config{}, err

e2e/message_tracker.go

Lines changed: 38 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package e2e
22

33
import (
4+
"context"
45
"fmt"
56
"strconv"
67
"time"
78

8-
"github.com/jellydator/ttlcache/v2"
9+
"github.com/jellydator/ttlcache/v3"
910

1011
"go.uber.org/zap"
1112
)
@@ -22,79 +23,78 @@ import (
2223
type messageTracker struct {
2324
svc *Service
2425
logger *zap.Logger
25-
cache *ttlcache.Cache
26+
cache *ttlcache.Cache[string, *EndToEndMessage]
2627
}
2728

2829
func newMessageTracker(svc *Service) *messageTracker {
2930
defaultExpirationDuration := svc.config.Consumer.RoundtripSla
30-
cache := ttlcache.NewCache()
31-
_ = cache.SetTTL(defaultExpirationDuration)
31+
cache := ttlcache.New[string, *EndToEndMessage](
32+
ttlcache.WithTTL[string, *EndToEndMessage](defaultExpirationDuration),
33+
)
3234

3335
t := &messageTracker{
3436
svc: svc,
3537
logger: svc.logger.Named("message_tracker"),
3638
cache: cache,
3739
}
38-
t.cache.SetExpirationReasonCallback(func(key string, reason ttlcache.EvictionReason, value interface{}) {
39-
t.onMessageExpired(key, reason, value.(*EndToEndMessage))
40+
41+
cache.OnEviction(func(ctx context.Context, reason ttlcache.EvictionReason, item *ttlcache.Item[string, *EndToEndMessage]) {
42+
t.onMessageExpired(item.Key(), reason, item.Value())
4043
})
4144

45+
// Start the cache's automatic cleanup
46+
go cache.Start()
47+
4248
return t
4349
}
4450

4551
func (t *messageTracker) addToTracker(msg *EndToEndMessage) {
46-
_ = t.cache.Set(msg.MessageID, msg)
52+
t.cache.Set(msg.MessageID, msg, ttlcache.DefaultTTL)
4753
}
4854

4955
// updateItemIfExists only updates a message if it still exists in the cache. The remaining time to live will not
5056
// be refreshed.
51-
// If it doesn't exist an ttlcache.ErrNotFound error will be returned.
57+
// If it doesn't exist an error will be returned.
5258
//
5359
//nolint:unused
5460
func (t *messageTracker) updateItemIfExists(msg *EndToEndMessage) error {
55-
_, ttl, err := t.cache.GetWithTTL(msg.MessageID)
56-
if err != nil {
57-
if err == ttlcache.ErrNotFound {
58-
return err
59-
}
60-
panic(err)
61+
item := t.cache.Get(msg.MessageID)
62+
if item == nil {
63+
return fmt.Errorf("item not found")
6164
}
6265

63-
// Because the returned TTL is set to the original TTL duration (and not the remaining TTL) we have to calculate
64-
// the remaining TTL now as we want to update the existing cache item without changing the remaining time to live.
65-
expiryTimestamp := msg.creationTime().Add(ttl)
66-
remainingTTL := time.Until(expiryTimestamp)
66+
// Calculate the remaining TTL to preserve the original expiration time
67+
remainingTTL := time.Until(item.ExpiresAt())
6768
if remainingTTL < 0 {
6869
// This entry should have been deleted already. Race condition.
69-
return ttlcache.ErrNotFound
70+
return fmt.Errorf("item expired")
7071
}
7172

72-
err = t.cache.SetWithTTL(msg.MessageID, msg, remainingTTL)
73-
if err != nil {
74-
panic(err)
75-
}
73+
// Set the updated message with the remaining TTL
74+
t.cache.Set(msg.MessageID, msg, remainingTTL)
7675

7776
return nil
7877
}
7978

80-
// removeFromTracker removes an entry from the cache. If the key does not exist it will return an ttlcache.ErrNotFound error.
79+
// removeFromTracker removes an entry from the cache. If the key does not exist it will return an error.
8180
func (t *messageTracker) removeFromTracker(messageID string) error {
82-
return t.cache.Remove(messageID)
81+
// Check if the item exists before trying to delete it
82+
if !t.cache.Has(messageID) {
83+
return fmt.Errorf("item not found")
84+
}
85+
t.cache.Delete(messageID)
86+
return nil
8387
}
8488

8589
func (t *messageTracker) onMessageArrived(arrivedMessage *EndToEndMessage) {
86-
cm, err := t.cache.Get(arrivedMessage.MessageID)
87-
if err != nil {
88-
if err == ttlcache.ErrNotFound {
89-
// message expired and was removed from the cache
90-
// it arrived too late, nothing to do here...
91-
return
92-
} else {
93-
panic(fmt.Errorf("failed to get message from cache: %w", err))
94-
}
90+
item := t.cache.Get(arrivedMessage.MessageID)
91+
if item == nil {
92+
// message expired and was removed from the cache
93+
// it arrived too late, nothing to do here...
94+
return
9595
}
9696

97-
msg := cm.(*EndToEndMessage)
97+
msg := item.Value()
9898

9999
expireTime := msg.creationTime().Add(t.svc.config.Consumer.RoundtripSla)
100100
isExpired := time.Now().Before(expireTime)
@@ -116,17 +116,15 @@ func (t *messageTracker) onMessageArrived(arrivedMessage *EndToEndMessage) {
116116
t.svc.roundtripLatency.WithLabelValues(pID).Observe(latency.Seconds())
117117

118118
// Remove message from cache, so that we don't track it any longer and won't mark it as lost when the entry expires.
119-
_ = t.cache.Remove(msg.MessageID)
119+
t.cache.Delete(msg.MessageID)
120120
}
121121

122-
func (t *messageTracker) onMessageExpired(_ string, reason ttlcache.EvictionReason, value interface{}) {
123-
if reason == ttlcache.Removed {
122+
func (t *messageTracker) onMessageExpired(key string, reason ttlcache.EvictionReason, msg *EndToEndMessage) {
123+
if reason == ttlcache.EvictionReasonDeleted {
124124
// We are not interested in messages that have been removed by us!
125125
return
126126
}
127127

128-
msg := value.(*EndToEndMessage)
129-
130128
created := msg.creationTime()
131129
age := time.Since(created)
132130
t.svc.lostMessages.WithLabelValues(strconv.Itoa(msg.partition)).Inc()

go.mod

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,21 @@ go 1.25
44

55
require (
66
github.com/adobe/ims-go v0.19.1
7+
github.com/go-viper/mapstructure/v2 v2.4.0
78
github.com/google/uuid v1.6.0
89
github.com/jcmturner/gokrb5/v8 v8.4.4
9-
github.com/jellydator/ttlcache/v2 v2.11.1
10-
github.com/knadh/koanf v1.5.0
11-
github.com/mitchellh/mapstructure v1.5.0
12-
github.com/orcaman/concurrent-map v1.0.0
10+
github.com/jellydator/ttlcache/v3 v3.4.0
11+
github.com/knadh/koanf/parsers/yaml v1.1.0
12+
github.com/knadh/koanf/providers/env/v2 v2.0.0
13+
github.com/knadh/koanf/providers/file v1.2.0
14+
github.com/knadh/koanf/v2 v2.3.0
15+
github.com/orcaman/concurrent-map/v2 v2.0.1
1316
github.com/pkg/errors v0.9.1
1417
github.com/prometheus/client_golang v1.23.2
1518
github.com/stretchr/testify v1.11.1
1619
github.com/twmb/franz-go v1.19.5
1720
github.com/twmb/franz-go/pkg/kadm v1.16.1
18-
github.com/twmb/franz-go/pkg/kmsg v1.12.0
21+
github.com/twmb/franz-go/pkg/kmsg v1.11.2
1922
github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0
2023
go.uber.org/atomic v1.11.0
2124
go.uber.org/automaxprocs v1.6.0
@@ -27,25 +30,26 @@ require (
2730
github.com/beorn7/perks v1.0.1 // indirect
2831
github.com/cespare/xxhash/v2 v2.3.0 // indirect
2932
github.com/davecgh/go-spew v1.1.1 // indirect
30-
github.com/fsnotify/fsnotify v1.8.0 // indirect
33+
github.com/fsnotify/fsnotify v1.9.0 // indirect
3134
github.com/golang-jwt/jwt/v5 v5.3.0 // indirect
3235
github.com/hashicorp/go-uuid v1.0.3 // indirect
3336
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
3437
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
3538
github.com/jcmturner/gofork v1.7.6 // indirect
3639
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
3740
github.com/klauspost/compress v1.18.0 // indirect
41+
github.com/knadh/koanf/maps v0.1.2 // indirect
3842
github.com/mitchellh/copystructure v1.2.0 // indirect
3943
github.com/mitchellh/reflectwalk v1.0.2 // indirect
4044
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
41-
github.com/pelletier/go-toml v1.9.1 // indirect
4245
github.com/pierrec/lz4/v4 v4.1.22 // indirect
4346
github.com/pmezard/go-difflib v1.0.0 // indirect
4447
github.com/prometheus/client_model v0.6.2 // indirect
4548
github.com/prometheus/common v0.66.1 // indirect
4649
github.com/prometheus/procfs v0.16.1 // indirect
4750
go.uber.org/multierr v1.11.0 // indirect
4851
go.yaml.in/yaml/v2 v2.4.2 // indirect
52+
go.yaml.in/yaml/v3 v3.0.4 // indirect
4953
golang.org/x/crypto v0.43.0 // indirect
5054
golang.org/x/net v0.46.0 // indirect
5155
golang.org/x/sys v0.37.0 // indirect

0 commit comments

Comments
 (0)