@@ -8,9 +8,15 @@ package bus
88import (
99 "context"
1010 "log/slog"
11+ "reflect"
1112 "sync"
1213
14+ mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
15+ "github.com/nginx/agent/v3/internal/config"
16+ "github.com/nginx/agent/v3/internal/logger"
17+ "github.com/nginx/agent/v3/pkg/id"
1318 messagebus "github.com/vardius/message-bus"
19+ "google.golang.org/protobuf/types/known/timestamppb"
1420)
1521
1622type (
@@ -45,20 +51,24 @@ type (
4551 Info () * Info
4652 Process (ctx context.Context , msg * Message )
4753 Subscriptions () []string
54+ Reconfigure (ctx context.Context , agentConfig * config.Config ) error
4855 }
4956
5057 MessagePipe struct {
58+ agentConfig * config.Config
5159 bus messagebus.MessageBus
5260 messageChannel chan * MessageWithContext
5361 plugins []Plugin
5462 pluginsMutex sync.Mutex
63+ configMutex sync.Mutex
5564 }
5665)
5766
58- func NewMessagePipe (size int ) * MessagePipe {
67+ func NewMessagePipe (size int , agentConfig * config. Config ) * MessagePipe {
5968 return & MessagePipe {
6069 messageChannel : make (chan * MessageWithContext , size ),
6170 pluginsMutex : sync.Mutex {},
71+ agentConfig : agentConfig ,
6272 }
6373}
6474
@@ -110,6 +120,7 @@ func (p *MessagePipe) Process(ctx context.Context, messages ...*Message) {
110120 }
111121}
112122
123+ //nolint:contextcheck,revive // need to use context from the message for the correlationID not use the parent context
113124func (p * MessagePipe ) Run (ctx context.Context ) {
114125 p .pluginsMutex .Lock ()
115126 p .initPlugins (ctx )
@@ -126,7 +137,16 @@ func (p *MessagePipe) Run(ctx context.Context) {
126137
127138 return
128139 case m := <- p .messageChannel :
129- p .bus .Publish (m .message .Topic , m .ctx , m .message )
140+ if m .message != nil {
141+ switch m .message .Topic {
142+ case AgentConfigUpdateTopic :
143+ p .handleAgentConfigUpdateTopic (m .ctx , m .message )
144+ case ConnectionAgentConfigUpdateTopic :
145+ p .handleConnectionAgentConfigUpdateTopic (m .ctx , m .message )
146+ default :
147+ p .bus .Publish (m .message .Topic , m .ctx , m .message )
148+ }
149+ }
130150 }
131151 }
132152}
@@ -157,6 +177,115 @@ func (p *MessagePipe) Index(pluginName string, plugins []Plugin) int {
157177 return - 1
158178}
159179
180+ func (p * MessagePipe ) Reconfigure (ctx context.Context , agentConfig * mpi.AgentConfig , topic , correlationID string ) {
181+ var reconfigureError error
182+ p .configMutex .Lock ()
183+ defer p .configMutex .Unlock ()
184+ currentConfig := p .agentConfig
185+
186+ // convert agent config from *mpi.AgentConfig to *config.Config
187+ updateAgentConfig := config .FromAgentRemoteConfigProto (agentConfig )
188+
189+ // The check for updates to the config needs to be done here as the command plugin needs the latest agent config
190+ // to be sent in response to create connection requests.
191+ p .updateConfig (ctx , updateAgentConfig )
192+
193+ // Reconfigure each plugin with the new agent config
194+ for _ , plugin := range p .plugins {
195+ slog .DebugContext (ctx , "Reconfigure plugin" , "plugin" , plugin .Info ().Name )
196+ reconfigureError = plugin .Reconfigure (ctx , p .agentConfig )
197+ if reconfigureError != nil {
198+ slog .ErrorContext (ctx , "Reconfigure plugin failed" , "plugin" , plugin .Info ().Name )
199+ break
200+ }
201+ }
202+
203+ if reconfigureError != nil {
204+ slog .ErrorContext (ctx , "Error updating plugin with updated agent config, reverting" ,
205+ "error" , reconfigureError .Error ())
206+
207+ // If the agent update was received from a create connection request no data plane response needs to be sent
208+ if topic == AgentConfigUpdateTopic {
209+ response := p .createDataPlaneResponse (correlationID , mpi .CommandResponse_COMMAND_STATUS_FAILURE ,
210+ "Failed to update agent config" , reconfigureError .Error ())
211+ p .bus .Publish (DataPlaneResponseTopic , ctx , & Message {Topic : DataPlaneResponseTopic , Data : response })
212+ }
213+
214+ p .agentConfig = currentConfig
215+ for _ , plugin := range p .plugins {
216+ err := plugin .Reconfigure (ctx , currentConfig )
217+ if err != nil {
218+ slog .ErrorContext (ctx , "Error reverting agent config" , "error" , err .Error ())
219+ }
220+ }
221+ }
222+
223+ slog .InfoContext (ctx , "Finished reconfiguring plugins" , "plugins" , p .plugins )
224+ if topic == AgentConfigUpdateTopic {
225+ response := p .createDataPlaneResponse (correlationID , mpi .CommandResponse_COMMAND_STATUS_OK ,
226+ "Successfully updated agent config" , "" )
227+ p .bus .Publish (DataPlaneResponseTopic , ctx , & Message {Topic : DataPlaneResponseTopic , Data : response })
228+ }
229+ }
230+
231+ func (p * MessagePipe ) handleConnectionAgentConfigUpdateTopic (ctx context.Context , msg * Message ) {
232+ slog .DebugContext (ctx , "Handling connection agent config update topic" , "topic" , msg .Topic )
233+ agentConfig , ok := msg .Data .(* mpi.AgentConfig )
234+ if ! ok {
235+ slog .ErrorContext (ctx , "Failed to parse agent config update message" )
236+ return
237+ }
238+
239+ p .Reconfigure (ctx , agentConfig , msg .Topic , "" )
240+ }
241+
242+ func (p * MessagePipe ) handleAgentConfigUpdateTopic (ctx context.Context , msg * Message ) {
243+ slog .DebugContext (ctx , "Received agent config update topic" , "topic" , msg .Topic )
244+ mpRequest , ok := msg .Data .(* mpi.ManagementPlaneRequest )
245+ if ! ok {
246+ slog .ErrorContext (ctx , "Failed to parse agent config update message" )
247+ return
248+ }
249+
250+ reconfigureRequest , ok := mpRequest .GetRequest ().(* mpi.ManagementPlaneRequest_UpdateAgentConfigRequest )
251+ if ! ok {
252+ slog .ErrorContext (ctx , "Failed to parse agent config update message" )
253+ return
254+ }
255+
256+ correlationID := reconfigureRequest .UpdateAgentConfigRequest .GetMessageMeta ().GetCorrelationId ()
257+ p .Reconfigure (ctx , reconfigureRequest .UpdateAgentConfigRequest .GetAgentConfig (), msg .Topic , correlationID )
258+ }
259+
260+ func (p * MessagePipe ) updateConfig (ctx context.Context , updateAgentConfig * config.Config ) {
261+ slog .InfoContext (ctx , "Updating agent config" )
262+ if updateAgentConfig .Log != nil && ! reflect .DeepEqual (p .agentConfig .Log , updateAgentConfig .Log ) {
263+ slog .InfoContext (ctx , "Agent log level has been updated" , "previous" , p .agentConfig .Log ,
264+ "update" , updateAgentConfig .Log )
265+ p .agentConfig .Log = updateAgentConfig .Log
266+
267+ slogger := logger .New (
268+ p .agentConfig .Log .Path ,
269+ p .agentConfig .Log .Level ,
270+ )
271+ slog .SetDefault (slogger )
272+ }
273+
274+ if updateAgentConfig .Labels != nil && validateLabels (updateAgentConfig .Labels ) &&
275+ ! reflect .DeepEqual (p .agentConfig .Labels , updateAgentConfig .Labels ) {
276+ slog .InfoContext (ctx , "Agent labels have been updated" , "previous" , p .agentConfig .Labels ,
277+ "update" , updateAgentConfig .Labels )
278+ p .agentConfig .Labels = updateAgentConfig .Labels
279+
280+ // OTel Headers also need to be updated when labels have been updated
281+ if p .agentConfig .Collector != nil {
282+ slog .DebugContext (ctx , "Agent OTel headers have been updated" )
283+ config .AddLabelsAsOTelHeaders (p .agentConfig .Collector , updateAgentConfig .Labels )
284+ }
285+ }
286+ slog .DebugContext (ctx , "Updated agent config" )
287+ }
288+
160289func (p * MessagePipe ) unsubscribePlugin (ctx context.Context , index int , plugin Plugin ) error {
161290 if index != - 1 {
162291 p .plugins = append (p .plugins [:index ], p .plugins [index + 1 :]... )
@@ -209,3 +338,32 @@ func (p *MessagePipe) initPlugins(ctx context.Context) {
209338 }
210339 }
211340}
341+
342+ func (p * MessagePipe ) createDataPlaneResponse (correlationID string , status mpi.CommandResponse_CommandStatus ,
343+ message , err string ,
344+ ) * mpi.DataPlaneResponse {
345+ return & mpi.DataPlaneResponse {
346+ MessageMeta : & mpi.MessageMeta {
347+ MessageId : id .GenerateMessageID (),
348+ CorrelationId : correlationID ,
349+ Timestamp : timestamppb .Now (),
350+ },
351+ CommandResponse : & mpi.CommandResponse {
352+ Status : status ,
353+ Message : message ,
354+ Error : err ,
355+ },
356+ }
357+ }
358+
359+ func validateLabels (labels map [string ]any ) bool {
360+ for _ , value := range labels {
361+ if val , ok := value .(string ); ok {
362+ if ! config .ValidateLabel (val ) {
363+ return false
364+ }
365+ }
366+ }
367+
368+ return true
369+ }
0 commit comments