Skip to content

Commit 2eb0944

Browse files
committed
refactor remote config add labels
1 parent b5fb1ce commit 2eb0944

21 files changed

+467
-330
lines changed

api/grpc/mpi/v1/helpers.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
package v1
77

88
import (
9+
"log/slog"
10+
911
"google.golang.org/protobuf/types/known/structpb"
1012
)
1113

@@ -45,3 +47,28 @@ func ConvertToStructs(input map[string]any) ([]*structpb.Struct, error) {
4547

4648
return structs, nil
4749
}
50+
51+
func ConvertToMap(input []*structpb.Struct) map[string]any {
52+
convertedMap := make(map[string]any)
53+
for _, value := range input {
54+
for key, field := range value.GetFields() {
55+
kind := field.GetKind()
56+
switch kind.(type) {
57+
case *structpb.Value_StringValue:
58+
convertedMap[key] = field.GetStringValue()
59+
case *structpb.Value_NumberValue:
60+
convertedMap[key] = int(field.GetNumberValue())
61+
case *structpb.Value_StructValue:
62+
convertedMap[key] = field.GetStructValue()
63+
case *structpb.Value_ListValue:
64+
convertedMap[key] = field.GetListValue()
65+
case *structpb.Value_BoolValue:
66+
convertedMap[key] = field.GetBoolValue()
67+
default:
68+
slog.Warn("unknown type for map conversion", "value", kind)
69+
}
70+
}
71+
}
72+
73+
return convertedMap
74+
}

api/grpc/mpi/v1/helpers_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,45 @@ func TestConvertToStructs(t *testing.T) {
7070
})
7171
}
7272
}
73+
74+
func TestConvertToMaps(t *testing.T) {
75+
tests := []struct {
76+
name string
77+
expected map[string]any
78+
input []*structpb.Struct
79+
}{
80+
{
81+
name: "Test 1: Valid input with simple key-value pairs",
82+
expected: map[string]any{
83+
"key1": "value1",
84+
"key2": 123,
85+
"key3": true,
86+
},
87+
input: []*structpb.Struct{
88+
{
89+
Fields: map[string]*structpb.Value{
90+
"key1": structpb.NewStringValue("value1"),
91+
},
92+
},
93+
{
94+
Fields: map[string]*structpb.Value{
95+
"key2": structpb.NewNumberValue(123),
96+
},
97+
},
98+
{
99+
Fields: map[string]*structpb.Value{
100+
"key3": structpb.NewBoolValue(true),
101+
},
102+
},
103+
},
104+
},
105+
}
106+
107+
for _, tt := range tests {
108+
t.Run(tt.name, func(t *testing.T) {
109+
got := ConvertToMap(tt.input)
110+
111+
assert.Equal(t, tt.expected, got)
112+
})
113+
}
114+
}

internal/app.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (a *App) Run(ctx context.Context) error {
5050
slog.String("commit", a.commit),
5151
)
5252

