Skip to content

Commit 1a4e321

Browse files
committed
WIP
1 parent 96b0501 commit 1a4e321

File tree

5 files changed

+28
-2
lines changed

5 files changed

+28
-2
lines changed

internal/resource/resource_plugin.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"errors"
1111
"fmt"
1212
"log/slog"
13+
"sync"
1314

1415
mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
1516
"github.com/nginx/agent/v3/internal/config"
@@ -28,6 +29,7 @@ type Resource struct {
2829
messagePipe bus.MessagePipeInterface
2930
resourceService resourceServiceInterface
3031
agentConfig *config.Config
32+
resourceMutex sync.Mutex
3133
}
3234

3335
type errResponse struct {
@@ -46,7 +48,8 @@ var _ bus.Plugin = (*Resource)(nil)
4648

4749
func NewResource(agentConfig *config.Config) *Resource {
4850
return &Resource{
49-
agentConfig: agentConfig,
51+
agentConfig: agentConfig,
52+
resourceMutex: sync.Mutex{},
5053
}
5154
}
5255

@@ -94,6 +97,8 @@ func (r *Resource) Process(ctx context.Context, msg *bus.Message) {
9497

9598
return
9699
}
100+
r.resourceMutex.Lock()
101+
defer r.resourceMutex.Unlock()
97102
resource := r.resourceService.UpdateInstances(instanceList)
98103

99104
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ResourceUpdateTopic, Data: resource})

internal/resource/resource_service.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@ func (r *ResourceService) DeleteInstances(instanceList []*mpi.Instance) *mpi.Res
160160
}
161161

162162
func (r *ResourceService) ApplyConfig(ctx context.Context, instanceID string) error {
163+
r.resourceMutex.Lock()
164+
defer r.resourceMutex.Unlock()
165+
163166
var instance *mpi.Instance
164167
operator := r.instanceOperators[instanceID]
165168

internal/watcher/health/health_watcher_service.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ func NewHealthWatcherService(agentConfig *config.Config) *HealthWatcherService {
5252
}
5353

5454
func (hw *HealthWatcherService) AddHealthWatcher(instances []*mpi.Instance) {
55+
hw.healthWatcherMutex.Lock()
56+
defer hw.healthWatcherMutex.Unlock()
5557
for _, instance := range instances {
5658
switch instance.GetInstanceMeta().GetInstanceType() {
5759
case mpi.InstanceMeta_INSTANCE_TYPE_NGINX, mpi.InstanceMeta_INSTANCE_TYPE_NGINX_PLUS:

internal/watcher/health/nginx_health_watcher_operator.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package health
88
import (
99
"context"
1010
"fmt"
11+
"sync"
1112

1213
mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
1314
"github.com/nginx/agent/v3/internal/datasource/host/exec"
@@ -18,6 +19,7 @@ import (
1819
type NginxHealthWatcher struct {
1920
executer exec.ExecInterface
2021
processOperator processwatcher.ProcessOperatorInterface
22+
healthMutex sync.Mutex
2123
}
2224

2325
var _ healthWatcherOperator = (*NginxHealthWatcher)(nil)
@@ -26,10 +28,14 @@ func NewNginxHealthWatcher() *NginxHealthWatcher {
2628
return &NginxHealthWatcher{
2729
executer: &exec.Exec{},
2830
processOperator: processwatcher.NewProcessOperator(),
31+
healthMutex: sync.Mutex{},
2932
}
3033
}
3134

3235
func (nhw *NginxHealthWatcher) Health(ctx context.Context, instance *mpi.Instance) (*mpi.InstanceHealth, error) {
36+
nhw.healthMutex.Lock()
37+
defer nhw.healthMutex.Unlock()
38+
3339
health := &mpi.InstanceHealth{
3440
InstanceId: instance.GetInstanceMeta().GetInstanceId(),
3541
InstanceHealthStatus: mpi.InstanceHealth_INSTANCE_HEALTH_STATUS_HEALTHY,

internal/watcher/watcher_plugin.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
"log/slog"
1111
"slices"
12+
"sync"
1213

1314
mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
1415

@@ -39,6 +40,7 @@ type (
3940
fileUpdatesChannel chan file.FileUpdateMessage
4041
cancel context.CancelFunc
4142
instancesWithConfigApplyInProgress []string
43+
watcherMutex sync.Mutex
4244
}
4345

4446
instanceWatcherServiceInterface interface {
@@ -65,6 +67,7 @@ func NewWatcher(agentConfig *config.Config) *Watcher {
6567
instanceHealthChannel: make(chan health.InstanceHealthMessage),
6668
fileUpdatesChannel: make(chan file.FileUpdateMessage),
6769
instancesWithConfigApplyInProgress: []string{},
70+
watcherMutex: sync.Mutex{},
6871
}
6972
}
7073

@@ -148,6 +151,8 @@ func (w *Watcher) handleConfigApplyRequest(ctx context.Context, msg *bus.Message
148151

149152
instanceID := request.ConfigApplyRequest.GetOverview().GetConfigVersion().GetInstanceId()
150153

154+
w.watcherMutex.Lock()
155+
defer w.watcherMutex.Unlock()
151156
w.instancesWithConfigApplyInProgress = append(w.instancesWithConfigApplyInProgress, instanceID)
152157
w.fileWatcherService.SetEnabled(false)
153158
}
@@ -163,6 +168,8 @@ func (w *Watcher) handleConfigApplySuccess(ctx context.Context, msg *bus.Message
163168

164169
instanceID := response.GetInstanceId()
165170

171+
w.watcherMutex.Lock()
172+
defer w.watcherMutex.Unlock()
166173
w.instancesWithConfigApplyInProgress = slices.DeleteFunc(
167174
w.instancesWithConfigApplyInProgress,
168175
func(element string) bool {
@@ -191,6 +198,8 @@ func (w *Watcher) handleConfigApplyComplete(ctx context.Context, msg *bus.Messag
191198

192199
instanceID := response.GetInstanceId()
193200

201+
w.watcherMutex.Lock()
202+
defer w.watcherMutex.Unlock()
194203
w.instancesWithConfigApplyInProgress = slices.DeleteFunc(
195204
w.instancesWithConfigApplyInProgress,
196205
func(element string) bool {
@@ -210,7 +219,7 @@ func (w *Watcher) monitorWatchers(ctx context.Context) {
210219
w.handleInstanceUpdates(newCtx, message)
211220
case message := <-w.nginxConfigContextChannel:
212221
newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID)
213-
222+
w.watcherMutex.Lock()
214223
if !slices.Contains(w.instancesWithConfigApplyInProgress, message.NginxConfigContext.InstanceID) {
215224
slog.DebugContext(
216225
newCtx,
@@ -229,6 +238,7 @@ func (w *Watcher) monitorWatchers(ctx context.Context) {
229238
"nginx_config_context", message.NginxConfigContext,
230239
)
231240
}
241+
w.watcherMutex.Unlock()
232242
case message := <-w.instanceHealthChannel:
233243
newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID)
234244
w.messagePipe.Process(newCtx, &bus.Message{

0 commit comments

Comments
 (0)