Skip to content

Commit 4caf164

Browse files
sean-breenaphralGdhurleyryepup
authored
Add token file watcher (#979)
* support reading token from file via config * remove empty file * simplify token validation and add unit tests * add unit tests for transport credentials funtions * address PR feedback * proto updates * fix function name * fix lint error: lll * add missing PR feedback * remove error log message * fix unit test * Fix apk test package naming (#961) * modify alpine package name: nginx-agent-3.0.0_1234 -> nginx-agent-3.0.0.1234 * protoc-gen update * Update agent config defaults and format (#959) * update config defaults and format * Add config apply request queue (#949) * add unit tests for transport credentials funtions * fix test name * fix error message * fix lint error: lll * modify error messages * remove error logging and modify messages * fall back to token field if error occurs when reading file * fix bad merge * restarting gRPC conn * remove code from testing * fix lint errors: whitespace, revive * add new topic for handling Token file updates * add CredentialWatcherService * adding initial watcher for credential files * trigger connection reset after credential update * added ConnectionResetTopic and event processing * Automatically add token-path to Credential watcher * add function to check credential paths defined in agent config * fix lint * use tokenpath as config option, fixes problem with cli param parsing * add CredentialWatcherService + tests * fix lint errors * updates to generated files * Send create connection after disconnect from management plane (#967) * correct yaml key in AuthConfig * fix: flaky test (#968) `TestConvertToStructs` was occasionally failing because it was expecting `range` over a `map` to be a consistent order, but per [spec]: > The iteration order over maps is not specified and is not guaranteed to be the same from one iteration to the next. Uses `ElementsMatch` so the test passes even when the order of elements is different. [spec]: https://go.dev/ref/spec#RangeClause * update tests * update tests * wait for create connection * PR feedback * fix race condition * clean up * clean up * update grpc connection in command and file plugins * move log message and fix file_plugin_test.go * fix lint * handle command and file service client updates after grpc reset * update FakeCommandService * add unit tests * fix bad test * formatting * remove test * increase timeout before checking connection after restart * set isConnected to false when handling subscribe errors * increase code coverage * lint fix * fieldalignment * remove unused fake * PR feedback * update fake command service * more PR feedback * PR feedback: disable watcher during config apply * don't pause credential watcher during config apply * debug log messages * Add mutex around updating client in the CommandService * undo change to metric name * update test * lock watcher when replacing grpc connection * add lock around subscibe client access * fix lint error * unlock watcher mutex in error case * return with no blank line * lint error ? * handle rename cases * remove unneccessary cases from switch * rewrite switch as if * handling kubernetes secret update case * fix test * save connection status when resetting file manager service --------- Co-authored-by: aphralG <108004222+aphralG@users.noreply.github.com> Co-authored-by: Donal Hurley <djhurley1990@gmail.com> Co-authored-by: Ryan Davis <ry.davis@f5.com> Co-authored-by: Aphral Griffin <a.griffin@f5.com>
1 parent 290c6e8 commit 4caf164

20 files changed

+728
-102
lines changed

internal/bus/topics.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ const (
1515
ConfigUploadRequestTopic = "config-upload-request"
1616
DataPlaneResponseTopic = "data-plane-response"
1717
ConnectionCreatedTopic = "connection-created"
18+
CredentialUpdatedTopic = "credential-updated"
19+
ConnectionResetTopic = "connection-reset"
1820
ConfigApplyRequestTopic = "config-apply-request"
1921
WriteConfigSuccessfulTopic = "write-config-successful"
2022
ConfigApplySuccessfulTopic = "config-apply-successful"

internal/command/command_plugin.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type (
3131
UpdateDataPlaneStatus(ctx context.Context, resource *mpi.Resource) error
3232
UpdateDataPlaneHealth(ctx context.Context, instanceHealths []*mpi.InstanceHealth) error
3333
SendDataPlaneResponse(ctx context.Context, response *mpi.DataPlaneResponse) error
34+
UpdateClient(client mpi.CommandServiceClient)
3435
Subscribe(ctx context.Context)
3536
IsConnected() bool
3637
CreateConnection(ctx context.Context, resource *mpi.Resource) (*mpi.CreateConnectionResponse, error)
@@ -86,6 +87,8 @@ func (cp *CommandPlugin) Info() *bus.Info {
8687

8788
func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) {
8889
switch msg.Topic {
90+
case bus.ConnectionResetTopic:
91+
cp.processConnectionReset(ctx, msg)
8992
case bus.ResourceUpdateTopic:
9093
cp.processResourceUpdate(ctx, msg)
9194
case bus.InstanceHealthTopic:
@@ -172,8 +175,22 @@ func (cp *CommandPlugin) processDataPlaneResponse(ctx context.Context, msg *bus.
172175
}
173176
}
174177

178+
func (cp *CommandPlugin) processConnectionReset(ctx context.Context, msg *bus.Message) {
179+
slog.DebugContext(ctx, "Command plugin received connection reset")
180+
if newConnection, ok := msg.Data.(grpc.GrpcConnectionInterface); ok {
181+
err := cp.conn.Close(ctx)
182+
if err != nil {
183+
slog.ErrorContext(ctx, "Command plugin: unable to close connection", "error", err)
184+
}
185+
cp.conn = newConnection
186+
cp.commandService.UpdateClient(cp.conn.CommandServiceClient())
187+
slog.DebugContext(ctx, "Command service client reset successfully")
188+
}
189+
}
190+
175191
func (cp *CommandPlugin) Subscriptions() []string {
176192
return []string{
193+
bus.ConnectionResetTopic,
177194
bus.ResourceUpdateTopic,
178195
bus.InstanceHealthTopic,
179196
bus.DataPlaneHealthResponseTopic,

internal/command/command_plugin_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ func TestCommandPlugin_Subscriptions(t *testing.T) {
4545
assert.Equal(
4646
t,
4747
[]string{
48+
bus.ConnectionResetTopic,
4849
bus.ResourceUpdateTopic,
4950
bus.InstanceHealthTopic,
5051
bus.DataPlaneHealthResponseTopic,
@@ -142,6 +143,12 @@ func TestCommandPlugin_Process(t *testing.T) {
142143
})
143144
require.Equal(t, 1, fakeCommandService.UpdateDataPlaneHealthCallCount())
144145
require.Equal(t, 1, fakeCommandService.SendDataPlaneResponseCallCount())
146+
147+
commandPlugin.Process(ctx, &bus.Message{
148+
Topic: bus.ConnectionResetTopic,
149+
Data: commandPlugin.conn,
150+
})
151+
require.Equal(t, 1, fakeCommandService.UpdateClientCallCount())
145152
}
146153

147154
func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) {

internal/command/command_service.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,13 @@ func (cs *CommandService) UpdateDataPlaneStatus(
9898
sendDataPlaneStatus := func() (*mpi.UpdateDataPlaneStatusResponse, error) {
9999
slog.DebugContext(ctx, "Sending data plane status update request", "request", request,
100100
"parent_correlation_id", correlationID)
101+
102+
cs.subscribeClientMutex.Lock()
101103
if cs.commandServiceClient == nil {
102104
return nil, errors.New("command service client is not initialized")
103105
}
104-
105106
response, updateError := cs.commandServiceClient.UpdateDataPlaneStatus(ctx, request)
107+
cs.subscribeClientMutex.Unlock()
106108

107109
validatedError := grpc.ValidateGrpcError(updateError)
108110
if validatedError != nil {
@@ -210,6 +212,10 @@ func (cs *CommandService) CreateConnection(
210212
slog.InfoContext(ctx, "No Data Plane Instance found")
211213
}
212214

215+
if cs.isConnected.Load() {
216+
return nil, errors.New("command service already connected")
217+
}
218+
213219
request := &mpi.CreateConnectionRequest{
214220
MessageMeta: &mpi.MessageMeta{
215221
MessageId: id.GenerateMessageID(),
@@ -228,7 +234,6 @@ func (cs *CommandService) CreateConnection(
228234
}
229235

230236
slog.DebugContext(ctx, "Sending create connection request", "request", request)
231-
232237
response, err := backoff.RetryWithData(
233238
cs.connectCallback(ctx, request),
234239
backoffHelpers.Context(ctx, commonSettings),
@@ -249,6 +254,12 @@ func (cs *CommandService) CreateConnection(
249254
return response, nil
250255
}
251256

257+
func (cs *CommandService) UpdateClient(client mpi.CommandServiceClient) {
258+
cs.subscribeClientMutex.Lock()
259+
defer cs.subscribeClientMutex.Unlock()
260+
cs.commandServiceClient = client
261+
}
262+
252263
// Retry callback for sending a data plane response to the Management Plane.
253264
func (cs *CommandService) sendDataPlaneResponseCallback(
254265
ctx context.Context,
@@ -355,11 +366,14 @@ func (cs *CommandService) dataPlaneHealthCallback(
355366
) func() (*mpi.UpdateDataPlaneHealthResponse, error) {
356367
return func() (*mpi.UpdateDataPlaneHealthResponse, error) {
357368
slog.DebugContext(ctx, "Sending data plane health update request", "request", request)
369+
370+
cs.subscribeClientMutex.Lock()
358371
if cs.commandServiceClient == nil {
359372
return nil, errors.New("command service client is not initialized")
360373
}
361374

362375
response, updateError := cs.commandServiceClient.UpdateDataPlaneHealth(ctx, request)
376+
cs.subscribeClientMutex.Unlock()
363377

364378
validatedError := grpc.ValidateGrpcError(updateError)
365379

@@ -427,6 +441,7 @@ func (cs *CommandService) handleSubscribeError(ctx context.Context, err error, e
427441
codeError, ok := status.FromError(err)
428442

429443
if ok && codeError.Code() == codes.Unavailable {
444+
cs.isConnected.Store(false)
430445
slog.ErrorContext(ctx, fmt.Sprintf("Failed to %s, rpc unavailable. "+
431446
"Trying create connection rpc", errorMsg), "error", err)
432447
_, connectionErr := cs.CreateConnection(ctx, cs.resource)
@@ -530,7 +545,9 @@ func (cs *CommandService) connectCallback(
530545
request *mpi.CreateConnectionRequest,
531546
) func() (*mpi.CreateConnectionResponse, error) {
532547
return func() (*mpi.CreateConnectionResponse, error) {
548+
cs.subscribeClientMutex.Lock()
533549
response, connectErr := cs.commandServiceClient.CreateConnection(ctx, request)
550+
cs.subscribeClientMutex.Unlock()
534551

535552
validatedError := grpc.ValidateGrpcError(connectErr)
536553
if validatedError != nil {

internal/command/command_service_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,18 @@ func TestCommandService_CreateConnection(t *testing.T) {
198198
require.NoError(t, err)
199199
}
200200

201+
func TestCommandService_UpdateClient(t *testing.T) {
202+
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
203+
204+
commandService := NewCommandService(
205+
commandServiceClient,
206+
types.AgentConfig(),
207+
make(chan *mpi.ManagementPlaneRequest),
208+
)
209+
commandService.UpdateClient(commandServiceClient)
210+
assert.NotNil(t, commandService.commandServiceClient)
211+
}
212+
201213
func TestCommandService_UpdateDataPlaneHealth(t *testing.T) {
202214
ctx := context.Background()
203215
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
@@ -501,3 +513,18 @@ func TestCommandService_isValidRequest(t *testing.T) {
501513
})
502514
}
503515
}
516+
517+
func TestCommandService_handleSubscribeError(t *testing.T) {
518+
ctx := context.Background()
519+
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
520+
521+
commandService := NewCommandService(
522+
commandServiceClient,
523+
types.AgentConfig(),
524+
make(chan *mpi.ManagementPlaneRequest),
525+
)
526+
require.Error(t,
527+
commandService.handleSubscribeError(ctx,
528+
errors.New("an error occurred when attempting to subscribe"),
529+
"Testing handleSubscribeError"))
530+
}

internal/command/commandfakes/fake_command_service.go

Lines changed: 39 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/config/config_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -901,7 +901,8 @@ func createConfig() *Config {
901901
Type: Grpc,
902902
},
903903
Auth: &AuthConfig{
904-
Token: "1234",
904+
Token: "1234",
905+
TokenPath: "path/to/my_token",
905906
},
906907
TLS: &TLSConfig{
907908
Cert: "some.cert",

internal/config/testdata/nginx-agent.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ command:
5656
type: grpc
5757
auth:
5858
token: "1234"
59+
tokenpath: "path/to/my_token"
5960
tls:
6061
cert: "some.cert"
6162
key: "some.key"

internal/file/file_manager_service.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type (
5454
UpdateCurrentFilesOnDisk(updateFiles map[string]*mpi.File)
5555
DetermineFileActions(currentFiles, modifiedFiles map[string]*mpi.File) (map[string]*mpi.File,
5656
map[string][]byte, error)
57+
IsConnected() bool
5758
SetIsConnected(isConnected bool)
5859
}
5960
)
@@ -272,6 +273,10 @@ func (fms *FileManagerService) UpdateFile(
272273
return err
273274
}
274275

276+
func (fms *FileManagerService) IsConnected() bool {
277+
return fms.isConnected.Load()
278+
}
279+
275280
func (fms *FileManagerService) SetIsConnected(isConnected bool) {
276281
fms.isConnected.Store(isConnected)
277282
}

internal/file/file_plugin.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ func (fp *FilePlugin) Info() *bus.Info {
6262

6363
func (fp *FilePlugin) Process(ctx context.Context, msg *bus.Message) {
6464
switch msg.Topic {
65+
case bus.ConnectionResetTopic:
66+
fp.handleConnectionReset(ctx, msg)
6567
case bus.ConnectionCreatedTopic:
6668
fp.fileManagerService.SetIsConnected(true)
6769
case bus.NginxConfigUpdateTopic:
@@ -81,6 +83,7 @@ func (fp *FilePlugin) Process(ctx context.Context, msg *bus.Message) {
8183

8284
func (fp *FilePlugin) Subscriptions() []string {
8385
return []string{
86+
bus.ConnectionResetTopic,
8487
bus.ConnectionCreatedTopic,
8588
bus.NginxConfigUpdateTopic,
8689
bus.ConfigUploadRequestTopic,
@@ -91,6 +94,24 @@ func (fp *FilePlugin) Subscriptions() []string {
9194
}
9295
}
9396

97+
func (fp *FilePlugin) handleConnectionReset(ctx context.Context, msg *bus.Message) {
98+
slog.DebugContext(ctx, "File plugin received connection reset message")
99+
if newConnection, ok := msg.Data.(grpc.GrpcConnectionInterface); ok {
100+
var reconnect bool
101+
err := fp.conn.Close(ctx)
102+
if err != nil {
103+
slog.ErrorContext(ctx, "File plugin: unable to close connection", "error", err)
104+
}
105+
fp.conn = newConnection
106+
107+
reconnect = fp.fileManagerService.IsConnected()
108+
fp.fileManagerService = NewFileManagerService(fp.conn.FileServiceClient(), fp.config)
109+
fp.fileManagerService.SetIsConnected(reconnect)
110+
111+
slog.DebugContext(ctx, "File plugin: client reset successfully")
112+
}
113+
}
114+
94115
func (fp *FilePlugin) handleConfigApplyComplete(ctx context.Context, msg *bus.Message) {
95116
response, ok := msg.Data.(*mpi.DataPlaneResponse)
96117

0 commit comments

Comments
 (0)