Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions api/grpc/mpi/v1/command_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions api/grpc/mpi/v1/files_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions api/grpc/mpi/v1/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package v1

import (
"log/slog"

"google.golang.org/protobuf/types/known/structpb"
)

Expand Down Expand Up @@ -45,3 +47,24 @@ func ConvertToStructs(input map[string]any) ([]*structpb.Struct, error) {

return structs, nil
}

func ConvertToMap(input []*structpb.Struct) map[string]any {
convertedMap := make(map[string]any)
for _, value := range input {
for key, field := range value.GetFields() {
kind := field.GetKind()
switch kind.(type) {
case *structpb.Value_StringValue:
convertedMap[key] = field.GetStringValue()
case *structpb.Value_NumberValue:
convertedMap[key] = int(field.GetNumberValue())
case *structpb.Value_BoolValue:
convertedMap[key] = field.GetBoolValue()
default:
slog.Warn("unknown type for map conversion", "value", kind)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
slog.Warn("unknown type for map conversion", "value", kind)
slog.Warn("Unknown type for map conversion", "key", key, "value", kind)

}
}
}

return convertedMap
}
42 changes: 42 additions & 0 deletions api/grpc/mpi/v1/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,45 @@ func TestConvertToStructs(t *testing.T) {
})
}
}

func TestConvertToMaps(t *testing.T) {
tests := []struct {
name string
expected map[string]any
input []*structpb.Struct
}{
{
name: "Test 1: Valid input with simple key-value pairs",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing cases for slices, and structs

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a test for invalid values? Empty values, blank "" values and special characters.

For valid scenario, should we include "config-sync-group" key?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests for this function should be pretty generic, the exact strings we place in the tests shouldn't really matter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is only to covert to a map validation will be done elsewhere

expected: map[string]any{
"key1": "value1",
"key2": 123,
"key3": true,
},
input: []*structpb.Struct{
{
Fields: map[string]*structpb.Value{
"key1": structpb.NewStringValue("value1"),
},
},
{
Fields: map[string]*structpb.Value{
"key2": structpb.NewNumberValue(123),
},
},
{
Fields: map[string]*structpb.Value{
"key3": structpb.NewBoolValue(true),
},
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := ConvertToMap(tt.input)

assert.Equal(t, tt.expected, got)
})
}
}
2 changes: 1 addition & 1 deletion internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (a *App) Run(ctx context.Context) error {
slog.String("commit", a.commit),
)

messagePipe := bus.NewMessagePipe(defaultMessagePipeChannelSize)
messagePipe := bus.NewMessagePipe(defaultMessagePipeChannelSize, agentConfig)
err = messagePipe.Register(defaultQueueSize, plugin.LoadPlugins(ctx, agentConfig))
if err != nil {
slog.ErrorContext(ctx, "Failed to register plugins", "error", err)
Expand Down
162 changes: 160 additions & 2 deletions internal/bus/message_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@ package bus
import (
"context"
"log/slog"
"reflect"
"sync"

mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
"github.com/nginx/agent/v3/internal/config"
"github.com/nginx/agent/v3/internal/logger"
"github.com/nginx/agent/v3/pkg/id"
messagebus "github.com/vardius/message-bus"
"google.golang.org/protobuf/types/known/timestamppb"
)

type (
Expand Down Expand Up @@ -45,20 +51,24 @@ type (
Info() *Info
Process(ctx context.Context, msg *Message)
Subscriptions() []string
Reconfigure(ctx context.Context, agentConfig *config.Config) error
}

MessagePipe struct {
agentConfig *config.Config
bus messagebus.MessageBus
messageChannel chan *MessageWithContext
plugins []Plugin
pluginsMutex sync.Mutex
configMutex sync.Mutex
}
)

func NewMessagePipe(size int) *MessagePipe {
func NewMessagePipe(size int, agentConfig *config.Config) *MessagePipe {
return &MessagePipe{
messageChannel: make(chan *MessageWithContext, size),
pluginsMutex: sync.Mutex{},
agentConfig: agentConfig,
}
}

Expand Down Expand Up @@ -110,6 +120,7 @@ func (p *MessagePipe) Process(ctx context.Context, messages ...*Message) {
}
}

//nolint:contextcheck,revive // need to use context from the message for the correlationID not use the parent context
func (p *MessagePipe) Run(ctx context.Context) {
p.pluginsMutex.Lock()
p.initPlugins(ctx)
Expand All @@ -126,7 +137,16 @@ func (p *MessagePipe) Run(ctx context.Context) {

return
case m := <-p.messageChannel:
p.bus.Publish(m.message.Topic, m.ctx, m.message)
if m.message != nil {
switch m.message.Topic {
case AgentConfigUpdateTopic:
p.handleAgentConfigUpdateTopic(m.ctx, m.message)
case ConnectionAgentConfigUpdateTopic:
p.handleConnectionAgentConfigUpdateTopic(m.ctx, m.message)
default:
p.bus.Publish(m.message.Topic, m.ctx, m.message)
}
}
}
}
}
Expand Down Expand Up @@ -157,6 +177,115 @@ func (p *MessagePipe) Index(pluginName string, plugins []Plugin) int {
return -1
}

func (p *MessagePipe) Reconfigure(ctx context.Context, agentConfig *mpi.AgentConfig, topic, correlationID string) {
var reconfigureError error
p.configMutex.Lock()
defer p.configMutex.Unlock()
currentConfig := p.agentConfig

// convert agent config from *mpi.AgentConfig to *config.Config
updateAgentConfig := config.FromAgentRemoteConfigProto(agentConfig)

// The check for updates to the config needs to be done here as the command plugin needs the latest agent config
// to be sent in response to create connection requests.
p.updateConfig(ctx, updateAgentConfig)

// Reconfigure each plugin with the new agent config
for _, plugin := range p.plugins {
slog.DebugContext(ctx, "Reconfigure plugin", "plugin", plugin.Info().Name)
reconfigureError = plugin.Reconfigure(ctx, p.agentConfig)
if reconfigureError != nil {
slog.ErrorContext(ctx, "Reconfigure plugin failed", "plugin", plugin.Info().Name)
break
}
}

if reconfigureError != nil {
slog.ErrorContext(ctx, "Error updating plugin with updated agent config, reverting",
"error", reconfigureError.Error())

// If the agent update was received from a create connection request no data plane response needs to be sent
if topic == AgentConfigUpdateTopic {
response := p.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
"Failed to update agent config", reconfigureError.Error())
p.bus.Publish(DataPlaneResponseTopic, ctx, &Message{Topic: DataPlaneResponseTopic, Data: response})
}

p.agentConfig = currentConfig
for _, plugin := range p.plugins {
err := plugin.Reconfigure(ctx, currentConfig)
if err != nil {
slog.ErrorContext(ctx, "Error reverting agent config", "error", err.Error())
}
}
}

slog.InfoContext(ctx, "Finished reconfiguring plugins", "plugins", p.plugins)
if topic == AgentConfigUpdateTopic {
response := p.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
"Successfully updated agent config", "")
p.bus.Publish(DataPlaneResponseTopic, ctx, &Message{Topic: DataPlaneResponseTopic, Data: response})
}
}

