@@ -188,53 +188,165 @@ func (r *Resource) handleNginxPlusActionRequest(ctx context.Context, action *mpi
188188
189189 switch action .GetAction ().(type ) {
190190 case * mpi.NGINXPlusAction_UpdateHttpUpstreamServers :
191- slog .DebugContext (ctx , "Updating http upstream servers" ,
192- "request" , action .GetUpdateHttpUpstreamServers ())
193- add , update , del , err := r .resourceService .UpdateHTTPUpstreams (ctx , instance ,
194- action .GetUpdateHttpUpstreamServers ().GetHttpUpstreamName (),
195- action .GetUpdateHttpUpstreamServers ().GetServers ())
196- if err != nil {
197- slog .ErrorContext (ctx , "Unable to update HTTP servers of upstream" , "request" ,
198- action .GetUpdateHttpUpstreamServers (), "error" , err )
199- resp := r .createDataPlaneResponse (correlationID , mpi .CommandResponse_COMMAND_STATUS_FAILURE ,
200- "" , instanceID , err .Error ())
201- r .messagePipe .Process (ctx , & bus.Message {Topic : bus .DataPlaneResponseTopic , Data : resp })
191+ r .handleUpdateHTTPUpstreamServers (ctx , action , instance )
192+ case * mpi.NGINXPlusAction_GetHttpUpstreamServers :
193+ r .handleGetHTTPUpstreamServers (ctx , action , instance )
194+ case * mpi.NGINXPlusAction_UpdateStreamServers :
195+ r .handleUpdateStreamServers (ctx , action , instance )
196+ case * mpi.NGINXPlusAction_GetStreamUpstreams :
197+ r .handleGetStreamUpstreams (ctx , action , instance )
198+ case * mpi.NGINXPlusAction_GetUpstreams :
199+ r .handleGetUpstreams (ctx , action , instance )
200+ default :
201+ slog .DebugContext (ctx , "NGINX Plus action not implemented yet" )
202+ }
203+ }
202204
203- return
204- }
205+ // nolint: dupl
206+ func (r * Resource ) handleUpdateStreamServers (ctx context.Context , action * mpi.NGINXPlusAction , instance * mpi.Instance ) {
207+ correlationID := logger .GetCorrelationID (ctx )
208+ instanceID := instance .GetInstanceMeta ().GetInstanceId ()
205209
206- slog .DebugContext (ctx , "Successfully updated http upstream servers" , "http_upstream_name" ,
207- action .GetUpdateHttpUpstreamServers ().GetHttpUpstreamName (), "add" , len (add ), "update" , len (update ),
208- "delete" , len (del ))
209- resp := r .createDataPlaneResponse (correlationID , mpi .CommandResponse_COMMAND_STATUS_OK ,
210- "Successfully updated HTTP Upstreams" , instanceID , "" )
210+ slog .DebugContext (ctx , "Updating stream servers" , "request" , action .GetUpdateStreamServers ())
211211
212+ add , update , del , err := r .resourceService .UpdateStreamServers (ctx , instance ,
213+ action .GetUpdateStreamServers ().GetUpstreamStreamName (), action .GetUpdateStreamServers ().GetServers ())
214+ if err != nil {
215+ slog .ErrorContext (ctx , "Unable to update stream servers of upstream" , "request" ,
216+ action .GetUpdateHttpUpstreamServers (), "error" , err )
217+ resp := r .createDataPlaneResponse (correlationID , mpi .CommandResponse_COMMAND_STATUS_FAILURE ,
218+ "" , instanceID , err .Error ())
212219 r .messagePipe .Process (ctx , & bus.Message {Topic : bus .DataPlaneResponseTopic , Data : resp })
213220
214- case * mpi.NGINXPlusAction_GetHttpUpstreamServers :
215- slog .DebugContext (ctx , "Getting http upstream servers" , "request" , action .GetGetHttpUpstreamServers ())
216- upstreams , err := r .resourceService .GetUpstreams (ctx , instance ,
217- action .GetGetHttpUpstreamServers ().GetHttpUpstreamName ())
218- if err != nil {
219- slog .ErrorContext (ctx , "Unable to get HTTP servers of upstream" , "error" , err )
220- resp := r .createDataPlaneResponse (correlationID , mpi .CommandResponse_COMMAND_STATUS_FAILURE ,
221- "" , instanceID , err .Error ())
222- r .messagePipe .Process (ctx , & bus.Message {Topic : bus .DataPlaneResponseTopic , Data : resp })
221+ return
222+ }
223223
224- return
225- }
224+ slog .DebugContext (ctx , "Successfully updated stream upstream servers" , "http_upstream_name" ,
225+ action .GetUpdateHttpUpstreamServers ().GetHttpUpstreamName (), "add" , len (add ), "update" , len (update ),
226+ "delete" , len (del ))
226227
227- upstreamsJSON , err := json .Marshal (upstreams )
228- if err != nil {
229- slog .ErrorContext (ctx , "Unable to marshal http upstreams" , "err" , err )
230- }
231- resp := r .createDataPlaneResponse (correlationID , mpi .CommandResponse_COMMAND_STATUS_OK ,
232- string (upstreamsJSON ), instanceID , "" )
228+ resp := r .createDataPlaneResponse (correlationID , mpi .CommandResponse_COMMAND_STATUS_OK ,
229+ "Successfully updated stream upstream servers" , instanceID , "" )
230+
231+ r .messagePipe .Process (ctx , & bus.Message {Topic : bus .DataPlaneResponseTopic , Data : resp })
232+ }
233+
234+ // nolint: dupl
235+ func (r * Resource ) handleGetStreamUpstreams (ctx context.Context , action * mpi.NGINXPlusAction , instance * mpi.Instance ) {
236+ correlationID := logger .GetCorrelationID (ctx )
237+ instanceID := instance .GetInstanceMeta ().GetInstanceId ()
233238
239+ slog .DebugContext (ctx , "Getting Stream Upstreams" , "request" , action .GetUpdateStreamServers ())
240+
241+ streamUpstreams , err := r .resourceService .GetStreamUpstreams (ctx , instance )
242+ if err != nil {
243+ slog .ErrorContext (ctx , "Unable to get stream upstreams" , "error" , err )
244+ resp := r .createDataPlaneResponse (correlationID , mpi .CommandResponse_COMMAND_STATUS_FAILURE ,
245+ "" , instanceID , err .Error ())
234246 r .messagePipe .Process (ctx , & bus.Message {Topic : bus .DataPlaneResponseTopic , Data : resp })
235- default :
236- slog .DebugContext (ctx , "NGINX Plus action not implemented yet" )
247+
248+ return
249+ }
250+
251+ streamUpstreamsJSON , err := json .Marshal (streamUpstreams )
252+ if err != nil {
253+ slog .ErrorContext (ctx , "Unable to marshal stream upstreams" , "err" , err )
254+ }
255+
256+ resp := r .createDataPlaneResponse (correlationID , mpi .CommandResponse_COMMAND_STATUS_OK ,
257+ string (streamUpstreamsJSON ), instanceID , "" )
258+
259+ r .messagePipe .Process (ctx , & bus.Message {Topic : bus .DataPlaneResponseTopic , Data : resp })
260+ }
261+
262+ // nolint: dupl
263+ func (r * Resource ) handleGetUpstreams (ctx context.Context , action * mpi.NGINXPlusAction , instance * mpi.Instance ) {
264+ correlationID := logger .GetCorrelationID (ctx )
265+ instanceID := instance .GetInstanceMeta ().GetInstanceId ()
266+
267+ slog .DebugContext (ctx , "Getting upstreams" , "request" , action .GetUpdateStreamServers ())
268+
269+ upstreams , err := r .resourceService .GetUpstreams (ctx , instance )
270+ if err != nil {
271+ slog .InfoContext (ctx , "Unable to get upstreams" , "error" , err )
272+ resp := r .createDataPlaneResponse (correlationID , mpi .CommandResponse_COMMAND_STATUS_FAILURE ,
273+ "" , instanceID , err .Error ())
274+ r .messagePipe .Process (ctx , & bus.Message {Topic : bus .DataPlaneResponseTopic , Data : resp })
275+
276+ return
277+ }
278+
279+ upstreamsJSON , err := json .Marshal (upstreams )
280+ if err != nil {
281+ slog .ErrorContext (ctx , "Unable to marshal upstreams" , "err" , err )
282+ }
283+
284+ resp := r .createDataPlaneResponse (correlationID , mpi .CommandResponse_COMMAND_STATUS_OK ,
285+ string (upstreamsJSON ), instanceID , "" )
286+
287+ r .messagePipe .Process (ctx , & bus.Message {Topic : bus .DataPlaneResponseTopic , Data : resp })
288+ }
289+
290+ // nolint: dupl
291+ func (r * Resource ) handleUpdateHTTPUpstreamServers (ctx context.Context , action * mpi.NGINXPlusAction ,
292+ instance * mpi.Instance ,
293+ ) {
294+ correlationID := logger .GetCorrelationID (ctx )
295+ instanceID := instance .GetInstanceMeta ().GetInstanceId ()
296+
297+ slog .DebugContext (ctx , "Updating http upstream servers" , "request" , action .GetUpdateHttpUpstreamServers ())
298+
299+ add , update , del , err := r .resourceService .UpdateHTTPUpstreamServers (ctx , instance ,
300+ action .GetUpdateHttpUpstreamServers ().GetHttpUpstreamName (),
301+ action .GetUpdateHttpUpstreamServers ().GetServers ())
302+ if err != nil {
303+ slog .ErrorContext (ctx , "Unable to update HTTP servers of upstream" , "request" ,
304+ action .GetUpdateHttpUpstreamServers (), "error" , err )
305+ resp := r .createDataPlaneResponse (correlationID , mpi .CommandResponse_COMMAND_STATUS_FAILURE ,
306+ "" , instanceID , err .Error ())
307+ r .messagePipe .Process (ctx , & bus.Message {Topic : bus .DataPlaneResponseTopic , Data : resp })
308+
309+ return
237310 }
311+
312+ slog .DebugContext (ctx , "Successfully updated http upstream servers" , "http_upstream_name" ,
313+ action .GetUpdateHttpUpstreamServers ().GetHttpUpstreamName (), "add" , len (add ), "update" , len (update ),
314+ "delete" , len (del ))
315+
316+ resp := r .createDataPlaneResponse (correlationID , mpi .CommandResponse_COMMAND_STATUS_OK ,
317+ "Successfully updated HTTP Upstreams" , instanceID , "" )
318+
319+ r .messagePipe .Process (ctx , & bus.Message {Topic : bus .DataPlaneResponseTopic , Data : resp })
320+ }
321+
322+ func (r * Resource ) handleGetHTTPUpstreamServers (ctx context.Context , action * mpi.NGINXPlusAction ,
323+ instance * mpi.Instance ,
324+ ) {
325+ correlationID := logger .GetCorrelationID (ctx )
326+ instanceID := instance .GetInstanceMeta ().GetInstanceId ()
327+
328+ slog .DebugContext (ctx , "Getting http upstream servers" , "request" , action .GetGetHttpUpstreamServers ())
329+
330+ upstreams , err := r .resourceService .GetHTTPUpstreamServers (ctx , instance ,
331+ action .GetGetHttpUpstreamServers ().GetHttpUpstreamName ())
332+ if err != nil {
333+ slog .ErrorContext (ctx , "Unable to get HTTP servers of upstream" , "error" , err )
334+ resp := r .createDataPlaneResponse (correlationID , mpi .CommandResponse_COMMAND_STATUS_FAILURE ,
335+ "" , instanceID , err .Error ())
336+ r .messagePipe .Process (ctx , & bus.Message {Topic : bus .DataPlaneResponseTopic , Data : resp })
337+
338+ return
339+ }
340+
341+ upstreamsJSON , err := json .Marshal (upstreams )
342+ if err != nil {
343+ slog .ErrorContext (ctx , "Unable to marshal http upstreams" , "err" , err )
344+ }
345+
346+ resp := r .createDataPlaneResponse (correlationID , mpi .CommandResponse_COMMAND_STATUS_OK ,
347+ string (upstreamsJSON ), instanceID , "" )
348+
349+ r .messagePipe .Process (ctx , & bus.Message {Topic : bus .DataPlaneResponseTopic , Data : resp })
238350}
239351
240352func (r * Resource ) handleWriteConfigSuccessful (ctx context.Context , msg * bus.Message ) {
0 commit comments