Skip to content

Commit e43157e

Browse files
committed
WIP watcher changes
1 parent 6d95d70 commit e43157e

10 files changed

+403
-570
lines changed

internal/bus/topics.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@
66
package bus
77

88
const (
9-
AddInstancesTopic = "add-instances"
109
UpdatedInstancesTopic = "updated-instances"
11-
DeletedInstancesTopic = "deleted-instances"
1210
ResourceUpdateTopic = "resource-update"
1311
NginxConfigUpdateTopic = "nginx-config-update"
1412
InstanceHealthTopic = "instance-health"

internal/resource/resource_plugin.go

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -77,20 +77,6 @@ func (*Resource) Info() *bus.Info {
7777

7878
func (r *Resource) Process(ctx context.Context, msg *bus.Message) {
7979
switch msg.Topic {
80-
case bus.AddInstancesTopic:
81-
slog.DebugContext(ctx, "Resource plugin received add instances message")
82-
instanceList, ok := msg.Data.([]*mpi.Instance)
83-
if !ok {
84-
slog.ErrorContext(ctx, "Unable to cast message payload to []*mpi.Instance", "payload", msg.Data)
85-
86-
return
87-
}
88-
89-
resource := r.resourceService.AddInstances(instanceList)
90-
91-
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ResourceUpdateTopic, Data: resource})
92-
93-
return
9480
case bus.UpdatedInstancesTopic:
9581
slog.DebugContext(ctx, "Resource plugin received update instances message")
9682
instanceList, ok := msg.Data.([]*mpi.Instance)
@@ -103,20 +89,6 @@ func (r *Resource) Process(ctx context.Context, msg *bus.Message) {
10389

10490
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ResourceUpdateTopic, Data: resource})
10591

106-
return
107-
108-
case bus.DeletedInstancesTopic:
109-
slog.DebugContext(ctx, "Resource plugin received delete instances message")
110-
instanceList, ok := msg.Data.([]*mpi.Instance)
111-
if !ok {
112-
slog.ErrorContext(ctx, "Unable to cast message payload to []*mpi.Instance", "payload", msg.Data)
113-
114-
return
115-
}
116-
resource := r.resourceService.DeleteInstances(ctx, instanceList)
117-
118-
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ResourceUpdateTopic, Data: resource})
119-
12092
return
12193
case bus.WriteConfigSuccessfulTopic:
12294
r.handleWriteConfigSuccessful(ctx, msg)
@@ -133,9 +105,7 @@ func (r *Resource) Process(ctx context.Context, msg *bus.Message) {
133105

134106
func (*Resource) Subscriptions() []string {
135107
return []string{
136-
bus.AddInstancesTopic,
137108
bus.UpdatedInstancesTopic,
138-
bus.DeletedInstancesTopic,
139109
bus.WriteConfigSuccessfulTopic,
140110
bus.RollbackWriteTopic,
141111
bus.APIActionRequestTopic,

internal/resource/resource_service.go

Lines changed: 13 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,7 @@ const (
5656
//counterfeiter:generate . processOperator
5757

5858
type resourceServiceInterface interface {
59-
AddInstances(instanceList []*mpi.Instance) *mpi.Resource
6059
UpdateInstances(ctx context.Context, instanceList []*mpi.Instance) *mpi.Resource
61-
DeleteInstances(ctx context.Context, instanceList []*mpi.Instance) *mpi.Resource
6260
ApplyConfig(ctx context.Context, instanceID string) (*model.NginxConfigContext, error)
6361
Instance(instanceID string) *mpi.Instance
6462
GetHTTPUpstreamServers(ctx context.Context, instance *mpi.Instance, upstreams string) ([]client.UpstreamServer,
@@ -91,9 +89,10 @@ type (
9189

9290
type ResourceService struct {
9391
resource *mpi.Resource
92+
napInstance *mpi.Instance
9493
nginxConfigParser parser.ConfigParser
9594
agentConfig *config.Config
96-
instanceOperators map[string]instanceOperator // key is instance ID
95+
instanceOperator instanceOperator
9796
info host.InfoInterface
9897
manifestFilePath string
9998
resourceMutex sync.Mutex
@@ -106,7 +105,7 @@ func NewResourceService(ctx context.Context, agentConfig *config.Config) *Resour
106105
resourceMutex: sync.Mutex{},
107106
info: host.NewInfo(),
108107
operatorsMutex: sync.Mutex{},
109-
instanceOperators: make(map[string]instanceOperator),
108+
instanceOperator: NewInstanceOperator(agentConfig),
110109
nginxConfigParser: parser.NewNginxConfigParser(agentConfig),
111110
agentConfig: agentConfig,
112111
manifestFilePath: agentConfig.LibDir + "/manifest.json",
@@ -117,15 +116,6 @@ func NewResourceService(ctx context.Context, agentConfig *config.Config) *Resour
117116
return resourceService
118117
}
119118

120-
func (r *ResourceService) AddInstances(instanceList []*mpi.Instance) *mpi.Resource {
121-
r.resourceMutex.Lock()
122-
defer r.resourceMutex.Unlock()
123-
r.resource.Instances = append(r.resource.GetInstances(), instanceList...)
124-
r.AddOperator(instanceList)
125-
126-
return r.resource
127-
}
128-
129119
func (r *ResourceService) Instance(instanceID string) *mpi.Instance {
130120
for _, instance := range r.resource.GetInstances() {
131121
if instance.GetInstanceMeta().GetInstanceId() == instanceID {
@@ -136,73 +126,25 @@ func (r *ResourceService) Instance(instanceID string) *mpi.Instance {
136126
return nil
137127
}
138128

139-
func (r *ResourceService) AddOperator(instanceList []*mpi.Instance) {
140-
r.operatorsMutex.Lock()
141-
defer r.operatorsMutex.Unlock()
142-
for _, instance := range instanceList {
143-
r.instanceOperators[instance.GetInstanceMeta().GetInstanceId()] = NewInstanceOperator(r.agentConfig)
144-
}
145-
}
146-
147-
func (r *ResourceService) RemoveOperator(instanceList []*mpi.Instance) {
148-
r.operatorsMutex.Lock()
149-
defer r.operatorsMutex.Unlock()
150-
for _, instance := range instanceList {
151-
delete(r.instanceOperators, instance.GetInstanceMeta().GetInstanceId())
152-
}
153-
}
154-
155129
func (r *ResourceService) UpdateInstances(ctx context.Context, instanceList []*mpi.Instance) *mpi.Resource {
156130
r.resourceMutex.Lock()
157131
defer r.resourceMutex.Unlock()
158-
159-
for _, updatedInstance := range instanceList {
160-
resourceCopy, ok := proto.Clone(r.resource).(*mpi.Resource)
161-
if ok {
162-
for _, instance := range resourceCopy.GetInstances() {
163-
if updatedInstance.GetInstanceMeta().GetInstanceId() == instance.GetInstanceMeta().GetInstanceId() {
164-
instance.InstanceMeta = updatedInstance.GetInstanceMeta()
165-
instance.InstanceRuntime = updatedInstance.GetInstanceRuntime()
166-
instance.InstanceConfig = updatedInstance.GetInstanceConfig()
167-
}
168-
}
169-
r.resource = resourceCopy
170-
} else {
171-
slog.WarnContext(ctx, "Unable to clone resource while updating instances", "resource",
172-
r.resource, "instances", instanceList)
173-
}
174-
}
175-
176-
return r.resource
177-
}
178-
179-
func (r *ResourceService) DeleteInstances(ctx context.Context, instanceList []*mpi.Instance) *mpi.Resource {
180-
r.resourceMutex.Lock()
181-
defer r.resourceMutex.Unlock()
182-
183-
for _, deletedInstance := range instanceList {
184-
resourceCopy, ok := proto.Clone(r.resource).(*mpi.Resource)
185-
if ok {
186-
for index, instance := range resourceCopy.GetInstances() {
187-
if deletedInstance.GetInstanceMeta().GetInstanceId() == instance.GetInstanceMeta().GetInstanceId() {
188-
r.resource.Instances = append(r.resource.Instances[:index], r.resource.GetInstances()[index+1:]...)
189-
}
190-
}
191-
} else {
192-
slog.WarnContext(ctx, "Unable to clone resource while deleting instances", "resource",
193-
r.resource, "instances", instanceList)
194-
}
132+
resourceCopy, ok := proto.Clone(r.resource).(*mpi.Resource)
133+
if ok {
134+
resourceCopy.Instances = instanceList
135+
r.resource = resourceCopy
136+
} else {
137+
slog.WarnContext(ctx, "Unable to clone resource while updating instances", "resource",
138+
r.resource, "instances", instanceList)
195139
}
196-
r.RemoveOperator(instanceList)
197140

198141
return r.resource
199142
}
200143

201144
func (r *ResourceService) ApplyConfig(ctx context.Context, instanceID string) (*model.NginxConfigContext, error) {
202145
var instance *mpi.Instance
203-
operator := r.instanceOperators[instanceID]
204146

205-
if operator == nil {
147+
if r.instanceOperator == nil {
206148
return nil, fmt.Errorf("instance %s not found", instanceID)
207149
}
208150

@@ -224,12 +166,12 @@ func (r *ResourceService) ApplyConfig(ctx context.Context, instanceID string) (*
224166

225167
slog.DebugContext(ctx, "Updated Instance Runtime after parsing config", "instance", instance.GetInstanceRuntime())
226168

227-
valErr := operator.Validate(ctx, instance)
169+
valErr := r.instanceOperator.Validate(ctx, instance)
228170
if valErr != nil {
229171
return nil, fmt.Errorf("failed validating config %w", valErr)
230172
}
231173

232-
reloadErr := operator.Reload(ctx, instance)
174+
reloadErr := r.instanceOperator.Reload(ctx, instance)
233175
if reloadErr != nil {
234176
return nil, fmt.Errorf("failed to reload NGINX %w", reloadErr)
235177
}

internal/watcher/health/health_watcher_service.go

Lines changed: 9 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ type (
3030

3131
HealthWatcherService struct {
3232
agentConfig *config.Config
33-
cache map[string]*mpi.InstanceHealth // key is instanceID
34-
watchers map[string]healthWatcherOperator // key is instanceID
35-
instances map[string]*mpi.Instance // key is instanceID
33+
cache map[string]*mpi.InstanceHealth // key is instanceID
34+
watcher healthWatcherOperator // key is instanceID
35+
instances map[string]*mpi.Instance // key is instanceID
3636
healthWatcherMutex sync.Mutex
3737
}
3838

@@ -44,51 +44,21 @@ type (
4444

4545
func NewHealthWatcherService(agentConfig *config.Config) *HealthWatcherService {
4646
return &HealthWatcherService{
47-
watchers: make(map[string]healthWatcherOperator),
47+
watcher: NewNginxHealthWatcher(),
4848
cache: make(map[string]*mpi.InstanceHealth),
4949
instances: make(map[string]*mpi.Instance),
5050
agentConfig: agentConfig,
5151
}
5252
}
5353

54-
func (hw *HealthWatcherService) AddHealthWatcher(instances []*mpi.Instance) {
55-
hw.healthWatcherMutex.Lock()
56-
defer hw.healthWatcherMutex.Unlock()
57-
58-
for _, instance := range instances {
59-
switch instance.GetInstanceMeta().GetInstanceType() {
60-
case mpi.InstanceMeta_INSTANCE_TYPE_NGINX, mpi.InstanceMeta_INSTANCE_TYPE_NGINX_PLUS:
61-
watcher := NewNginxHealthWatcher()
62-
hw.watchers[instance.GetInstanceMeta().GetInstanceId()] = watcher
63-
case mpi.InstanceMeta_INSTANCE_TYPE_AGENT:
64-
case mpi.InstanceMeta_INSTANCE_TYPE_UNSPECIFIED,
65-
mpi.InstanceMeta_INSTANCE_TYPE_UNIT,
66-
mpi.InstanceMeta_INSTANCE_TYPE_NGINX_APP_PROTECT:
67-
fallthrough
68-
default:
69-
slog.Warn("Health watcher not implemented", "instance_type",
70-
instance.GetInstanceMeta().GetInstanceType())
71-
}
72-
hw.instances[instance.GetInstanceMeta().GetInstanceId()] = instance
73-
}
74-
}
75-
7654
func (hw *HealthWatcherService) UpdateHealthWatcher(instances []*mpi.Instance) {
7755
hw.healthWatcherMutex.Lock()
7856
defer hw.healthWatcherMutex.Unlock()
7957

80-
for _, instance := range instances {
81-
hw.instances[instance.GetInstanceMeta().GetInstanceId()] = instance
82-
}
83-
}
84-
85-
func (hw *HealthWatcherService) DeleteHealthWatcher(instances []*mpi.Instance) {
86-
hw.healthWatcherMutex.Lock()
87-
defer hw.healthWatcherMutex.Unlock()
58+
clear(hw.instances)
8859

8960
for _, instance := range instances {
90-
delete(hw.watchers, instance.GetInstanceMeta().GetInstanceId())
91-
delete(hw.instances, instance.GetInstanceMeta().GetInstanceId())
61+
hw.instances[instance.GetInstanceMeta().GetInstanceId()] = instance
9262
}
9363
}
9464

@@ -135,11 +105,11 @@ func (hw *HealthWatcherService) Watch(ctx context.Context, ch chan<- InstanceHea
135105

136106
func (hw *HealthWatcherService) health(ctx context.Context) (updatedStatuses []*mpi.InstanceHealth, isHealthDiff bool,
137107
) {
138-
currentHealth := make(map[string]*mpi.InstanceHealth, len(hw.watchers))
108+
currentHealth := make(map[string]*mpi.InstanceHealth, len(hw.instances))
139109
allStatuses := make([]*mpi.InstanceHealth, 0)
140110

141-
for instanceID, watcher := range hw.watchers {
142-
instanceHealth, err := watcher.Health(ctx, hw.instances[instanceID])
111+
for instanceID := range hw.instances {
112+
instanceHealth, err := hw.watcher.Health(ctx, hw.instances[instanceID])
143113
if instanceHealth == nil {
144114
instanceHealth = &mpi.InstanceHealth{
145115
InstanceId: instanceID,

0 commit comments

Comments
 (0)