53-
messagePipe := bus.NewMessagePipe(defaultMessagePipeChannelSize)
53+
messagePipe := bus.NewMessagePipe(defaultMessagePipeChannelSize, agentConfig)
5454
err = messagePipe.Register(defaultQueueSize, plugin.LoadPlugins(ctx, agentConfig))
5555
if err != nil {
5656
slog.ErrorContext(ctx, "Failed to register plugins", "error", err)

internal/bus/message_pipe.go

Lines changed: 148 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,15 @@ package bus
88
import (
99
"context"
1010
"log/slog"
11+
"reflect"
1112
"sync"
1213

14+
mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
15+
"github.com/nginx/agent/v3/internal/config"
16+
"github.com/nginx/agent/v3/internal/logger"
17+
"github.com/nginx/agent/v3/pkg/id"
1318
messagebus "github.com/vardius/message-bus"
19+
"google.golang.org/protobuf/types/known/timestamppb"
1420
)
1521

1622
type (
@@ -45,20 +51,24 @@ type (
4551
Info() *Info
4652
Process(ctx context.Context, msg *Message)
4753
Subscriptions() []string
54+
Reconfigure(ctx context.Context, agentConfig *config.Config) error
4855
}
4956

5057
MessagePipe struct {
58+
agentConfig *config.Config
5159
bus messagebus.MessageBus
5260
messageChannel chan *MessageWithContext
5361
plugins []Plugin
5462
pluginsMutex sync.Mutex
63+
configMutex sync.Mutex
5564
}
5665
)
5766

58-
func NewMessagePipe(size int) *MessagePipe {
67+
func NewMessagePipe(size int, agentConfig *config.Config) *MessagePipe {
5968
return &MessagePipe{
6069
messageChannel: make(chan *MessageWithContext, size),
6170
pluginsMutex: sync.Mutex{},
71+
agentConfig: agentConfig,
6272
}
6373
}
6474

@@ -110,6 +120,7 @@ func (p *MessagePipe) Process(ctx context.Context, messages ...*Message) {
110120
}
111121
}
112122

123+
//nolint:contextcheck,revive // need to use context from the message for the correlationID not use the parent context
113124
func (p *MessagePipe) Run(ctx context.Context) {
114125
p.pluginsMutex.Lock()
115126
p.initPlugins(ctx)
@@ -126,7 +137,17 @@ func (p *MessagePipe) Run(ctx context.Context) {
126137

127138
return
128139
case m := <-p.messageChannel:
129-
p.bus.Publish(m.message.Topic, m.ctx, m.message)
140+
if m.message != nil {
141+
switch m.message.Topic {
142+
case AgentConfigUpdateTopic:
143+
p.handleAgentConfigUpdateTopic(m.ctx, m.message)
144+
case ConnectionAgentConfigUpdateTopic:
145+
p.handleConnectionAgentConfigUpdateTopic(m.ctx, m.message)
146+
default:
147+
slog.InfoContext(ctx, "Publishing message", "topic", m.message.Topic, "message----", m.message)
148+
p.bus.Publish(m.message.Topic, m.ctx, m.message)
149+
}
150+
}
130151
}
131152
}
132153
}
@@ -157,6 +178,114 @@ func (p *MessagePipe) Index(pluginName string, plugins []Plugin) int {
157178
return -1
158179
}
159180

