Skip to content

Commit 1bdc156

Browse files
authored
Fix watcher & resource race conditions (#981)
1 parent 0024261 commit 1bdc156

File tree

10 files changed

+96
-95
lines changed

10 files changed

+96
-95
lines changed

internal/resource/resource_plugin.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (r *Resource) Process(ctx context.Context, msg *bus.Message) {
9494

9595
return
9696
}
97-
resource := r.resourceService.UpdateInstances(instanceList)
97+
resource := r.resourceService.UpdateInstances(ctx, instanceList)
9898

9999
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ResourceUpdateTopic, Data: resource})
100100

@@ -107,7 +107,7 @@ func (r *Resource) Process(ctx context.Context, msg *bus.Message) {
107107

108108
return
109109
}
110-
resource := r.resourceService.DeleteInstances(instanceList)
110+
resource := r.resourceService.DeleteInstances(ctx, instanceList)
111111

112112
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ResourceUpdateTopic, Data: resource})
113113

internal/resource/resource_service.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616
"strings"
1717
"sync"
1818

19+
"google.golang.org/protobuf/proto"
20+
1921
"github.com/nginxinc/nginx-plus-go-client/v2/client"
2022
"google.golang.org/protobuf/types/known/structpb"
2123

@@ -42,8 +44,8 @@ const (
4244

4345
type resourceServiceInterface interface {
4446
AddInstances(instanceList []*mpi.Instance) *mpi.Resource
45-
UpdateInstances(instanceList []*mpi.Instance) *mpi.Resource
46-
DeleteInstances(instanceList []*mpi.Instance) *mpi.Resource
47+
UpdateInstances(ctx context.Context, instanceList []*mpi.Instance) *mpi.Resource
48+
DeleteInstances(ctx context.Context, instanceList []*mpi.Instance) *mpi.Resource
4749
ApplyConfig(ctx context.Context, instanceID string) error
4850
Instance(instanceID string) *mpi.Instance
4951
GetHTTPUpstreamServers(ctx context.Context, instance *mpi.Instance, upstreams string) ([]client.UpstreamServer,
@@ -126,32 +128,45 @@ func (r *ResourceService) RemoveOperator(instanceList []*mpi.Instance) {
126128
}
127129
}
128130

129-
func (r *ResourceService) UpdateInstances(instanceList []*mpi.Instance) *mpi.Resource {
131+
func (r *ResourceService) UpdateInstances(ctx context.Context, instanceList []*mpi.Instance) *mpi.Resource {
130132
r.resourceMutex.Lock()
131133
defer r.resourceMutex.Unlock()
132134

133135
for _, updatedInstance := range instanceList {
134-
for _, instance := range r.resource.GetInstances() {
135-
if updatedInstance.GetInstanceMeta().GetInstanceId() == instance.GetInstanceMeta().GetInstanceId() {
136-
instance.InstanceMeta = updatedInstance.GetInstanceMeta()
137-
instance.InstanceRuntime = updatedInstance.GetInstanceRuntime()
138-
instance.InstanceConfig = updatedInstance.GetInstanceConfig()
136+
resourceCopy, ok := proto.Clone(r.resource).(*mpi.Resource)
137+
if ok {
138+
for _, instance := range resourceCopy.GetInstances() {
139+
if updatedInstance.GetInstanceMeta().GetInstanceId() == instance.GetInstanceMeta().GetInstanceId() {
140+
instance.InstanceMeta = updatedInstance.GetInstanceMeta()
141+
instance.InstanceRuntime = updatedInstance.GetInstanceRuntime()
142+
instance.InstanceConfig = updatedInstance.GetInstanceConfig()
143+
}
139144
}
145+
r.resource = resourceCopy
146+
} else {
147+
slog.WarnContext(ctx, "Unable to clone resource while updating instances", "resource",
148+
r.resource, "instances", instanceList)
140149
}
141150
}
142151

143152
return r.resource
144153
}
145154

146-
func (r *ResourceService) DeleteInstances(instanceList []*mpi.Instance) *mpi.Resource {
155+
func (r *ResourceService) DeleteInstances(ctx context.Context, instanceList []*mpi.Instance) *mpi.Resource {
147156
r.resourceMutex.Lock()
148157
defer r.resourceMutex.Unlock()
149158

150159
for _, deletedInstance := range instanceList {
151-
for index, instance := range r.resource.GetInstances() {
152-
if deletedInstance.GetInstanceMeta().GetInstanceId() == instance.GetInstanceMeta().GetInstanceId() {
153-
r.resource.Instances = append(r.resource.Instances[:index], r.resource.GetInstances()[index+1:]...)
160+
resourceCopy, ok := proto.Clone(r.resource).(*mpi.Resource)
161+
if ok {
162+
for index, instance := range resourceCopy.GetInstances() {
163+
if deletedInstance.GetInstanceMeta().GetInstanceId() == instance.GetInstanceMeta().GetInstanceId() {
164+
r.resource.Instances = append(r.resource.Instances[:index], r.resource.GetInstances()[index+1:]...)
165+
}
154166
}
167+
} else {
168+
slog.WarnContext(ctx, "Unable to clone resource while deleting instances", "resource",
169+
r.resource, "instances", instanceList)
155170
}
156171
}
157172
r.RemoveOperator(instanceList)

internal/resource/resource_service_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func TestResourceService_UpdateInstance(t *testing.T) {
104104
t.Run(test.name, func(tt *testing.T) {
105105
resourceService := NewResourceService(ctx, types.AgentConfig())
106106
resourceService.resource.Instances = []*v1.Instance{protos.GetNginxOssInstance([]string{})}
107-
resource := resourceService.UpdateInstances(test.instanceList)
107+
resource := resourceService.UpdateInstances(ctx, test.instanceList)
108108
assert.Equal(tt, test.resource.GetInstances(), resource.GetInstances())
109109
})
110110
}
@@ -141,7 +141,7 @@ func TestResourceService_DeleteInstance(t *testing.T) {
141141
protos.GetNginxOssInstance([]string{}),
142142
protos.GetNginxPlusInstance([]string{}),
143143
}
144-
resource := resourceService.DeleteInstances(test.instanceList)
144+
resource := resourceService.DeleteInstances(ctx, test.instanceList)
145145
assert.Equal(tt, test.resource.GetInstances(), resource.GetInstances())
146146
})
147147
}

internal/resource/resourcefakes/fake_resource_service_interface.go

Lines changed: 32 additions & 28 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/watcher/health/health_watcher_service.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ func NewHealthWatcherService(agentConfig *config.Config) *HealthWatcherService {
5252
}
5353

5454
func (hw *HealthWatcherService) AddHealthWatcher(instances []*mpi.Instance) {
55+
hw.healthWatcherMutex.Lock()
56+
defer hw.healthWatcherMutex.Unlock()
5557
for _, instance := range instances {
5658
switch instance.GetInstanceMeta().GetInstanceType() {
5759
case mpi.InstanceMeta_INSTANCE_TYPE_NGINX, mpi.InstanceMeta_INSTANCE_TYPE_NGINX_PLUS:

internal/watcher/instance/instance_watcher_service.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,6 @@ func (iw *InstanceWatcherService) ReparseConfig(ctx context.Context, instanceID
167167
func (iw *InstanceWatcherService) checkForUpdates(
168168
ctx context.Context,
169169
) {
170-
iw.cacheMutex.Lock()
171-
defer iw.cacheMutex.Unlock()
172-
173170
var instancesToParse []*mpi.Instance
174171
correlationID := logger.GenerateCorrelationID()
175172
newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, correlationID)
@@ -245,6 +242,8 @@ func (iw *InstanceWatcherService) instanceUpdates(ctx context.Context) (
245242
instanceUpdates InstanceUpdates,
246243
err error,
247244
) {
245+
iw.cacheMutex.Lock()
246+
defer iw.cacheMutex.Unlock()
248247
processes, err := iw.processOperator.Processes(ctx)
249248
if err != nil {
250249
return instanceUpdates, err

internal/watcher/watcher_plugin.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
"log/slog"
1111
"slices"
12+
"sync"
1213

1314
mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
1415

@@ -39,6 +40,7 @@ type (
3940
fileUpdatesChannel chan file.FileUpdateMessage
4041
cancel context.CancelFunc
4142
instancesWithConfigApplyInProgress []string
43+
watcherMutex sync.Mutex
4244
}
4345

4446
instanceWatcherServiceInterface interface {
@@ -65,6 +67,7 @@ func NewWatcher(agentConfig *config.Config) *Watcher {
6567
instanceHealthChannel: make(chan health.InstanceHealthMessage),
6668
fileUpdatesChannel: make(chan file.FileUpdateMessage),
6769
instancesWithConfigApplyInProgress: []string{},
70+
watcherMutex: sync.Mutex{},
6871
}
6972
}
7073

@@ -148,6 +151,8 @@ func (w *Watcher) handleConfigApplyRequest(ctx context.Context, msg *bus.Message
148151

149152
instanceID := request.ConfigApplyRequest.GetOverview().GetConfigVersion().GetInstanceId()
150153

154+
w.watcherMutex.Lock()
155+
defer w.watcherMutex.Unlock()
151156
w.instancesWithConfigApplyInProgress = append(w.instancesWithConfigApplyInProgress, instanceID)
152157
w.fileWatcherService.SetEnabled(false)
153158
}
@@ -163,13 +168,15 @@ func (w *Watcher) handleConfigApplySuccess(ctx context.Context, msg *bus.Message
163168

164169
instanceID := response.GetInstanceId()
165170

171+
w.watcherMutex.Lock()
166172
w.instancesWithConfigApplyInProgress = slices.DeleteFunc(
167173
w.instancesWithConfigApplyInProgress,
168174
func(element string) bool {
169175
return element == instanceID
170176
},
171177
)
172178
w.fileWatcherService.SetEnabled(true)
179+
w.watcherMutex.Unlock()
173180

174181
w.instanceWatcherService.ReparseConfig(ctx, instanceID)
175182
}
@@ -191,6 +198,8 @@ func (w *Watcher) handleConfigApplyComplete(ctx context.Context, msg *bus.Messag
191198

192199
instanceID := response.GetInstanceId()
193200

201+
w.watcherMutex.Lock()
202+
defer w.watcherMutex.Unlock()
194203
w.instancesWithConfigApplyInProgress = slices.DeleteFunc(
195204
w.instancesWithConfigApplyInProgress,
196205
func(element string) bool {
@@ -210,7 +219,7 @@ func (w *Watcher) monitorWatchers(ctx context.Context) {
210219
w.handleInstanceUpdates(newCtx, message)
211220
case message := <-w.nginxConfigContextChannel:
212221
newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID)
213-
222+
w.watcherMutex.Lock()
214223
if !slices.Contains(w.instancesWithConfigApplyInProgress, message.NginxConfigContext.InstanceID) {
215224
slog.DebugContext(
216225
newCtx,
@@ -229,6 +238,7 @@ func (w *Watcher) monitorWatchers(ctx context.Context) {
229238
"nginx_config_context", message.NginxConfigContext,
230239
)
231240
}
241+
w.watcherMutex.Unlock()
232242
case message := <-w.instanceHealthChannel:
233243
newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID)
234244
w.messagePipe.Process(newCtx, &bus.Message{

test/config/nginx/nginx-plus.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
worker_processes 1;
2+
error_log /var/log/nginx/error.log;
23

34
events {
45
worker_connections 1024;

test/helpers/test_containers_utils.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ func LogAndTerminateContainers(
277277
) {
278278
tb.Helper()
279279

280-
tb.Log("Logging nginx agent container logs")
280+
tb.Log("======================== Logging Agent Container Logs ========================")
281281
logReader, err := agentContainer.Logs(ctx)
282282
require.NoError(tb, err)
283283

@@ -294,7 +294,7 @@ func LogAndTerminateContainers(
294294
require.NoError(tb, err)
295295

296296
if mockManagementPlaneContainer != nil {
297-
tb.Log("Logging mock management container logs")
297+
tb.Log("======================== Logging Mock Management Container Logs ========================")
298298
logReader, err = mockManagementPlaneContainer.Logs(ctx)
299299
require.NoError(tb, err)
300300

0 commit comments

Comments
 (0)