Skip to content

Commit 543838c

Browse files
pastequojhernand
andauthored
MGMT-20233: Recompute operator dependencies after discovering the hosts (#7524)
* NO-ISSUE: Recalculate operator dependencies before validations Currently operator dependencies are only calculated when a cluster is created or updated. But certain dependencies are dynamic, and may change when new hosts are added. For example, if a cluster has the OpenShift AI operator installed, it will also require the NVIDIA GPU operator only if there are hosts that have NVIDIA GPUs. To support those dynamic dependencies this patch modifies the cluster monitor so that it recalculates the operator dependencies before checking validations. Signed-off-by: Juan Hernandez <[email protected]> (cherry picked from commit 5371230) * MGMT-20233: Improve recomputation of operator dependencies * Run all db changes within a transaction * Call to EnsureOperatorPrerequisite --------- Co-authored-by: Juan Hernandez <[email protected]>
1 parent 54e22e8 commit 543838c

File tree

14 files changed

+572
-98
lines changed

14 files changed

+572
-98
lines changed

cmd/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ func main() {
458458
manifestsGenerator := network.NewManifestsGenerator(manifestsApi, Options.ManifestsGeneratorConfig, db)
459459
clusterApi := cluster.NewManager(Options.ClusterConfig, log.WithField("pkg", "cluster-state"), db,
460460
notificationStream, eventsHandler, uploadClient, hostApi, metricsManager, manifestsGenerator, lead, operatorsManager,
461-
ocmClient, objectHandler, dnsApi, authHandler, manifestsApi, Options.EnableSoftTimeouts)
461+
ocmClient, objectHandler, dnsApi, authHandler, manifestsApi, Options.EnableSoftTimeouts, usageManager)
462462
infraEnvApi := infraenv.NewManager(log.WithField("pkg", "host-state"), db, objectHandler)
463463

464464
clusterEventsUploader := thread.New(

internal/bminventory/inventory.go

+2
Original file line numberDiff line numberDiff line change
@@ -3121,6 +3121,8 @@ func (b *bareMetalInventory) updateClusterCPUFeatureUsage(cpuArchitecture string
31213121
}
31223122
}
31233123

3124+
// This code is very similar to internal/cluster/refresh_status_preprocessor.go:recalculateOperatorDependencies
3125+
// TODO: Refactor this to a common place if possible
31243126
func (b *bareMetalInventory) updateOperatorsData(ctx context.Context, cluster *common.Cluster, params installer.V2UpdateClusterParams, usages map[string]models.Usage, db *gorm.DB, log logrus.FieldLogger) error {
31253127
if params.ClusterUpdateParams.OlmOperators == nil {
31263128
return nil

internal/bminventory/inventory_test.go

+52-29
Large diffs are not rendered by default.

internal/cluster/cluster.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/openshift/assisted-service/internal/operators"
3333
"github.com/openshift/assisted-service/internal/stream"
3434
"github.com/openshift/assisted-service/internal/uploader"
35+
"github.com/openshift/assisted-service/internal/usage"
3536
"github.com/openshift/assisted-service/models"
3637
"github.com/openshift/assisted-service/pkg/auth"
3738
"github.com/openshift/assisted-service/pkg/commonutils"
@@ -176,7 +177,8 @@ type Manager struct {
176177
func NewManager(cfg Config, log logrus.FieldLogger, db *gorm.DB, stream stream.Notifier, eventsHandler eventsapi.Handler,
177178
uploadClient uploader.Client, hostAPI host.API, metricApi metrics.API, manifestsGeneratorAPI network.ManifestsGeneratorAPI,
178179
leaderElector leader.Leader, operatorsApi operators.API, ocmClient *ocm.Client, objectHandler s3wrapper.API,
179-
dnsApi dns.DNSApi, authHandler auth.Authenticator, manifestApi manifestsapi.ManifestsAPI, softTimeoutsEnabled bool) *Manager {
180+
dnsApi dns.DNSApi, authHandler auth.Authenticator, manifestApi manifestsapi.ManifestsAPI, softTimeoutsEnabled bool,
181+
usageApi usage.API) *Manager {
180182
th := &transitionHandler{
181183
log: log,
182184
db: db,
@@ -198,7 +200,7 @@ func NewManager(cfg Config, log logrus.FieldLogger, db *gorm.DB, stream stream.N
198200
sm: NewClusterStateMachine(th),
199201
metricAPI: metricApi,
200202
manifestsGeneratorAPI: manifestsGeneratorAPI,
201-
rp: newRefreshPreprocessor(log, hostAPI, operatorsApi),
203+
rp: newRefreshPreprocessor(log, hostAPI, operatorsApi, usageApi, eventsHandler),
202204
hostAPI: hostAPI,
203205
leaderElector: leaderElector,
204206
prevMonitorInvokedAt: time.Now(),

internal/cluster/cluster_test.go

+81-32
Large diffs are not rendered by default.

internal/cluster/progress_test.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,13 @@ var _ = Describe("Progress bar test", func() {
5050
mockOperatorApi = operators.NewMockAPI(ctrl)
5151
mockDnsApi = dns.NewMockDNSApi(ctrl)
5252
clusterApi = NewManager(getDefaultConfig(), common.GetTestLog().WithField("pkg", "cluster-monitor"), db, commontesting.GetDummyNotificationStream(ctrl),
53-
mockEvents, nil, mockHostAPI, mockMetric, nil, nil, mockOperatorApi, nil, nil, mockDnsApi, nil, nil, false)
53+
mockEvents, nil, mockHostAPI, mockMetric, nil, nil, mockOperatorApi, nil, nil, mockDnsApi, nil, nil, false, nil)
54+
55+
mockOperatorApi.EXPECT().ResolveDependencies(gomock.Any(), gomock.Any()).DoAndReturn(
56+
func(_ *common.Cluster, previousOperators []*models.MonitoredOperator) ([]*models.MonitoredOperator, error) {
57+
return previousOperators, nil
58+
},
59+
).AnyTimes()
5460
})
5561

5662
AfterEach(func() {

internal/cluster/refresh_status_preprocessor.go

+207-11
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,25 @@ package cluster
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
7+
"reflect"
68
"sort"
79
"strings"
810

11+
"github.com/dustin/go-humanize/english"
912
"github.com/go-openapi/swag"
1013
"github.com/openshift/assisted-service/internal/common"
14+
eventsapi "github.com/openshift/assisted-service/internal/events/api"
1115
"github.com/openshift/assisted-service/internal/host"
1216
"github.com/openshift/assisted-service/internal/operators"
1317
"github.com/openshift/assisted-service/internal/operators/api"
18+
operatorcommon "github.com/openshift/assisted-service/internal/operators/common"
19+
"github.com/openshift/assisted-service/internal/usage"
1420
"github.com/openshift/assisted-service/models"
15-
"github.com/pkg/errors"
1621
"github.com/sirupsen/logrus"
1722
"github.com/thoas/go-funk"
23+
"gorm.io/gorm"
1824
)
1925

2026
type ValidationResult struct {
@@ -30,23 +36,28 @@ type stringer interface {
3036
}
3137

3238
type refreshPreprocessor struct {
33-
log logrus.FieldLogger
34-
validations []validation
35-
conditions []condition
36-
operatorsAPI operators.API
39+
log logrus.FieldLogger
40+
validations []validation
41+
conditions []condition
42+
operatorsAPI operators.API
43+
usageAPI usage.API
44+
eventsHandler eventsapi.Handler
3745
}
3846

39-
func newRefreshPreprocessor(log logrus.FieldLogger, hostAPI host.API, operatorsAPI operators.API) *refreshPreprocessor {
47+
func newRefreshPreprocessor(log logrus.FieldLogger, hostAPI host.API, operatorsAPI operators.API, usageAPI usage.API,
48+
eventsHandler eventsapi.Handler) *refreshPreprocessor {
4049
v := clusterValidator{
4150
log: log,
4251
hostAPI: hostAPI,
4352
}
4453

4554
return &refreshPreprocessor{
46-
log: log,
47-
validations: newValidations(&v),
48-
conditions: newConditions(&v),
49-
operatorsAPI: operatorsAPI,
55+
log: log,
56+
validations: newValidations(&v),
57+
conditions: newConditions(&v),
58+
operatorsAPI: operatorsAPI,
59+
usageAPI: usageAPI,
60+
eventsHandler: eventsHandler,
5061
}
5162
}
5263

@@ -61,7 +72,7 @@ func (r *refreshPreprocessor) preprocess(ctx context.Context, c *clusterPreproce
6172
if c.cluster != nil {
6273
ignoredValidations, err = common.DeserializeJSONList(c.cluster.IgnoredClusterValidations)
6374
if err != nil {
64-
return nil, nil, errors.Wrap(err, fmt.Sprintf("Unable to deserialize ignored cluster validations for cluster %s", string(*c.cluster.ID)))
75+
return nil, nil, fmt.Errorf("unable to deserialize ignored cluster validations for cluster %s: %w", c.cluster.ID.String(), err)
6576
}
6677
}
6778

@@ -84,6 +95,17 @@ func (r *refreshPreprocessor) preprocess(ctx context.Context, c *clusterPreproce
8495
Message: message,
8596
})
8697
}
98+
99+
// Before validating the operators we need to recalculate the dependencies because changes in the hosts may
100+
// imply changes in the dependencies between operators. For example, if the OpenShift AI operator is enabled and
101+
// a new host with an NVIDIA GPU has been discovered, then the NVIDIA GPU operator will need to be added as a
102+
// dependency, and then we will need to validate that secure boot is disabled.
103+
err = r.recalculateOperatorDependencies(ctx, c)
104+
if err != nil {
105+
err = fmt.Errorf("failed to recalculate operator dependencies for cluster '%s': %w", c.clusterId, err)
106+
return nil, nil, err
107+
}
108+
87109
// Validate operators
88110
results, err := r.operatorsAPI.ValidateCluster(ctx, c.cluster)
89111
if err != nil {
@@ -124,6 +146,180 @@ func (r *refreshPreprocessor) preprocess(ctx context.Context, c *clusterPreproce
124146
return stateMachineInput, validationsOutput, nil
125147
}
126148

149+
// recalculateOperatorDependencies calculates the operator dependencies and updates the database and the passed cluster
150+
// accordingly.
151+
func (r *refreshPreprocessor) recalculateOperatorDependencies(ctx context.Context, c *clusterPreprocessContext) error {
152+
// Calculate and save the operators that have been added, updated or deleted:
153+
operatorsBeforeResolve := c.cluster.MonitoredOperators
154+
155+
operatorsAfterResolve, err := r.operatorsAPI.ResolveDependencies(c.cluster, c.cluster.MonitoredOperators)
156+
if err != nil {
157+
return fmt.Errorf("failed to resolve operator dependencies: %w", err)
158+
}
159+
160+
var addedOperators, updatedOperators, deletedOperators []*models.MonitoredOperator
161+
162+
for _, operatorAfterResolve := range operatorsAfterResolve {
163+
if operatorAfterResolve.ClusterID == "" {
164+
operatorAfterResolve.ClusterID = c.clusterId
165+
}
166+
167+
operatorBeforeResolve := operatorcommon.GetOperator(operatorsBeforeResolve, operatorAfterResolve.Name)
168+
if operatorBeforeResolve != nil {
169+
if !reflect.DeepEqual(operatorAfterResolve, operatorBeforeResolve) {
170+
updatedOperators = append(updatedOperators, operatorAfterResolve)
171+
}
172+
} else {
173+
addedOperators = append(addedOperators, operatorAfterResolve)
174+
}
175+
}
176+
177+
for _, operatorBeforeResolve := range operatorsBeforeResolve {
178+
if !operatorcommon.HasOperator(operatorsAfterResolve, operatorBeforeResolve.Name) {
179+
deletedOperators = append(deletedOperators, operatorBeforeResolve)
180+
}
181+
}
182+
183+
// If nothing changed, nothing needs to be done
184+
if len(addedOperators) == 0 && len(deletedOperators) == 0 && len(updatedOperators) == 0 {
185+
return nil
186+
}
187+
188+
// Validate with cluster CPU architecture
189+
err = r.operatorsAPI.EnsureOperatorPrerequisite(c.cluster, c.cluster.OpenshiftVersion, c.cluster.CPUArchitecture, operatorsAfterResolve)
190+
if err != nil {
191+
return fmt.Errorf("failed to validate operator prerequisite: %w", err)
192+
}
193+
194+
c.cluster.MonitoredOperators = operatorsAfterResolve
195+
196+
err = c.db.Transaction(func(tx *gorm.DB) error {
197+
for _, addedOperator := range addedOperators {
198+
err = tx.Save(addedOperator).Error
199+
if err != nil {
200+
return fmt.Errorf("failed to add operator '%s': %w", addedOperator.Name, err)
201+
}
202+
}
203+
204+
for _, updatedOperator := range updatedOperators {
205+
err = tx.Save(updatedOperator).Error
206+
if err != nil {
207+
return fmt.Errorf("failed to update operator '%s': %w", updatedOperator.Name, err)
208+
}
209+
}
210+
211+
for _, deletedOperator := range deletedOperators {
212+
err = tx.Delete(deletedOperator).Error
213+
if err != nil {
214+
return fmt.Errorf("failed to delete operator '%s': %w", deletedOperator.Name, err)
215+
}
216+
}
217+
218+
// If any operator has been added or deleted then we need to update the corresponding feature usage
219+
if len(addedOperators) > 0 || len(deletedOperators) > 0 {
220+
err = r.recalculateOperatorFeatureUsage(c, tx, addedOperators, deletedOperators)
221+
if err != nil {
222+
return fmt.Errorf("failed to recalculate operator feature usage: %w", err)
223+
}
224+
}
225+
226+
return nil
227+
})
228+
229+
if err != nil {
230+
return fmt.Errorf("transaction to update monitored operators, the associated usage and reset roles failed: %w", err)
231+
}
232+
233+
// If everything went smoothly, notify about the change
234+
if len(addedOperators) > 0 || len(deletedOperators) > 0 {
235+
err = r.notifyOperatorFeatureUsageChange(ctx, c, addedOperators, deletedOperators)
236+
if err != nil {
237+
return err
238+
}
239+
}
240+
241+
return nil
242+
}
243+
244+
func (r *refreshPreprocessor) recalculateOperatorFeatureUsage(c *clusterPreprocessContext, db *gorm.DB,
245+
addedOperators, deletedOperators []*models.MonitoredOperator) error {
246+
if r.usageAPI == nil {
247+
return nil
248+
}
249+
250+
usages, err := usage.Unmarshal(c.cluster.FeatureUsage)
251+
if err != nil {
252+
return fmt.Errorf("failed to read feature usage: %w", err)
253+
}
254+
255+
for _, addedOperator := range addedOperators {
256+
featureName := strings.ToUpper(addedOperator.Name)
257+
r.usageAPI.Add(usages, featureName, nil)
258+
}
259+
260+
for _, deletedOperator := range deletedOperators {
261+
featureName := strings.ToUpper(deletedOperator.Name)
262+
r.usageAPI.Remove(usages, featureName)
263+
}
264+
265+
data, err := json.Marshal(usages)
266+
if err != nil {
267+
return fmt.Errorf("failed to write feature usage: %w", err)
268+
}
269+
270+
c.cluster.FeatureUsage = string(data)
271+
r.usageAPI.Save(db, c.clusterId, usages)
272+
273+
return nil
274+
}
275+
276+
func (r refreshPreprocessor) notifyOperatorFeatureUsageChange(ctx context.Context, c *clusterPreprocessContext,
277+
addedOperators, deletedOperators []*models.MonitoredOperator) error {
278+
if r.eventsHandler == nil {
279+
return nil
280+
}
281+
if len(addedOperators) > 0 {
282+
r.notifyAddedOperatorFeatures(ctx, c, addedOperators)
283+
}
284+
if len(deletedOperators) > 0 {
285+
r.notifyDeletedOperatorFeatures(ctx, c, deletedOperators)
286+
}
287+
return nil
288+
}
289+
290+
func (r *refreshPreprocessor) notifyAddedOperatorFeatures(ctx context.Context, c *clusterPreprocessContext,
291+
operators []*models.MonitoredOperator) {
292+
featureList := r.calculateOperatorFeatureList(operators)
293+
var message string
294+
if len(operators) == 1 {
295+
message = fmt.Sprintf("Cluster %s: added operator feature %s", c.clusterId, featureList)
296+
} else {
297+
message = fmt.Sprintf("Cluster %s: added operator features %s", c.clusterId, featureList)
298+
}
299+
r.eventsHandler.NotifyInternalEvent(ctx, &c.clusterId, nil, nil, message)
300+
}
301+
302+
func (r *refreshPreprocessor) notifyDeletedOperatorFeatures(ctx context.Context, c *clusterPreprocessContext,
303+
operators []*models.MonitoredOperator) {
304+
featureList := r.calculateOperatorFeatureList(operators)
305+
var message string
306+
if len(operators) == 1 {
307+
message = fmt.Sprintf("Cluster %s: deleted operator feature %s", c.clusterId, featureList)
308+
} else {
309+
message = fmt.Sprintf("Cluster %s: deleted operator features %s", c.clusterId, featureList)
310+
}
311+
r.eventsHandler.NotifyInternalEvent(ctx, &c.clusterId, nil, nil, message)
312+
}
313+
314+
func (r *refreshPreprocessor) calculateOperatorFeatureList(operators []*models.MonitoredOperator) string {
315+
featureNames := make([]string, len(operators))
316+
for i, operator := range operators {
317+
featureNames[i] = strings.ToUpper(operator.Name)
318+
}
319+
sort.Strings(featureNames)
320+
return english.WordSeries(featureNames, "and")
321+
}
322+
127323
// sortByValidationResultID sorts results by models.ClusterValidationID
128324
func sortByValidationResultID(validationResults []ValidationResult) {
129325
sort.SliceStable(validationResults, func(i, j int) bool {

0 commit comments

Comments
 (0)