Skip to content

Commit 611c6bf

Browse files
authored
namespace-scope datastore keys and improve obj updates (#957)
Signed-off-by: Braulio Dumba <Braulio.Dumba@ibm.com>
1 parent e0ed956 commit 611c6bf

6 files changed

Lines changed: 83 additions & 42 deletions

File tree

internal/collector/source/registry.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,26 @@ func (r *SourceRegistry) List() []string {
6262
}
6363
return names
6464
}
65+
66+
// Unregister removes a source from the registry.
67+
// This operation is idempotent - it succeeds even if the source is not registered.
68+
// This makes deletion paths clean and idempotent, especially useful for reconcile
69+
// retries and race conditions where PoolDelete may be called multiple times.
70+
func (r *SourceRegistry) Unregister(name string) error {
71+
r.mu.Lock()
72+
defer r.mu.Unlock()
73+
74+
// Idempotent: no-op if source doesn't exist
75+
delete(r.sources, name)
76+
return nil
77+
}
78+
79+
// Update replaces an existing source or registers a new one.
80+
// Unlike Register, this method does not return an error if the source already exists.
81+
func (r *SourceRegistry) Update(name string, source MetricsSource) error {
82+
r.mu.Lock()
83+
defer r.mu.Unlock()
84+
85+
r.sources[name] = source
86+
return nil
87+
}

