Skip to content
Merged
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
5b31494
add auxiliary command server proto
aphralG Jun 11, 2025
c42a042
Merge branch 'main' into add-auxiliary-command-server-proto
aphralG Jun 12, 2025
e07bf64
allow multiple management planes
aphralG Jun 12, 2025
a6b6e52
start of file plugins
aphralG Jun 16, 2025
650a0d9
PR feedback
aphralG Jun 16, 2025
8c422cc
Merge branch 'add-auxiliary-command-server-to-agent-config' into add-…
aphralG Jun 16, 2025
8ef9c26
file plugin
aphralG Jun 16, 2025
a8817ab
file plugin
aphralG Jun 16, 2025
b81fb37
allow second credential watcher
aphralG Jun 17, 2025
f958507
Merge branch 'main' into add-auxiliary-command-server-proto
aphralG Jun 23, 2025
2487a4d
Merge branch 'add-auxiliary-command-server-proto' into add-auxiliary-…
aphralG Jun 23, 2025
b710855
PR feedback
aphralG Jun 23, 2025
90cd7f9
merge main
aphralG Jun 23, 2025
8ef3c45
refactor file plugin to have read only mode
aphralG Jun 23, 2025
5773b42
clean up
aphralG Jun 23, 2025
d065710
Merge branch 'add-read-only-file-plugin' into allow-second-credential…
aphralG Jun 23, 2025
753ee94
feedback
aphralG Jun 23, 2025
d586259
fix race condition
aphralG Jun 23, 2025
a8d5b29
Pr feedback
aphralG Jun 26, 2025
fbbe418
merge main
aphralG Jun 27, 2025
1fa09b9
Merge branch 'add-auxiliary-command-server-proto' into add-auxiliary-…
aphralG Jun 27, 2025
8e26d81
PR feedback
aphralG Jun 27, 2025
4091223
Merge branch 'add-auxiliary-command-server-to-agent-config' into add-…
aphralG Jun 27, 2025
c6c3cca
Merge branch 'add-read-only-file-plugin' into allow-second-credential…
aphralG Jun 27, 2025
f2d1fb2
PR feedback
aphralG Jun 27, 2025
e4423de
Merge branch 'add-auxiliary-command-server-to-agent-config' into add-…
aphralG Jun 27, 2025
6d39669
PR feedback
aphralG Jun 27, 2025
0d86529
PR feedback
aphralG Jun 27, 2025
0a95a38
PR feedback
aphralG Jun 27, 2025
ee42007
Merge branch 'add-read-only-file-plugin' into allow-second-credential…
aphralG Jun 27, 2025
d08ba6f
PR feedback
aphralG Jun 27, 2025
3f39fbf
fix message response
aphralG Jun 27, 2025
d27b85a
start of integration tests
aphralG Jun 30, 2025
30dc211
add file overview endpoint to mock
aphralG Jun 30, 2025
08fa4db
Merge branch 'main' into add-auxiliary-command-server-proto
aphralG Jul 1, 2025
44f1e1e
merge main
aphralG Jul 1, 2025
16b08c0
Merge branch 'add-auxiliary-command-server-proto' into add-auxiliary-…
aphralG Jul 1, 2025
845f3dd
fix instance ID
aphralG Jul 1, 2025
bf98852
Merge branch 'add-auxiliary-command-server-to-agent-config' into add-…
aphralG Jul 1, 2025
f32f34a
Merge branch 'add-read-only-file-plugin' into allow-second-credential…
aphralG Jul 1, 2025
cb58452
Merge branch 'allow-second-credential-watcher' into add-multi-managem…
aphralG Jul 1, 2025
9c61e4c
update message
aphralG Jul 2, 2025
9cc8351
Merge branch 'allow-second-credential-watcher' into add-multi-managem…
aphralG Jul 2, 2025
8f35043
update agent config
aphralG Jul 3, 2025
70da7a5
update agent config
aphralG Jul 3, 2025
cb0e64a
update agent config
aphralG Jul 3, 2025
f85b4c7
try fix test
aphralG Jul 3, 2025
5c55151
try fix test
aphralG Jul 3, 2025
cd92239
Merge branch 'allow-second-credential-watcher' into add-multi-managem…
aphralG Jul 3, 2025
fcbc755
some working tests, config apply test broken
aphralG Jul 3, 2025
885e657
format stuff
aphralG Jul 3, 2025
8a25503
clean up
aphralG Jul 3, 2025
e6bbad5
added config apply test
aphralG Jul 7, 2025
f5d93b8
run on official tests
aphralG Jul 7, 2025
b1d1b4a
PR feedback
aphralG Jul 7, 2025
b728b91
merge main
aphralG Jul 7, 2025
1de9c00
Merge branch 'add-read-only-file-plugin' into allow-second-credential…
aphralG Jul 7, 2025
b70dcc7
merge main
aphralG Jul 7, 2025
5c87492
merge main
aphralG Jul 7, 2025
04c5512
Merge branch 'main' into add-multi-management-plane-integration-test
aphralG Jul 8, 2025
f035a05
fix test and race
aphralG Jul 9, 2025
93e00e6
fix test and race
aphralG Jul 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,13 @@ integration-test: $(SELECTED_PACKAGE) build-mock-management-plane-grpc
TEST_ENV="Container" CONTAINER_OS_TYPE=$(CONTAINER_OS_TYPE) BUILD_TARGET="install-agent-local" CONTAINER_NGINX_IMAGE_REGISTRY=${CONTAINER_NGINX_IMAGE_REGISTRY} \
PACKAGES_REPO=$(OSS_PACKAGES_REPO) PACKAGE_NAME=$(PACKAGE_NAME) BASE_IMAGE=$(BASE_IMAGE) DOCKERFILE_PATH=$(DOCKERFILE_PATH) IMAGE_PATH=$(IMAGE_PATH) TAG=${IMAGE_TAG} \
OS_VERSION=$(OS_VERSION) OS_RELEASE=$(OS_RELEASE) \
go test -v ./test/integration/installuninstall ./test/integration/managementplane ./test/integration/nginxless
go test -v ./test/integration/installuninstall ./test/integration/managementplane ./test/integration/auxiliarycommandserver ./test/integration/nginxless