181+
func (p *MessagePipe) Reconfigure(ctx context.Context, agentConfig *mpi.AgentConfig, topic, correlationID string) {
182+
var reconfigureError error
183+
p.configMutex.Lock()
184+
defer p.configMutex.Unlock()
185+
currentConfig := p.agentConfig
186+
187+
// convert agent config from *mpi.AgentConfig to *config.Config
188+
updateAgentConfig := config.FromAgentRemoteConfigProto(agentConfig)
189+
190+
// The check for updates to the config needs to be done here as the command plugin needs the latest agent config
191+
// to be sent in response to create connection requests.
192+
p.updateConfig(ctx, updateAgentConfig)
193+
194+
// Reconfigure each plugin with the new agent config
195+
for _, plugin := range p.plugins {
196+
slog.InfoContext(ctx, "Reconfigure plugin", "plugin", plugin.Info().Name)
197+
reconfigureError = plugin.Reconfigure(ctx, p.agentConfig)
198+
if reconfigureError != nil {
199+
slog.ErrorContext(ctx, "Reconfigure plugin failed", "plugin", plugin.Info().Name)
200+
break
201+
}
202+
}
203+
204+
if reconfigureError != nil {
205+
slog.ErrorContext(ctx, "Error updating plugin with updated agent config, reverting",
206+
"error", reconfigureError.Error())
207+
208+
// If the agent update was received from a create connection request no data plane response needs to be sent
209+
if topic == AgentConfigUpdateTopic {
210+
response := p.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
211+
"Failed to update agent config", reconfigureError.Error())
212+
p.bus.Publish(DataPlaneResponseTopic, ctx, &Message{Topic: DataPlaneResponseTopic, Data: response})
213+
}
214+
215+
p.agentConfig = currentConfig
216+
for _, plugin := range p.plugins {
217+
err := plugin.Reconfigure(ctx, currentConfig)
218+
if err != nil {
219+
slog.ErrorContext(ctx, "Error reverting agent config", "error", err.Error())
220+
}
221+
}
222+
}
223+
224+
slog.InfoContext(ctx, "Finished reconfigure plugin", "plugins", p.plugins)
225+
if topic == AgentConfigUpdateTopic {
226+
response := p.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
227+
"Successfully updated agent config", "")
228+
p.bus.Publish(DataPlaneResponseTopic, ctx, &Message{Topic: DataPlaneResponseTopic, Data: response})
229+
}
230+
}
231+
232+
func (p *MessagePipe) handleConnectionAgentConfigUpdateTopic(ctx context.Context, msg *Message) {
233+
slog.DebugContext(ctx, "Handling connection agent config update topic", "topic", msg.Topic)
234+
agentConfig, ok := msg.Data.(*mpi.AgentConfig)
235+
if !ok {
236+
slog.ErrorContext(ctx, "Failed to parse agent config update message")
237+
return
238+
}
239+
240+
p.Reconfigure(ctx, agentConfig, msg.Topic, "")
241+
}
242+
243+
func (p *MessagePipe) handleAgentConfigUpdateTopic(ctx context.Context, msg *Message) {
244+
slog.DebugContext(ctx, "Received agent config update topic", "topic", msg.Topic)
245+
mpRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest)
246+
if !ok {
247+
slog.ErrorContext(ctx, "Failed to parse agent config update message")
248+
return
249+
}
250+
251+
reconfigureRequest, ok := mpRequest.GetRequest().(*mpi.ManagementPlaneRequest_UpdateAgentConfigRequest)
252+
if !ok {
253+
slog.ErrorContext(ctx, "Failed to parse agent config update message")
254+
return
255+
}
256+
257+
correlationID := reconfigureRequest.UpdateAgentConfigRequest.GetMessageMeta().GetCorrelationId()
258+
p.Reconfigure(ctx, reconfigureRequest.UpdateAgentConfigRequest.GetAgentConfig(), msg.Topic, correlationID)
259+
}
260+
261+
func (p *MessagePipe) updateConfig(ctx context.Context, updateAgentConfig *config.Config) {
262+
slog.InfoContext(ctx, "Updating agent config")
263+
if updateAgentConfig.Log != nil && !reflect.DeepEqual(p.agentConfig.Log, updateAgentConfig.Log) {
264+
slog.DebugContext(ctx, "Agent log level has been updated", "previous", p.agentConfig.Log,
265+
"update", updateAgentConfig.Log)
266+
p.agentConfig.Log = updateAgentConfig.Log
267+
268+
slogger := logger.New(
269+
p.agentConfig.Log.Path,
270+
p.agentConfig.Log.Level,
271+
)
272+
slog.SetDefault(slogger)
273+
}
274+
275+
if updateAgentConfig.Labels != nil && !reflect.DeepEqual(p.agentConfig.Labels, updateAgentConfig.Labels) {
276+
slog.DebugContext(ctx, "Agent labels have been updated", "previous", p.agentConfig.Labels,
277+
"update", updateAgentConfig.Labels)
278+
p.agentConfig.Labels = updateAgentConfig.Labels
279+
280+
// OTel Headers also need to be updated when labels have been updated
281+
if p.agentConfig.Collector != nil {
282+
slog.DebugContext(ctx, "Agent OTel headers have been updated")
283+
config.AddLabelsAsOTelHeaders(p.agentConfig.Collector, updateAgentConfig.Labels)
284+
}
285+
}
286+
slog.DebugContext(ctx, "Updated agent config")
287+
}
288+
160289
func (p *MessagePipe) unsubscribePlugin(ctx context.Context, index int, plugin Plugin) error {
161290
if index != -1 {
162291
p.plugins = append(p.plugins[:index], p.plugins[index+1:]...)
@@ -209,3 +338,20 @@ func (p *MessagePipe) initPlugins(ctx context.Context) {
209338
}
210339
}
211340
}
341+
342+
func (p *MessagePipe) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus,
343+
message, err string,
344+
) *mpi.DataPlaneResponse {
345+
return &mpi.DataPlaneResponse{
346+
MessageMeta: &mpi.MessageMeta{
347+
MessageId: id.GenerateMessageID(),
348+
CorrelationId: correlationID,
349+
Timestamp: timestamppb.Now(),
350+
},
351+
CommandResponse: &mpi.CommandResponse{
352+
Status: status,
353+
Message: message,
354+
Error: err,
355+
},
356+
}
357+
}