func (p *MessagePipe) handleConnectionAgentConfigUpdateTopic(ctx context.Context, msg *Message) {
slog.DebugContext(ctx, "Handling connection agent config update topic", "topic", msg.Topic)
agentConfig, ok := msg.Data.(*mpi.AgentConfig)
if !ok {
slog.ErrorContext(ctx, "Failed to parse agent config update message")
return
}

p.Reconfigure(ctx, agentConfig, msg.Topic, "")
}

func (p *MessagePipe) handleAgentConfigUpdateTopic(ctx context.Context, msg *Message) {
slog.DebugContext(ctx, "Received agent config update topic", "topic", msg.Topic)
mpRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest)
if !ok {
slog.ErrorContext(ctx, "Failed to parse agent config update message")
return
}

reconfigureRequest, ok := mpRequest.GetRequest().(*mpi.ManagementPlaneRequest_UpdateAgentConfigRequest)
if !ok {
slog.ErrorContext(ctx, "Failed to parse agent config update message")
return
}

correlationID := reconfigureRequest.UpdateAgentConfigRequest.GetMessageMeta().GetCorrelationId()
p.Reconfigure(ctx, reconfigureRequest.UpdateAgentConfigRequest.GetAgentConfig(), msg.Topic, correlationID)
}

func (p *MessagePipe) updateConfig(ctx context.Context, updateAgentConfig *config.Config) {
slog.InfoContext(ctx, "Updating agent config")
if updateAgentConfig.Log != nil && !reflect.DeepEqual(p.agentConfig.Log, updateAgentConfig.Log) {
slog.InfoContext(ctx, "Agent log level has been updated", "previous", p.agentConfig.Log,
"update", updateAgentConfig.Log)
p.agentConfig.Log = updateAgentConfig.Log

slogger := logger.New(
p.agentConfig.Log.Path,
p.agentConfig.Log.Level,
)
slog.SetDefault(slogger)
}

if updateAgentConfig.Labels != nil && validateLabels(updateAgentConfig.Labels) &&
!reflect.DeepEqual(p.agentConfig.Labels, updateAgentConfig.Labels) {
slog.InfoContext(ctx, "Agent labels have been updated", "previous", p.agentConfig.Labels,
"update", updateAgentConfig.Labels)
p.agentConfig.Labels = updateAgentConfig.Labels

// OTel Headers also need to be updated when labels have been updated
if p.agentConfig.Collector != nil {
slog.DebugContext(ctx, "Agent OTel headers have been updated")
config.AddLabelsAsOTelHeaders(p.agentConfig.Collector, updateAgentConfig.Labels)
}
}
slog.DebugContext(ctx, "Updated agent config")
}

func (p *MessagePipe) unsubscribePlugin(ctx context.Context, index int, plugin Plugin) error {
if index != -1 {
p.plugins = append(p.plugins[:index], p.plugins[index+1:]...)
Expand Down Expand Up @@ -209,3 +338,32 @@ func (p *MessagePipe) initPlugins(ctx context.Context) {
}
}
}

func (p *MessagePipe) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus,
message, err string,
) *mpi.DataPlaneResponse {
return &mpi.DataPlaneResponse{
MessageMeta: &mpi.MessageMeta{
MessageId: id.GenerateMessageID(),
CorrelationId: correlationID,
Timestamp: timestamppb.Now(),
},
CommandResponse: &mpi.CommandResponse{
Status: status,
Message: message,
Error: err,
},
}
}

func validateLabels(labels map[string]any) bool {
for _, value := range labels {
if val, ok := value.(string); ok {
if !config.ValidateLabel(val) {
return false
}
}
}

return true
}
Loading
Loading