Skip to content

Commit ba464a4

Browse files
authored
[extension/apmconfig]: replace sync.Map with freelru (#629)
* feat: replace sync.Map for freelru * test: add freelru test cases * replace fnv hash function with xxh benchmarking results when hashing a UUID v7: BenchmarkHashString_FNV1A-20 36.27 ns/op BenchmarkHashString_XXHASH-20 15.04 ns/op * set cache capacity to 1024 (2^N) * feat: make cache config configurable * fix: set default cache config in integration tests * feat: purge expired cache based on ttl
1 parent 2ed2347 commit ba464a4

File tree

8 files changed

+303
-87
lines changed

8 files changed

+303
-87
lines changed

extension/apmconfigextension/config.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,18 @@ type OpAMPConfig struct {
6060
// Protocols is the configuration for the supported protocols, currently
6161
// HTTP (TBD: websocket).
6262
Protocols `mapstructure:"protocols"`
63+
// Cache holds configuration related to agents caching
64+
Cache CacheConfig `mapstructure:"cache"`
65+
}
66+
67+
type CacheConfig struct {
68+
// Capacity defines the maximum number of agents to cache.
69+
// Once this is reached, the least recently
70+
// used entries will be evicted.
71+
Capacity uint32 `mapstructure:"capacity"`
72+
73+
// TTL defines the duration before the cache key gets evicted
74+
TTL time.Duration `mapstructure:"ttl"`
6375
}
6476

6577
// Protocols is the configuration for the supported protocols.

extension/apmconfigextension/extension.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,12 @@ func (op *apmConfigExtension) Start(ctx context.Context, host component.Host) er
6363
return err
6464
}
6565

66-
opampHandler, conContext, err := server.New(newLoggerFromZap(op.telemetrySettings.Logger)).Attach(server.Settings{Callbacks: *newRemoteConfigCallbacks(remoteConfigClient, op.telemetrySettings.Logger).Callbacks})
66+
opampCallbacks, err := newRemoteConfigCallbacks(ctx, remoteConfigClient, op.extensionConfig.OpAMP.Cache, op.telemetrySettings.Logger)
67+
if err != nil {
68+
return err
69+
}
70+
71+
opampHandler, conContext, err := server.New(newLoggerFromZap(op.telemetrySettings.Logger)).Attach(server.Settings{Callbacks: *opampCallbacks.Callbacks})
6772
if err != nil {
6873
return err
6974
}

extension/apmconfigextension/factory.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,14 @@ func NewFactory() extension.Factory {
4242
)
4343
}
4444

45+
var defaultCacheConfig = CacheConfig{
46+
// Cache capacity for active agents
47+
Capacity: 1024,
48+
// TTL for each cached agent entry (30s heartbeat interval)
49+
// Allows ~4 missed heartbeats before cache eviction
50+
TTL: 30 * 4 * time.Second,
51+
}
52+
4553
func createDefaultConfig() component.Config {
4654
defaultElasticSearchClient := configelasticsearch.NewDefaultClientConfig()
4755
httpCfg := confighttp.NewDefaultServerConfig()
@@ -60,6 +68,7 @@ func createDefaultConfig() component.Config {
6068
Protocols: Protocols{
6169
ServerConfig: &httpCfg,
6270
},
71+
Cache: defaultCacheConfig,
6372
},
6473
}
6574
}

extension/apmconfigextension/go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ module github.com/elastic/opentelemetry-collector-components/extension/apmconfig
33
go 1.23.8
44

