Skip to content

Commit 8fc60c6

Browse files
committed
Update how watchers are enabled after config apply
1 parent a520f9b commit 8fc60c6

File tree

8 files changed

+155
-150
lines changed

8 files changed

+155
-150
lines changed

internal/bus/topics.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ const (
1919
ConnectionResetTopic = "connection-reset"
2020
ConfigApplyRequestTopic = "config-apply-request"
2121
WriteConfigSuccessfulTopic = "write-config-successful"
22-
ConfigApplySuccessfulTopic = "config-apply-successful"
22+
ReloadSuccessfulTopic = "reload-successful"
23+
EnableWatchersTopic = "enable-watchers"
2324
ConfigApplyFailedTopic = "config-apply-failed"
2425
ConfigApplyCompleteTopic = "config-apply-complete"
2526
RollbackWriteTopic = "rollback-write"

internal/file/file_plugin.go

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ func (fp *FilePlugin) Process(ctx context.Context, msg *bus.Message) {
106106
fp.handleConfigApplyRequest(ctxWithMetadata, msg)
107107
case bus.ConfigApplyCompleteTopic:
108108
fp.handleConfigApplyComplete(ctxWithMetadata, msg)
109-
case bus.ConfigApplySuccessfulTopic:
110-
fp.handleConfigApplySuccess(ctxWithMetadata, msg)
109+
case bus.ReloadSuccessfulTopic:
110+
fp.handleReloadSuccess(ctxWithMetadata, msg)
111111
case bus.ConfigApplyFailedTopic:
112112
fp.handleConfigApplyFailedRequest(ctxWithMetadata, msg)
113113
default:
@@ -133,11 +133,28 @@ func (fp *FilePlugin) Subscriptions() []string {
133133
bus.ConfigUploadRequestTopic,
134134
bus.ConfigApplyRequestTopic,
135135
bus.ConfigApplyFailedTopic,
136-
bus.ConfigApplySuccessfulTopic,
136+
bus.ReloadSuccessfulTopic,
137137
bus.ConfigApplyCompleteTopic,
138138
}
139139
}
140140

141+
func (fp *FilePlugin) CleanUpConfigApply(ctx context.Context,
142+
configContext *model.NginxConfigContext,
143+
instanceID string,
144+
) {
145+
enableWatcher := &model.EnableWatchers{
146+
ConfigContext: configContext,
147+
InstanceID: instanceID,
148+
}
149+
150+
fp.fileManagerService.ClearCache()
151+
152+
fp.messagePipe.Process(ctx, &bus.Message{
153+
Data: enableWatcher,
154+
Topic: bus.EnableWatchersTopic,
155+
})
156+
}
157+
141158
func (fp *FilePlugin) handleConnectionReset(ctx context.Context, msg *bus.Message) {
142159
slog.DebugContext(ctx, "File plugin received connection reset message")
143160
if newConnection, ok := msg.Data.(grpc.GrpcConnectionInterface); ok {
@@ -165,20 +182,21 @@ func (fp *FilePlugin) handleConfigApplyComplete(ctx context.Context, msg *bus.Me
165182
return
166183
}
167184

168-
fp.fileManagerService.ClearCache()
169185
fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: response})
186+
fp.CleanUpConfigApply(ctx, &model.NginxConfigContext{}, response.GetInstanceId())
170187
}
171188

172-
func (fp *FilePlugin) handleConfigApplySuccess(ctx context.Context, msg *bus.Message) {
173-
slog.DebugContext(ctx, "File plugin received config success message")
174-
successMessage, ok := msg.Data.(*model.ConfigApplySuccess)
189+
func (fp *FilePlugin) handleReloadSuccess(ctx context.Context, msg *bus.Message) {
190+
slog.InfoContext(ctx, "File plugin received reload success message", "data", msg.Data)
191+
192+
successMessage, ok := msg.Data.(*model.ReloadSuccess)
175193

176194
if !ok {
177-
slog.ErrorContext(ctx, "Unable to cast message payload to *model.ConfigApplySuccess", "payload", msg.Data)
195+
slog.ErrorContext(ctx, "Unable to cast message payload to *model.ReloadSuccess", "payload", msg.Data)
178196
return
179197
}
180198

181-
fp.fileManagerService.ClearCache()
199+
fp.CleanUpConfigApply(ctx, successMessage.ConfigContext, successMessage.DataPlaneResponse.GetInstanceId())
182200

183201
if successMessage.ConfigContext.Files != nil {
184202
slog.DebugContext(ctx, "Changes made during config apply, update files on disk")
@@ -191,6 +209,7 @@ func (fp *FilePlugin) handleConfigApplySuccess(ctx context.Context, msg *bus.Mes
191209
slog.ErrorContext(ctx, "Unable to update current files on disk", "error", updateError)
192210
}
193211
}
212+
194213
fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: successMessage.DataPlaneResponse})
195214
}
196215

