Skip to content

Commit aadb015

Browse files
committed
PR feedback
1 parent 85115e3 commit aadb015

File tree

5 files changed

+197
-191
lines changed

5 files changed

+197
-191
lines changed

internal/datasource/proto/message.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ import (
99
"log/slog"
1010
"time"
1111

12+
mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
13+
"google.golang.org/protobuf/types/known/timestamppb"
14+
1215
"github.com/google/uuid"
1316
agentUuid "github.com/nginx/agent/v3/pkg/uuid"
1417
)
@@ -29,3 +32,21 @@ func GenerateMessageID() string {
2932

3033
return uuidv7.String()
3134
}
35+
36+
func CreateDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus,
37+
message, instanceID, err string,
38+
) *mpi.DataPlaneResponse {
39+
return &mpi.DataPlaneResponse{
40+
MessageMeta: &mpi.MessageMeta{
41+
MessageId: GenerateMessageID(),
42+
CorrelationId: correlationID,
43+
Timestamp: timestamppb.Now(),
44+
},
45+
CommandResponse: &mpi.CommandResponse{
46+
Status: status,
47+
Message: message,
48+
Error: err,
49+
},
50+
InstanceId: instanceID,
51+
}
52+
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
// Copyright (c) F5, Inc.
2+
//
3+
// This source code is licensed under the Apache License, Version 2.0 license found in the
4+
// LICENSE file in the root directory of this source tree.
5+
6+
package resource
7+
8+
import (
9+
"context"
10+
"encoding/json"
11+
"log/slog"
12+
13+
mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
14+
"github.com/nginx/agent/v3/internal/datasource/proto"
15+
"github.com/nginx/agent/v3/internal/logger"
16+
)
17+
18+
const emptyResponse = "{}"
19+
20+
type APIAction struct {
21+
ResourceService resourceServiceInterface
22+
}
23+
24+
func (a *APIAction) HandleUpdateStreamServersRequest(ctx context.Context, action *mpi.NGINXPlusAction,
25+
instance *mpi.Instance,
26+
) *mpi.DataPlaneResponse {
27+
correlationID := logger.GetCorrelationID(ctx)
28+
instanceID := instance.GetInstanceMeta().GetInstanceId()
29+
30+
add, update, del, err := a.ResourceService.UpdateStreamServers(ctx, instance,
31+
action.GetUpdateStreamServers().GetUpstreamStreamName(), action.GetUpdateStreamServers().GetServers())
32+
if err != nil {
33+
slog.ErrorContext(ctx, "Unable to update stream servers of upstream", "request",
34+
action.GetUpdateHttpUpstreamServers(), "error", err)
35+
36+
return proto.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
37+
"", instanceID, err.Error())
38+
}
39+
40+
slog.DebugContext(ctx, "Successfully updated stream upstream servers", "http_upstream_name",
41+
action.GetUpdateHttpUpstreamServers().GetHttpUpstreamName(), "add", len(add), "update", len(update),
42+
"delete", len(del))
43+
44+
return proto.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
45+
"Successfully updated stream upstream servers", instanceID, "")
46+
}
47+
48+
func (a *APIAction) HandleGetStreamUpstreams(ctx context.Context, instance *mpi.Instance) *mpi.DataPlaneResponse {
49+
correlationID := logger.GetCorrelationID(ctx)
50+
instanceID := instance.GetInstanceMeta().GetInstanceId()
51+
streamUpstreamsResponse := emptyResponse
52+
53+
streamUpstreams, err := a.ResourceService.GetStreamUpstreams(ctx, instance)
54+
if err != nil {
55+
slog.ErrorContext(ctx, "Unable to get stream upstreams", "error", err)
56+
return proto.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
57+
"", instanceID, err.Error())
58+
}
59+
60+
if streamUpstreams != nil {
61+
streamUpstreamsJSON, jsonErr := json.Marshal(streamUpstreams)
62+
if jsonErr != nil {
63+
slog.ErrorContext(ctx, "Unable to marshal stream upstreams", "err", err)
64+
}
65+
streamUpstreamsResponse = string(streamUpstreamsJSON)
66+
}
67+
68+
return proto.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
69+
streamUpstreamsResponse, instanceID, "")
70+
}
71+
72+
func (a *APIAction) HandleGetUpstreams(ctx context.Context, instance *mpi.Instance) *mpi.DataPlaneResponse {
73+
correlationID := logger.GetCorrelationID(ctx)
74+
instanceID := instance.GetInstanceMeta().GetInstanceId()
75+
upstreamsResponse := emptyResponse
76+
77+
upstreams, err := a.ResourceService.GetUpstreams(ctx, instance)
78+
if err != nil {
79+
slog.InfoContext(ctx, "Unable to get upstreams", "error", err)
80+
81+
return proto.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
82+
"", instanceID, err.Error())
83+
}
84+
85+
if upstreams != nil {
86+
upstreamsJSON, jsonErr := json.Marshal(upstreams)
87+
if jsonErr != nil {
88+
slog.ErrorContext(ctx, "Unable to marshal upstreams", "err", err)
89+
}
90+
upstreamsResponse = string(upstreamsJSON)
91+
}
92+
93+
return proto.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
94+
upstreamsResponse, instanceID, "")
95+
}
96+
97+
func (a *APIAction) HandleUpdateHTTPUpstreams(ctx context.Context, action *mpi.NGINXPlusAction,
98+
instance *mpi.Instance,
99+
) *mpi.DataPlaneResponse {
100+
correlationID := logger.GetCorrelationID(ctx)
101+
instanceID := instance.GetInstanceMeta().GetInstanceId()
102+
103+
add, update, del, err := a.ResourceService.UpdateHTTPUpstreamServers(ctx, instance,
104+
action.GetUpdateHttpUpstreamServers().GetHttpUpstreamName(),
105+
action.GetUpdateHttpUpstreamServers().GetServers())
106+
if err != nil {
107+
slog.ErrorContext(ctx, "Unable to update HTTP servers of upstream", "request",
108+
action.GetUpdateHttpUpstreamServers(), "error", err)
109+
110+
return proto.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
111+
"", instanceID, err.Error())
112+
}
113+
114+
slog.DebugContext(ctx, "Successfully updated http upstream servers", "http_upstream_name",
115+
action.GetUpdateHttpUpstreamServers().GetHttpUpstreamName(), "add", len(add), "update", len(update),
116+
"delete", len(del))
117+
118+
return proto.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
119+
"Successfully updated HTTP Upstreams", instanceID, "")
120+
}
121+
122+
func (a *APIAction) HandleGetHTTPUpstreamsServers(ctx context.Context, action *mpi.NGINXPlusAction,
123+
instance *mpi.Instance,
124+
) *mpi.DataPlaneResponse {
125+
correlationID := logger.GetCorrelationID(ctx)
126+
instanceID := instance.GetInstanceMeta().GetInstanceId()
127+
upstreamsResponse := emptyResponse
128+
129+
upstreams, err := a.ResourceService.GetHTTPUpstreamServers(ctx, instance,
130+
action.GetGetHttpUpstreamServers().GetHttpUpstreamName())
131+
if err != nil {
132+
slog.ErrorContext(ctx, "Unable to get HTTP servers of upstream", "error", err)
133+
return proto.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
134+
"", instanceID, err.Error())
135+
}
136+
137+
if upstreams != nil {
138+
upstreamsJSON, jsonErr := json.Marshal(upstreams)
139+
if jsonErr != nil {
140+
slog.ErrorContext(ctx, "Unable to marshal http upstreams", "err", err)
141+
}
142+
upstreamsResponse = string(upstreamsJSON)
143+
}
144+
145+
return proto.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
146+
upstreamsResponse, instanceID, "")
147+
}

0 commit comments

Comments
 (0)