Skip to content

Commit 72e82ad

Browse files
committed
wip instance watcher refactor
1 parent e43157e commit 72e82ad

17 files changed

+1511
-893
lines changed

internal/plugin/plugin_manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func LoadPlugins(ctx context.Context, agentConfig *config.Config) []bus.Plugin {
4040
}
4141

4242
func addResourcePlugin(plugins []bus.Plugin, agentConfig *config.Config) []bus.Plugin {
43-
resourcePlugin := resource.NewResource(agentConfig)
43+
resourcePlugin := resource.NewNginx(agentConfig)
4444
plugins = append(plugins, resourcePlugin)
4545

4646
return plugins

internal/plugin/plugin_manager_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestLoadPlugins(t *testing.T) {
3434
name: "Test 1: Load plugins",
3535
input: &config.Config{},
3636
expected: []bus.Plugin{
37-
&resource.Resource{},
37+
&resource.Nginx{},
3838
&watcher.Watcher{},
3939
},
4040
},
@@ -58,7 +58,7 @@ func TestLoadPlugins(t *testing.T) {
5858
Features: config.DefaultFeatures(),
5959
},
6060
expected: []bus.Plugin{
61-
&resource.Resource{},
61+
&resource.Nginx{},
6262
&command.CommandPlugin{},
6363
&file.FilePlugin{},
6464
&command.CommandPlugin{},
@@ -77,7 +77,7 @@ func TestLoadPlugins(t *testing.T) {
7777
Features: config.DefaultFeatures(),
7878
},
7979
expected: []bus.Plugin{
80-
&resource.Resource{},
80+
&resource.Nginx{},
8181
&collector.Collector{},
8282
&watcher.Watcher{},
8383
},
@@ -103,7 +103,7 @@ func TestLoadPlugins(t *testing.T) {
103103
},
104104
},
105105
expected: []bus.Plugin{
106-
&resource.Resource{},
106+
&resource.Nginx{},
107107
&command.CommandPlugin{},
108108
&file.FilePlugin{},
109109
&watcher.Watcher{},
@@ -134,7 +134,7 @@ func TestLoadPlugins(t *testing.T) {
134134
},
135135
},
136136
expected: []bus.Plugin{
137-
&resource.Resource{},
137+
&resource.Nginx{},
138138
&command.CommandPlugin{},
139139
&file.FilePlugin{},
140140
&collector.Collector{},
Lines changed: 65 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@ import (
2121
"github.com/nginx/agent/v3/internal/bus"
2222
)
2323

24-
// The resource plugin listens for a writeConfigSuccessfulTopic from the file plugin after the config apply
25-
// files have been written. The resource plugin then, validates the config, reloads the instance and monitors the logs.
26-
// This is done in the resource plugin to make the file plugin usable for every type of instance.
24+
// The Nginx plugin listens for a writeConfigSuccessfulTopic from the file plugin after the config apply
25+
// files have been written. The Nginx plugin then, validates the config, reloads the instance and monitors the logs.
26+
// This is done in the Nginx plugin to make the file plugin usable for every type of instance.
2727

28-
type Resource struct {
28+
type Nginx struct {
2929
messagePipe bus.MessagePipeInterface
30-
resourceService resourceServiceInterface
30+
nginxService nginxServiceInterface
3131
agentConfig *config.Config
3232
agentConfigMutex *sync.Mutex
3333
}
@@ -44,88 +44,88 @@ type plusAPIErr struct {
4444
Href string `json:"href"`
4545
}
4646

47-
var _ bus.Plugin = (*Resource)(nil)
47+
var _ bus.Plugin = (*Nginx)(nil)
4848

49-
func NewResource(agentConfig *config.Config) *Resource {
50-
return &Resource{
49+
func NewNginx(agentConfig *config.Config) *Nginx {
50+
return &Nginx{
5151
agentConfig: agentConfig,
5252
agentConfigMutex: &sync.Mutex{},
5353
}
5454
}
5555

56-
func (r *Resource) Init(ctx context.Context, messagePipe bus.MessagePipeInterface) error {
57-
slog.DebugContext(ctx, "Starting resource plugin")
56+
func (n *Nginx) Init(ctx context.Context, messagePipe bus.MessagePipeInterface) error {
57+
slog.DebugContext(ctx, "Starting nginx plugin")
5858

59-
r.messagePipe = messagePipe
60-
r.resourceService = NewResourceService(ctx, r.agentConfig)
59+
n.messagePipe = messagePipe
60+
n.nginxService = NewNginxService(ctx, n.agentConfig)
6161

6262
return nil
6363
}
6464

65-
func (*Resource) Close(ctx context.Context) error {
66-
slog.InfoContext(ctx, "Closing resource plugin")
65+
func (*Nginx) Close(ctx context.Context) error {
66+
slog.InfoContext(ctx, "Closing nginx plugin")
6767
return nil
6868
}
6969

70-
func (*Resource) Info() *bus.Info {
70+
func (*Nginx) Info() *bus.Info {
7171
return &bus.Info{
72-
Name: "resource",
72+
Name: "nginx",
7373
}
7474
}
7575

7676
// cyclomatic complexity 11 max is 10
7777

78-
func (r *Resource) Process(ctx context.Context, msg *bus.Message) {
78+
func (n *Nginx) Process(ctx context.Context, msg *bus.Message) {
7979
switch msg.Topic {
80-
case bus.UpdatedInstancesTopic:
81-
slog.DebugContext(ctx, "Resource plugin received update instances message")
82-
instanceList, ok := msg.Data.([]*mpi.Instance)
80+
case bus.ResourceUpdateTopic:
81+
resourceUpdate, ok := msg.Data.(*mpi.Resource)
82+
8383
if !ok {
84-
slog.ErrorContext(ctx, "Unable to cast message payload to []*mpi.Instance", "payload", msg.Data)
84+
slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.Resource", "payload",
85+
msg.Data)
8586

8687
return
8788
}
88-
resource := r.resourceService.UpdateInstances(ctx, instanceList)
89-
90-
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ResourceUpdateTopic, Data: resource})
89+
n.nginxService.UpdateResource(ctx, resourceUpdate)
90+
slog.DebugContext(ctx, "Nginx plugin received update resource message")
9191

9292
return
9393
case bus.WriteConfigSuccessfulTopic:
94-
r.handleWriteConfigSuccessful(ctx, msg)
94+
n.handleWriteConfigSuccessful(ctx, msg)
9595
case bus.RollbackWriteTopic:
96-
r.handleRollbackWrite(ctx, msg)
96+
n.handleRollbackWrite(ctx, msg)
9797
case bus.APIActionRequestTopic:
98-
r.handleAPIActionRequest(ctx, msg)
98+
n.handleAPIActionRequest(ctx, msg)
9999
case bus.AgentConfigUpdateTopic:
100-
r.handleAgentConfigUpdate(ctx, msg)
100+
n.handleAgentConfigUpdate(ctx, msg)
101101
default:
102102
slog.DebugContext(ctx, "Unknown topic", "topic", msg.Topic)
103103
}
104104
}
105105

106-
func (*Resource) Subscriptions() []string {
106+
func (*Nginx) Subscriptions() []string {
107107
return []string{
108-
bus.UpdatedInstancesTopic,
108+
bus.ResourceUpdateTopic,
109109
bus.WriteConfigSuccessfulTopic,
110110
bus.RollbackWriteTopic,
111111
bus.APIActionRequestTopic,
112112
bus.AgentConfigUpdateTopic,
113113
}
114114
}
115115

116-
func (r *Resource) Reconfigure(ctx context.Context, agentConfig *config.Config) error {
117-
slog.DebugContext(ctx, "Resource plugin is reconfiguring to update agent configuration")
116+
func (n *Nginx) Reconfigure(ctx context.Context, agentConfig *config.Config) error {
117+
slog.DebugContext(ctx, "Nginx plugin is reconfiguring to update agent configuration")
118118

119-
r.agentConfigMutex.Lock()
120-
defer r.agentConfigMutex.Unlock()
119+
n.agentConfigMutex.Lock()
120+
defer n.agentConfigMutex.Unlock()
121121

122-
r.agentConfig = agentConfig
122+
n.agentConfig = agentConfig
123123

124124
return nil
125125
}
126126

127-
func (r *Resource) handleAPIActionRequest(ctx context.Context, msg *bus.Message) {
128-
slog.DebugContext(ctx, "Resource plugin received api action request message")
127+
func (n *Nginx) handleAPIActionRequest(ctx context.Context, msg *bus.Message) {
128+
slog.DebugContext(ctx, "Nginx plugin received api action request message")
129129
managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest)
130130

131131
if !ok {
@@ -145,25 +145,25 @@ func (r *Resource) handleAPIActionRequest(ctx context.Context, msg *bus.Message)
145145

146146
switch request.ActionRequest.GetAction().(type) {
147147
case *mpi.APIActionRequest_NginxPlusAction:
148-
r.handleNginxPlusActionRequest(ctx, request.ActionRequest.GetNginxPlusAction(), instanceID)
148+
n.handleNginxPlusActionRequest(ctx, request.ActionRequest.GetNginxPlusAction(), instanceID)
149149
default:
150150
slog.DebugContext(ctx, "API action request not implemented yet")
151151
}
152152
}
153153

154-
func (r *Resource) handleNginxPlusActionRequest(ctx context.Context, action *mpi.NGINXPlusAction, instanceID string) {
154+
func (n *Nginx) handleNginxPlusActionRequest(ctx context.Context, action *mpi.NGINXPlusAction, instanceID string) {
155155
correlationID := logger.CorrelationID(ctx)
156-
instance := r.resourceService.Instance(instanceID)
156+
instance := n.nginxService.Instance(instanceID)
157157
apiAction := APIAction{
158-
ResourceService: r.resourceService,
158+
NginxService: n.nginxService,
159159
}
160160
if instance == nil {
161161
slog.ErrorContext(ctx, "Unable to find instance with ID", "id", instanceID)
162162
resp := response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
163163
"", instanceID, fmt.Sprintf("failed to preform API "+
164164
"action, could not find instance with ID: %s", instanceID))
165165

166-
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp})
166+
n.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp})
167167

168168
return
169169
}
@@ -173,7 +173,7 @@ func (r *Resource) handleNginxPlusActionRequest(ctx context.Context, action *mpi
173173
resp := response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
174174
"", instanceID, "failed to preform API action, instance is not NGINX Plus")
175175

176-
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp})
176+
n.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp})
177177

178178
return
179179
}
@@ -182,46 +182,46 @@ func (r *Resource) handleNginxPlusActionRequest(ctx context.Context, action *mpi
182182
case *mpi.NGINXPlusAction_UpdateHttpUpstreamServers:
183183
slog.DebugContext(ctx, "Updating http upstream servers", "request", action.GetUpdateHttpUpstreamServers())
184184
resp := apiAction.HandleUpdateHTTPUpstreamsRequest(ctx, action, instance)
185-
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp})
185+
n.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp})
186186
case *mpi.NGINXPlusAction_GetHttpUpstreamServers:
187187
slog.DebugContext(ctx, "Getting http upstream servers", "request", action.GetGetHttpUpstreamServers())
188188
resp := apiAction.HandleGetHTTPUpstreamsServersRequest(ctx, action, instance)
189-
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp})
189+
n.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp})
190190
case *mpi.NGINXPlusAction_UpdateStreamServers:
191191
slog.DebugContext(ctx, "Updating stream servers", "request", action.GetUpdateStreamServers())
192192
resp := apiAction.HandleUpdateStreamServersRequest(ctx, action, instance)
193-
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp})
193+
n.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp})
194194
case *mpi.NGINXPlusAction_GetStreamUpstreams:
195195
slog.DebugContext(ctx, "Getting stream upstreams", "request", action.GetGetStreamUpstreams())
196196
resp := apiAction.HandleGetStreamUpstreamsRequest(ctx, instance)
197-
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp})
197+
n.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp})
198198
case *mpi.NGINXPlusAction_GetUpstreams:
199199
slog.DebugContext(ctx, "Getting upstreams", "request", action.GetGetUpstreams())
200200
resp := apiAction.HandleGetUpstreamsRequest(ctx, instance)
201-
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp})
201+
n.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp})
202202
default:
203203
slog.DebugContext(ctx, "NGINX Plus action not implemented yet")
204204
}
205205
}
206206

