Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"github.com/cloudhut/kminion/v2/logging"
"github.com/cloudhut/kminion/v2/minion"
"github.com/cloudhut/kminion/v2/prometheus"
"github.com/knadh/koanf"
"github.com/go-viper/mapstructure/v2"
"github.com/knadh/koanf/v2"
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/env"
"github.com/knadh/koanf/providers/env/v2"
"github.com/knadh/koanf/providers/file"
"github.com/mitchellh/mapstructure"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -86,16 +86,18 @@ func newConfig(logger *zap.Logger) (Config, error) {
return Config{}, err
}

err = k.Load(env.ProviderWithValue("", ".", func(s string, v string) (string, interface{}) {
key := strings.ReplaceAll(strings.ToLower(s), "_", ".")
// Check to exist if we have a configuration option already and see if it's a slice
// If there is a comma in the value, split the value into a slice by the comma.
if strings.Contains(v, ",") {
return key, strings.Split(v, ",")
}

// Otherwise return the new key with the unaltered value
return key, v
err = k.Load(env.Provider(".", env.Opt{
TransformFunc: func(s string, v string) (string, interface{}) {
key := strings.ReplaceAll(strings.ToLower(s), "_", ".")
// Check to exist if we have a configuration option already and see if it's a slice
// If there is a comma in the value, split the value into a slice by the comma.
if strings.Contains(v, ",") {
return key, strings.Split(v, ",")
}

// Otherwise return the new key with the unaltered value
return key, v
},
}), nil)
if err != nil {
return Config{}, err
Expand Down
78 changes: 38 additions & 40 deletions e2e/message_tracker.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package e2e

import (
"context"
"fmt"
"strconv"
"time"

"github.com/jellydator/ttlcache/v2"
"github.com/jellydator/ttlcache/v3"

"go.uber.org/zap"
)
Expand All @@ -22,79 +23,78 @@ import (
type messageTracker struct {
svc *Service
logger *zap.Logger
cache *ttlcache.Cache
cache *ttlcache.Cache[string, *EndToEndMessage]
}

func newMessageTracker(svc *Service) *messageTracker {
defaultExpirationDuration := svc.config.Consumer.RoundtripSla
cache := ttlcache.NewCache()
_ = cache.SetTTL(defaultExpirationDuration)
cache := ttlcache.New[string, *EndToEndMessage](
ttlcache.WithTTL[string, *EndToEndMessage](defaultExpirationDuration),
)

t := &messageTracker{
svc: svc,
logger: svc.logger.Named("message_tracker"),
cache: cache,
}
t.cache.SetExpirationReasonCallback(func(key string, reason ttlcache.EvictionReason, value interface{}) {
t.onMessageExpired(key, reason, value.(*EndToEndMessage))

cache.OnEviction(func(ctx context.Context, reason ttlcache.EvictionReason, item *ttlcache.Item[string, *EndToEndMessage]) {
t.onMessageExpired(item.Key(), reason, item.Value())
})

// Start the cache's automatic cleanup
go cache.Start()

return t
}

func (t *messageTracker) addToTracker(msg *EndToEndMessage) {
_ = t.cache.Set(msg.MessageID, msg)
t.cache.Set(msg.MessageID, msg, ttlcache.DefaultTTL)
}

// updateItemIfExists only updates a message if it still exists in the cache. The remaining time to live will not
// be refreshed.
// If it doesn't exist an ttlcache.ErrNotFound error will be returned.
// If it doesn't exist an error will be returned.
//
//nolint:unused
func (t *messageTracker) updateItemIfExists(msg *EndToEndMessage) error {
_, ttl, err := t.cache.GetWithTTL(msg.MessageID)
if err != nil {
if err == ttlcache.ErrNotFound {
return err
}
panic(err)
item := t.cache.Get(msg.MessageID)
if item == nil {
return fmt.Errorf("item not found")
}

// Because the returned TTL is set to the original TTL duration (and not the remaining TTL) we have to calculate
// the remaining TTL now as we want to update the existing cache item without changing the remaining time to live.
expiryTimestamp := msg.creationTime().Add(ttl)
remainingTTL := time.Until(expiryTimestamp)
// Calculate the remaining TTL to preserve the original expiration time
remainingTTL := time.Until(item.ExpiresAt())
if remainingTTL < 0 {
// This entry should have been deleted already. Race condition.
return ttlcache.ErrNotFound
return fmt.Errorf("item expired")
}

err = t.cache.SetWithTTL(msg.MessageID, msg, remainingTTL)
if err != nil {
panic(err)
}
// Set the updated message with the remaining TTL
t.cache.Set(msg.MessageID, msg, remainingTTL)

return nil
}

// removeFromTracker removes an entry from the cache. If the key does not exist it will return an ttlcache.ErrNotFound error.
// removeFromTracker removes an entry from the cache. If the key does not exist it will return an error.
func (t *messageTracker) removeFromTracker(messageID string) error {
return t.cache.Remove(messageID)
// Check if the item exists before trying to delete it
if !t.cache.Has(messageID) {
return fmt.Errorf("item not found")
}
t.cache.Delete(messageID)
return nil
}

func (t *messageTracker) onMessageArrived(arrivedMessage *EndToEndMessage) {
cm, err := t.cache.Get(arrivedMessage.MessageID)
if err != nil {
if err == ttlcache.ErrNotFound {
// message expired and was removed from the cache
// it arrived too late, nothing to do here...
return
} else {
panic(fmt.Errorf("failed to get message from cache: %w", err))
}
item := t.cache.Get(arrivedMessage.MessageID)
if item == nil {
// message expired and was removed from the cache
// it arrived too late, nothing to do here...
return
}

msg := cm.(*EndToEndMessage)
msg := item.Value()

expireTime := msg.creationTime().Add(t.svc.config.Consumer.RoundtripSla)
isExpired := time.Now().Before(expireTime)
Expand All @@ -116,17 +116,15 @@ func (t *messageTracker) onMessageArrived(arrivedMessage *EndToEndMessage) {
t.svc.roundtripLatency.WithLabelValues(pID).Observe(latency.Seconds())

// Remove message from cache, so that we don't track it any longer and won't mark it as lost when the entry expires.
_ = t.cache.Remove(msg.MessageID)
t.cache.Delete(msg.MessageID)
}

func (t *messageTracker) onMessageExpired(_ string, reason ttlcache.EvictionReason, value interface{}) {
if reason == ttlcache.Removed {
func (t *messageTracker) onMessageExpired(key string, reason ttlcache.EvictionReason, msg *EndToEndMessage) {
if reason == ttlcache.EvictionReasonDeleted {
// We are not interested in messages that have been removed by us!
return
}

msg := value.(*EndToEndMessage)

created := msg.creationTime()
age := time.Since(created)
t.svc.lostMessages.WithLabelValues(strconv.Itoa(msg.partition)).Inc()
Expand Down
18 changes: 11 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@ go 1.25

require (
github.com/adobe/ims-go v0.19.1
github.com/go-viper/mapstructure/v2 v2.4.0
github.com/google/uuid v1.6.0
github.com/jcmturner/gokrb5/v8 v8.4.4
github.com/jellydator/ttlcache/v2 v2.11.1
github.com/knadh/koanf v1.5.0
github.com/mitchellh/mapstructure v1.5.0
github.com/orcaman/concurrent-map v1.0.0
github.com/jellydator/ttlcache/v3 v3.4.0
github.com/knadh/koanf/parsers/yaml v1.1.0
github.com/knadh/koanf/providers/env/v2 v2.0.0
github.com/knadh/koanf/providers/file v1.2.0
github.com/knadh/koanf/v2 v2.3.0
github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.23.2
github.com/stretchr/testify v1.11.1
github.com/twmb/franz-go v1.19.5
github.com/twmb/franz-go/pkg/kadm v1.16.1
github.com/twmb/franz-go/pkg/kmsg v1.12.0
github.com/twmb/franz-go/pkg/kmsg v1.11.2
github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0
go.uber.org/atomic v1.11.0
go.uber.org/automaxprocs v1.6.0
Expand All @@ -27,25 +30,26 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.8.0 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/golang-jwt/jwt/v5 v5.3.0 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/knadh/koanf/maps v0.1.2 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pelletier/go-toml v1.9.1 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.43.0 // indirect
golang.org/x/net v0.46.0 // indirect
golang.org/x/sys v0.37.0 // indirect
Expand Down
Loading