Skip to content

MGMT-20233: Recompute operator dependencies after discovering the hosts #7524

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func main() {
manifestsGenerator := network.NewManifestsGenerator(manifestsApi, Options.ManifestsGeneratorConfig, db)
clusterApi := cluster.NewManager(Options.ClusterConfig, log.WithField("pkg", "cluster-state"), db,
notificationStream, eventsHandler, uploadClient, hostApi, metricsManager, manifestsGenerator, lead, operatorsManager,
ocmClient, objectHandler, dnsApi, authHandler, manifestsApi, Options.EnableSoftTimeouts)
ocmClient, objectHandler, dnsApi, authHandler, manifestsApi, Options.EnableSoftTimeouts, usageManager)
infraEnvApi := infraenv.NewManager(log.WithField("pkg", "host-state"), db, objectHandler)

clusterEventsUploader := thread.New(
Expand Down
2 changes: 2 additions & 0 deletions internal/bminventory/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3121,6 +3121,8 @@ func (b *bareMetalInventory) updateClusterCPUFeatureUsage(cpuArchitecture string
}
}

// This code is very similar to internal/cluster/refresh_status_preprocessor.go:recalculateOperatorDependencies
// TODO: Refactor this to a common place if possible
func (b *bareMetalInventory) updateOperatorsData(ctx context.Context, cluster *common.Cluster, params installer.V2UpdateClusterParams, usages map[string]models.Usage, db *gorm.DB, log logrus.FieldLogger) error {
if params.ClusterUpdateParams.OlmOperators == nil {
return nil
Expand Down
81 changes: 52 additions & 29 deletions internal/bminventory/inventory_test.go

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions internal/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/openshift/assisted-service/internal/operators"
"github.com/openshift/assisted-service/internal/stream"
"github.com/openshift/assisted-service/internal/uploader"
"github.com/openshift/assisted-service/internal/usage"
"github.com/openshift/assisted-service/models"
"github.com/openshift/assisted-service/pkg/auth"
"github.com/openshift/assisted-service/pkg/commonutils"
Expand Down Expand Up @@ -176,7 +177,8 @@ type Manager struct {
func NewManager(cfg Config, log logrus.FieldLogger, db *gorm.DB, stream stream.Notifier, eventsHandler eventsapi.Handler,
uploadClient uploader.Client, hostAPI host.API, metricApi metrics.API, manifestsGeneratorAPI network.ManifestsGeneratorAPI,
leaderElector leader.Leader, operatorsApi operators.API, ocmClient *ocm.Client, objectHandler s3wrapper.API,
dnsApi dns.DNSApi, authHandler auth.Authenticator, manifestApi manifestsapi.ManifestsAPI, softTimeoutsEnabled bool) *Manager {
dnsApi dns.DNSApi, authHandler auth.Authenticator, manifestApi manifestsapi.ManifestsAPI, softTimeoutsEnabled bool,
usageApi usage.API) *Manager {
th := &transitionHandler{
log: log,
db: db,
Expand All @@ -198,7 +200,7 @@ func NewManager(cfg Config, log logrus.FieldLogger, db *gorm.DB, stream stream.N
sm: NewClusterStateMachine(th),
metricAPI: metricApi,
manifestsGeneratorAPI: manifestsGeneratorAPI,
rp: newRefreshPreprocessor(log, hostAPI, operatorsApi),
rp: newRefreshPreprocessor(log, hostAPI, operatorsApi, usageApi, eventsHandler),
hostAPI: hostAPI,
leaderElector: leaderElector,
prevMonitorInvokedAt: time.Now(),
Expand Down
113 changes: 81 additions & 32 deletions internal/cluster/cluster_test.go

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion internal/cluster/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ var _ = Describe("Progress bar test", func() {
mockOperatorApi = operators.NewMockAPI(ctrl)
mockDnsApi = dns.NewMockDNSApi(ctrl)
clusterApi = NewManager(getDefaultConfig(), common.GetTestLog().WithField("pkg", "cluster-monitor"), db, commontesting.GetDummyNotificationStream(ctrl),
mockEvents, nil, mockHostAPI, mockMetric, nil, nil, mockOperatorApi, nil, nil, mockDnsApi, nil, nil, false)
mockEvents, nil, mockHostAPI, mockMetric, nil, nil, mockOperatorApi, nil, nil, mockDnsApi, nil, nil, false, nil)

mockOperatorApi.EXPECT().ResolveDependencies(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ *common.Cluster, previousOperators []*models.MonitoredOperator) ([]*models.MonitoredOperator, error) {
return previousOperators, nil
},
).AnyTimes()
})

AfterEach(func() {
Expand Down
218 changes: 207 additions & 11 deletions internal/cluster/refresh_status_preprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,25 @@ package cluster

import (
"context"
"encoding/json"
"fmt"
"reflect"
"sort"
"strings"

"github.com/dustin/go-humanize/english"
"github.com/go-openapi/swag"
"github.com/openshift/assisted-service/internal/common"
eventsapi "github.com/openshift/assisted-service/internal/events/api"
"github.com/openshift/assisted-service/internal/host"
"github.com/openshift/assisted-service/internal/operators"
"github.com/openshift/assisted-service/internal/operators/api"
operatorcommon "github.com/openshift/assisted-service/internal/operators/common"
"github.com/openshift/assisted-service/internal/usage"
"github.com/openshift/assisted-service/models"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/thoas/go-funk"
"gorm.io/gorm"
)

type ValidationResult struct {
Expand All @@ -30,23 +36,28 @@ type stringer interface {
}

type refreshPreprocessor struct {
log logrus.FieldLogger
validations []validation
conditions []condition
operatorsAPI operators.API
log logrus.FieldLogger
validations []validation
conditions []condition
operatorsAPI operators.API
usageAPI usage.API
eventsHandler eventsapi.Handler
}

func newRefreshPreprocessor(log logrus.FieldLogger, hostAPI host.API, operatorsAPI operators.API) *refreshPreprocessor {
func newRefreshPreprocessor(log logrus.FieldLogger, hostAPI host.API, operatorsAPI operators.API, usageAPI usage.API,
eventsHandler eventsapi.Handler) *refreshPreprocessor {
v := clusterValidator{
log: log,
hostAPI: hostAPI,
}

return &refreshPreprocessor{
log: log,
validations: newValidations(&v),
conditions: newConditions(&v),
operatorsAPI: operatorsAPI,
log: log,
validations: newValidations(&v),
conditions: newConditions(&v),
operatorsAPI: operatorsAPI,
usageAPI: usageAPI,
eventsHandler: eventsHandler,
}
}

Expand All @@ -61,7 +72,7 @@ func (r *refreshPreprocessor) preprocess(ctx context.Context, c *clusterPreproce
if c.cluster != nil {
ignoredValidations, err = common.DeserializeJSONList(c.cluster.IgnoredClusterValidations)
if err != nil {
return nil, nil, errors.Wrap(err, fmt.Sprintf("Unable to deserialize ignored cluster validations for cluster %s", string(*c.cluster.ID)))
return nil, nil, fmt.Errorf("unable to deserialize ignored cluster validations for cluster %s: %w", c.cluster.ID.String(), err)
}
}

Expand All @@ -84,6 +95,17 @@ func (r *refreshPreprocessor) preprocess(ctx context.Context, c *clusterPreproce
Message: message,
})
}

// Before validating the operators we need to recalculate the dependencies because changes in the hosts may
// imply changes in the dependencies between operators. For example, if the OpenShift AI operator is enabled and
// a new host with an NVIDIA GPU has been discovered, then the NVIDIA GPU operator will need to be added as a
// dependency, and then we will need to validate that secure boot is disabled.
err = r.recalculateOperatorDependencies(ctx, c)
if err != nil {
err = fmt.Errorf("failed to recalculate operator dependencies for cluster '%s': %w", c.clusterId, err)
return nil, nil, err
}

// Validate operators
results, err := r.operatorsAPI.ValidateCluster(ctx, c.cluster)
if err != nil {
Expand Down Expand Up @@ -124,6 +146,180 @@ func (r *refreshPreprocessor) preprocess(ctx context.Context, c *clusterPreproce
return stateMachineInput, validationsOutput, nil
}

// recalculateOperatorDependencies calculates the operator dependencies and updates the database and the passed cluster
// accordingly.
func (r *refreshPreprocessor) recalculateOperatorDependencies(ctx context.Context, c *clusterPreprocessContext) error {
// Calculate and save the operators that have been added, updated or deleted:
operatorsBeforeResolve := c.cluster.MonitoredOperators

operatorsAfterResolve, err := r.operatorsAPI.ResolveDependencies(c.cluster, c.cluster.MonitoredOperators)
if err != nil {
return fmt.Errorf("failed to resolve operator dependencies: %w", err)
}

var addedOperators, updatedOperators, deletedOperators []*models.MonitoredOperator

for _, operatorAfterResolve := range operatorsAfterResolve {
if operatorAfterResolve.ClusterID == "" {
operatorAfterResolve.ClusterID = c.clusterId
}

operatorBeforeResolve := operatorcommon.GetOperator(operatorsBeforeResolve, operatorAfterResolve.Name)
if operatorBeforeResolve != nil {
if !reflect.DeepEqual(operatorAfterResolve, operatorBeforeResolve) {
updatedOperators = append(updatedOperators, operatorAfterResolve)
}
} else {
addedOperators = append(addedOperators, operatorAfterResolve)
}
}

for _, operatorBeforeResolve := range operatorsBeforeResolve {
if !operatorcommon.HasOperator(operatorsAfterResolve, operatorBeforeResolve.Name) {
deletedOperators = append(deletedOperators, operatorBeforeResolve)
}
}

// If nothing changed, nothing needs to be done
if len(addedOperators) == 0 && len(deletedOperators) == 0 && len(updatedOperators) == 0 {
return nil
}

// Validate with cluster CPU architecture
err = r.operatorsAPI.EnsureOperatorPrerequisite(c.cluster, c.cluster.OpenshiftVersion, c.cluster.CPUArchitecture, operatorsAfterResolve)
if err != nil {
return fmt.Errorf("failed to validate operator prerequisite: %w", err)
}

c.cluster.MonitoredOperators = operatorsAfterResolve

err = c.db.Transaction(func(tx *gorm.DB) error {
for _, addedOperator := range addedOperators {
err = tx.Save(addedOperator).Error
if err != nil {
return fmt.Errorf("failed to add operator '%s': %w", addedOperator.Name, err)
}
}

for _, updatedOperator := range updatedOperators {
err = tx.Save(updatedOperator).Error
if err != nil {
return fmt.Errorf("failed to update operator '%s': %w", updatedOperator.Name, err)
}
}

for _, deletedOperator := range deletedOperators {
err = tx.Delete(deletedOperator).Error
if err != nil {
return fmt.Errorf("failed to delete operator '%s': %w", deletedOperator.Name, err)
}
}

// If any operator has been added or deleted then we need to update the corresponding feature usage
if len(addedOperators) > 0 || len(deletedOperators) > 0 {
err = r.recalculateOperatorFeatureUsage(c, tx, addedOperators, deletedOperators)
if err != nil {
return fmt.Errorf("failed to recalculate operator feature usage: %w", err)
}
}

return nil
})

if err != nil {
return fmt.Errorf("transaction to update monitored operators, the associated usage and reset roles failed: %w", err)
}

// If everything went smoothly, notify about the change
if len(addedOperators) > 0 || len(deletedOperators) > 0 {
err = r.notifyOperatorFeatureUsageChange(ctx, c, addedOperators, deletedOperators)
if err != nil {
return err
}
}

return nil
}

func (r *refreshPreprocessor) recalculateOperatorFeatureUsage(c *clusterPreprocessContext, db *gorm.DB,
addedOperators, deletedOperators []*models.MonitoredOperator) error {
if r.usageAPI == nil {
return nil
}

usages, err := usage.Unmarshal(c.cluster.FeatureUsage)
if err != nil {
return fmt.Errorf("failed to read feature usage: %w", err)
}

for _, addedOperator := range addedOperators {
featureName := strings.ToUpper(addedOperator.Name)
r.usageAPI.Add(usages, featureName, nil)
}

for _, deletedOperator := range deletedOperators {
featureName := strings.ToUpper(deletedOperator.Name)
r.usageAPI.Remove(usages, featureName)
}

data, err := json.Marshal(usages)
if err != nil {
return fmt.Errorf("failed to write feature usage: %w", err)
}

c.cluster.FeatureUsage = string(data)
r.usageAPI.Save(db, c.clusterId, usages)

return nil
}

func (r refreshPreprocessor) notifyOperatorFeatureUsageChange(ctx context.Context, c *clusterPreprocessContext,
addedOperators, deletedOperators []*models.MonitoredOperator) error {
if r.eventsHandler == nil {
return nil
}
if len(addedOperators) > 0 {
r.notifyAddedOperatorFeatures(ctx, c, addedOperators)
}
if len(deletedOperators) > 0 {
r.notifyDeletedOperatorFeatures(ctx, c, deletedOperators)
}
return nil
}

func (r *refreshPreprocessor) notifyAddedOperatorFeatures(ctx context.Context, c *clusterPreprocessContext,
operators []*models.MonitoredOperator) {
featureList := r.calculateOperatorFeatureList(operators)
var message string
if len(operators) == 1 {
message = fmt.Sprintf("Cluster %s: added operator feature %s", c.clusterId, featureList)
} else {
message = fmt.Sprintf("Cluster %s: added operator features %s", c.clusterId, featureList)
}
r.eventsHandler.NotifyInternalEvent(ctx, &c.clusterId, nil, nil, message)
}

func (r *refreshPreprocessor) notifyDeletedOperatorFeatures(ctx context.Context, c *clusterPreprocessContext,
operators []*models.MonitoredOperator) {
featureList := r.calculateOperatorFeatureList(operators)
var message string
if len(operators) == 1 {
message = fmt.Sprintf("Cluster %s: deleted operator feature %s", c.clusterId, featureList)
} else {
message = fmt.Sprintf("Cluster %s: deleted operator features %s", c.clusterId, featureList)
}
r.eventsHandler.NotifyInternalEvent(ctx, &c.clusterId, nil, nil, message)
}

func (r *refreshPreprocessor) calculateOperatorFeatureList(operators []*models.MonitoredOperator) string {
featureNames := make([]string, len(operators))
for i, operator := range operators {
featureNames[i] = strings.ToUpper(operator.Name)
}
sort.Strings(featureNames)
return english.WordSeries(featureNames, "and")
}

// sortByValidationResultID sorts results by models.ClusterValidationID
func sortByValidationResultID(validationResults []ValidationResult) {
sort.SliceStable(validationResults, func(i, j int) bool {
Expand Down
Loading