@@ -264,13 +283,8 @@ func (fp *FilePlugin) handleConfigApplyRequest(ctx context.Context, msg *bus.Mes
264283
"",
265284
)
266285

267-
successMessage := &model.ConfigApplySuccess{
268-
ConfigContext: &model.NginxConfigContext{},
269-
DataPlaneResponse: dpResponse,
270-
}
271-
272286
fp.fileManagerService.ClearCache()
273-
fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplySuccessfulTopic, Data: successMessage})
287+
fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: dpResponse})
274288

275289
return
276290
case model.Error:

internal/file/file_plugin_test.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func TestFilePlugin_Subscriptions(t *testing.T) {
5959
bus.ConfigUploadRequestTopic,
6060
bus.ConfigApplyRequestTopic,
6161
bus.ConfigApplyFailedTopic,
62-
bus.ConfigApplySuccessfulTopic,
62+
bus.ReloadSuccessfulTopic,
6363
bus.ConfigApplyCompleteTopic,
6464
},
6565
filePlugin.Subscriptions(),
@@ -210,13 +210,13 @@ func TestFilePlugin_Process_ConfigApplyRequestTopic(t *testing.T) {
210210
case test.configApplyStatus == model.NoChange:
211211
assert.Len(t, messages, 1)
212212

213-
response, ok := messages[0].Data.(*model.ConfigApplySuccess)
213+
response, ok := messages[0].Data.(*mpi.DataPlaneResponse)
214214
assert.True(t, ok)
215-
assert.Equal(t, bus.ConfigApplySuccessfulTopic, messages[0].Topic)
215+
assert.Equal(t, bus.ConfigApplyCompleteTopic, messages[0].Topic)
216216
assert.Equal(
217217
t,
218218
mpi.CommandResponse_COMMAND_STATUS_OK,
219-
response.DataPlaneResponse.GetCommandResponse().GetStatus(),
219+
response.GetCommandResponse().GetStatus(),
220220
)
221221
case test.message == nil:
222222
assert.Empty(t, messages)
@@ -432,7 +432,7 @@ func TestFilePlugin_Process_ConfigApplyFailedTopic(t *testing.T) {
432432
}
433433
}
434434