official-image-integration-test: $(SELECTED_PACKAGE) build-mock-management-plane-grpc
TEST_ENV="Container" CONTAINER_OS_TYPE=$(CONTAINER_OS_TYPE) CONTAINER_NGINX_IMAGE_REGISTRY=${CONTAINER_NGINX_IMAGE_REGISTRY} BUILD_TARGET="install" \
PACKAGES_REPO=$(OSS_PACKAGES_REPO) TAG=${TAG} PACKAGE_NAME=$(PACKAGE_NAME) BASE_IMAGE=$(BASE_IMAGE) DOCKERFILE_PATH=$(OFFICIAL_IMAGE_DOCKERFILE_PATH) \
OS_VERSION=$(OS_VERSION) OS_RELEASE=$(OS_RELEASE) IMAGE_PATH=$(IMAGE_PATH) \
go test -v ./test/integration/managementplane
go test -v ./test/integration/managementplane ./test/integration/managementplane
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be here twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope should be ./test/integration/auxiliarycommandserver good catch Thank you!


performance-test:
@mkdir -p $(TEST_BUILD_DIR)
Expand Down
2 changes: 1 addition & 1 deletion internal/command/command_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (cp *CommandPlugin) handleInvalidRequest(ctx context.Context,
CommandResponse: &mpi.CommandResponse{
Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE,
Message: message,
Error: "Can not perform write action as auxiliary command server",
Error: "Unable to process request. Management plane is configured as read only.",
},
InstanceId: instanceID,
})
Expand Down
37 changes: 37 additions & 0 deletions internal/config/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,40 @@ func ToCommandProto(cmd *Command) *mpi.CommandServer {

return protoConfig
}