55
require (
6+
github.com/cespare/xxhash v1.1.0
67
github.com/elastic/go-elasticsearch/v8 v8.18.0
8+
github.com/elastic/go-freelru v0.16.0
79
github.com/elastic/opentelemetry-collector-components/internal/testutil v0.0.0-20250613082151-282de5af1c9b
810
github.com/elastic/opentelemetry-lib v0.18.0
911
github.com/stretchr/testify v1.10.0

extension/apmconfigextension/go.sum

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,12 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOEl
66
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
77
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
88
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
9+
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
10+
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
911
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
1012
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
13+
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
14+
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
1115
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
1216
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
1317
github.com/containerd/platforms v0.2.1 h1:zvwtM3rz2YHPQsF2CHYM8+KtB5dvhISiXh5ZpSBQv6A=
@@ -34,6 +38,8 @@ github.com/elastic/elastic-transport-go/v8 v8.7.0 h1:OgTneVuXP2uip4BA658Xi6Hfw+P
3438
github.com/elastic/elastic-transport-go/v8 v8.7.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
3539
github.com/elastic/go-elasticsearch/v8 v8.18.0 h1:ANNq1h7DEiPUaALb8+5w3baQzaS08WfHV0DNzp0VG4M=
3640
github.com/elastic/go-elasticsearch/v8 v8.18.0/go.mod h1:WLqwXsJmQoYkoA9JBFeEwPkQhCfAZuUvfpdU/NvSSf0=
41+
github.com/elastic/go-freelru v0.16.0 h1:gG2HJ1WXN2tNl5/p40JS/l59HjvjRhjyAa+oFTRArYs=
42+
github.com/elastic/go-freelru v0.16.0/go.mod h1:bSdWT4M0lW79K8QbX6XY2heQYSCqD7THoYf82pT/H3I=
3743
github.com/elastic/opentelemetry-collector-components/internal/testutil v0.0.0-20250613082151-282de5af1c9b h1:NWuTKdMCJlU9ehRH8V0w1Kk1QI5Vn+9OcJWIO9wI+pE=
3844
github.com/elastic/opentelemetry-collector-components/internal/testutil v0.0.0-20250613082151-282de5af1c9b/go.mod h1:R1WWATZlmmkryuE5hLQNHj89rdPPi4ErobBZYx2LmGs=
3945
github.com/elastic/opentelemetry-lib v0.18.0 h1:LZqpQE++kt+0yovIjKeKRVc8gqURDegaqaF1saNuYwc=
@@ -134,6 +140,8 @@ github.com/shirou/gopsutil/v4 v4.25.1 h1:QSWkTc+fu9LTAWfkZwZ6j8MSUk4A2LV7rbH0Zqm
134140
github.com/shirou/gopsutil/v4 v4.25.1/go.mod h1:RoUCUpndaJFtT+2zsZzzmhvbfGoDCJ7nFXKJf8GqJbI=
135141
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
136142
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
143+
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
144+
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
137145
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
138146
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
139147
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=

extension/apmconfigextension/integration_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,7 @@ func apmConfigintegrationTest(name string) func(t *testing.T) {
450450
},
451451
},
452452
OpAMP: OpAMPConfig{
453+
Cache: defaultCacheConfig,
453454
Protocols: Protocols{
454455
ServerConfig: func() *confighttp.ServerConfig {
455456
httpCfg := confighttp.NewDefaultServerConfig()

extension/apmconfigextension/opamp_callbacks.go

Lines changed: 47 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ import (
2323
"errors"
2424
"fmt"
2525
"net/http"
26-
"sync"
26+
"time"
2727

28+
"github.com/cespare/xxhash"
29+
"github.com/elastic/go-freelru"
2830
"github.com/elastic/opentelemetry-collector-components/extension/apmconfigextension/apmconfig"
2931
"github.com/open-telemetry/opamp-go/protobufs"
3032
"github.com/open-telemetry/opamp-go/server/types"
@@ -35,8 +37,10 @@ type remoteConfigCallbacks struct {
3537
*types.Callbacks
3638
configClient apmconfig.RemoteConfigClient
3739

38-
agentState sync.Map
39-
logger *zap.Logger
40+
agentState freelru.Cache[string, *agentInfo]
41+
ttl time.Duration
42+
43+
logger *zap.Logger
4044
}
4145

4246
type agentInfo struct {
@@ -45,11 +49,35 @@ type agentInfo struct {
4549
lastConfigHash apmconfig.LastConfigHash
4650
}
4751

48-
func newRemoteConfigCallbacks(configClient apmconfig.RemoteConfigClient, logger *zap.Logger) *remoteConfigCallbacks {
52+
func newRemoteConfigCallbacks(ctx context.Context, configClient apmconfig.RemoteConfigClient, ttlConfig CacheConfig, logger *zap.Logger) (*remoteConfigCallbacks, error) {
53+
cache, err := freelru.NewSharded[string, *agentInfo](ttlConfig.Capacity, func(key string) uint32 {
54+
return uint32(xxhash.Sum64String(key))
55+
})
56+
if err != nil {
57+
return nil, err
58+
}
59+
cache.SetLifetime(ttlConfig.TTL)
60+
// Purge expired entries from the cache
61+
if ttlConfig.TTL > 0 {
62+
go func() {
63+
ticker := time.NewTicker(ttlConfig.TTL)
64+
defer ticker.Stop()
65+
for {
66+
select {
67+
case <-ctx.Done():
68+
return
69+
case <-ticker.C:
70+
cache.PurgeExpired()
71+
}
72+
}
73+
}()
74+
}
75+
4976
opampCallbacks := &remoteConfigCallbacks{
5077
configClient: configClient,
51-
agentState: sync.Map{},
78+
agentState: cache,
5279
logger: logger,
80+
ttl: ttlConfig.TTL,
5381
}
5482

5583
connectionCallbacks := types.ConnectionCallbacks{}
@@ -67,7 +95,7 @@ func newRemoteConfigCallbacks(configClient apmconfig.RemoteConfigClient, logger
6795
},
6896
}
6997

70-
return opampCallbacks
98+
return opampCallbacks, nil
7199
}
72100

73101
func (rc *remoteConfigCallbacks) serverError(msg string, message *protobufs.ServerToAgent, logFields ...zap.Field) *protobufs.ServerToAgent {
@@ -97,7 +125,7 @@ func (rc *remoteConfigCallbacks) onMessage(ctx context.Context, conn types.Conne
97125
agentUid := hex.EncodeToString(message.GetInstanceUid())
98126
if message.GetAgentDescription() != nil {
99127
// new description might lead to another remote configuration
100-
rc.agentState.Store(agentUid, agentInfo{
128+
_ = rc.agentState.Add(agentUid, &agentInfo{
101129
agentUid: message.GetInstanceUid(),
102130
identifyingAttributes: message.AgentDescription.IdentifyingAttributes,
103131
})
@@ -106,26 +134,26 @@ func (rc *remoteConfigCallbacks) onMessage(ctx context.Context, conn types.Conne
106134
agentUidField := zap.String("instance_uid", agentUid)
107135
if message.GetAgentDisconnect() != nil {
108136
rc.logger.Info("Disconnecting the agent from the remote configuration service", agentUidField)
109-
rc.agentState.Delete(agentUid)
137+
_ = rc.agentState.Remove(agentUid)
110138
return &serverToAgent
111139
}
112140

113-
loadedAgent, _ := rc.agentState.LoadOrStore(agentUid, agentInfo{
114-
agentUid: message.GetInstanceUid(),
115-
})
116-
agent, ok := loadedAgent.(agentInfo)
117-
if !ok {
118-
rc.logger.Warn("unexpected type in agentState cache", agentUidField)
119-
return rc.serverError("internal error: invalid agent state", &serverToAgent)
141+
loadedAgent, found := rc.agentState.GetAndRefresh(agentUid, rc.ttl)
142+
if !found {
143+
loadedAgent = &agentInfo{
144+
agentUid: message.InstanceUid,
145+
}
146+
_ = rc.agentState.Add(agentUid, loadedAgent)
120147
}
148+
121149
remoteConfigStatus := message.GetRemoteConfigStatus()
122150
if remoteConfigStatus != nil {
123-
agent.lastConfigHash = remoteConfigStatus.GetLastRemoteConfigHash()
124-
rc.logger.Info("Remote config status", agentUidField, zap.String("lastRemoteConfigHash", hex.EncodeToString(agent.lastConfigHash)), zap.String("status", remoteConfigStatus.GetStatus().String()), zap.String("errorMessage", remoteConfigStatus.ErrorMessage))
125-
rc.agentState.Store(agentUid, agent)
151+
loadedAgent.lastConfigHash = remoteConfigStatus.GetLastRemoteConfigHash()
152+
rc.logger.Info("Remote config status", agentUidField, zap.String("lastRemoteConfigHash", hex.EncodeToString(loadedAgent.lastConfigHash)), zap.String("status", remoteConfigStatus.GetStatus().String()), zap.String("errorMessage", remoteConfigStatus.ErrorMessage))
153+
rc.agentState.Add(agentUid, loadedAgent)
126154
}
127155

128-
remoteConfig, err := rc.configClient.RemoteConfig(ctx, agent.identifyingAttributes, agent.lastConfigHash)
156+
remoteConfig, err := rc.configClient.RemoteConfig(ctx, loadedAgent.identifyingAttributes, loadedAgent.lastConfigHash)
129157
if err != nil {
130158
// remote config client could not identify the agent
131159
if errors.Is(err, apmconfig.UnidentifiedAgent) {

0 commit comments

Comments
 (0)