Skip to content

Commit 67dda52

Browse files
committed
fix resource race condition
1 parent 1a4e321 commit 67dda52

File tree

4 files changed

+26
-27
lines changed

4 files changed

+26
-27
lines changed

internal/resource/resource_plugin.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,12 @@ import (
99
"context"
1010
"errors"
1111
"fmt"
12-
"log/slog"
13-
"sync"
14-
1512
mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
1613
"github.com/nginx/agent/v3/internal/config"
1714
response "github.com/nginx/agent/v3/internal/datasource/proto"
1815
"github.com/nginx/agent/v3/internal/logger"
1916
"github.com/nginx/agent/v3/internal/model"
17+
"log/slog"
2018

2119
"github.com/nginx/agent/v3/internal/bus"
2220
)
@@ -29,7 +27,6 @@ type Resource struct {
2927
messagePipe bus.MessagePipeInterface
3028
resourceService resourceServiceInterface
3129
agentConfig *config.Config
32-
resourceMutex sync.Mutex
3330
}
3431

3532
type errResponse struct {
@@ -48,8 +45,7 @@ var _ bus.Plugin = (*Resource)(nil)
4845

4946
func NewResource(agentConfig *config.Config) *Resource {
5047
return &Resource{
51-
agentConfig: agentConfig,
52-
resourceMutex: sync.Mutex{},
48+
agentConfig: agentConfig,
5349
}
5450
}
5551

@@ -97,8 +93,6 @@ func (r *Resource) Process(ctx context.Context, msg *bus.Message) {
9793

9894
return
9995
}
100-
r.resourceMutex.Lock()
101-
defer r.resourceMutex.Unlock()
10296
resource := r.resourceService.UpdateInstances(instanceList)
10397

10498
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ResourceUpdateTopic, Data: resource})

internal/resource/resource_service.go

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"encoding/json"
1111
"errors"
1212
"fmt"
13+
"google.golang.org/protobuf/proto"
1314
"log/slog"
1415
"net"
1516
"net/http"
@@ -131,13 +132,20 @@ func (r *ResourceService) UpdateInstances(instanceList []*mpi.Instance) *mpi.Res
131132
defer r.resourceMutex.Unlock()
132133

133134
for _, updatedInstance := range instanceList {
134-
for _, instance := range r.resource.GetInstances() {
135-
if updatedInstance.GetInstanceMeta().GetInstanceId() == instance.GetInstanceMeta().GetInstanceId() {
136-
instance.InstanceMeta = updatedInstance.GetInstanceMeta()
137-
instance.InstanceRuntime = updatedInstance.GetInstanceRuntime()
138-
instance.InstanceConfig = updatedInstance.GetInstanceConfig()
135+
resourceCopy, ok := proto.Clone(r.resource).(*mpi.Resource)
136+
if ok {
137+
for _, instance := range resourceCopy.GetInstances() {
138+
if updatedInstance.GetInstanceMeta().GetInstanceId() == instance.GetInstanceMeta().GetInstanceId() {
139+
instance.InstanceMeta = updatedInstance.GetInstanceMeta()
140+
instance.InstanceRuntime = updatedInstance.GetInstanceRuntime()
141+
instance.InstanceConfig = updatedInstance.GetInstanceConfig()
142+
}
139143
}
144+
r.resource = resourceCopy
145+
} else {
146+
slog.Warn("Error updating resource instances", "instances", instanceList)
140147
}
148+
141149
}
142150

143151
return r.resource
@@ -148,21 +156,24 @@ func (r *ResourceService) DeleteInstances(instanceList []*mpi.Instance) *mpi.Res
148156
defer r.resourceMutex.Unlock()
149157

150158
for _, deletedInstance := range instanceList {
151-
for index, instance := range r.resource.GetInstances() {
152-
if deletedInstance.GetInstanceMeta().GetInstanceId() == instance.GetInstanceMeta().GetInstanceId() {
153-
r.resource.Instances = append(r.resource.Instances[:index], r.resource.GetInstances()[index+1:]...)
159+
resourceCopy, ok := proto.Clone(r.resource).(*mpi.Resource)
160+
if ok {
161+
for index, instance := range resourceCopy.GetInstances() {
162+
if deletedInstance.GetInstanceMeta().GetInstanceId() == instance.GetInstanceMeta().GetInstanceId() {
163+
r.resource.Instances = append(r.resource.Instances[:index], r.resource.GetInstances()[index+1:]...)
164+
}
154165
}
166+
} else {
167+
slog.Warn("Error deleting instances from resource", "instances", instanceList)
155168
}
169+
156170
}
157171
r.RemoveOperator(instanceList)
158172

159173
return r.resource
160174
}
161175

162176
func (r *ResourceService) ApplyConfig(ctx context.Context, instanceID string) error {
163-
r.resourceMutex.Lock()
164-
defer r.resourceMutex.Unlock()
165-
166177
var instance *mpi.Instance
167178
operator := r.instanceOperators[instanceID]
168179

internal/watcher/health/nginx_health_watcher_operator.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ package health
88
import (
99
"context"
1010
"fmt"
11-
"sync"
12-
1311
mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
1412
"github.com/nginx/agent/v3/internal/datasource/host/exec"
1513
processwatcher "github.com/nginx/agent/v3/internal/watcher/process"
@@ -19,7 +17,6 @@ import (
1917
type NginxHealthWatcher struct {
2018
executer exec.ExecInterface
2119
processOperator processwatcher.ProcessOperatorInterface
22-
healthMutex sync.Mutex
2320
}
2421

2522
var _ healthWatcherOperator = (*NginxHealthWatcher)(nil)
@@ -28,13 +25,10 @@ func NewNginxHealthWatcher() *NginxHealthWatcher {
2825
return &NginxHealthWatcher{
2926
executer: &exec.Exec{},
3027
processOperator: processwatcher.NewProcessOperator(),
31-
healthMutex: sync.Mutex{},
3228
}
3329
}
3430

3531
func (nhw *NginxHealthWatcher) Health(ctx context.Context, instance *mpi.Instance) (*mpi.InstanceHealth, error) {
36-
nhw.healthMutex.Lock()
37-
defer nhw.healthMutex.Unlock()
3832

3933
health := &mpi.InstanceHealth{
4034
InstanceId: instance.GetInstanceMeta().GetInstanceId(),

internal/watcher/instance/instance_watcher_service.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,6 @@ func (iw *InstanceWatcherService) ReparseConfig(ctx context.Context, instanceID
167167
func (iw *InstanceWatcherService) checkForUpdates(
168168
ctx context.Context,
169169
) {
170-
iw.cacheMutex.Lock()
171-
defer iw.cacheMutex.Unlock()
172170

173171
var instancesToParse []*mpi.Instance
174172
correlationID := logger.GenerateCorrelationID()
@@ -245,6 +243,8 @@ func (iw *InstanceWatcherService) instanceUpdates(ctx context.Context) (
245243
instanceUpdates InstanceUpdates,
246244
err error,
247245
) {
246+
iw.cacheMutex.Lock()
247+
defer iw.cacheMutex.Unlock()
248248
processes, err := iw.processOperator.Processes(ctx)
249249
if err != nil {
250250
return instanceUpdates, err

0 commit comments

Comments
 (0)