// ToAuxiliaryCommandServerProto maps the AgentConfig Command struct back to the AuxiliaryCommandServer proto message
func ToAuxiliaryCommandServerProto(cmd *Command) *mpi.AuxiliaryCommandServer {
protoConfig := &mpi.AuxiliaryCommandServer{}

// Map ServerConfig to the ServerSettings
if cmd.Server != nil {
protoServerType := mpi.ServerSettings_SERVER_SETTINGS_TYPE_UNDEFINED
if cmd.Server.Type == Grpc {
protoServerType = mpi.ServerSettings_SERVER_SETTINGS_TYPE_GRPC
}

protoConfig.Server = &mpi.ServerSettings{
Host: cmd.Server.Host,
Port: int32(cmd.Server.Port),
Type: protoServerType,
}
}

// Map AuthConfig to AuthSettings
if cmd.Auth != nil {
protoConfig.Auth = &mpi.AuthSettings{}
}

// Map TLSConfig to TLSSettings
if cmd.TLS != nil {
protoConfig.Tls = &mpi.TLSSettings{
Cert: cmd.TLS.Cert,
Key: cmd.TLS.Key,
Ca: cmd.TLS.Ca,
ServerName: cmd.TLS.ServerName,
SkipVerify: cmd.TLS.SkipVerify,
}
}

return protoConfig
}
3 changes: 1 addition & 2 deletions internal/file/file_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (fp *FilePlugin) Process(ctx context.Context, msg *bus.Message) {
)
}