435-
func TestFilePlugin_Process_ConfigApplyRollbackCompleteTopic(t *testing.T) {
435+
func TestFilePlugin_Process_ConfigApplyReloadSuccessTopic(t *testing.T) {
436436
ctx := context.Background()
437437
instance := protos.NginxOssInstance([]string{})
438438
mockFileManager := &filefakes.FakeFileManagerServiceInterface{}
@@ -460,14 +460,22 @@ func TestFilePlugin_Process_ConfigApplyRollbackCompleteTopic(t *testing.T) {
460460
InstanceId: instance.GetInstanceMeta().GetInstanceId(),
461461
}
462462

463-
filePlugin.Process(ctx, &bus.Message{Topic: bus.ConfigApplySuccessfulTopic, Data: &model.ConfigApplySuccess{
463+
filePlugin.Process(ctx, &bus.Message{Topic: bus.ReloadSuccessfulTopic, Data: &model.ReloadSuccess{
464464
ConfigContext: &model.NginxConfigContext{},
465465
DataPlaneResponse: expectedResponse,
466466
}})
467467

468468
messages := messagePipe.Messages()
469-
response, ok := messages[0].Data.(*mpi.DataPlaneResponse)
469+
470+
watchers, ok := messages[0].Data.(*model.EnableWatchers)
471+
assert.True(t, ok)
472+
assert.Equal(t, bus.EnableWatchersTopic, messages[0].Topic)
473+
assert.Equal(t, &model.NginxConfigContext{}, watchers.ConfigContext)
474+
assert.Equal(t, instance.GetInstanceMeta().GetInstanceId(), watchers.InstanceID)
475+
476+
response, ok := messages[1].Data.(*mpi.DataPlaneResponse)
470477
assert.True(t, ok)
478+
assert.Equal(t, bus.DataPlaneResponseTopic, messages[1].Topic)
471479

472480
assert.Equal(t, expectedResponse.GetCommandResponse().GetStatus(), response.GetCommandResponse().GetStatus())
473481
assert.Equal(t, expectedResponse.GetCommandResponse().GetMessage(), response.GetCommandResponse().GetMessage())

internal/model/config.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,16 @@ const (
7777
OK
7878
)
7979

80-
type ConfigApplySuccess struct {
80+
type ReloadSuccess struct {
8181
ConfigContext *NginxConfigContext
8282
DataPlaneResponse *v1.DataPlaneResponse
8383
}
8484

85+
type EnableWatchers struct {
86+
ConfigContext *NginxConfigContext
87+
InstanceID string
88+
}
89+
8590
//nolint:revive,cyclop // cyclomatic complexity is 16
8691
func (ncc *NginxConfigContext) Equal(otherNginxConfigContext *NginxConfigContext) bool {
8792
if ncc.StubStatus != nil && otherNginxConfigContext.StubStatus != nil {

internal/resource/resource_plugin.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,12 +242,12 @@ func (r *Resource) handleWriteConfigSuccessful(ctx context.Context, msg *bus.Mes
242242
dpResponse := response.CreateDataPlaneResponse(data.CorrelationID, mpi.CommandResponse_COMMAND_STATUS_OK,
243243
"Config apply successful", data.InstanceID, "")
244244

245-
successMessage := &model.ConfigApplySuccess{
245+
successMessage := &model.ReloadSuccess{
246246
ConfigContext: configContext,
247247
DataPlaneResponse: dpResponse,
248248
}
249249

250-
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplySuccessfulTopic, Data: successMessage})
250+
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ReloadSuccessfulTopic, Data: successMessage})
251251
}
252252

253253
func (r *Resource) handleRollbackWrite(ctx context.Context, msg *bus.Message) {

internal/resource/resource_plugin_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func TestResource_Process_Apply(t *testing.T) {
142142
},
143143
},
144144
applyErr: nil,
145-
topic: []string{bus.ConfigApplySuccessfulTopic},
145+
topic: []string{bus.ReloadSuccessfulTopic},
146146
},
147147
{
148148
name: "Test 2: Write Config Successful Topic - Fail Status",

internal/watcher/watcher_plugin.go

Lines changed: 35 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,10 @@ func (w *Watcher) Process(ctx context.Context, msg *bus.Message) {
140140
switch msg.Topic {
141141
case bus.ConfigApplyRequestTopic:
142142
w.handleConfigApplyRequest(ctx, msg)
143-
case bus.ConfigApplySuccessfulTopic:
144-
w.handleConfigApplySuccess(ctx, msg)
145-
case bus.ConfigApplyCompleteTopic:
146-
w.handleConfigApplyComplete(ctx, msg)
147143
case bus.DataPlaneHealthRequestTopic:
148144
w.handleHealthRequest(ctx)
145+
case bus.EnableWatchersTopic:
146+
w.handleEnableWatchers(ctx, msg)
149147
default:
150148
slog.DebugContext(ctx, "Watcher plugin unknown topic", "topic", msg.Topic)
151149
}
@@ -154,12 +152,43 @@ func (w *Watcher) Process(ctx context.Context, msg *bus.Message) {
154152
func (*Watcher) Subscriptions() []string {
155153
return []string{
156154
bus.ConfigApplyRequestTopic,
157-
bus.ConfigApplySuccessfulTopic,
158-
bus.ConfigApplyCompleteTopic,
159155
bus.DataPlaneHealthRequestTopic,
156+
bus.EnableWatchersTopic,
160157
}
161158
}
162159

160+
func (w *Watcher) handleEnableWatchers(ctx context.Context, msg *bus.Message) {
161+
slog.DebugContext(ctx, "Watcher plugin received enable watchers message")
162+
enableWatchersMessage, ok := msg.Data.(*model.EnableWatchers)
163+
if !ok {
164+
slog.ErrorContext(ctx, "Unable to cast message payload to *model.EnableWatchers", "payload",
165+
msg.Data, "topic", msg.Topic)
166+
167+
return
168+
}
169+
170+
instanceID := enableWatchersMessage.InstanceID
171+
configContext := enableWatchersMessage.ConfigContext
172+
173+
// if config apply ended in a reload there is no need to reparse the config so an empty config context is sent
174+
// from the file plugin
175+
if configContext.InstanceID != "" {
176+
w.instanceWatcherService.HandleNginxConfigContextUpdate(ctx, instanceID, configContext)
177+
}
178+
179+
w.watcherMutex.Lock()
180+
w.instancesWithConfigApplyInProgress = slices.DeleteFunc(
181+
w.instancesWithConfigApplyInProgress,
182+
func(element string) bool {
183+
return element == instanceID
184+
},
185+
)
186+
187+
w.fileWatcherService.SetEnabled(true)
188+
w.instanceWatcherService.SetEnabled(true)
189+
w.watcherMutex.Unlock()
190+
}
191+
163192
func (w *Watcher) handleConfigApplyRequest(ctx context.Context, msg *bus.Message) {
164193
slog.DebugContext(ctx, "Watcher plugin received config apply request message")
165194
managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest)
@@ -188,69 +217,13 @@ func (w *Watcher) handleConfigApplyRequest(ctx context.Context, msg *bus.Message
188217
w.instanceWatcherService.SetEnabled(false)
189218
}
190219

191-
func (w *Watcher) handleConfigApplySuccess(ctx context.Context, msg *bus.Message) {
192-
slog.DebugContext(ctx, "Watcher plugin received config apply success message")
193-
successMessage, ok := msg.Data.(*model.ConfigApplySuccess)
194-
if !ok {
195-
slog.ErrorContext(ctx, "Unable to cast message payload to *model.ConfigApplySuccess", "payload",
196-
msg.Data, "topic", msg.Topic)
197-
198-
return
199-
}
200-
201-
instanceID := successMessage.DataPlaneResponse.GetInstanceId()
202-
203-
// If the config apply had no changes to any files, it is results in a ConfigApplySuccessfulTopic with an empty
204-
// configContext being sent, there is no need to reparse the config as no change has occurred.
205-
if successMessage.ConfigContext.InstanceID != "" {
206-
w.instanceWatcherService.HandleNginxConfigContextUpdate(ctx, instanceID, successMessage.ConfigContext)
207-
}
208-
209-
w.watcherMutex.Lock()
210-
w.instancesWithConfigApplyInProgress = slices.DeleteFunc(
211-
w.instancesWithConfigApplyInProgress,
212-
func(element string) bool {
213-
return element == instanceID
214-
},
215-
)
216-
217-
w.fileWatcherService.SetEnabled(true)
218-
w.instanceWatcherService.SetEnabled(true)
219-
w.watcherMutex.Unlock()
220-
}
221-
222220
func (w *Watcher) handleHealthRequest(ctx context.Context) {
223221
slog.DebugContext(ctx, "Watcher plugin received health request message")
224222
w.messagePipe.Process(ctx, &bus.Message{
225223
Topic: bus.DataPlaneHealthResponseTopic, Data: w.healthWatcherService.InstancesHealth(),
226224
})
227225
}
228226

229-
func (w *Watcher) handleConfigApplyComplete(ctx context.Context, msg *bus.Message) {
230-
slog.DebugContext(ctx, "Watcher plugin received config apply complete message")
231-
response, ok := msg.Data.(*mpi.DataPlaneResponse)
232-
if !ok {
233-
slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.DataPlaneResponse", "payload",
234-
msg.Data, "topic", msg.Topic)
235-
236-
return
237-
}
238-
239-
instanceID := response.GetInstanceId()
240-
241-
w.watcherMutex.Lock()
242-
defer w.watcherMutex.Unlock()
243-
w.instancesWithConfigApplyInProgress = slices.DeleteFunc(
244-
w.instancesWithConfigApplyInProgress,
245-
func(element string) bool {
246-
return element == instanceID
247-
},
248-
)
249-
250-
w.instanceWatcherService.SetEnabled(true)
251-
w.fileWatcherService.SetEnabled(true)
252-
}
253-
254227
func (w *Watcher) monitorWatchers(ctx context.Context) {
255228
for {
256229
select {

0 commit comments

Comments
 (0)