internal/bus/message_pipe_test.go

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@ package bus
66

77
import (
88
"context"
9+
"log/slog"
910
"testing"
1011
"time"
1112

13+
"github.com/nginx/agent/v3/internal/config"
14+
"github.com/nginx/agent/v3/test/types"
1215
"github.com/stretchr/testify/require"
1316

1417
"github.com/stretchr/testify/assert"
@@ -41,6 +44,11 @@ func (*testPlugin) Subscriptions() []string {
4144
return []string{"test.message"}
4245
}
4346

47+
func (p *testPlugin) Reconfigure(ctx context.Context, agentConfig *config.Config) error {
48+
p.Called()
49+
return nil
50+
}
51+
4452
func TestMessagePipe(t *testing.T) {
4553
messages := []*Message{
4654
{Topic: "test.message", Data: 1},
@@ -58,7 +66,7 @@ func TestMessagePipe(t *testing.T) {
5866
ctx, cancel := context.WithCancel(context.Background())
5967
pipelineDone := make(chan bool)
6068

61-
messagePipe := NewMessagePipe(100)
69+
messagePipe := NewMessagePipe(100, types.AgentConfig())
6270
err := messagePipe.Register(10, []Plugin{plugin})
6371

6472
require.NoError(t, err)
@@ -84,7 +92,7 @@ func TestMessagePipe_DeRegister(t *testing.T) {
8492
ctx, cancel := context.WithCancel(context.Background())
8593
defer cancel()
8694

87-
messagePipe := NewMessagePipe(100)
95+
messagePipe := NewMessagePipe(100, types.AgentConfig())
8896
err := messagePipe.Register(100, []Plugin{plugin})
8997

9098
require.NoError(t, err)
@@ -105,7 +113,7 @@ func TestMessagePipe_IsPluginRegistered(t *testing.T) {
105113
ctx, cancel := context.WithCancel(context.Background())
106114
pipelineDone := make(chan bool)
107115

108-
messagePipe := NewMessagePipe(100)
116+
messagePipe := NewMessagePipe(100, types.AgentConfig())
109117
err := messagePipe.Register(10, []Plugin{plugin})
110118

111119
require.NoError(t, err)
@@ -121,3 +129,27 @@ func TestMessagePipe_IsPluginRegistered(t *testing.T) {
121129
assert.True(t, messagePipe.IsPluginRegistered(plugin.Info().Name))
122130
assert.False(t, messagePipe.IsPluginRegistered("metrics"))
123131
}
132+
133+
func TestMessagePipe_updateConfig(t *testing.T) {
134+
initialConfig := types.AgentConfig()
135+
initialConfig.Log.Level = "INFO"
136+
initialConfig.Log.Path = ""
137+
138+
messagePipe := NewMessagePipe(100, initialConfig)
139+
originalLogger := slog.Default()
140+
141+
updatedConfig := &config.Config{
142+
Log: &config.Log{
143+
Path: "/etc/nginx-agent/",
144+
Level: "DEBUG",
145+
},
146+
}
147+
148+
messagePipe.updateConfig(context.Background(), updatedConfig)
149+
150+
require.Equal(t, messagePipe.agentConfig.Log.Path, updatedConfig.Log.Path)
151+
require.Equal(t, messagePipe.agentConfig.Log.Level, updatedConfig.Log.Level)
152+
153+
newLogger := slog.Default()
154+
require.NotEqual(t, originalLogger, newLogger)
155+
}

0 commit comments

Comments
 (0)