if logger.ServerType(ctx) == fp.serverType.String() || logger.ServerType(ctx) == "" {
if logger.ServerType(ctx) == fp.serverType.String() {
switch msg.Topic {
case bus.ConnectionResetTopic:
fp.handleConnectionReset(ctx, msg)
Expand Down Expand Up @@ -358,7 +358,6 @@ func (fp *FilePlugin) handleNginxConfigUpdate(ctx context.Context, msg *bus.Mess
fp.fileManagerService.ConfigUpdate(ctx, nginxConfigContext)
}

// nolint: dupl
func (fp *FilePlugin) handleConfigUploadRequest(ctx context.Context, msg *bus.Message) {
slog.DebugContext(ctx, "File plugin received config upload request message")
managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest)
Expand Down
86 changes: 64 additions & 22 deletions internal/watcher/credentials/credential_watcher_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
"sync/atomic"
"time"

"github.com/nginx/agent/v3/internal/model"

"github.com/nginx/agent/v3/internal/grpc"

"github.com/fsnotify/fsnotify"
"github.com/nginx/agent/v3/internal/config"
"github.com/nginx/agent/v3/internal/logger"
Expand All @@ -28,56 +32,75 @@ var emptyEvent = fsnotify.Event{
}

type CredentialUpdateMessage struct {
CorrelationID slog.Attr
CorrelationID slog.Attr
GrpcConnection *grpc.GrpcConnection
ServerType model.ServerType
}

type CredentialWatcherService struct {
agentConfig *config.Config
watcher *fsnotify.Watcher
filesBeingWatched *sync.Map
filesChanged *atomic.Bool
serverType model.ServerType
watcherMutex sync.Mutex
}

func NewCredentialWatcherService(agentConfig *config.Config) *CredentialWatcherService {
func NewCredentialWatcherService(agentConfig *config.Config, serverType model.ServerType) *CredentialWatcherService {
filesChanged := &atomic.Bool{}
filesChanged.Store(false)

return &CredentialWatcherService{
agentConfig: agentConfig,
filesBeingWatched: &sync.Map{},
filesChanged: filesChanged,
serverType: serverType,
watcherMutex: sync.Mutex{},
}
}

func (cws *CredentialWatcherService) Watch(ctx context.Context, ch chan<- CredentialUpdateMessage) {
slog.DebugContext(ctx, "Starting credential watcher monitoring")
newCtx := context.WithValue(
ctx,
logger.ServerTypeContextKey,
slog.Any(logger.ServerTypeKey, cws.serverType.String()),
)
slog.DebugContext(newCtx, "Starting credential watcher monitoring")

ticker := time.NewTicker(monitoringInterval)
watcher, err := fsnotify.NewWatcher()
if err != nil {
slog.ErrorContext(ctx, "Failed to create credential watcher", "error", err)
slog.ErrorContext(newCtx, "Failed to create credential watcher", "error", err)
return
}

cws.watcher = watcher

cws.watchFiles(ctx, credentialPaths(cws.agentConfig))
cws.watcherMutex.Lock()
commandServer := cws.agentConfig.Command

if cws.serverType == model.Auxiliary {
commandServer = cws.agentConfig.AuxiliaryCommand
}

cws.watchFiles(newCtx, credentialPaths(commandServer))
cws.watcherMutex.Unlock()

for {
select {
case <-ctx.Done():
case <-newCtx.Done():
closeError := cws.watcher.Close()
if closeError != nil {
slog.ErrorContext(ctx, "Unable to close credential watcher", "error", closeError)
slog.ErrorContext(newCtx, "Unable to close credential watcher", "error", closeError)
}

return
case event := <-cws.watcher.Events:
cws.handleEvent(ctx, event)
cws.handleEvent(newCtx, event)
case <-ticker.C:
cws.checkForUpdates(ctx, ch)
cws.checkForUpdates(newCtx, ch)
case watcherError := <-cws.watcher.Errors:
slog.ErrorContext(ctx, "Unexpected error in credential watcher", "error", watcherError)
slog.ErrorContext(newCtx, "Unexpected error in credential watcher", "error", watcherError)
}
}
}
Expand Down Expand Up @@ -146,31 +169,50 @@ func (cws *CredentialWatcherService) checkForUpdates(ctx context.Context, ch cha
slog.Any(logger.CorrelationIDKey, logger.GenerateCorrelationID()),
)

cws.watcherMutex.Lock()
defer cws.watcherMutex.Unlock()

commandServer := cws.agentConfig.Command
if cws.serverType == model.Auxiliary {
commandServer = cws.agentConfig.AuxiliaryCommand
}

conn, err := grpc.NewGrpcConnection(newCtx, cws.agentConfig, commandServer)
if err != nil {
slog.ErrorContext(newCtx, "Unable to create new grpc connection", "error", err)
cws.filesChanged.Store(false)

return
}
slog.DebugContext(ctx, "Credential watcher has detected changes")
ch <- CredentialUpdateMessage{CorrelationID: logger.CorrelationIDAttr(newCtx)}
ch <- CredentialUpdateMessage{
CorrelationID: logger.CorrelationIDAttr(newCtx),
ServerType: cws.serverType,
GrpcConnection: conn,
}
cws.filesChanged.Store(false)
}
}

func credentialPaths(agentConfig *config.Config) []string {
func credentialPaths(agentConfig *config.Command) []string {
var paths []string

if agentConfig.Command.Auth != nil {
if agentConfig.Command.Auth.TokenPath != "" {
paths = append(paths, agentConfig.Command.Auth.TokenPath)
if agentConfig.Auth != nil {
if agentConfig.Auth.TokenPath != "" {
paths = append(paths, agentConfig.Auth.TokenPath)
}
}

// agent's tls certs
if agentConfig.Command.TLS != nil {
if agentConfig.Command.TLS.Ca != "" {
paths = append(paths, agentConfig.Command.TLS.Ca)
if agentConfig.TLS != nil {
if agentConfig.TLS.Ca != "" {
paths = append(paths, agentConfig.TLS.Ca)
}
if agentConfig.Command.TLS.Cert != "" {
paths = append(paths, agentConfig.Command.TLS.Cert)
if agentConfig.TLS.Cert != "" {
paths = append(paths, agentConfig.TLS.Cert)
}
if agentConfig.Command.TLS.Key != "" {
paths = append(paths, agentConfig.Command.TLS.Key)
if agentConfig.TLS.Key != "" {
paths = append(paths, agentConfig.TLS.Key)
}
}

Expand Down
18 changes: 10 additions & 8 deletions internal/watcher/credentials/credential_watcher_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"testing"
"time"

"github.com/nginx/agent/v3/internal/model"

"github.com/nginx/agent/v3/internal/config"

"github.com/fsnotify/fsnotify"
Expand All @@ -22,15 +24,15 @@ import (
)

func TestCredentialWatcherService_TestNewCredentialWatcherService(t *testing.T) {
credentialWatcherService := NewCredentialWatcherService(types.AgentConfig())
credentialWatcherService := NewCredentialWatcherService(types.AgentConfig(), model.Command)

assert.Empty(t, credentialWatcherService.filesBeingWatched)
assert.False(t, credentialWatcherService.filesChanged.Load())
}

func TestCredentialWatcherService_Watch(t *testing.T) {
ctx := context.Background()
cws := NewCredentialWatcherService(types.AgentConfig())
cws := NewCredentialWatcherService(types.AgentConfig(), model.Command)
watcher, err := fsnotify.NewWatcher()
require.NoError(t, err)
cws.watcher = watcher
Expand Down Expand Up @@ -61,7 +63,7 @@ func TestCredentialWatcherService_Watch(t *testing.T) {
}

func TestCredentialWatcherService_isWatching(t *testing.T) {
cws := NewCredentialWatcherService(types.AgentConfig())
cws := NewCredentialWatcherService(types.AgentConfig(), model.Command)
assert.False(t, cws.isWatching("test-file"))
cws.filesBeingWatched.Store("test-file", true)
assert.True(t, cws.isWatching("test-file"))
Expand All @@ -80,7 +82,7 @@ func TestCredentialWatcherService_isEventSkippable(t *testing.T) {

func TestCredentialWatcherService_addWatcher(t *testing.T) {
ctx := context.Background()
cws := NewCredentialWatcherService(types.AgentConfig())
cws := NewCredentialWatcherService(types.AgentConfig(), model.Command)
watcher, err := fsnotify.NewWatcher()
require.NoError(t, err)
cws.watcher = watcher
Expand All @@ -105,7 +107,7 @@ func TestCredentialWatcherService_watchFiles(t *testing.T) {
var files []string

ctx := context.Background()
cws := NewCredentialWatcherService(types.AgentConfig())
cws := NewCredentialWatcherService(types.AgentConfig(), model.Command)
watcher, err := fsnotify.NewWatcher()
require.NoError(t, err)
cws.watcher = watcher
Expand Down Expand Up @@ -137,7 +139,7 @@ func TestCredentialWatcherService_watchFiles(t *testing.T) {

func TestCredentialWatcherService_checkForUpdates(t *testing.T) {
ctx := context.Background()
cws := NewCredentialWatcherService(types.AgentConfig())
cws := NewCredentialWatcherService(types.AgentConfig(), model.Command)
watcher, err := fsnotify.NewWatcher()
require.NoError(t, err)
cws.watcher = watcher
Expand All @@ -164,7 +166,7 @@ func TestCredentialWatcherService_checkForUpdates(t *testing.T) {

func TestCredentialWatcherService_handleEvent(t *testing.T) {
ctx := context.Background()
cws := NewCredentialWatcherService(types.AgentConfig())
cws := NewCredentialWatcherService(types.AgentConfig(), model.Command)
watcher, err := fsnotify.NewWatcher()
require.NoError(t, err)
cws.watcher = watcher
Expand Down Expand Up @@ -232,7 +234,7 @@ func Test_credentialPaths(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, credentialPaths(tt.agentConfig), "credentialPaths(%v)", tt.agentConfig)
assert.Equalf(t, tt.want, credentialPaths(tt.agentConfig.Command), "credentialPaths(%v)", tt.agentConfig)
})
}
}
9 changes: 8 additions & 1 deletion internal/watcher/instance/instance_watcher_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (iw *InstanceWatcherService) agentInstance(ctx context.Context) *mpi.Instan
slog.WarnContext(ctx, "Unable to convert config to labels structure", "error", convertErr)
}

return &mpi.Instance{
instance := &mpi.Instance{
InstanceMeta: &mpi.InstanceMeta{
InstanceId: iw.agentConfig.UUID,
InstanceType: mpi.InstanceMeta_INSTANCE_TYPE_AGENT,
Expand All @@ -334,6 +334,13 @@ func (iw *InstanceWatcherService) agentInstance(ctx context.Context) *mpi.Instan
Details: nil,
},
}

if iw.agentConfig.IsAuxiliaryCommandGrpcClientConfigured() {
instance.GetInstanceConfig().GetAgentConfig().AuxiliaryCommand = config.
ToAuxiliaryCommandServerProto(iw.agentConfig.AuxiliaryCommand)
}

return instance
}

func compareInstances(oldInstancesMap, instancesMap map[string]*mpi.Instance) (
Expand Down
Loading
Loading