Skip to content
Merged
Show file tree
Hide file tree
Changes from 73 commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
1a17cd5
support reading token from file via config
sean-breen Jan 6, 2025
c4b962d
remove empty file
sean-breen Jan 6, 2025
d68fe7e
simplify token validation and add unit tests
sean-breen Jan 6, 2025
1b2b029
add unit tests for transport credentials funtions
sean-breen Jan 7, 2025
a43cb53
address PR feedback
sean-breen Jan 10, 2025
2ecf102
proto updates
sean-breen Jan 10, 2025
6872207
fix function name
sean-breen Jan 10, 2025
2c30d2c
fix lint error: lll
sean-breen Jan 10, 2025
2ee40ed
add missing PR feedback
sean-breen Jan 10, 2025
3cbf80a
remove error log message
sean-breen Jan 14, 2025
95403c9
fix unit test
sean-breen Jan 14, 2025
b36e5fd
Fix apk test package naming (#961)
sean-breen Jan 14, 2025
36f9e3c
Update agent config defaults and format (#959)
aphralG Jan 15, 2025
81a70e1
Add config apply request queue (#949)
dhurley Jan 15, 2025
e096397
add unit tests for transport credentials funtions
sean-breen Jan 7, 2025
df3b793
fix test name
sean-breen Jan 15, 2025
a44ca08
fix error message
sean-breen Jan 16, 2025
8d61593
fix lint error: lll
sean-breen Jan 16, 2025
459c0d5
modify error messages
sean-breen Jan 16, 2025
f26c45e
remove error logging and modify messages
sean-breen Jan 16, 2025
371c72b
fall back to token field if error occurs when reading file
sean-breen Jan 20, 2025
cd74bd4
Merge branch 'v3' into key-via-file-path
sean-breen Jan 20, 2025
c9d792d
fix bad merge
sean-breen Jan 20, 2025
e19afb4
restarting gRPC conn
sean-breen Jan 22, 2025
e4656a9
remove code from testing
sean-breen Jan 23, 2025
369f19e
fix lint errors: whitespace, revive
sean-breen Jan 23, 2025
1d48001
add new topic for handling Token file updates
sean-breen Jan 16, 2025
4747c13
add CredentialWatcherService
sean-breen Jan 22, 2025
a570d6d
adding initial watcher for credential files
sean-breen Jan 22, 2025
572c529
trigger connection reset after credential update
sean-breen Jan 23, 2025
794a184
added ConnectionResetTopic and event processing
sean-breen Jan 23, 2025
fcdcc39
Automatically add token-path to Credential watcher
sean-breen Jan 23, 2025
f5b6f7f
add function to check credential paths defined in agent config
sean-breen Jan 24, 2025
05b3eae
fix lint
sean-breen Jan 24, 2025
7472cf8
use tokenpath as config option, fixes problem with cli param parsing
sean-breen Jan 24, 2025
d488768
add CredentialWatcherService + tests
sean-breen Jan 27, 2025
1b34be4
fix lint errors
sean-breen Jan 27, 2025
e909d03
updates to generated files
sean-breen Jan 27, 2025
0100645
Send create connection after disconnect from management plane (#967)
aphralG Jan 27, 2025
7c46a7d
correct yaml key in AuthConfig
sean-breen Jan 27, 2025
7b92819
fix: flaky test (#968)
ryepup Jan 28, 2025
2139a0a
update tests
sean-breen Jan 28, 2025
095ac1d
update tests
sean-breen Jan 29, 2025
a011222
wait for create connection
aphralG Jan 29, 2025
5783936
PR feedback
aphralG Jan 30, 2025
082e85d
fix race condition
aphralG Jan 30, 2025
c50e333
clean up
aphralG Jan 30, 2025
94893c7
Merge branch 'v3' into start-subscribe-after-connection-created
aphralG Jan 31, 2025
45a9714
clean up
aphralG Jan 31, 2025
0b7c24b
Merge branch 'v3' into add-token-file-watcher
sean-breen Feb 4, 2025
8620567
update grpc connection in command and file plugins
sean-breen Feb 5, 2025
743924a
Merge branch 'start-subscribe-after-connection-created' into add-toke…
sean-breen Feb 5, 2025
e821c1d
move log message and fix file_plugin_test.go
sean-breen Feb 6, 2025
687ab69
fix lint
sean-breen Feb 6, 2025
0ebaf6b
Merge branch 'v3' into add-token-file-watcher
sean-breen Feb 11, 2025
87e6c56
handle command and file service client updates after grpc reset
sean-breen Feb 11, 2025
b98b24b
update FakeCommandService
sean-breen Feb 11, 2025
0bfa34f
add unit tests
sean-breen Feb 12, 2025
afc0d69
fix bad test
sean-breen Feb 13, 2025
889e5da
formatting
sean-breen Feb 13, 2025
04db672
remove test
sean-breen Feb 13, 2025
61b17d7
increase timeout before checking connection after restart
sean-breen Feb 14, 2025
7bc684a
set isConnected to false when handling subscribe errors
sean-breen Feb 18, 2025
5a69209
increase code coverage
sean-breen Feb 19, 2025
dcda07a
lint fix
sean-breen Feb 19, 2025
cabdd20
fieldalignment
sean-breen Feb 19, 2025
5645b19
remove unused fake
sean-breen Feb 19, 2025
dd559aa
PR feedback
sean-breen Feb 19, 2025
9d60914
update fake command service
sean-breen Feb 19, 2025
562c312
more PR feedback
sean-breen Feb 19, 2025
40cd06c
PR feedback: disable watcher during config apply
sean-breen Feb 20, 2025
85cc17a
Merge branch 'v3' into add-token-file-watcher
sean-breen Feb 20, 2025
005cc02
don't pause credential watcher during config apply
sean-breen Feb 20, 2025
c895042
debug log messages
sean-breen Feb 20, 2025
071dba7
Add mutex around updating client in the CommandService
sean-breen Feb 21, 2025
23d1193
undo change to metric name
sean-breen Feb 21, 2025
74cf2b7
update test
sean-breen Feb 21, 2025
d728fcf
Merge branch 'v3' into add-token-file-watcher
sean-breen Feb 25, 2025
5f579aa
lock watcher when replacing grpc connection
sean-breen Feb 26, 2025
cfefced
add lock around subscibe client access
sean-breen Feb 26, 2025
3c7b001
fix lint error
sean-breen Feb 26, 2025
189ed6d
unlock watcher mutex in error case
sean-breen Feb 26, 2025
3c7913c
return with no blank line
sean-breen Feb 26, 2025
e0f8169
lint error ?
sean-breen Feb 26, 2025
6a421be
handle rename cases
sean-breen Feb 27, 2025
1f816a9
remove unneccessary cases from switch
sean-breen Feb 27, 2025
95b1027
rewrite switch as if
sean-breen Feb 27, 2025
69176f2
handling kubernetes secret update case
sean-breen Feb 27, 2025
a9a14cc
fix test
sean-breen Feb 27, 2025
ef4364c
save connection status when resetting file manager service
sean-breen Feb 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/bus/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const (
ConfigUploadRequestTopic = "config-upload-request"
DataPlaneResponseTopic = "data-plane-response"
ConnectionCreatedTopic = "connection-created"
CredentialUpdatedTopic = "credential-updated"
ConnectionResetTopic = "connection-reset"
ConfigApplyRequestTopic = "config-apply-request"
WriteConfigSuccessfulTopic = "write-config-successful"
ConfigApplySuccessfulTopic = "config-apply-successful"
Expand Down
17 changes: 17 additions & 0 deletions internal/command/command_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type (
UpdateDataPlaneStatus(ctx context.Context, resource *mpi.Resource) error
UpdateDataPlaneHealth(ctx context.Context, instanceHealths []*mpi.InstanceHealth) error
SendDataPlaneResponse(ctx context.Context, response *mpi.DataPlaneResponse) error
UpdateClient(client mpi.CommandServiceClient)
Subscribe(ctx context.Context)
IsConnected() bool
CreateConnection(ctx context.Context, resource *mpi.Resource) (*mpi.CreateConnectionResponse, error)
Expand Down Expand Up @@ -86,6 +87,8 @@ func (cp *CommandPlugin) Info() *bus.Info {

func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) {
switch msg.Topic {
case bus.ConnectionResetTopic:
cp.processConnectionReset(ctx, msg)
case bus.ResourceUpdateTopic:
cp.processResourceUpdate(ctx, msg)
case bus.InstanceHealthTopic:
Expand Down Expand Up @@ -172,8 +175,22 @@ func (cp *CommandPlugin) processDataPlaneResponse(ctx context.Context, msg *bus.
}
}

func (cp *CommandPlugin) processConnectionReset(ctx context.Context, msg *bus.Message) {
slog.DebugContext(ctx, "Command plugin received connection reset")
if newConnection, ok := msg.Data.(grpc.GrpcConnectionInterface); ok {
err := cp.conn.Close(ctx)
if err != nil {
slog.ErrorContext(ctx, "Command plugin: unable to close connection", "error", err)
}
cp.conn = newConnection
cp.commandService.UpdateClient(cp.conn.CommandServiceClient())
slog.DebugContext(ctx, "Command plugin: client reset successfully")
}
}

func (cp *CommandPlugin) Subscriptions() []string {
return []string{
bus.ConnectionResetTopic,
bus.ResourceUpdateTopic,
bus.InstanceHealthTopic,
bus.DataPlaneHealthResponseTopic,
Expand Down
7 changes: 7 additions & 0 deletions internal/command/command_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestCommandPlugin_Subscriptions(t *testing.T) {
assert.Equal(
t,
[]string{
bus.ConnectionResetTopic,
bus.ResourceUpdateTopic,
bus.InstanceHealthTopic,
bus.DataPlaneHealthResponseTopic,
Expand Down Expand Up @@ -142,6 +143,12 @@ func TestCommandPlugin_Process(t *testing.T) {
})
require.Equal(t, 1, fakeCommandService.UpdateDataPlaneHealthCallCount())
require.Equal(t, 1, fakeCommandService.SendDataPlaneResponseCallCount())

commandPlugin.Process(ctx, &bus.Message{
Topic: bus.ConnectionResetTopic,
Data: commandPlugin.conn,
})
require.Equal(t, 1, fakeCommandService.UpdateClientCallCount())
}

func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) {
Expand Down
10 changes: 9 additions & 1 deletion internal/command/command_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ func (cs *CommandService) CreateConnection(
slog.InfoContext(ctx, "No Data Plane Instance found")
}

if cs.isConnected.Load() {
return nil, errors.New("command service already connected")
}

request := &mpi.CreateConnectionRequest{
MessageMeta: &mpi.MessageMeta{
MessageId: id.GenerateMessageID(),
Expand All @@ -228,7 +232,6 @@ func (cs *CommandService) CreateConnection(
}

slog.DebugContext(ctx, "Sending create connection request", "request", request)

response, err := backoff.RetryWithData(
cs.connectCallback(ctx, request),
backoffHelpers.Context(ctx, commonSettings),
Expand All @@ -249,6 +252,10 @@ func (cs *CommandService) CreateConnection(
return response, nil
}

func (cs *CommandService) UpdateClient(client mpi.CommandServiceClient) {
cs.commandServiceClient = client
}

// Retry callback for sending a data plane response to the Management Plane.
func (cs *CommandService) sendDataPlaneResponseCallback(
ctx context.Context,
Expand Down Expand Up @@ -427,6 +434,7 @@ func (cs *CommandService) handleSubscribeError(ctx context.Context, err error, e
codeError, ok := status.FromError(err)

if ok && codeError.Code() == codes.Unavailable {
cs.isConnected.Store(false)
slog.ErrorContext(ctx, fmt.Sprintf("Failed to %s, rpc unavailable. "+
"Trying create connection rpc", errorMsg), "error", err)
_, connectionErr := cs.CreateConnection(ctx, cs.resource)
Expand Down
27 changes: 27 additions & 0 deletions internal/command/command_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,18 @@ func TestCommandService_CreateConnection(t *testing.T) {
require.NoError(t, err)
}

func TestCommandService_UpdateClient(t *testing.T) {
commandServiceClient := &v1fakes.FakeCommandServiceClient{}

commandService := NewCommandService(
commandServiceClient,
types.AgentConfig(),
make(chan *mpi.ManagementPlaneRequest),
)
commandService.UpdateClient(commandServiceClient)
assert.NotNil(t, commandService.commandServiceClient)
}

func TestCommandService_UpdateDataPlaneHealth(t *testing.T) {
ctx := context.Background()
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
Expand Down Expand Up @@ -501,3 +513,18 @@ func TestCommandService_isValidRequest(t *testing.T) {
})
}
}

func TestCommandService_handleSubscribeError(t *testing.T) {
ctx := context.Background()
commandServiceClient := &v1fakes.FakeCommandServiceClient{}

commandService := NewCommandService(
commandServiceClient,
types.AgentConfig(),
make(chan *mpi.ManagementPlaneRequest),
)
require.Error(t, commandService.handleSubscribeError(ctx, errors.New(""), ""))

err := commandService.handleSubscribeError(ctx, errors.New("blah blah blah"), "Testing handleSubscribeError")
require.Error(t, err)
}
39 changes: 39 additions & 0 deletions internal/command/commandfakes/fake_command_service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,8 @@ func createConfig() *Config {
Type: Grpc,
},
Auth: &AuthConfig{
Token: "1234",
Token: "1234",
TokenPath: "path/to/my_token",
},
TLS: &TLSConfig{
Cert: "some.cert",
Expand Down
1 change: 1 addition & 0 deletions internal/config/testdata/nginx-agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ command:
type: grpc
auth:
token: "1234"
tokenpath: "path/to/my_token"
tls:
cert: "some.cert"
key: "some.key"
Expand Down
16 changes: 16 additions & 0 deletions internal/file/file_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func (fp *FilePlugin) Info() *bus.Info {

func (fp *FilePlugin) Process(ctx context.Context, msg *bus.Message) {
switch msg.Topic {
case bus.ConnectionResetTopic:
fp.handleConnectionReset(ctx, msg)
case bus.ConnectionCreatedTopic:
fp.fileManagerService.SetIsConnected(true)
case bus.NginxConfigUpdateTopic:
Expand All @@ -81,6 +83,7 @@ func (fp *FilePlugin) Process(ctx context.Context, msg *bus.Message) {

func (fp *FilePlugin) Subscriptions() []string {
return []string{
bus.ConnectionResetTopic,
bus.ConnectionCreatedTopic,
bus.NginxConfigUpdateTopic,
bus.ConfigUploadRequestTopic,
Expand All @@ -91,6 +94,19 @@ func (fp *FilePlugin) Subscriptions() []string {
}
}

func (fp *FilePlugin) handleConnectionReset(ctx context.Context, msg *bus.Message) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above with the lock just wondering if when it is being reset the connection needs to be locked

slog.DebugContext(ctx, "File plugin received connection reset message")
if newConnection, ok := msg.Data.(grpc.GrpcConnectionInterface); ok {
err := fp.conn.Close(ctx)
if err != nil {
slog.ErrorContext(ctx, "File plugin: unable to close connection", "error", err)
}
fp.conn = newConnection
fp.fileManagerService = NewFileManagerService(fp.conn.FileServiceClient(), fp.config)
slog.DebugContext(ctx, "File plugin: client reset successfully")
}
}

func (fp *FilePlugin) handleConfigApplyComplete(ctx context.Context, msg *bus.Message) {
response, ok := msg.Data.(*mpi.DataPlaneResponse)

Expand Down
1 change: 1 addition & 0 deletions internal/file/file_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func TestFilePlugin_Subscriptions(t *testing.T) {
assert.Equal(
t,
[]string{
bus.ConnectionResetTopic,
bus.ConnectionCreatedTopic,
bus.NginxConfigUpdateTopic,
bus.ConfigUploadRequestTopic,
Expand Down
18 changes: 1 addition & 17 deletions internal/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type (
CommandServiceClient() mpi.CommandServiceClient
FileServiceClient() mpi.FileServiceClient
Close(ctx context.Context) error
Restart(ctx context.Context) (*GrpcConnection, error)
}

GrpcConnection struct {
Expand All @@ -68,6 +67,7 @@ var (
_ GrpcConnectionInterface = (*GrpcConnection)(nil)
)

// nolint: ireturn
func NewGrpcConnection(ctx context.Context, agentConfig *config.Config) (*GrpcConnection, error) {
if agentConfig == nil || agentConfig.Command.Server.Type != config.Grpc {
return nil, errors.New("invalid command server settings")
Expand Down Expand Up @@ -131,22 +131,6 @@ func (gc *GrpcConnection) Close(ctx context.Context) error {
return nil
}

func (gc *GrpcConnection) Restart(ctx context.Context) (*GrpcConnection, error) {
slog.InfoContext(ctx, "Restarting grpc connection")
err := gc.Close(ctx)
if err != nil {
return nil, err
}

slog.InfoContext(ctx, "Creating grpc connection")
newConn, err := NewGrpcConnection(ctx, gc.config)
if err != nil {
return nil, err
}

return newConn, nil
}

func (w *wrappedStream) RecvMsg(message any) error {
err := w.ClientStream.RecvMsg(message)
if err == nil {
Expand Down
Loading