207-
func (r *Resource) handleWriteConfigSuccessful(ctx context.Context, msg *bus.Message) {
208-
slog.DebugContext(ctx, "Resource plugin received write config successful message")
207+
func (n *Nginx) handleWriteConfigSuccessful(ctx context.Context, msg *bus.Message) {
208+
slog.DebugContext(ctx, "Nginx plugin received write config successful message")
209209
data, ok := msg.Data.(*model.ConfigApplyMessage)
210210
if !ok {
211211
slog.ErrorContext(ctx, "Unable to cast message payload to *model.ConfigApplyMessage", "payload", msg.Data)
212212

213213
return
214214
}
215-
configContext, err := r.resourceService.ApplyConfig(ctx, data.InstanceID)
215+
configContext, err := n.nginxService.ApplyConfig(ctx, data.InstanceID)
216216
if err != nil {
217217
data.Error = err
218218
slog.ErrorContext(ctx, "errors found during config apply, "+
219219
"sending error status, rolling back config", "err", err)
220220
dpResponse := response.CreateDataPlaneResponse(data.CorrelationID, mpi.CommandResponse_COMMAND_STATUS_ERROR,
221221
"Config apply failed, rolling back config", data.InstanceID, err.Error())
222-
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: dpResponse})
222+
n.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: dpResponse})
223223

