Skip to content

Commit b81fb37

Browse files
committed
allow second credential watcher
1 parent 8c422cc commit b81fb37

File tree

4 files changed

+110
-64
lines changed

4 files changed

+110
-64
lines changed

internal/watcher/credentials/credential_watcher_service.go

Lines changed: 61 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ import (
1313
"sync/atomic"
1414
"time"
1515

16+
"github.com/nginx/agent/v3/internal/command"
17+
"github.com/nginx/agent/v3/internal/grpc"
18+
1619
"github.com/fsnotify/fsnotify"
1720
"github.com/nginx/agent/v3/internal/config"
1821
"github.com/nginx/agent/v3/internal/logger"
@@ -29,55 +32,74 @@ var emptyEvent = fsnotify.Event{
2932

3033
type CredentialUpdateMessage struct {
3134
CorrelationID slog.Attr
35+
Conn *grpc.GrpcConnection
36+
SeverType command.ServerType
3237
}
3338

3439
type CredentialWatcherService struct {
3540
agentConfig *config.Config
3641
watcher *fsnotify.Watcher
3742
filesBeingWatched *sync.Map
3843
filesChanged *atomic.Bool
44+
serverType command.ServerType
45+
watcherMutex sync.Mutex
3946
}
4047

41-
func NewCredentialWatcherService(agentConfig *config.Config) *CredentialWatcherService {
48+
func NewCredentialWatcherService(agentConfig *config.Config, serverType command.ServerType) *CredentialWatcherService {
4249
filesChanged := &atomic.Bool{}
4350
filesChanged.Store(false)
4451

4552
return &CredentialWatcherService{
4653
agentConfig: agentConfig,
4754
filesBeingWatched: &sync.Map{},
4855
filesChanged: filesChanged,
56+
serverType: serverType,
57+
watcherMutex: sync.Mutex{},
4958
}
5059
}
5160

5261
func (cws *CredentialWatcherService) Watch(ctx context.Context, ch chan<- CredentialUpdateMessage) {
53-
slog.DebugContext(ctx, "Starting credential watcher monitoring")
62+
newCtx := context.WithValue(
63+
ctx,
64+
logger.ServerTypeContextKey,
65+
slog.Any(logger.ServerTypeKey, cws.serverType.String()),
66+
)
67+
slog.DebugContext(newCtx, "Starting credential watcher monitoring")
5468

5569
ticker := time.NewTicker(monitoringInterval)
5670
watcher, err := fsnotify.NewWatcher()
5771
if err != nil {
58-
slog.ErrorContext(ctx, "Failed to create credential watcher", "error", err)
72+
slog.ErrorContext(newCtx, "Failed to create credential watcher", "error", err)
5973
return
6074
}
6175

6276
cws.watcher = watcher
6377

64-
cws.watchFiles(ctx, credentialPaths(cws.agentConfig))
78+
cws.watcherMutex.Lock()
79+
commandSever := cws.agentConfig.Command
80+
81+
if cws.serverType == command.Auxiliary {
82+
commandSever = cws.agentConfig.AuxiliaryCommand
83+
}
84+
85+
cws.watchFiles(newCtx, credentialPaths(commandSever))
86+
cws.watcherMutex.Unlock()
6587

6688
for {
6789
select {
68-
case <-ctx.Done():
90+
case <-newCtx.Done():
6991
closeError := cws.watcher.Close()
7092
if closeError != nil {
71-
slog.ErrorContext(ctx, "Unable to close credential watcher", "error", closeError)
93+
slog.ErrorContext(newCtx, "Unable to close credential watcher", "error", closeError)
7294
}
7395

7496
return
7597
case event := <-cws.watcher.Events:
76-
cws.handleEvent(ctx, event)
98+
cws.handleEvent(newCtx, event)
7799
case <-ticker.C:
78-
cws.checkForUpdates(ctx, ch)
100+
cws.checkForUpdates(newCtx, ch)
79101
case watcherError := <-cws.watcher.Errors:
80-
slog.ErrorContext(ctx, "Unexpected error in credential watcher", "error", watcherError)
102+
slog.ErrorContext(newCtx, "Unexpected error in credential watcher", "error", watcherError)
81103
}
82104
}
83105
}
@@ -146,31 +168,49 @@ func (cws *CredentialWatcherService) checkForUpdates(ctx context.Context, ch cha
146168
slog.Any(logger.CorrelationIDKey, logger.GenerateCorrelationID()),
147169
)
148170

171+
cws.watcherMutex.Lock()
172+
defer cws.watcherMutex.Unlock()
173+
174+
commandSever := cws.agentConfig.Command
175+
if cws.serverType == command.Auxiliary {
176+
commandSever = cws.agentConfig.AuxiliaryCommand
177+
}
178+
179+
conn, err := grpc.NewGrpcConnection(newCtx, cws.agentConfig, commandSever)
180+
if err != nil {
181+
slog.ErrorContext(newCtx, "Unable to create new grpc connection", "error", err)
182+
cws.filesChanged.Store(false)
183+
184+
return
185+
}
149186
slog.DebugContext(ctx, "Credential watcher has detected changes")
150-
ch <- CredentialUpdateMessage{CorrelationID: logger.CorrelationIDAttr(newCtx)}
187+
ch <- CredentialUpdateMessage{
188+
CorrelationID: logger.CorrelationIDAttr(newCtx),
189+
SeverType: cws.serverType, Conn: conn,
190+
}
151191
cws.filesChanged.Store(false)
152192
}
153193
}
154194

155-
func credentialPaths(agentConfig *config.Config) []string {
195+
func credentialPaths(agentConfig *config.Command) []string {
156196
var paths []string
157197

158-
if agentConfig.Command.Auth != nil {
159-
if agentConfig.Command.Auth.TokenPath != "" {
160-
paths = append(paths, agentConfig.Command.Auth.TokenPath)
198+
if agentConfig.Auth != nil {
199+
if agentConfig.Auth.TokenPath != "" {
200+
paths = append(paths, agentConfig.Auth.TokenPath)
161201
}
162202
}
163203

164204
// agent's tls certs
165-
if agentConfig.Command.TLS != nil {
166-
if agentConfig.Command.TLS.Ca != "" {
167-
paths = append(paths, agentConfig.Command.TLS.Ca)
205+
if agentConfig.TLS != nil {
206+
if agentConfig.TLS.Ca != "" {
207+
paths = append(paths, agentConfig.TLS.Ca)
168208
}
169-
if agentConfig.Command.TLS.Cert != "" {
170-
paths = append(paths, agentConfig.Command.TLS.Cert)
209+
if agentConfig.TLS.Cert != "" {
210+
paths = append(paths, agentConfig.TLS.Cert)
171211
}
172-
if agentConfig.Command.TLS.Key != "" {
173-
paths = append(paths, agentConfig.Command.TLS.Key)
212+
if agentConfig.TLS.Key != "" {
213+
paths = append(paths, agentConfig.TLS.Key)
174214
}
175215
}
176216

internal/watcher/credentials/credential_watcher_service_test.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"testing"
1414
"time"
1515

16+
"github.com/nginx/agent/v3/internal/command"
17+
1618
"github.com/nginx/agent/v3/internal/config"
1719

1820
"github.com/fsnotify/fsnotify"
@@ -22,15 +24,15 @@ import (
2224
)
2325

2426
func TestCredentialWatcherService_TestNewCredentialWatcherService(t *testing.T) {
25-
credentialWatcherService := NewCredentialWatcherService(types.AgentConfig())
27+
credentialWatcherService := NewCredentialWatcherService(types.AgentConfig(), command.Command)
2628

2729
assert.Empty(t, credentialWatcherService.filesBeingWatched)
2830
assert.False(t, credentialWatcherService.filesChanged.Load())
2931
}
3032

3133
func TestCredentialWatcherService_Watch(t *testing.T) {
3234
ctx := context.Background()
33-
cws := NewCredentialWatcherService(types.AgentConfig())
35+
cws := NewCredentialWatcherService(types.AgentConfig(), command.Command)
3436
watcher, err := fsnotify.NewWatcher()
3537
require.NoError(t, err)
3638
cws.watcher = watcher
@@ -61,7 +63,7 @@ func TestCredentialWatcherService_Watch(t *testing.T) {
6163
}
6264

6365
func TestCredentialWatcherService_isWatching(t *testing.T) {
64-
cws := NewCredentialWatcherService(types.AgentConfig())
66+
cws := NewCredentialWatcherService(types.AgentConfig(), command.Command)
6567
assert.False(t, cws.isWatching("test-file"))
6668
cws.filesBeingWatched.Store("test-file", true)
6769
assert.True(t, cws.isWatching("test-file"))
@@ -80,7 +82,7 @@ func TestCredentialWatcherService_isEventSkippable(t *testing.T) {
8082

8183
func TestCredentialWatcherService_addWatcher(t *testing.T) {
8284
ctx := context.Background()
83-
cws := NewCredentialWatcherService(types.AgentConfig())
85+
cws := NewCredentialWatcherService(types.AgentConfig(), command.Command)
8486
watcher, err := fsnotify.NewWatcher()
8587
require.NoError(t, err)
8688
cws.watcher = watcher
@@ -105,7 +107,7 @@ func TestCredentialWatcherService_watchFiles(t *testing.T) {
105107
var files []string
106108

107109
ctx := context.Background()
108-
cws := NewCredentialWatcherService(types.AgentConfig())
110+
cws := NewCredentialWatcherService(types.AgentConfig(), command.Command)
109111
watcher, err := fsnotify.NewWatcher()
110112
require.NoError(t, err)
111113
cws.watcher = watcher
@@ -137,7 +139,7 @@ func TestCredentialWatcherService_watchFiles(t *testing.T) {
137139

138140
func TestCredentialWatcherService_checkForUpdates(t *testing.T) {
139141
ctx := context.Background()
140-
cws := NewCredentialWatcherService(types.AgentConfig())
142+
cws := NewCredentialWatcherService(types.AgentConfig(), command.Command)
141143
watcher, err := fsnotify.NewWatcher()
142144
require.NoError(t, err)
143145
cws.watcher = watcher
@@ -164,7 +166,7 @@ func TestCredentialWatcherService_checkForUpdates(t *testing.T) {
164166

165167
func TestCredentialWatcherService_handleEvent(t *testing.T) {
166168
ctx := context.Background()
167-
cws := NewCredentialWatcherService(types.AgentConfig())
169+
cws := NewCredentialWatcherService(types.AgentConfig(), command.Command)
168170
watcher, err := fsnotify.NewWatcher()
169171
require.NoError(t, err)
170172
cws.watcher = watcher
@@ -232,7 +234,7 @@ func Test_credentialPaths(t *testing.T) {
232234
}
233235
for _, tt := range tests {
234236
t.Run(tt.name, func(t *testing.T) {
235-
assert.Equalf(t, tt.want, credentialPaths(tt.agentConfig), "credentialPaths(%v)", tt.agentConfig)
237+
assert.Equalf(t, tt.want, credentialPaths(tt.agentConfig.Command), "credentialPaths(%v)", tt.agentConfig)
236238
})
237239
}
238240
}

internal/watcher/watcher_plugin.go

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ import (
1111
"slices"
1212
"sync"
1313

14-
"github.com/nginx/agent/v3/internal/model"
14+
"github.com/nginx/agent/v3/internal/command"
1515

16-
"github.com/nginx/agent/v3/internal/grpc"
16+
"github.com/nginx/agent/v3/internal/model"
1717

1818
"github.com/nginx/agent/v3/internal/watcher/credentials"
1919

@@ -40,12 +40,14 @@ type (
4040
instanceWatcherService instanceWatcherServiceInterface
4141
healthWatcherService *health.HealthWatcherService
4242
fileWatcherService *file.FileWatcherService
43-
credentialWatcherService credentialWatcherServiceInterface
43+
commandCredentialWatcherService credentialWatcherServiceInterface
44+
auxiliaryCredentialWatcherService credentialWatcherServiceInterface
4445
instanceUpdatesChannel chan instance.InstanceUpdatesMessage
4546
nginxConfigContextChannel chan instance.NginxConfigContextMessage
4647
instanceHealthChannel chan health.InstanceHealthMessage
4748
fileUpdatesChannel chan file.FileUpdateMessage
48-
credentialUpdatesChannel chan credentials.CredentialUpdateMessage
49+
commandCredentialUpdatesChannel chan credentials.CredentialUpdateMessage
50+
auxiliaryCredentialUpdatesChannel chan credentials.CredentialUpdateMessage
4951
cancel context.CancelFunc
5052
instancesWithConfigApplyInProgress []string
5153
watcherMutex sync.Mutex
@@ -78,12 +80,14 @@ func NewWatcher(agentConfig *config.Config) *Watcher {
7880
instanceWatcherService: instance.NewInstanceWatcherService(agentConfig),
7981
healthWatcherService: health.NewHealthWatcherService(agentConfig),
8082
fileWatcherService: file.NewFileWatcherService(agentConfig),
81-
credentialWatcherService: credentials.NewCredentialWatcherService(agentConfig),
83+
commandCredentialWatcherService: credentials.NewCredentialWatcherService(agentConfig, command.Command),
84+
auxiliaryCredentialWatcherService: credentials.NewCredentialWatcherService(agentConfig, command.Auxiliary),
8285
instanceUpdatesChannel: make(chan instance.InstanceUpdatesMessage),
8386
nginxConfigContextChannel: make(chan instance.NginxConfigContextMessage),
8487
instanceHealthChannel: make(chan health.InstanceHealthMessage),
8588
fileUpdatesChannel: make(chan file.FileUpdateMessage),
86-
credentialUpdatesChannel: make(chan credentials.CredentialUpdateMessage),
89+
commandCredentialUpdatesChannel: make(chan credentials.CredentialUpdateMessage),
90+
auxiliaryCredentialUpdatesChannel: make(chan credentials.CredentialUpdateMessage),
8791
instancesWithConfigApplyInProgress: []string{},
8892
watcherMutex: sync.Mutex{},
8993
}
@@ -100,7 +104,11 @@ func (w *Watcher) Init(ctx context.Context, messagePipe bus.MessagePipeInterface
100104

101105
go w.instanceWatcherService.Watch(watcherContext, w.instanceUpdatesChannel, w.nginxConfigContextChannel)
102106
go w.healthWatcherService.Watch(watcherContext, w.instanceHealthChannel)
103-
go w.credentialWatcherService.Watch(watcherContext, w.credentialUpdatesChannel)
107+
go w.commandCredentialWatcherService.Watch(watcherContext, w.commandCredentialUpdatesChannel)
108+
109+
if w.agentConfig.AuxiliaryCommand != nil {
110+
go w.auxiliaryCredentialWatcherService.Watch(watcherContext, w.auxiliaryCredentialUpdatesChannel)
111+
}
104112

105113
if w.agentConfig.IsFeatureEnabled(pkgConfig.FeatureFileWatcher) {
106114
go w.fileWatcherService.Watch(watcherContext, w.fileUpdatesChannel)
@@ -132,8 +140,6 @@ func (*Watcher) Info() *bus.Info {
132140

133141
func (w *Watcher) Process(ctx context.Context, msg *bus.Message) {
134142
switch msg.Topic {
135-
case bus.CredentialUpdatedTopic:
136-
w.handleCredentialUpdate(ctx)
137143
case bus.ConfigApplyRequestTopic:
138144
w.handleConfigApplyRequest(ctx, msg)
139145
case bus.ConfigApplySuccessfulTopic:
@@ -149,7 +155,6 @@ func (w *Watcher) Process(ctx context.Context, msg *bus.Message) {
149155

150156
func (*Watcher) Subscriptions() []string {
151157
return []string{
152-
bus.CredentialUpdatedTopic,
153158
bus.ConfigApplyRequestTopic,
154159
bus.ConfigApplySuccessfulTopic,
155160
bus.ConfigApplyCompleteTopic,
@@ -248,34 +253,29 @@ func (w *Watcher) handleConfigApplyComplete(ctx context.Context, msg *bus.Messag
248253
w.fileWatcherService.SetEnabled(true)
249254
}
250255

251-
func (w *Watcher) handleCredentialUpdate(ctx context.Context) {
252-
slog.DebugContext(ctx, "Watcher plugin received credential update message")
253-
254-
w.watcherMutex.Lock()
255-
// This will be changed/moved during the credential watcher PR
256-
conn, err := grpc.NewGrpcConnection(ctx, w.agentConfig, w.agentConfig.Command)
257-
if err != nil {
258-
slog.ErrorContext(ctx, "Unable to create new grpc connection", "error", err)
259-
w.watcherMutex.Unlock()
260-
261-
return
262-
}
263-
w.watcherMutex.Unlock()
264-
w.messagePipe.Process(ctx, &bus.Message{
265-
Topic: bus.ConnectionResetTopic, Data: conn,
266-
})
267-
}
268-
269256
func (w *Watcher) monitorWatchers(ctx context.Context) {
270257
for {
271258
select {
272259
case <-ctx.Done():
273260
return
274-
case message := <-w.credentialUpdatesChannel:
275-
slog.DebugContext(ctx, "Received credential update event")
276-
newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID)
261+
case message := <-w.commandCredentialUpdatesChannel:
262+
slog.DebugContext(ctx, "Received credential update event for command server")
263+
newCtx := context.WithValue(context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID),
264+
logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey,
265+
message.SeverType.String()))
266+
267+
w.messagePipe.Process(newCtx, &bus.Message{
268+
Topic: bus.ConnectionResetTopic, Data: message.Conn,
269+
})
270+
271+
case message := <-w.auxiliaryCredentialUpdatesChannel:
272+
slog.DebugContext(ctx, "Received credential update event for auxiliary command server")
273+
newCtx := context.WithValue(context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID),
274+
logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey,
275+
message.SeverType.String()))
276+
277277
w.messagePipe.Process(newCtx, &bus.Message{
278-
Topic: bus.CredentialUpdatedTopic, Data: nil,
278+
Topic: bus.ConnectionResetTopic, Data: message.Conn,
279279
})
280280
case message := <-w.instanceUpdatesChannel:
281281
newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID)

0 commit comments

Comments
 (0)