internal/controller/inferencepool_reconciler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques
6060
if err := c.Get(ctx, req.NamespacedName, obj); err != nil {
6161
if errors.IsNotFound(err) {
6262
logger.Info("InferencePool not found. Removing pool from datastore")
63-
c.Datastore.PoolDelete(req.Name)
63+
c.Datastore.PoolDelete(req.Namespace + "/" + req.Name)
6464
return ctrl.Result{}, nil
6565
}
6666
return ctrl.Result{}, fmt.Errorf("unable to get InferencePool - %w", err)
@@ -69,7 +69,7 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques
6969
// 3. Perform common checks using the client.Object interface.
7070
if !obj.GetDeletionTimestamp().IsZero() {
7171
logger.Info("InferencePool is marked for deletion. Removing pool from datastore")
72-
c.Datastore.PoolDelete(obj.GetName()) // remove the pool from the datastore
72+
c.Datastore.PoolDelete(obj.GetNamespace() + "/" + obj.GetName()) // remove the pool from the datastore
7373
return ctrl.Result{}, nil
7474
}
7575

internal/controller/inferencepool_reconciler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func TestInferencePoolReconcile(t *testing.T) {
157157
}
158158

159159
func diffStore(store datastore.Datastore, ep *poolutil.EndpointPool) string {
160-
gotPool, _ := store.PoolGet(ep.Name)
160+
gotPool, _ := store.PoolGet(ep.Namespace + "/" + ep.Name)
161161
if diff := cmp.Diff(ep, gotPool); diff != "" {
162162
return "inferencePool:" + diff
163163
}

internal/datastore/datastore.go

Lines changed: 39 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ type Datastore interface {
7575
PoolGet(name string) (*poolutil.EndpointPool, error)
7676
PoolGetMetricsSource(name string) source.MetricsSource
7777
PoolList() []*poolutil.EndpointPool
78-
PoolGetFromLabels(labels map[string]string) (*poolutil.EndpointPool, error)
78+
PoolGetFromLabels(namespace string, labels map[string]string) (*poolutil.EndpointPool, error)
7979
PoolDelete(name string)
8080

8181
// Clears the store state, happens when the pool gets deleted.
@@ -117,38 +117,37 @@ func (ds *datastore) PoolSet(ctx context.Context, client client.Client, pool *po
117117
return errPoolIsNull
118118
}
119119

120-
if ds.registry.Get(pool.Name) == nil {
121-
// Create pod source using the EPP metrics token read by getEPPMetricsToken()
122-
// from the mounted token file at /var/run/secrets/epp-metrics/token. This token
123-
// is mounted from the epp-metrics service account with minimal privileges and is
124-
// used for authenticating with EPP pods when scraping metrics.
125-
podConfig := pod.PodScrapingSourceConfig{
126-
ServiceName: pool.EndpointPicker.ServiceName,
127-
ServiceNamespace: pool.EndpointPicker.Namespace,
128-
MetricsPort: pool.EndpointPicker.MetricsPortNumber,
129-
BearerToken: getEPPMetricsToken(),
130-
}
120+
namespacedName := pool.Namespace + "/" + pool.Name
121+
122+
// Create or update pod source using the EPP metrics token read by getEPPMetricsToken()
123+
// from the mounted token file at /var/run/secrets/epp-metrics/token. This token
124+
// is mounted from the epp-metrics service account with minimal privileges and is
125+
// used for authenticating with EPP pods when scraping metrics.
126+
podConfig := pod.PodScrapingSourceConfig{
127+
ServiceName: pool.EndpointPicker.ServiceName,
128+
ServiceNamespace: pool.EndpointPicker.Namespace,
129+
MetricsPort: pool.EndpointPicker.MetricsPortNumber,
130+
BearerToken: getEPPMetricsToken(),
131+
}
131132

132-
podSource, err := pod.NewPodScrapingSource(ctx, client, podConfig)
133-
if err != nil {
134-
return err
135-
}
133+
podSource, err := pod.NewPodScrapingSource(ctx, client, podConfig)
134+
if err != nil {
135+
return err
136+
}
136137

137-
// Register in registry
138-
// TODO: We need to be able to update or delete a pod source object in the registry at internal/collector/source/registry.go
139-
if err := ds.registry.Register(pool.Name, podSource); err != nil {
140-
return err
141-
}
138+
// Update or register in registry (idempotent operation)
139+
if err := ds.registry.Update(namespacedName, podSource); err != nil {
140+
return err
142141
}
143142

144-
// Store in the datastore
145-
ds.pools.Store(pool.Name, pool)
143+
// Store in the datastore with namespace-scoped key
144+
ds.pools.Store(pool.Namespace+"/"+pool.Name, pool)
146145
return nil
147146
}
148147

149-
func (ds *datastore) PoolGet(name string) (*poolutil.EndpointPool, error) {
148+
func (ds *datastore) PoolGet(namespacedName string) (*poolutil.EndpointPool, error) {
150149

151-
pool, exist := ds.pools.Load(name)
150+
pool, exist := ds.pools.Load(namespacedName)
152151
if !exist {
153152
return nil, errPoolNotSynced
154153
}
@@ -157,22 +156,27 @@ func (ds *datastore) PoolGet(name string) (*poolutil.EndpointPool, error) {
157156
return epp, nil
158157
}
159158

160-
func (ds *datastore) PoolGetMetricsSource(name string) source.MetricsSource {
161-
source := ds.registry.Get(name)
159+
func (ds *datastore) PoolGetMetricsSource(namespacedName string) source.MetricsSource {
160+
source := ds.registry.Get(namespacedName)
162161
return source
163162
}
164163

165-
func (ds *datastore) PoolGetFromLabels(labels map[string]string) (*poolutil.EndpointPool, error) {
164+
func (ds *datastore) PoolGetFromLabels(namespace string, labels map[string]string) (*poolutil.EndpointPool, error) {
166165
exist := false
167166
var ep *poolutil.EndpointPool
168167

169168
ds.pools.Range(func(k, v any) bool {
170169
ep = v.(*poolutil.EndpointPool)
171170

171+
// Filter by namespace first to avoid cross-namespace matches
172+
if ep.Namespace != namespace {
173+
return true // Continue iteration
174+
}
175+
172176
found := poolutil.IsSubset(ep.Selector, labels)
173177
if found {
174178
exist = true
175-
return false
179+
return false // Stop iteration - found a match
176180
}
177181
return true
178182
})
@@ -193,8 +197,12 @@ func (ds *datastore) PoolList() []*poolutil.EndpointPool {
193197
return res
194198
}
195199

196-
func (ds *datastore) PoolDelete(name string) {
197-
ds.pools.Delete(name)
200+
func (ds *datastore) PoolDelete(namespacedName string) {
201+
ds.pools.Delete(namespacedName)
202+
// Clean up the metrics source from the registry
203+
if err := ds.registry.Unregister(namespacedName); err != nil {
204+
ctrl.Log.V(logging.DEBUG).Info("Failed to unregister metrics source", "namespacedName", namespacedName, "error", err)
205+
}
198206
}
199207

200208
func (ds *datastore) Clear() {

internal/datastore/datastore_test.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ func TestDatastore(t *testing.T) {
106106
t.Errorf("Unexpected InferencePoolToEndpointPool error: %v", err)
107107
}
108108

109-
gotPoolMatch, err := ds.PoolGetFromLabels(tt.labels)
109+
// Pass the namespace from the wantPool to match the pool in the same namespace
110+
gotPoolMatch, err := ds.PoolGetFromLabels(tt.wantPool.Namespace, tt.labels)
110111
if err != nil {
111112
t.Errorf("Unexpected PoolGetFromLabels error: %v", err)
112113
}
@@ -123,7 +124,7 @@ func TestDatastore(t *testing.T) {
123124
t.Errorf("Unexpected InferencePoolToEndpointPool error: %v", err)
124125
}
125126

126-
gotPool, err := ds.PoolGet(ep.Name)
127+
gotPool, err := ds.PoolGet(ep.Namespace + "/" + ep.Name)
127128
if err != nil {
128129
t.Errorf("failed to add endpoint into the datastore: %v", err)
129130
}
@@ -132,10 +133,19 @@ func TestDatastore(t *testing.T) {
132133
t.Errorf("Unexpected pool diff (+got/-want): %s", diff)
133134
}
134135

136+
// Verify metrics source exists before deletion
137+
namespacedName := ep.Namespace + "/" + ep.Name
138+
metricsSource := ds.PoolGetMetricsSource(namespacedName)
139+
assert.NotNil(t, metricsSource, "Metrics source should exist in registry before deletion")
140+
135141
// Test Delete & PoolList
136-
ds.PoolDelete(ep.Name)
142+
ds.PoolDelete(namespacedName)
137143
assert.Equal(t, len(ds.PoolList()), tt.clearDeleteResultLen, "Pools map should have the expected length after item deleted")
138144

145+
// Verify metrics source is cleaned up from registry after deletion
146+
metricsSourceAfterDelete := ds.PoolGetMetricsSource(namespacedName)
147+
assert.Nil(t, metricsSourceAfterDelete, "Metrics source should be removed from registry after pool deletion")
148+
139149
if err := ds.PoolSet(ctx, fakeClient, ep); err != nil {
140150
t.Errorf("failed to add endpoint into the datastore: %v", err)
141151
}

internal/engines/scalefromzero/engine.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,20 +229,20 @@ func (e *Engine) processInactiveVariant(ctx context.Context, deployments map[str
229229
return nil
230230
}
231231

232-
// Find target EPP for metrics collection
233-
pool, err := e.Datastore.PoolGetFromLabels(labels)
232+
// Find target EPP for metrics collection in the same namespace as the VA
233+
pool, err := e.Datastore.PoolGetFromLabels(va.Namespace, labels)
234234
if err != nil {
235-
logger.Error(err, "Error finding target EPP", "variant", va.Name, "target VA model", va.Spec.ModelID)
235+
logger.Error(err, "Error finding target EPP", "variant", va.Name, "namespace", va.Namespace, "target VA model", va.Spec.ModelID)
236236
return err
237237
}
238238

239239
// Use EPP source from registry
240-
eppSource := e.Datastore.PoolGetMetricsSource(pool.Name)
240+
eppSource := e.Datastore.PoolGetMetricsSource(pool.Namespace + "/" + pool.Name)
241241
if eppSource == nil {
242242
logger.Info("Scale-from-zero: skipping VA, EPP metrics source not found in datastore",
243243
"va", va.Name,
244244
"namespace", va.Namespace,
245-
"pool", pool.Name)
245+
"pool", pool.Namespace+"/"+pool.Name)
246246
return errors.New("endpointpicker metrics source not found in datastore")
247247
}
248248

0 commit comments

Comments
 (0)