224-
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyFailedTopic, Data: data})
224+
n.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyFailedTopic, Data: data})
225225

226226
return
227227
}
@@ -234,18 +234,18 @@ func (r *Resource) handleWriteConfigSuccessful(ctx context.Context, msg *bus.Mes
234234
DataPlaneResponse: dpResponse,
235235
}
236236

237-
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ReloadSuccessfulTopic, Data: successMessage})
237+
n.messagePipe.Process(ctx, &bus.Message{Topic: bus.ReloadSuccessfulTopic, Data: successMessage})
238238
}
239239

240-
func (r *Resource) handleRollbackWrite(ctx context.Context, msg *bus.Message) {
241-
slog.DebugContext(ctx, "Resource plugin received rollback write message")
240+
func (n *Nginx) handleRollbackWrite(ctx context.Context, msg *bus.Message) {
241+
slog.DebugContext(ctx, "Nginx plugin received rollback write message")
242242
data, ok := msg.Data.(*model.ConfigApplyMessage)
243243
if !ok {
244244
slog.ErrorContext(ctx, "Unable to cast message payload to *model.ConfigApplyMessage", "payload", msg.Data)
245245

246246
return
247247
}
248-
_, err := r.resourceService.ApplyConfig(ctx, data.InstanceID)
248+
_, err := n.nginxService.ApplyConfig(ctx, data.InstanceID)
249249
if err != nil {
250250
slog.ErrorContext(ctx, "errors found during rollback, sending failure status", "err", err)
251251

@@ -256,8 +256,8 @@ func (r *Resource) handleRollbackWrite(ctx context.Context, msg *bus.Message) {
256256
mpi.CommandResponse_COMMAND_STATUS_FAILURE, "Config apply failed, rollback failed",
257257
data.InstanceID, data.Error.Error())
258258

259-
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: rollbackResponse})
260-
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: applyResponse})
259+
n.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: rollbackResponse})
260+
n.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: applyResponse})
261261

