-
Notifications
You must be signed in to change notification settings - Fork 105
Expand file tree
/
Copy pathresource_service.go
More file actions
415 lines (336 loc) · 13.3 KB
/
resource_service.go
File metadata and controls
415 lines (336 loc) · 13.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
// Copyright (c) F5, Inc.
//
// This source code is licensed under the Apache License, Version 2.0 license found in the
// LICENSE file in the root directory of this source tree.
package resource
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net"
"net/http"
"strings"
"sync"
parser "github.com/nginx/agent/v3/internal/datasource/config"
datasource "github.com/nginx/agent/v3/internal/datasource/proto"
"github.com/nginx/agent/v3/internal/model"
"google.golang.org/protobuf/proto"
"github.com/nginxinc/nginx-plus-go-client/v2/client"
"google.golang.org/protobuf/types/known/structpb"
"github.com/nginx/agent/v3/internal/config"
"github.com/nginx/agent/v3/internal/datasource/host"
mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
)
const (
apiFormat = "http://%s%s"
unixPlusAPIFormat = "http://nginx-plus-api%s"
)
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6@v6.8.1 -generate
//counterfeiter:generate . resourceServiceInterface
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6@v6.8.1 -generate
//counterfeiter:generate . logTailerOperator
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6@v6.8.1 -generate
//counterfeiter:generate . instanceOperator
type resourceServiceInterface interface {
AddInstances(instanceList []*mpi.Instance) *mpi.Resource
UpdateInstances(ctx context.Context, instanceList []*mpi.Instance) *mpi.Resource
DeleteInstances(ctx context.Context, instanceList []*mpi.Instance) *mpi.Resource
ApplyConfig(ctx context.Context, instanceID string) (*model.NginxConfigContext, error)
Instance(instanceID string) *mpi.Instance
GetHTTPUpstreamServers(ctx context.Context, instance *mpi.Instance, upstreams string) ([]client.UpstreamServer,
error)
UpdateHTTPUpstreamServers(ctx context.Context, instance *mpi.Instance, upstream string,
upstreams []*structpb.Struct) (added, updated, deleted []client.UpstreamServer, err error)
GetUpstreams(ctx context.Context, instance *mpi.Instance) (*client.Upstreams, error)
GetStreamUpstreams(ctx context.Context, instance *mpi.Instance) (*client.StreamUpstreams, error)
UpdateStreamServers(ctx context.Context, instance *mpi.Instance, upstream string,
upstreams []*structpb.Struct) (added, updated, deleted []client.StreamUpstreamServer, err error)
}
type (
instanceOperator interface {
Validate(ctx context.Context, instance *mpi.Instance) error
Reload(ctx context.Context, instance *mpi.Instance) error
}
logTailerOperator interface {
Tail(ctx context.Context, errorLogs string, errorChannel chan error)
}
)
type ResourceService struct {
resource *mpi.Resource
nginxConfigParser parser.ConfigParser
agentConfig *config.Config
instanceOperators map[string]instanceOperator // key is instance ID
info host.InfoInterface
resourceMutex sync.Mutex
operatorsMutex sync.Mutex
}
func NewResourceService(ctx context.Context, agentConfig *config.Config) *ResourceService {
resourceService := &ResourceService{
resource: &mpi.Resource{},
resourceMutex: sync.Mutex{},
info: host.NewInfo(),
operatorsMutex: sync.Mutex{},
instanceOperators: make(map[string]instanceOperator),
nginxConfigParser: parser.NewNginxConfigParser(agentConfig),
agentConfig: agentConfig,
}
resourceService.updateResourceInfo(ctx)
return resourceService
}
func (r *ResourceService) AddInstances(instanceList []*mpi.Instance) *mpi.Resource {
r.resourceMutex.Lock()
defer r.resourceMutex.Unlock()
r.resource.Instances = append(r.resource.GetInstances(), instanceList...)
r.AddOperator(instanceList)
return r.resource
}
func (r *ResourceService) Instance(instanceID string) *mpi.Instance {
for _, instance := range r.resource.GetInstances() {
if instance.GetInstanceMeta().GetInstanceId() == instanceID {
return instance
}
}
return nil
}
func (r *ResourceService) AddOperator(instanceList []*mpi.Instance) {
r.operatorsMutex.Lock()
defer r.operatorsMutex.Unlock()
for _, instance := range instanceList {
r.instanceOperators[instance.GetInstanceMeta().GetInstanceId()] = NewInstanceOperator(r.agentConfig)
}
}
func (r *ResourceService) RemoveOperator(instanceList []*mpi.Instance) {
r.operatorsMutex.Lock()
defer r.operatorsMutex.Unlock()
for _, instance := range instanceList {
delete(r.instanceOperators, instance.GetInstanceMeta().GetInstanceId())
}
}
func (r *ResourceService) UpdateInstances(ctx context.Context, instanceList []*mpi.Instance) *mpi.Resource {
r.resourceMutex.Lock()
defer r.resourceMutex.Unlock()
for _, updatedInstance := range instanceList {
resourceCopy, ok := proto.Clone(r.resource).(*mpi.Resource)
if ok {
for _, instance := range resourceCopy.GetInstances() {
if updatedInstance.GetInstanceMeta().GetInstanceId() == instance.GetInstanceMeta().GetInstanceId() {
instance.InstanceMeta = updatedInstance.GetInstanceMeta()
instance.InstanceRuntime = updatedInstance.GetInstanceRuntime()
instance.InstanceConfig = updatedInstance.GetInstanceConfig()
}
}
r.resource = resourceCopy
} else {
slog.WarnContext(ctx, "Unable to clone resource while updating instances", "resource",
r.resource, "instances", instanceList)
}
}
return r.resource
}
func (r *ResourceService) DeleteInstances(ctx context.Context, instanceList []*mpi.Instance) *mpi.Resource {
r.resourceMutex.Lock()
defer r.resourceMutex.Unlock()
for _, deletedInstance := range instanceList {
resourceCopy, ok := proto.Clone(r.resource).(*mpi.Resource)
if ok {
for index, instance := range resourceCopy.GetInstances() {
if deletedInstance.GetInstanceMeta().GetInstanceId() == instance.GetInstanceMeta().GetInstanceId() {
r.resource.Instances = append(r.resource.Instances[:index], r.resource.GetInstances()[index+1:]...)
}
}
} else {
slog.WarnContext(ctx, "Unable to clone resource while deleting instances", "resource",
r.resource, "instances", instanceList)
}
}
r.RemoveOperator(instanceList)
return r.resource
}
func (r *ResourceService) ApplyConfig(ctx context.Context, instanceID string) (*model.NginxConfigContext, error) {
var instance *mpi.Instance
operator := r.instanceOperators[instanceID]
if operator == nil {
return nil, fmt.Errorf("instance %s not found", instanceID)
}
for _, resourceInstance := range r.resource.GetInstances() {
if resourceInstance.GetInstanceMeta().GetInstanceId() == instanceID {
instance = resourceInstance
}
}
nginxConfigContext, parseErr := r.nginxConfigParser.Parse(ctx, instance)
if parseErr != nil || nginxConfigContext == nil {
return nil, fmt.Errorf("failed to parse config %w", parseErr)
}
datasource.UpdateNginxInstanceRuntime(instance, nginxConfigContext)
slog.DebugContext(ctx, "Updated Instance Runtime after parsing config", "instance", instance.GetInstanceRuntime())
valErr := operator.Validate(ctx, instance)
if valErr != nil {
return nil, fmt.Errorf("failed validating config %w", valErr)
}
reloadErr := operator.Reload(ctx, instance)
if reloadErr != nil {
return nil, fmt.Errorf("failed to reload NGINX %w", reloadErr)
}
return nginxConfigContext, nil
}
func (r *ResourceService) GetHTTPUpstreamServers(ctx context.Context, instance *mpi.Instance,
upstream string,
) ([]client.UpstreamServer, error) {
plusClient, err := r.createPlusClient(instance)
if err != nil {
slog.ErrorContext(ctx, "Failed to create plus client ", "error", err)
return nil, err
}
servers, getServersErr := plusClient.GetHTTPServers(ctx, upstream)
slog.WarnContext(ctx, "Error returned from NGINX Plus client, GetHTTPUpstreamServers", "err", getServersErr)
return servers, createPlusAPIError(getServersErr)
}
func (r *ResourceService) GetUpstreams(ctx context.Context, instance *mpi.Instance,
) (*client.Upstreams, error) {
plusClient, err := r.createPlusClient(instance)
if err != nil {
slog.ErrorContext(ctx, "Failed to create plus client ", "error", err)
return nil, err
}
servers, getUpstreamsErr := plusClient.GetUpstreams(ctx)
slog.WarnContext(ctx, "Error returned from NGINX Plus client, GetUpstreams", "err", getUpstreamsErr)
return servers, createPlusAPIError(getUpstreamsErr)
}
func (r *ResourceService) GetStreamUpstreams(ctx context.Context, instance *mpi.Instance,
) (*client.StreamUpstreams, error) {
plusClient, err := r.createPlusClient(instance)
if err != nil {
slog.ErrorContext(ctx, "Failed to create plus client ", "error", err)
return nil, err
}
streamUpstreams, getServersErr := plusClient.GetStreamUpstreams(ctx)
slog.WarnContext(ctx, "Error returned from NGINX Plus client, GetStreamUpstreams", "err", getServersErr)
return streamUpstreams, createPlusAPIError(getServersErr)
}
// max number of returns from function is 3
// nolint: revive
func (r *ResourceService) UpdateStreamServers(ctx context.Context, instance *mpi.Instance, upstream string,
upstreams []*structpb.Struct,
) (added, updated, deleted []client.StreamUpstreamServer, err error) {
plusClient, err := r.createPlusClient(instance)
if err != nil {
slog.ErrorContext(ctx, "Failed to create plus client ", "error", err)
return nil, nil, nil, err
}
servers := convertToStreamUpstreamServer(upstreams)
added, updated, deleted, updateError := plusClient.UpdateStreamServers(ctx, upstream, servers)
slog.WarnContext(ctx, "Error returned from NGINX Plus client, UpdateStreamServers", "err", updateError)
return added, updated, deleted, createPlusAPIError(updateError)
}
// max number of returns from function is 3
// nolint: revive
func (r *ResourceService) UpdateHTTPUpstreamServers(ctx context.Context, instance *mpi.Instance, upstream string,
upstreams []*structpb.Struct,
) (added, updated, deleted []client.UpstreamServer, err error) {
plusClient, err := r.createPlusClient(instance)
if err != nil {
slog.ErrorContext(ctx, "Failed to create plus client ", "error", err)
return nil, nil, nil, err
}
servers := convertToUpstreamServer(upstreams)
added, updated, deleted, updateError := plusClient.UpdateHTTPServers(ctx, upstream, servers)
if updateError != nil {
slog.WarnContext(ctx, "Error returned from NGINX Plus client, UpdateHTTPUpstreamServers", "err", updateError)
}
return added, updated, deleted, createPlusAPIError(updateError)
}
func convertToUpstreamServer(upstreams []*structpb.Struct) []client.UpstreamServer {
var servers []client.UpstreamServer
res, err := json.Marshal(upstreams)
if err != nil {
slog.Error("Failed to marshal upstreams", "error", err, "upstreams", upstreams)
}
err = json.Unmarshal(res, &servers)
if err != nil {
slog.Error("Failed to unmarshal upstreams", "error", err, "servers", servers)
}
return servers
}
func convertToStreamUpstreamServer(streamUpstreams []*structpb.Struct) []client.StreamUpstreamServer {
var servers []client.StreamUpstreamServer
res, err := json.Marshal(streamUpstreams)
if err != nil {
slog.Error("Failed to marshal stream upstream server", "error", err, "stream_upstreams", streamUpstreams)
}
err = json.Unmarshal(res, &servers)
if err != nil {
slog.Error("Failed to unmarshal stream upstream server", "error", err, "stream_upstreams", streamUpstreams)
}
return servers
}
func (r *ResourceService) createPlusClient(instance *mpi.Instance) (*client.NginxClient, error) {
plusAPI := instance.GetInstanceRuntime().GetNginxPlusRuntimeInfo().GetPlusApi()
var endpoint string
if plusAPI.GetLocation() == "" || plusAPI.GetListen() == "" {
return nil, errors.New("failed to preform API action, NGINX Plus API is not configured")
}
slog.Info("location", "", plusAPI.GetListen())
if strings.HasPrefix(plusAPI.GetListen(), "unix:") {
endpoint = fmt.Sprintf(unixPlusAPIFormat, plusAPI.GetLocation())
} else {
endpoint = fmt.Sprintf(apiFormat, plusAPI.GetListen(), plusAPI.GetLocation())
}
httpClient := http.DefaultClient
if strings.HasPrefix(plusAPI.GetListen(), "unix:") {
httpClient = socketClient(strings.TrimPrefix(plusAPI.GetListen(), "unix:"))
}
return client.NewNginxClient(endpoint,
client.WithMaxAPIVersion(), client.WithHTTPClient(httpClient),
)
}
func (r *ResourceService) updateResourceInfo(ctx context.Context) {
r.resourceMutex.Lock()
defer r.resourceMutex.Unlock()
if r.info.IsContainer() {
r.resource.Info = r.info.ContainerInfo(ctx)
r.resource.ResourceId = r.resource.GetContainerInfo().GetContainerId()
r.resource.Instances = []*mpi.Instance{}
} else {
r.resource.Info = r.info.HostInfo(ctx)
r.resource.ResourceId = r.resource.GetHostInfo().GetHostId()
r.resource.Instances = []*mpi.Instance{}
}
}
func socketClient(socketPath string) *http.Client {
return &http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", socketPath)
},
},
}
}
// createPlusAPIError converts the error returned by the plus go client into the json format used by the NGINX Plus API
func createPlusAPIError(apiErr error) error {
if apiErr == nil {
return nil
}
_, after, _ := strings.Cut(apiErr.Error(), "error.status")
errorSlice := strings.Split(after, ";")
for i, errStr := range errorSlice {
_, value, _ := strings.Cut(errStr, "=")
errorSlice[i] = value
}
plusErr := plusAPIErr{
Error: errResponse{
Status: errorSlice[0],
Text: errorSlice[1],
Code: errorSlice[2],
},
RequestID: errorSlice[3],
Href: errorSlice[4],
}
r, err := json.Marshal(plusErr)
if err != nil {
slog.Error("Unable to marshal NGINX Plus API error", "error", err)
return apiErr
}
return errors.New(string(r))
}