262262
return
263263
}
@@ -266,20 +266,20 @@ func (r *Resource) handleRollbackWrite(ctx context.Context, msg *bus.Message) {
266266
mpi.CommandResponse_COMMAND_STATUS_FAILURE,
267267
"Config apply failed, rollback successful", data.InstanceID, data.Error.Error())
268268

269-
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: applyResponse})
269+
n.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: applyResponse})
270270
}
271271

272-
func (r *Resource) handleAgentConfigUpdate(ctx context.Context, msg *bus.Message) {
273-
slog.DebugContext(ctx, "Resource plugin received agent config update message")
272+
func (n *Nginx) handleAgentConfigUpdate(ctx context.Context, msg *bus.Message) {
273+
slog.DebugContext(ctx, "Nginx plugin received agent config update message")
274274

275-
r.agentConfigMutex.Lock()
276-
defer r.agentConfigMutex.Unlock()
275+
n.agentConfigMutex.Lock()
276+
defer n.agentConfigMutex.Unlock()
277277

278278
agentConfig, ok := msg.Data.(*config.Config)
279279
if !ok {
280280
slog.ErrorContext(ctx, "Unable to cast message payload to *config.Config", "payload", msg.Data)
281281
return
282282
}
283283

284-
r.agentConfig = agentConfig
284+
n.agentConfig = agentConfig
285285
}

0 commit comments

Comments
 (0)