diff --git a/.gitignore b/.gitignore index ef16e98ddb..9b05e2006a 100644 --- a/.gitignore +++ b/.gitignore @@ -32,4 +32,7 @@ junit/ # Buildchain artifacts /_build/ -vagrant_config.rb \ No newline at end of file +vagrant_config.rb + +# Binaries downloaded by the upgrade-operator-sdk.py script +/.tmp/ \ No newline at end of file diff --git a/BUMPING.md b/BUMPING.md index aa9f542b2b..ea1abf1d5d 100644 --- a/BUMPING.md +++ b/BUMPING.md @@ -128,16 +128,97 @@ A few tips to bump image versions and SHAs: This guide is applied for both `metalk8s-operator` and `storage-operator`. - - check [documentation](https://sdk.operatorframework.io/docs/upgrading-sdk-version/$version) - for important changes and apply them. - - bump version in Makefile. - - if necessary, bump go version in pre_merge github action. - - if necessary, bump go version in Dockerfile. - - if necessary, bump go dependencies versions. - - in the root of each operator, run `go mod tidy`. - - run `make metalk8s` - - check a diff between the two latest versions of this [test project](https://github.com/operator-framework/operator-sdk/tree/master/testdata/go/v4/memcached-operator) - - the diff in this repo and the test project should be more or less the same +### Prerequisites + +- `go`, `curl`, and `patch` in `PATH`. +- `pyyaml` Python package: `pip install pyyaml` +- `GITHUB_TOKEN` (optional): raises the GitHub API rate limit from 60 to 5000 + req/hour. Set via `export GITHUB_TOKEN=`. + +### Updating the versions + +Target versions are pinned in `scripts/upgrade-operator-sdk//config.yaml`: + +```yaml +operator_sdk_version: v1.42.1 # target operator-sdk release +go_toolchain: go1.24.13 # pin Go toolchain (for GOTOOLCHAIN) +k8s_libs: v0.33.10 # pin k8s.io libs version +``` + +After scaffolding, the script detects the latest available versions (operator-sdk +from GitHub, Go and k8s.io patches from go.dev / module proxy) and compares with +the pinned values: + +- **No pin** in YAML: the detected version is used and auto-pinned in the file. +- **Pin matches detected**: all good, no action. +- **Pin is older** than detected: warning printed with the newer version available. + The pinned value is still used. Update the YAML manually when ready. +- **Pin is newer** than detected (unusual): warning, the detected value is used. + +This is CI-friendly: zero interactive input during reconciliation. + +### Running the upgrade + +The script processes one operator at a time: + +```bash +python3 scripts/upgrade-operator-sdk/upgrade.py \ + --operator-dir operator \ + scripts/upgrade-operator-sdk/operator + +python3 scripts/upgrade-operator-sdk/upgrade.py \ + --operator-dir storage-operator \ + scripts/upgrade-operator-sdk/storage-operator +``` + +Options: + +``` +--operator-dir Path to the operator project directory (required) +--skip-backup Reuse an existing .bak directory (no new backup) +--clean-tools Remove tool cache after upgrade +--yes, -y Skip the confirmation prompt +``` + +### YAML config files + +Each operator has a config directory at `scripts/upgrade-operator-sdk//` containing +`config.yaml` and a `patches/` subdirectory. The config fields are: + +- **Versions**: `operator_sdk_version`, `go_toolchain` (optional pin), `k8s_libs` (optional pin) +- **Scaffold**: `repo`, `domain`, `apis` (with `group`, `version`, `kind`, `namespaced`). The operator name is derived from the config directory name. +- **Raw copy**: `raw_copy` -- directories or files copied as-is from backup (purely custom code with no scaffold equivalent: `pkg/`, `version/`, `config/metalk8s/`, `salt/`, individual test/helper files) +- **Post-processing**: `extra_commands` + +### Patch files + +All customizations to scaffold-generated files are stored as GNU unified diff +files in the `patches/` subdirectory. This includes: + +- **Dockerfile** and **Makefile** customizations +- **CRD type definitions** (`*_types.go`) +- **Controller implementations** (`*_controller.go`) +- **Scaffold test stubs** (`*_controller_test.go`) -- neutralized when incompatible with the delegation pattern + +The script applies them with `patch -p1` after scaffolding. If a patch does not +apply cleanly, look for `.rej` files and resolve manually. + +Patch files use `__PLACEHOLDER__` tokens for runtime values: + +| Placeholder | Replaced with | Source | +| ----------------- | ---------------------------- | ---------- | +| `__GOTOOLCHAIN__` | Detected/pinned Go toolchain | `Makefile` | + +New `.patch` files in the patches directory are automatically picked up. + +### What to review after the upgrade + +1. `git diff` to review all changes +2. `cd && make test` to run tests +3. Check `config/crd/bases/` for correct CRD scopes +4. Check `config/rbac/role.yaml` for RBAC completeness +5. Check `deploy/manifests.yaml` for correct Jinja templates +6. Remove backup: `rm -rf .bak/` ## Calico diff --git a/tools/upgrade-operator-sdk/operator/config.yaml b/tools/upgrade-operator-sdk/operator/config.yaml new file mode 100644 index 0000000000..d684c4f8a5 --- /dev/null +++ b/tools/upgrade-operator-sdk/operator/config.yaml @@ -0,0 +1,38 @@ +repo: github.com/scality/metalk8s/operator +domain: metalk8s.scality.com + +operator_sdk_version: v1.42.1 + +# Optional: pin versions. If absent, the script detects the latest +# patch from the scaffold's go.mod and auto-pins them here. +go_toolchain: go1.24.13 +k8s_libs: v0.33.10 + +apis: + - version: v1alpha1 + kind: ClusterConfig + namespaced: false + - version: v1alpha1 + kind: VirtualIPPool + namespaced: true + +# Directories/files copied as-is from backup (purely custom, no scaffold equivalent). +raw_copy: + - pkg + - version + - config/metalk8s + - api/v1alpha1/conditions.go + - api/v1alpha1/conditions_test.go + - api/v1alpha1/clusterconfig_types_test.go + - api/v1alpha1/virtualippool_types_test.go + - api/v1alpha1/v1alpha1_suite_test.go + +# Files/dirs to delete from the scaffold (not needed or incompatible). +delete: + - .devcontainer + - .github + - internal/controller/clusterconfig_controller_test.go + - internal/controller/virtualippool_controller_test.go + +extra_commands: + - ["make", "metalk8s"] diff --git a/tools/upgrade-operator-sdk/operator/patches/Dockerfile.patch b/tools/upgrade-operator-sdk/operator/patches/Dockerfile.patch new file mode 100644 index 0000000000..bb01219219 --- /dev/null +++ b/tools/upgrade-operator-sdk/operator/patches/Dockerfile.patch @@ -0,0 +1,65 @@ +--- a/Dockerfile ++++ b/Dockerfile +@@ -15,13 +15,20 @@ + COPY cmd/main.go cmd/main.go + COPY api/ api/ + COPY internal/ internal/ ++COPY pkg/ pkg/ ++COPY version/ version/ ++ ++# Version of the project, e.g. `git describe --always --long --dirty --broken` ++ARG METALK8S_VERSION + + # Build + # the GOARCH has not a default value to allow the binary be built according to the host where the command + # was called. For example, if we call make docker-build in a local env which has the Apple Silicon M1 SO + # the docker BUILDPLATFORM arg will be linux/arm64 when for Apple x86 it will be linux/amd64. Therefore, + # by leaving it empty we can ensure that the container and binary shipped on it will have the same platform. +-RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -a -o manager cmd/main.go ++RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -a -o manager \ ++ -ldflags "-X 'github.com/scality/metalk8s/operator/version.Version=${METALK8S_VERSION}'" \ ++ cmd/main.go + + # Use distroless as minimal base image to package the manager binary + # Refer to https://github.com/GoogleContainerTools/distroless for more details +@@ -31,3 +38,40 @@ + USER 65532:65532 + + ENTRYPOINT ["/manager"] ++ ++# Timestamp of the build, formatted as RFC3339 ++ARG BUILD_DATE ++# Git revision o the tree at build time ++ARG VCS_REF ++# Version of the image ++ARG VERSION ++# Version of the project, e.g. `git describe --always --long --dirty --broken` ++ARG METALK8S_VERSION ++ ++# These contain BUILD_DATE so should come 'late' for layer caching ++LABEL maintainer="squad-metalk8s@scality.com" \ ++ # http://label-schema.org/rc1/ ++ org.label-schema.build-date="$BUILD_DATE" \ ++ org.label-schema.name="metalk8s-operator" \ ++ org.label-schema.description="Kubernetes Operator for managing MetalK8s cluster config" \ ++ org.label-schema.url="https://github.com/scality/metalk8s/" \ ++ org.label-schema.vcs-url="https://github.com/scality/metalk8s.git" \ ++ org.label-schema.vcs-ref="$VCS_REF" \ ++ org.label-schema.vendor="Scality" \ ++ org.label-schema.version="$VERSION" \ ++ org.label-schema.schema-version="1.0" \ ++ # https://github.com/opencontainers/image-spec/blob/master/annotations.md ++ org.opencontainers.image.created="$BUILD_DATE" \ ++ org.opencontainers.image.authors="squad-metalk8s@scality.com" \ ++ org.opencontainers.image.url="https://github.com/scality/metalk8s/" \ ++ org.opencontainers.image.source="https://github.com/scality/metalk8s.git" \ ++ org.opencontainers.image.version="$VERSION" \ ++ org.opencontainers.image.revision="$VCS_REF" \ ++ org.opencontainers.image.vendor="Scality" \ ++ org.opencontainers.image.title="metalk8s-operator" \ ++ org.opencontainers.image.description="Kubernetes Operator for managing MetalK8s cluster config" \ ++ # https://docs.openshift.org/latest/creating_images/metadata.html ++ io.openshift.tags="metalk8s,operator" \ ++ io.k8s.description="Kubernetes Operator for managing MetalK8s cluster config" \ ++ # Various ++ com.scality.metalk8s.version="$METALK8S_VERSION" diff --git a/tools/upgrade-operator-sdk/operator/patches/Makefile.patch b/tools/upgrade-operator-sdk/operator/patches/Makefile.patch new file mode 100644 index 0000000000..6d2abfdbec --- /dev/null +++ b/tools/upgrade-operator-sdk/operator/patches/Makefile.patch @@ -0,0 +1,15 @@ +--- a/Makefile ++++ b/Makefile +@@ -3,3 +3,12 @@ + .PHONY: catalog-push + catalog-push: ## Push a catalog image. + $(MAKE) docker-push IMG=$(CATALOG_IMG) ++ ++# Force Go toolchain version ++export GOTOOLCHAIN = __GOTOOLCHAIN__ ++ ++.PHONY: metalk8s ++metalk8s: manifests kustomize ## Generate MetalK8s resulting manifests ++ mkdir -p deploy ++ $(KUSTOMIZE) build config/metalk8s | \ ++ sed 's/BUILD_IMAGE_CLUSTER_OPERATOR:latest/{{ build_image_name("metalk8s-operator") }}/' > deploy/manifests.yaml diff --git a/tools/upgrade-operator-sdk/operator/patches/clusterconfig_controller.patch b/tools/upgrade-operator-sdk/operator/patches/clusterconfig_controller.patch new file mode 100644 index 0000000000..8776986650 --- /dev/null +++ b/tools/upgrade-operator-sdk/operator/patches/clusterconfig_controller.patch @@ -0,0 +1,50 @@ +--- a/internal/controller/clusterconfig_controller.go ++++ b/internal/controller/clusterconfig_controller.go +@@ -17,14 +17,10 @@ + package controller + + import ( +- "context" +- ++ "github.com/scality/metalk8s/operator/pkg/controller/clusterconfig" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +- logf "sigs.k8s.io/controller-runtime/pkg/log" +- +- metalk8sscalitycomv1alpha1 "github.com/scality/metalk8s/operator/api/v1alpha1" + ) + + // ClusterConfigReconciler reconciles a ClusterConfig object +@@ -37,27 +33,11 @@ + // +kubebuilder:rbac:groups=metalk8s.scality.com,resources=clusterconfigs/status,verbs=get;update;patch + // +kubebuilder:rbac:groups=metalk8s.scality.com,resources=clusterconfigs/finalizers,verbs=update + +-// Reconcile is part of the main kubernetes reconciliation loop which aims to +-// move the current state of the cluster closer to the desired state. +-// TODO(user): Modify the Reconcile function to compare the state specified by +-// the ClusterConfig object against the actual cluster state, and then +-// perform operations to make the cluster state reflect the state specified by +-// the user. +-// +-// For more details, check Reconcile and its Result here: +-// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.21.0/pkg/reconcile +-func (r *ClusterConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +- _ = logf.FromContext(ctx) +- +- // TODO(user): your logic here +- +- return ctrl.Result{}, nil +-} ++// +kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch;create;update;patch;delete ++// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch ++// +kubebuilder:rbac:groups=metalk8s.scality.com,resources=virtualippools,verbs=get;list;watch;create;update;patch;delete + + // SetupWithManager sets up the controller with the Manager. + func (r *ClusterConfigReconciler) SetupWithManager(mgr ctrl.Manager) error { +- return ctrl.NewControllerManagedBy(mgr). +- For(&metalk8sscalitycomv1alpha1.ClusterConfig{}). +- Named("clusterconfig"). +- Complete(r) ++ return clusterconfig.Add(mgr) + } diff --git a/tools/upgrade-operator-sdk/operator/patches/clusterconfig_types.patch b/tools/upgrade-operator-sdk/operator/patches/clusterconfig_types.patch new file mode 100644 index 0000000000..71f085a58d --- /dev/null +++ b/tools/upgrade-operator-sdk/operator/patches/clusterconfig_types.patch @@ -0,0 +1,182 @@ +--- a/api/v1alpha1/clusterconfig_types.go ++++ b/api/v1alpha1/clusterconfig_types.go +@@ -1,5 +1,5 @@ + /* +-Copyright 2026. ++Copyright 2022. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. +@@ -17,32 +17,102 @@ + package v1alpha1 + + import ( ++ "sync" ++ + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ) + ++const ( ++ wPVIPConfiguredConditionName = "WorkloadPlaneVirtualIPPool" + configuredConditionName ++ wPVIPReadyConditionName = "WorkloadPlaneVirtualIPPool" + readyConditionName ++ ++ cPIngressConfiguredConditionName = "ControlPlaneIngress" + configuredConditionName ++ cPIngressVIPConfiguredConditionName = "ControlPlaneIngressVirtualIP" + configuredConditionName ++ cPIngressVIPReadyConditionName = "ControlPlaneIngressVirtualIP" + readyConditionName ++) ++ + // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! + // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. + +-// ClusterConfigSpec defines the desired state of ClusterConfig. ++type ManagedVirtualIPSource struct { ++ // and will be used to reach the Ingress ++ // A Virtual IP address that will be managed by the Operator ++ Address IPAddress `json:"address"` ++} ++ ++type ExternalIPSource struct { ++ // The IP address used to reach the Ingress ++ Address IPAddress `json:"address"` ++} ++ ++type ControlPlaneIngressSource struct { ++ ManagedVirtualIP *ManagedVirtualIPSource `json:"managedVirtualIP,omitempty"` ++ ExternalIP *ExternalIPSource `json:"externalIP,omitempty"` ++} ++ ++type ControlPlaneIngressSpec struct { ++ ControlPlaneIngressSource `json:",inline"` ++} ++ ++type ControlPlaneSpec struct { ++ // Information about the Control Plane Ingress ++ Ingress ControlPlaneIngressSpec `json:"ingress,omitempty"` ++} ++ ++type WorkloadPlaneSpec struct { ++ // Information about Virtual IP Pools ++ // +optional ++ VirtualIPPools map[string]VirtualIPPoolSpec `json:"virtualIPPools,omitempty"` ++} ++ ++// ClusterConfigSpec defines the desired state of ClusterConfig + type ClusterConfigSpec struct { + // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster + // Important: Run "make" to regenerate code after modifying this file + +- // Foo is an example field of ClusterConfig. Edit clusterconfig_types.go to remove/update +- Foo string `json:"foo,omitempty"` ++ // Information about the Control Plane. ++ // +optional ++ ControlPlane ControlPlaneSpec `json:"controlPlane,omitempty"` ++ ++ // Information about the Workload Plane. ++ // +optional ++ WorkloadPlane WorkloadPlaneSpec `json:"workloadPlane,omitempty"` ++} ++ ++type ControlPlaneIngressStatus struct { ++ // The IP address where the Ingress is exposed ++ IP IPAddress `json:"ip,omitempty"` ++ // The full endpoint URL to reach the Ingress ++ Endpoint string `json:"endpoint,omitempty"` ++} ++ ++type ControlPlaneStatus struct { ++ // Information about the Control Plane Ingress ++ Ingress ControlPlaneIngressStatus `json:"ingress,omitempty"` + } + +-// ClusterConfigStatus defines the observed state of ClusterConfig. ++// ClusterConfigStatus defines the observed state of ClusterConfig + type ClusterConfigStatus struct { + // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster + // Important: Run "make" to regenerate code after modifying this file ++ ++ // List of conditions for the ClusterConfig ++ // +patchMergeKey=type ++ // +patchStrategy=merge ++ // +listType=map ++ // +listMapKey=type ++ Conditions []Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,1,rep,name=conditions"` ++ ++ // Control Plane Information ++ ControlPlane ControlPlaneStatus `json:"controlPlane,omitempty"` + } + +-// +kubebuilder:object:root=true +-// +kubebuilder:subresource:status +-// +kubebuilder:resource:scope=Cluster ++//+kubebuilder:object:root=true ++//+kubebuilder:subresource:status ++//+kubebuilder:resource:scope=Cluster,shortName=cc ++//+kubebuilder:printcolumn:name="Control-Plane-Url",type="string",JSONPath=".status.controlPlane.ingress.endpoint",description="The URL to reach the Control Plane Ingress" + +-// ClusterConfig is the Schema for the clusterconfigs API. ++// ClusterConfig is the Schema for the clusterconfigs API + type ClusterConfig struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` +@@ -51,9 +121,59 @@ + Status ClusterConfigStatus `json:"status,omitempty"` + } + +-// +kubebuilder:object:root=true ++// The ClusterConfig is managed by several SubReconciler in parallel ++// that may Set and Get conditions, so ensure there is always only one ++// accessing it at once ++// NOTE: We can have only one ClusterConfig so keep the mutex has a simple variable here ++var mu sync.Mutex ++ ++// Set a condition on ClusterConfig ++func (v *ClusterConfig) SetCondition(kind string, status metav1.ConditionStatus, reason string, message string) { ++ mu.Lock() ++ defer mu.Unlock() ++ setCondition(v.Generation, &v.Status.Conditions, kind, status, reason, message) ++} ++ ++// Get a condition from ClusterConfig ++func (v *ClusterConfig) GetCondition(kind string) *Condition { ++ mu.Lock() ++ defer mu.Unlock() ++ return getCondition(v.Status.Conditions, kind) ++} ++ ++// Set Ready Condition ++func (v *ClusterConfig) SetReadyCondition(status metav1.ConditionStatus, reason string, message string) { ++ v.SetCondition(readyConditionName, status, reason, message) ++} ++ ++// Set WorkloadPlaneVirtualIPPool Configured Condition ++func (v *ClusterConfig) SetWPVIPConfiguredCondition(status metav1.ConditionStatus, reason string, message string) { ++ v.SetCondition(wPVIPConfiguredConditionName, status, reason, message) ++} ++ ++// Set WorkloadPlaneVirtualIPPool Ready Condition ++func (v *ClusterConfig) SetWPVIPReadyCondition(status metav1.ConditionStatus, reason string, message string) { ++ v.SetCondition(wPVIPReadyConditionName, status, reason, message) ++} ++ ++// Set ControlPlaneIngressConfigured Condition ++func (v *ClusterConfig) SetCPIngressConfiguredCondition(status metav1.ConditionStatus, reason string, message string) { ++ v.SetCondition(cPIngressConfiguredConditionName, status, reason, message) ++} ++ ++// Set ControlPlaneIngressVirtualIP Configured Condition ++func (v *ClusterConfig) SetCPIngressVIPConfiguredCondition(status metav1.ConditionStatus, reason string, message string) { ++ v.SetCondition(cPIngressVIPConfiguredConditionName, status, reason, message) ++} ++ ++// Set ControlPlaneIngressVirtualIP Ready Condition ++func (v *ClusterConfig) SetCPIngressVIPReadyCondition(status metav1.ConditionStatus, reason string, message string) { ++ v.SetCondition(cPIngressVIPReadyConditionName, status, reason, message) ++} ++ ++//+kubebuilder:object:root=true + +-// ClusterConfigList contains a list of ClusterConfig. ++// ClusterConfigList contains a list of ClusterConfig + type ClusterConfigList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` diff --git a/tools/upgrade-operator-sdk/operator/patches/virtualippool_controller.patch b/tools/upgrade-operator-sdk/operator/patches/virtualippool_controller.patch new file mode 100644 index 0000000000..b251e1c1ee --- /dev/null +++ b/tools/upgrade-operator-sdk/operator/patches/virtualippool_controller.patch @@ -0,0 +1,52 @@ +--- a/internal/controller/virtualippool_controller.go ++++ b/internal/controller/virtualippool_controller.go +@@ -17,14 +17,10 @@ + package controller + + import ( +- "context" +- ++ "github.com/scality/metalk8s/operator/pkg/controller/virtualippool" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +- logf "sigs.k8s.io/controller-runtime/pkg/log" +- +- metalk8sscalitycomv1alpha1 "github.com/scality/metalk8s/operator/api/v1alpha1" + ) + + // VirtualIPPoolReconciler reconciles a VirtualIPPool object +@@ -37,27 +33,12 @@ + // +kubebuilder:rbac:groups=metalk8s.scality.com,resources=virtualippools/status,verbs=get;update;patch + // +kubebuilder:rbac:groups=metalk8s.scality.com,resources=virtualippools/finalizers,verbs=update + ++// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch ++// +kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch ++// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;patch;delete ++// +kubebuilder:rbac:groups=apps,resources=daemonsets,verbs=get;list;watch;create;update;patch;delete ++ +-// Reconcile is part of the main kubernetes reconciliation loop which aims to +-// move the current state of the cluster closer to the desired state. +-// TODO(user): Modify the Reconcile function to compare the state specified by +-// the VirtualIPPool object against the actual cluster state, and then +-// perform operations to make the cluster state reflect the state specified by +-// the user. +-// +-// For more details, check Reconcile and its Result here: +-// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.21.0/pkg/reconcile +-func (r *VirtualIPPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +- _ = logf.FromContext(ctx) +- +- // TODO(user): your logic here +- +- return ctrl.Result{}, nil +-} +- + // SetupWithManager sets up the controller with the Manager. + func (r *VirtualIPPoolReconciler) SetupWithManager(mgr ctrl.Manager) error { +- return ctrl.NewControllerManagedBy(mgr). +- For(&metalk8sscalitycomv1alpha1.VirtualIPPool{}). +- Named("virtualippool"). +- Complete(r) ++ return virtualippool.Add(mgr) + } diff --git a/tools/upgrade-operator-sdk/operator/patches/virtualippool_types.patch b/tools/upgrade-operator-sdk/operator/patches/virtualippool_types.patch new file mode 100644 index 0000000000..e1eec06bea --- /dev/null +++ b/tools/upgrade-operator-sdk/operator/patches/virtualippool_types.patch @@ -0,0 +1,169 @@ +--- a/api/v1alpha1/virtualippool_types.go ++++ b/api/v1alpha1/virtualippool_types.go +@@ -1,5 +1,5 @@ + /* +-Copyright 2026. ++Copyright 2022. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. +@@ -17,31 +17,90 @@ + package v1alpha1 + + import ( ++ appsv1 "k8s.io/api/apps/v1" ++ corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ) + + // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! + // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. + +-// VirtualIPPoolSpec defines the desired state of VirtualIPPool. ++type SpreadConstraintSpec struct { ++ // Topology label to use to spread the Virtual IPs ++ TopologyKey string `json:"topologyKey"` ++} ++ ++type HttpGetSpec struct { ++ // The IP to do the HTTP request ++ // (default to keepalived Pod IP) ++ // +optional ++ IP IPAddress `json:"host,omitempty"` ++ // The scheme to use for the HTTP request ++ // (default to HTTPS) ++ // +optional ++ // +kubebuilder:default="HTTPS" ++ // +kubebuilder:validation:Enum={"HTTP", "HTTPS"} ++ Scheme string `json:"scheme"` ++ // The port to do the HTTP request ++ // +optional ++ // +kubebuilder:default=443 ++ Port int `json:"port"` ++ // Path for the HTTP request ++ // +optional ++ Path string `json:"path"` ++} ++ ++type HealthcheckSpec struct { ++ // Simple HTTP Get check ++ HttpGet HttpGetSpec `json:"httpGet,omitempty"` ++} ++ ++// +kubebuilder:validation:Format=ipv4 ++type IPAddress string ++ ++// VirtualIPPoolSpec defines the desired state of VirtualIPPool + type VirtualIPPoolSpec struct { + // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster + // Important: Run "make" to regenerate code after modifying this file + +- // Foo is an example field of VirtualIPPool. Edit virtualippool_types.go to remove/update +- Foo string `json:"foo,omitempty"` ++ // Node Selector to deploy the Virtual IPs manager ++ // +optional ++ NodeSelector map[string]string `json:"nodeSelector,omitempty"` ++ // Tolerations to deploy the Virtual IPs manager ++ // +optional ++ Tolerations []corev1.Toleration `json:"tolerations,omitempty"` ++ // Spread constraints for the Virtual IPs ++ // NOTE: Not supported yet ++ // // +optional ++ // SpreadConstraints []SpreadConstraintSpec `json:"spreadConstraints,omitempty"` ++ ++ // Virtual IP addresses to use ++ // +kubebuilder:validation:MinItems=1 ++ Addresses []IPAddress `json:"addresses"` ++ ++ // The local health check to run to ensure the Virtual IP can sit on ++ // this specific node ++ Healthcheck *HealthcheckSpec `json:"healthcheck,omitempty"` + } + +-// VirtualIPPoolStatus defines the observed state of VirtualIPPool. ++// VirtualIPPoolStatus defines the observed state of VirtualIPPool + type VirtualIPPoolStatus struct { + // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster + // Important: Run "make" to regenerate code after modifying this file ++ ++ // List of conditions for the VirtualIPPool ++ // +patchMergeKey=type ++ // +patchStrategy=merge ++ // +listType=map ++ // +listMapKey=type ++ Conditions []Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,1,rep,name=conditions"` + } + +-// +kubebuilder:object:root=true +-// +kubebuilder:subresource:status ++//+kubebuilder:object:root=true ++//+kubebuilder:subresource:status ++//+kubebuilder:resource:shortName=vipp + +-// VirtualIPPool is the Schema for the virtualippools API. ++// VirtualIPPool is the Schema for the virtualippools API + type VirtualIPPool struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` +@@ -50,9 +109,59 @@ + Status VirtualIPPoolStatus `json:"status,omitempty"` + } + +-// +kubebuilder:object:root=true ++// Compute the ConfigMap name for a pool ++func (v *VirtualIPPool) GetConfigMap() *corev1.ConfigMap { ++ return &corev1.ConfigMap{ ++ ObjectMeta: metav1.ObjectMeta{ ++ Name: v.GetName(), ++ Namespace: v.GetNamespace(), ++ }, ++ } ++} ++ ++// Compute the DaemonSet name for a pool ++func (v *VirtualIPPool) GetDaemonSet() *appsv1.DaemonSet { ++ return &appsv1.DaemonSet{ ++ ObjectMeta: metav1.ObjectMeta{ ++ Name: v.GetName(), ++ Namespace: v.GetNamespace(), ++ }, ++ } ++} ++ ++// Set a condition on VirtualIPPool ++func (v *VirtualIPPool) SetCondition(kind string, status metav1.ConditionStatus, reason string, message string) { ++ setCondition(v.Generation, &v.Status.Conditions, kind, status, reason, message) ++} ++ ++// Get a condition from VirtualIPPool ++func (v *VirtualIPPool) GetCondition(kind string) *Condition { ++ return getCondition(v.Status.Conditions, kind) ++} ++ ++// Set Configured Condition ++func (v *VirtualIPPool) SetConfiguredCondition(status metav1.ConditionStatus, reason string, message string) { ++ v.SetCondition(configuredConditionName, status, reason, message) ++} ++ ++// Set Available Condition ++func (v *VirtualIPPool) SetAvailableCondition(status metav1.ConditionStatus, reason string, message string) { ++ v.SetCondition(availableConditionName, status, reason, message) ++} ++ ++// Set Ready Condition ++func (v *VirtualIPPool) SetReadyCondition(status metav1.ConditionStatus, reason string, message string) { ++ v.SetCondition(readyConditionName, status, reason, message) ++} ++ ++// Get Ready Condition ++func (v *VirtualIPPool) GetReadyCondition() *Condition { ++ return v.GetCondition(readyConditionName) ++} ++ ++//+kubebuilder:object:root=true + +-// VirtualIPPoolList contains a list of VirtualIPPool. ++// VirtualIPPoolList contains a list of VirtualIPPool + type VirtualIPPoolList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` diff --git a/tools/upgrade-operator-sdk/pyproject.toml b/tools/upgrade-operator-sdk/pyproject.toml new file mode 100644 index 0000000000..5116f496c8 --- /dev/null +++ b/tools/upgrade-operator-sdk/pyproject.toml @@ -0,0 +1,39 @@ +# Linting and formatting configuration for scripts/upgrade-operator-sdk/upgrade.py +# Run from the scripts/ directory: +# python3 -m black upgrade-operator-sdk/upgrade.py +# python3 -m ruff check upgrade-operator-sdk/upgrade.py +# python3 -m mypy upgrade-operator-sdk/upgrade.py + +[tool.black] +line-length = 88 +target-version = ["py310"] + +[tool.ruff] +line-length = 88 +target-version = "py310" + +[tool.ruff.lint] +select = [ + "E", # pycodestyle errors + "W", # pycodestyle warnings + "F", # pyflakes (undefined names, unused imports, …) + "I", # isort (import ordering) + "N", # pep8-naming conventions + "UP", # pyupgrade (modernise Python syntax) + "B", # flake8-bugbear (likely bugs and design issues) + "C4", # flake8-comprehensions (better list/dict/set comprehensions) + "SIM", # flake8-simplify (simplifiable code patterns) + "RET", # flake8-return (return statement issues) + "PTH", # flake8-use-pathlib (prefer pathlib over os.path) + "TRY", # tryceratops (exception handling anti-patterns) +] +ignore = [ + "RET504", # allow x = ...; return x (readability) + "TRY003", # allow long messages in raise/die() calls + "TRY300", # allow return inside try block +] + +[tool.mypy] +strict = true +ignore_missing_imports = true +python_version = "3.10" diff --git a/tools/upgrade-operator-sdk/storage-operator/config.yaml b/tools/upgrade-operator-sdk/storage-operator/config.yaml new file mode 100644 index 0000000000..537cd84199 --- /dev/null +++ b/tools/upgrade-operator-sdk/storage-operator/config.yaml @@ -0,0 +1,32 @@ +repo: github.com/scality/metalk8s/storage-operator +domain: metalk8s.scality.com + +operator_sdk_version: v1.42.1 + +# Optional: pin versions. If absent, the script detects the latest +# patch from the scaffold's go.mod and auto-pins them here. +go_toolchain: go1.24.13 +k8s_libs: v0.33.10 + +apis: + - group: storage + version: v1alpha1 + kind: Volume + namespaced: false + +# Directories/files copied as-is from backup (purely custom, no scaffold equivalent). +raw_copy: + - config/metalk8s + - salt + - api/v1alpha1/volume_types_test.go + - internal/controller/slice.go + - internal/controller/volume_controller_test.go + +# Files/dirs to delete from the scaffold (not needed or incompatible). +delete: + - .devcontainer + - .github + - internal/controller/volume_controller_test.go + +extra_commands: + - ["make", "metalk8s"] diff --git a/tools/upgrade-operator-sdk/storage-operator/patches/Dockerfile.patch b/tools/upgrade-operator-sdk/storage-operator/patches/Dockerfile.patch new file mode 100644 index 0000000000..f1851655f1 --- /dev/null +++ b/tools/upgrade-operator-sdk/storage-operator/patches/Dockerfile.patch @@ -0,0 +1,51 @@ +--- a/Dockerfile ++++ b/Dockerfile +@@ -15,6 +15,7 @@ + COPY cmd/main.go cmd/main.go + COPY api/ api/ + COPY internal/ internal/ ++COPY salt/ salt/ + + # Build + # the GOARCH has not a default value to allow the binary be built according to the host where the command +@@ -31,3 +32,40 @@ + USER 65532:65532 + + ENTRYPOINT ["/manager"] ++ ++# Timestamp of the build, formatted as RFC3339 ++ARG BUILD_DATE ++# Git revision o the tree at build time ++ARG VCS_REF ++# Version of the image ++ARG VERSION ++# Version of the project, e.g. `git describe --always --long --dirty --broken` ++ARG METALK8S_VERSION ++ ++# These contain BUILD_DATE so should come 'late' for layer caching ++LABEL maintainer="squad-metalk8s@scality.com" \ ++ # http://label-schema.org/rc1/ ++ org.label-schema.build-date="$BUILD_DATE" \ ++ org.label-schema.name="storage-operator" \ ++ org.label-schema.description="Kubernetes Operator for managing PersistentVolumes in MetalK8s" \ ++ org.label-schema.url="https://github.com/scality/metalk8s/" \ ++ org.label-schema.vcs-url="https://github.com/scality/metalk8s.git" \ ++ org.label-schema.vcs-ref="$VCS_REF" \ ++ org.label-schema.vendor="Scality" \ ++ org.label-schema.version="$VERSION" \ ++ org.label-schema.schema-version="1.0" \ ++ # https://github.com/opencontainers/image-spec/blob/master/annotations.md ++ org.opencontainers.image.created="$BUILD_DATE" \ ++ org.opencontainers.image.authors="squad-metalk8s@scality.com" \ ++ org.opencontainers.image.url="https://github.com/scality/metalk8s/" \ ++ org.opencontainers.image.source="https://github.com/scality/metalk8s.git" \ ++ org.opencontainers.image.version="$VERSION" \ ++ org.opencontainers.image.revision="$VCS_REF" \ ++ org.opencontainers.image.vendor="Scality" \ ++ org.opencontainers.image.title="storage-operator" \ ++ org.opencontainers.image.description="Kubernetes Operator for managing PersistentVolumes in MetalK8s" \ ++ # https://docs.openshift.org/latest/creating_images/metadata.html ++ io.openshift.tags="metalk8s,storage,operator" \ ++ io.k8s.description="Kubernetes Operator for managing PersistentVolumes in MetalK8s" \ ++ # Various ++ com.scality.metalk8s.version="$METALK8S_VERSION" diff --git a/tools/upgrade-operator-sdk/storage-operator/patches/Makefile.patch b/tools/upgrade-operator-sdk/storage-operator/patches/Makefile.patch new file mode 100644 index 0000000000..f98333af31 --- /dev/null +++ b/tools/upgrade-operator-sdk/storage-operator/patches/Makefile.patch @@ -0,0 +1,15 @@ +--- a/Makefile ++++ b/Makefile +@@ -3,3 +3,12 @@ + .PHONY: catalog-push + catalog-push: ## Push a catalog image. + $(MAKE) docker-push IMG=$(CATALOG_IMG) ++ ++# Force Go toolchain version ++export GOTOOLCHAIN = __GOTOOLCHAIN__ ++ ++.PHONY: metalk8s ++metalk8s: manifests kustomize ## Generate MetalK8s resulting manifests ++ mkdir -p deploy ++ $(KUSTOMIZE) build config/metalk8s | \ ++ sed 's/BUILD_IMAGE_CLUSTER_OPERATOR:latest/{{ build_image_name("storage-operator") }}/' > deploy/manifests.yaml diff --git a/tools/upgrade-operator-sdk/storage-operator/patches/volume_controller.patch b/tools/upgrade-operator-sdk/storage-operator/patches/volume_controller.patch new file mode 100644 index 0000000000..34565a22d8 --- /dev/null +++ b/tools/upgrade-operator-sdk/storage-operator/patches/volume_controller.patch @@ -0,0 +1,1146 @@ +--- a/internal/controller/volume_controller.go ++++ b/internal/controller/volume_controller.go +@@ -1,5 +1,5 @@ + /* +-Copyright 2026. ++Copyright 2021. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. +@@ -18,46 +18,1121 @@ + + import ( + "context" ++ "fmt" ++ "io/ioutil" ++ "strconv" ++ "time" + ++ errorsng "github.com/pkg/errors" ++ corev1 "k8s.io/api/core/v1" ++ storagev1 "k8s.io/api/storage/v1" ++ "k8s.io/apimachinery/pkg/api/errors" ++ "k8s.io/apimachinery/pkg/api/resource" ++ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" ++ "k8s.io/apimachinery/pkg/types" ++ "k8s.io/client-go/rest" ++ "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" ++ "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + logf "sigs.k8s.io/controller-runtime/pkg/log" ++ "sigs.k8s.io/controller-runtime/pkg/reconcile" + + storagev1alpha1 "github.com/scality/metalk8s/storage-operator/api/v1alpha1" ++ "github.com/scality/metalk8s/storage-operator/salt" + ) + ++/* Explanations/schemas about volume lifecycle/reconciliation workflow {{{ ++ ++=================================== ++= Reconciliation loop (top level) = ++=================================== ++ ++When receiving a request, the first thing we do is to fetch the targeted Volume. ++If it doesn't exist, which happens when a volume is `Terminating` and has no ++finalizer, then we're done: nothing more to do. ++ ++If the volume does exist, we have to check its semantic validity (this task is ++usually done by an Admission Controller but it may not be always up and running, ++so we should have a check here). ++ ++Once pre-checks are done, we will fall in one of four cases: ++- the volume is marked for deletion: we have to try to delete the volume ++ (details are given in the "Finalize Volume" section below). ++- the volume is stuck in an unrecoverable (automatically at least) error state: ++ we can't do anything here: the request is considered done and won't be ++ rescheduled. ++- the volume doesn't have a backing PersistentVolume (e.g: newly created ++ volume): we have to "deploy" the volume (details are given in the "Deploy ++ Volume" section below) ++- the backing PersistentVolume exists: let's check its status to update the ++ volume's status accordingly. ++ ++ ----- ------ ++(START) +-------------------------->( STOP ) ++ ----- | ------ ++ | | ^ ++ | | | ++ | | | ++ | +------------+ | ++ | +------------------------------>|DoNotRequeue|<-------------+ | ++ | | +------------+ | | ++ | |N ^ | | ++ v | |Y |Y | +++-------+ Y +------+ Y +-----------+ N +-------+ N +-----+ Y +---------+ | ++|Exists?|-->|Valid?|-->|Terminating?|-->|Failed?|-->|HasPv|-->|PvHealthy| | +++-------+ +------+ +-----------+ +-------+ +-----+ +---------+ | ++ |N |Y |N |N | ++ | v v | | ++ | +--------------+ +------------+ | | ++ | |FinalizeVolume| |DeployVolume| | | ++ | +--------------+ +------------+ | | ++ | v | ++ | +---------+ +-------+ ++ +---------------------------------------->|SetFailed|->|Requeue| ++ +---------+ +-------+ ++ ++ ++ ++================= ++= Deploy Volume = ++================= ++ ++To "deploy" a volume, we need to prepare its storage (using Salt) and create a ++backing PersistentVolume. ++ ++If we have no value for `Job`, that means nothing has started, thus we set a ++finalizer on ourself and then start the volume preparation using an asynchronous ++Salt call (which gives us a job ID) before rescheduling the request to monitor ++the evolution of the job. ++ ++If we do have a job ID, then something is in progress and we monitor it until ++it's over. ++If it has ended with an error, we move the volume into a failed state. ++ ++Otherwise we make another asynchronous Salt call to get information on the ++backing storage device (the polling is done exactly as described above). ++ ++If we successfully retrieve the device information, we proceed with the ++PersistentVolume creation, taking care of putting a finalizer on the ++PersistentVolume (so that its lifetime is tied to ours) and setting ourself as ++the owner of the PersistentVolume. ++ ++Once we have successfuly created the PersistentVolume, we can move into the ++`Available` state and reschedule the request (the next iteration will check the ++health of the PersistentVolume we just created). ++ ++ +------------------+ +------------------+ +----------+ ++ +-->|SetVolumeFinalizer|-->|SpawnPrepareVolume|-->|SetPending| ++ | +------------------+ +------------------+ +----------+ ++ | NO | ++ | v ++ ----- +----+ DONE +--------+ +------------+ +-------+ ------ ++(START)-->|Job?|------>|CreatePV|-->|SetAvailable|--------->|Requeue|-->( STOP ) ++ ----- +----+ +--------+ +------------+ +-------+ ------ ++ | YES ^ ++ v | ++ +-----------+ Job Failed +---------+ | ++ | |-------------->|SetFailed|------------------>+ ++ | | +---------+ | ++ | | | ++ | | Unknown Job +--------+ | ++ |PollSaltJob|-------------->|UnsetJob|------------------->+ ++ | | +--------+ | ++ | | | ++ | | Job Succeed +--------+ | ++ | |-------------->|Job=DONE|------------------->+ ++ +-----------+ +--------+ | ++ | Job in progress | ++ | | ++ +----------------------------------------------------+ ++ ++================ ++= Steady state = ++================ ++ ++Once the volume is deployed, we update, with a synchronous Salt call, the ++`deviceName` status field at each reconciliation loop iteration. This field ++contains the name of the underlying block device (as found under `/dev`). ++ ++=================== ++= Finalize Volume = ++=================== ++ ++`Pending` volumes cannot be deleted (because we don't know where we are in the ++creation process), so we reschedule the request until the volume becomes either ++`Failed` or `Available`. ++ ++For volumes with no backing PersistentVolume we directly go reclaim the storage ++on the node and upon completion we remove our finalizer to let Kubernetes delete ++us. ++ ++If we do have a backing PersistentVolume, we delete it (if it's not already in a ++terminating state) and watch for the moment when it becomes unused (this is done ++by rescheduling). Once the backing PersistentVolume becomes unused, we go ++reclaim its storage and remove the finalizers to let the object be deleted. ++ ++ ----- ------ ++ (START) ( STOP ) ++ ----- ------ ++ | ^ ++ | | ++ v | +++--------+ YES +-------+ ++|Pending?|------------------------------------------------------->|Requeue| +++--------+ +-------+ ++ | NO ^ ++ v | +++--------+ YES +----------------+ NO +--------+ | ++| HasPv? |----------->|IsPvTerminating?|---->|DeletePV|------------->| +++--------+ +----------------+ +--------+ | ++ | NO | YES | ++ | v | ++ | YES +-----------+ NO | ++ |<-----------------|IsPvUnused?|-------------------------------->| ++ | +-----------+ | ++ | | ++ | +--------------------+ +--------------+ | ++ | +---->|SpawnUnprepareVolume|-->|SetTerminating|---------->| ++ | | +--------------------+ +--------------+ | ++ | | NO | ++ | | | ++ | +----+ DONE +-----------------+ +---------------------+ | ++ +-->|Job?|------>|RemovePvFinalizer|-->|RemoveVolumeFinalizer|-->| ++ +----+ +-----------------+ +---------------------+ | ++ | YES | ++ v | ++ +-----------+ Job Failed +---------+ | ++ | |--------------------->|SetFailed|----------------->| ++ | | +---------+ | ++ | | | ++ | | Unknown Job +--------+ | ++ |PollSaltJob|--------------------->|UnsetJob|------------------>| ++ | | +--------+ | ++ | | | ++ | | Job Succeed +--------+ | ++ | |--------------------->|Job=DONE|------------------>| ++ +-----------+ +--------+ | ++ | Job in progress | ++ | | ++ +----------------------------------------------------------+ ++ ++}}} */ ++ ++const VOLUME_PROTECTION = "storage.metalk8s.scality.com/volume-protection" ++const JOB_DONE_MARKER = "DONE" ++ ++var log = logf.Log.WithName("volume-controller") ++ ++type deviceInfo struct { ++ size int64 // Size of the device (in bytes) ++ path string // Reliable path to the device. ++} ++ + // VolumeReconciler reconciles a Volume object + type VolumeReconciler struct { + client.Client +- Scheme *runtime.Scheme ++ Scheme *runtime.Scheme ++ recorder record.EventRecorder ++ salt *salt.Client ++ devices map[string]deviceInfo ++} ++ ++// Trace a state transition, using logging and Kubernetes events. ++func (self *VolumeReconciler) traceStateTransition( ++ volume *storagev1alpha1.Volume, oldPhase storagev1alpha1.VolumePhase, ++) { ++ newPhase := volume.ComputePhase() ++ ++ // Nothing to trace if there is no transition. ++ if newPhase == oldPhase { ++ return ++ } ++ ++ reqLogger := log.WithValues("Volume.Name", volume.Name) ++ ++ self.recorder.Eventf( ++ volume, corev1.EventTypeNormal, "StateTransition", ++ "volume phase transition from '%s' to '%s'", ++ oldPhase, newPhase, ++ ) ++ reqLogger.Info( ++ "volume phase transition: requeue", ++ "Volume.OldPhase", oldPhase, ++ "Volume.NewPhase", newPhase, ++ ) ++} ++ ++// Commit the Volume Status update. ++func (self *VolumeReconciler) updateVolumeStatus( ++ ctx context.Context, ++ volume *storagev1alpha1.Volume, ++ oldPhase storagev1alpha1.VolumePhase, ++) (reconcile.Result, error) { ++ reqLogger := log.WithValues("Volume.Name", volume.Name) ++ ++ if err := self.Client.Status().Update(ctx, volume); err != nil { ++ reqLogger.Error(err, "cannot update Volume status: requeue") ++ return delayedRequeue(err) ++ } ++ ++ self.traceStateTransition(volume, oldPhase) ++ // Status updated: reschedule to move forward. ++ return requeue(nil) ++} ++ ++// Put the volume into Failed state. ++func (self *VolumeReconciler) setFailedVolumeStatus( ++ ctx context.Context, ++ volume *storagev1alpha1.Volume, ++ pv *corev1.PersistentVolume, ++ reason storagev1alpha1.ConditionReason, ++ format string, ++ args ...interface{}, ++) (reconcile.Result, error) { ++ reqLogger := log.WithValues("Volume.Name", volume.Name) ++ oldPhase := volume.ComputePhase() ++ ++ volume.SetFailedStatus(reason, format, args...) ++ if _, err := self.updateVolumeStatus(ctx, volume, oldPhase); err != nil { ++ return delayedRequeue(err) ++ } ++ // If a PV is provided, move it to Failed state as well. ++ if pv != nil { ++ pv.Status = corev1.PersistentVolumeStatus{ ++ Phase: corev1.VolumeFailed, ++ Message: "the owning volume failed", ++ Reason: "OwnerFailed", ++ } ++ if err := self.Client.Status().Update(ctx, pv); err != nil { ++ reqLogger.Error( ++ err, "cannot update PersistentVolume status: requeue", ++ "PersistentVolume.Name", pv.Name, ++ ) ++ return delayedRequeue(err) ++ } ++ } ++ return requeue(nil) ++} ++ ++// Put the volume into Pending state. ++func (self *VolumeReconciler) setPendingVolumeStatus( ++ ctx context.Context, volume *storagev1alpha1.Volume, job string, ++) (reconcile.Result, error) { ++ oldPhase := volume.ComputePhase() ++ ++ volume.SetPendingStatus(job) ++ if _, err := self.updateVolumeStatus(ctx, volume, oldPhase); err != nil { ++ return delayedRequeue(err) ++ } ++ return requeue(nil) ++} ++ ++// Put the volume into Available state. ++func (self *VolumeReconciler) setAvailableVolumeStatus( ++ ctx context.Context, volume *storagev1alpha1.Volume, ++) (reconcile.Result, error) { ++ oldPhase := volume.ComputePhase() ++ ++ volume.SetAvailableStatus() ++ if _, err := self.updateVolumeStatus(ctx, volume, oldPhase); err != nil { ++ return delayedRequeue(err) ++ } ++ return requeue(nil) ++} ++ ++// Put the volume into Terminating state. ++func (self *VolumeReconciler) setTerminatingVolumeStatus( ++ ctx context.Context, volume *storagev1alpha1.Volume, job string, ++) (reconcile.Result, error) { ++ oldPhase := volume.ComputePhase() ++ ++ volume.SetTerminatingStatus(job) ++ if _, err := self.updateVolumeStatus(ctx, volume, oldPhase); err != nil { ++ return delayedRequeue(err) ++ } ++ return requeue(nil) ++} ++ ++// Add the volume-protection on the volume (if not already present). ++func (self *VolumeReconciler) addVolumeFinalizer( ++ ctx context.Context, volume *storagev1alpha1.Volume, ++) error { ++ finalizers := volume.GetFinalizers() ++ volume.SetFinalizers(SliceAppendUnique(finalizers, VOLUME_PROTECTION)) ++ return self.Client.Update(ctx, volume) ++} ++ ++// Remove the volume-protection on the volume (if not already present). ++func (self *VolumeReconciler) removeVolumeFinalizer( ++ ctx context.Context, volume *storagev1alpha1.Volume, ++) error { ++ finalizers := volume.GetFinalizers() ++ volume.SetFinalizers(SliceRemoveValue(finalizers, VOLUME_PROTECTION)) ++ return self.Client.Update(ctx, volume) ++} ++ ++// Get the PersistentVolume associated to the given volume. ++// ++// Return `nil` if no such volume exists. ++func (self *VolumeReconciler) getPersistentVolume( ++ ctx context.Context, volume *storagev1alpha1.Volume, ++) (*corev1.PersistentVolume, error) { ++ pv := &corev1.PersistentVolume{} ++ key := types.NamespacedName{Namespace: "", Name: volume.Name} ++ ++ if err := self.Client.Get(ctx, key, pv); err != nil { ++ if errors.IsNotFound(err) { ++ return nil, nil ++ } ++ return nil, err ++ } ++ if !metav1.IsControlledBy(pv, volume) { ++ return nil, fmt.Errorf( ++ "name conflict: PersistentVolume %s not owned by Volume %s", ++ pv.Name, volume.Name, ++ ) ++ } ++ ++ return pv, nil ++} ++ ++// Get the storage class identified by the given name. ++func (self *VolumeReconciler) getStorageClass( ++ ctx context.Context, name string, ++) (*storagev1.StorageClass, error) { ++ sc := &storagev1.StorageClass{} ++ key := types.NamespacedName{Namespace: "", Name: name} ++ ++ if err := self.Client.Get(ctx, key, sc); err != nil { ++ return nil, err ++ } ++ return sc, nil ++} ++ ++// Remove the volume-protection on the PV (if not already present). ++func (self *VolumeReconciler) removePvFinalizer( ++ ctx context.Context, pv *corev1.PersistentVolume, ++) error { ++ finalizers := pv.GetFinalizers() ++ pv.SetFinalizers(SliceRemoveValue(finalizers, VOLUME_PROTECTION)) ++ return self.Client.Update(ctx, pv) + } + +-// +kubebuilder:rbac:groups=storage.metalk8s.scality.com,resources=volumes,verbs=get;list;watch;create;update;patch;delete +-// +kubebuilder:rbac:groups=storage.metalk8s.scality.com,resources=volumes/status,verbs=get;update;patch +-// +kubebuilder:rbac:groups=storage.metalk8s.scality.com,resources=volumes/finalizers,verbs=update ++type stateSetter func( ++ context.Context, *storagev1alpha1.Volume, string, ++) (reconcile.Result, error) ++ ++type jobSuccessCallback func(map[string]interface{}) (reconcile.Result, error) ++ ++// Poll a Salt state job. ++func (self *VolumeReconciler) pollSaltJob( ++ ctx context.Context, ++ stepName string, ++ volume *storagev1alpha1.Volume, ++ pv *corev1.PersistentVolume, ++ setState stateSetter, ++ reason storagev1alpha1.ConditionReason, ++ onSuccess jobSuccessCallback, ++) (reconcile.Result, error) { ++ nodeName := string(volume.Spec.NodeName) ++ reqLogger := log.WithValues( ++ "Volume.Name", volume.Name, "Volume.NodeName", nodeName, ++ ) ++ ++ job, err := salt.JobFromString(volume.Status.Job) ++ if err != nil { ++ reqLogger.Error(err, "cannot parse Salt job from Volume status") ++ return self.setFailedVolumeStatus( ++ ctx, volume, pv, reason, "cannot parse Salt job from Volume status", ++ ) ++ } ++ if result, err := self.salt.PollJob(ctx, job, nodeName); err != nil { ++ reqLogger.Error( ++ err, fmt.Sprintf("failed to poll Salt job '%s' status", job.Name), ++ ) ++ // This one is not retryable. ++ if failure, ok := err.(*salt.AsyncJobFailed); ok { ++ self.recorder.Eventf( ++ volume, corev1.EventTypeWarning, "SaltCall", ++ "step '%s' failed", stepName, ++ ) ++ return self.setFailedVolumeStatus( ++ ctx, volume, pv, reason, ++ "Salt job '%s' failed with: %s", job.Name, failure.Error(), ++ ) ++ } ++ // Job salt not found or failed to run, let's retry. ++ job.ID = "" ++ return setState(ctx, volume, job.String()) ++ } else { ++ if result == nil { ++ reqLogger.Info( ++ fmt.Sprintf("Salt job '%s' still in progress", job.Name), ++ ) ++ return delayedRequeue(nil) ++ } ++ self.recorder.Eventf( ++ volume, corev1.EventTypeNormal, "SaltCall", ++ "step '%s' succeeded", stepName, ++ ) ++ return onSuccess(result) ++ } ++} ++ ++// Return the saltenv to use on the given node. ++func (self *VolumeReconciler) fetchSaltEnv( ++ ctx context.Context, nodeName string, ++) (string, error) { ++ node := &corev1.Node{} ++ key := types.NamespacedName{Namespace: "", Name: nodeName} ++ ++ if err := self.Client.Get(ctx, key, node); err != nil { ++ return "", err ++ } ++ versionKey := "metalk8s.scality.com/version" ++ if version, found := node.Labels[versionKey]; found { ++ return fmt.Sprintf("metalk8s-%s", version), nil ++ } ++ return "", fmt.Errorf("label %s not found on node %s", versionKey, nodeName) ++} ++ ++// Controller RBAC settings ++ ++// - Volume custom resources ++//+kubebuilder:rbac:groups=storage.metalk8s.scality.com,resources=volumes,verbs=get;list;watch;create;update;patch;delete ++//+kubebuilder:rbac:groups=storage.metalk8s.scality.com,resources=volumes/status,verbs=get;update;patch ++//+kubebuilder:rbac:groups=storage.metalk8s.scality.com,resources=volumes/finalizers,verbs=update ++ ++// - Transition events ++//+kubebuilder:rbac:groups="",resources=events,verbs=create;patch ++ ++// - Owned PersistentVolumes ++//+kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=create;delete;get;list;patch;update;watch ++ ++// - Read referenced StorageClasses ++//+kubebuilder:rbac:groups=storage.k8s.io,resources=storageclasses,verbs=get;list;watch ++ ++// - Read Node's MetalK8s version ++//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch + + // Reconcile is part of the main kubernetes reconciliation loop which aims to + // move the current state of the cluster closer to the desired state. +-// TODO(user): Modify the Reconcile function to compare the state specified by +-// the Volume object against the actual cluster state, and then +-// perform operations to make the cluster state reflect the state specified by +-// the user. ++func (self *VolumeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { ++ reqLogger := log.WithValues("Request.Name", req.Name) ++ reqLogger.Info("reconciling volume: START") ++ defer reqLogger.Info("reconciling volume: STOP") ++ ++ // Fetch the requested Volume object. ++ // ++ // The reconciliation request can be triggered by either a Volume or a ++ // PersistentVolume owned by a Volume (we're watching both), but because the ++ // lifetime of a Volume always span over the whole lifetime of the backing ++ // PersistentVolume (and they have the same name) it is safe to always ++ // lookup a Volume here. ++ volume := &storagev1alpha1.Volume{} ++ err := self.Client.Get(ctx, req.NamespacedName, volume) ++ if err != nil { ++ if errors.IsNotFound(err) { ++ // Volume not found: ++ // => all the finalizers have been removed & Volume has been deleted ++ // => there is nothing left to do ++ reqLogger.Info("volume already deleted: nothing to do") ++ return endReconciliation() ++ } ++ reqLogger.Error(err, "cannot read Volume: requeue") ++ return delayedRequeue(err) ++ } ++ if err := volume.IsValid(); err != nil { ++ return self.setFailedVolumeStatus( ++ ctx, volume, nil, storagev1alpha1.ReasonInternalError, ++ "invalid volume: %s", err.Error(), ++ ) ++ } ++ saltenv, err := self.fetchSaltEnv(ctx, string(volume.Spec.NodeName)) ++ if err != nil { ++ reqLogger.Error(err, "cannot compute saltenv") ++ return delayedRequeue(err) ++ } ++ // Check if the volume is marked for deletion (i.e., deletion tstamp is set). ++ if !volume.GetDeletionTimestamp().IsZero() { ++ // Pending volume: can do nothing but wait for stabilization. ++ if volume.ComputePhase() == storagev1alpha1.VolumePending { ++ reqLogger.Info("pending volume cannot be finalized: requeue") ++ // Do not return here! We need to re-enter deployVolume to keep ++ // polling the Salt job and make progress. ++ } else { ++ return self.finalizeVolume(ctx, volume, saltenv) ++ } ++ } ++ // Skip volume stuck waiting for deletion or a manual fix. ++ if condition := volume.IsInUnrecoverableFailedState(); condition != nil { ++ reqLogger.Info( ++ "volume stuck in error state: do nothing", ++ "Error.Code", condition.Reason, ++ "Error.Message", condition.Message, ++ ) ++ return endReconciliation() ++ } ++ // Check if a PV already exists for this volume. ++ pv, err := self.getPersistentVolume(ctx, volume) ++ if err != nil { ++ reqLogger.Error( ++ err, "error while looking for backing PersistentVolume: requeue", ++ "PersistentVolume.Name", volume.Name, ++ ) ++ return delayedRequeue(err) ++ } ++ // PV doesn't exist: deploy the volume to create it. ++ if pv == nil { ++ return self.deployVolume(ctx, volume, saltenv) ++ } ++ // Else, check its health. ++ if pv.Status.Phase == corev1.VolumeFailed { ++ _, err := self.setFailedVolumeStatus( ++ ctx, volume, nil, storagev1alpha1.ReasonUnavailableError, ++ "backing PersistentVolume is in a failed state (%s): %s", ++ pv.Status.Reason, pv.Status.Message, ++ ) ++ return delayedRequeue(err) ++ } ++ if _, err = self.setAvailableVolumeStatus(ctx, volume); err != nil { ++ return delayedRequeue(err) ++ } ++ reqLogger.Info("backing PersistentVolume is healthy") ++ return self.refreshDeviceName(ctx, volume, pv) ++} ++ ++// Deploy a volume (i.e prepare the storage and create a PV). ++func (self *VolumeReconciler) deployVolume( ++ ctx context.Context, ++ volume *storagev1alpha1.Volume, ++ saltenv string, ++) (reconcile.Result, error) { ++ nodeName := string(volume.Spec.NodeName) ++ reqLogger := log.WithValues( ++ "Volume.Name", volume.Name, "Volume.NodeName", nodeName, ++ ) ++ job, err := salt.JobFromString(volume.Status.Job) ++ if err != nil { ++ reqLogger.Error(err, "cannot parse Salt job from Volume status") ++ return requeue(err) ++ } ++ ++ switch job.Name { ++ // Since it's the first step, the name can be unset the very first time. ++ case "", "PrepareVolume": ++ return self.prepareStorage(ctx, volume, saltenv, job) ++ case "GetDeviceInfo": ++ return self.getStorageSize(ctx, volume, job) ++ default: ++ // Shouldn't happen, except if someome somehow tampered our status field… ++ return self.setFailedVolumeStatus( ++ ctx, volume, nil, storagev1alpha1.ReasonCreationError, ++ "Tampered Salt job handle: invalid name (%s)", job.Name, ++ ) ++ } ++} ++ ++// Finalize a volume marked for deletion. ++func (self *VolumeReconciler) finalizeVolume( ++ ctx context.Context, ++ volume *storagev1alpha1.Volume, ++ saltenv string, ++) (reconcile.Result, error) { ++ reqLogger := log.WithValues("Volume.Name", volume.Name) ++ // Check if a PV is associated to the volume. ++ pv, err := self.getPersistentVolume(ctx, volume) ++ if err != nil { ++ reqLogger.Error( ++ err, "error while looking for backing PersistentVolume: requeue", ++ "PersistentVolume.Name", volume.Name, ++ ) ++ return delayedRequeue(err) ++ } ++ ++ // If we have a backing PV we delete it (the finalizer will keep it alive). ++ if pv != nil && pv.GetDeletionTimestamp().IsZero() { ++ if err := self.Client.Delete(ctx, pv); err != nil { ++ reqLogger.Error( ++ err, "cannot delete PersistentVolume: requeue", ++ "PersistentVolume.Name", volume.Name, ++ ) ++ return delayedRequeue(err) ++ } ++ reqLogger.Info( ++ "deleting backing PersistentVolume", ++ "PersistentVolume.Name", pv.Name, ++ ) ++ self.recorder.Event( ++ volume, corev1.EventTypeNormal, "PvDeletion", ++ "backing PersistentVolume deleted", ++ ) ++ return requeue(nil) ++ } ++ ++ // If we don't have a PV or it's only used by us we can reclaim the storage. ++ if pv == nil || isPersistentVolumeUnused(pv) { ++ return self.reclaimStorage(ctx, volume, pv, saltenv) ++ } ++ ++ // PersistentVolume still in use: wait before reclaiming the storage. ++ return delayedRequeue(nil) ++} ++ ++func (self *VolumeReconciler) refreshDeviceName( ++ ctx context.Context, ++ volume *storagev1alpha1.Volume, ++ pv *corev1.PersistentVolume, ++) (reconcile.Result, error) { ++ nodeName := string(volume.Spec.NodeName) ++ reqLogger := log.WithValues( ++ "Volume.Name", volume.Name, "Volume.NodeName", nodeName, ++ ) ++ ++ if pv.Spec.PersistentVolumeSource.Local == nil { ++ reqLogger.Info("skipping volume: not a local storage") ++ return endReconciliation() ++ } ++ path := pv.Spec.PersistentVolumeSource.Local.Path ++ ++ name, err := self.salt.GetDeviceName(ctx, nodeName, volume.Name, path) ++ if err != nil { ++ self.recorder.Event( ++ volume, corev1.EventTypeNormal, "SaltCall", ++ "device path resolution failed", ++ ) ++ reqLogger.Error(err, "cannot get device name from Salt response") ++ return delayedRequeue(err) ++ } ++ if volume.Status.DeviceName != name { ++ volume.Status.DeviceName = name ++ reqLogger.Info("update device name", "Volume.DeviceName", name) ++ return self.setAvailableVolumeStatus(ctx, volume) ++ } ++ ++ return endReconciliation() ++} ++ ++func (self *VolumeReconciler) prepareStorage( ++ ctx context.Context, ++ volume *storagev1alpha1.Volume, ++ saltenv string, ++ job *salt.JobHandle, ++) (reconcile.Result, error) { ++ nodeName := string(volume.Spec.NodeName) ++ reqLogger := log.WithValues( ++ "Volume.Name", volume.Name, "Volume.NodeName", nodeName, ++ ) ++ ++ switch job.ID { ++ case "": // No job in progress: call Salt to prepare the volume. ++ // Set volume-protection finalizer on the volume. ++ if err := self.addVolumeFinalizer(ctx, volume); err != nil { ++ reqLogger.Error(err, "cannot set volume-protection: requeue") ++ return delayedRequeue(err) ++ } ++ job, err := self.salt.PrepareVolume(ctx, nodeName, volume.Name, saltenv) ++ if err != nil { ++ reqLogger.Error(err, "failed to run PrepareVolume") ++ return delayedRequeue(err) ++ } else { ++ reqLogger.Info("start to prepare the volume") ++ self.recorder.Event( ++ volume, corev1.EventTypeNormal, "SaltCall", ++ "volume provisioning step 1/2 started", ++ ) ++ return self.setPendingVolumeStatus(ctx, volume, job.String()) ++ } ++ case JOB_DONE_MARKER: // Storage is ready, let's get its information. ++ job.Name = "GetDeviceInfo" ++ job.ID = "" ++ return self.getStorageSize(ctx, volume, job) ++ default: // PrepareVolume in progress: poll its state. ++ return self.pollSaltJob( ++ ctx, "volume provisioning (1/2)", volume, nil, ++ self.setPendingVolumeStatus, ++ storagev1alpha1.ReasonCreationError, ++ func(_ map[string]interface{}) (reconcile.Result, error) { ++ job.ID = JOB_DONE_MARKER ++ return self.setPendingVolumeStatus(ctx, volume, job.String()) ++ }, ++ ) ++ } ++} ++ ++func (self *VolumeReconciler) getStorageSize( ++ ctx context.Context, ++ volume *storagev1alpha1.Volume, ++ job *salt.JobHandle, ++) (reconcile.Result, error) { ++ nodeName := string(volume.Spec.NodeName) ++ reqLogger := log.WithValues( ++ "Volume.Name", volume.Name, "Volume.NodeName", nodeName, ++ ) ++ ++ switch job.ID { ++ case "": // No job in progress: call Salt to get the volume information. ++ job, err := self.salt.GetDeviceInfo(ctx, nodeName, volume.Name) ++ if err != nil { ++ reqLogger.Error(err, "failed to run GetDeviceInfo") ++ return delayedRequeue(err) ++ } else { ++ reqLogger.Info("try to retrieve the volume information") ++ self.recorder.Event( ++ volume, corev1.EventTypeNormal, "SaltCall", ++ "volume provisioning step 2/2 started", ++ ) ++ return self.setPendingVolumeStatus(ctx, volume, job.String()) ++ } ++ case JOB_DONE_MARKER: // We have everything we need: let's create the PV! ++ return self.createPersistentVolume(ctx, volume) ++ default: // GetDeviceInfo in progress: poll its state. ++ return self.pollSaltJob( ++ ctx, "volume provisioning (2/2)", volume, nil, ++ self.setPendingVolumeStatus, ++ storagev1alpha1.ReasonCreationError, ++ func(result map[string]interface{}) (reconcile.Result, error) { ++ info, err := parseDeviceInfo(result) ++ if err != nil { ++ reqLogger.Error(err, "cannot get device info from Salt response") ++ self.recorder.Event( ++ volume, corev1.EventTypeNormal, "SaltCall", ++ "volume provisioning step 2/2 failed", ++ ) ++ return self.setFailedVolumeStatus( ++ ctx, volume, nil, storagev1alpha1.ReasonCreationError, ++ "Salt job '%s' failed with: %s", job.Name, err.Error(), ++ ) ++ } ++ self.devices[volume.Name] = *info ++ job.ID = JOB_DONE_MARKER ++ return self.setPendingVolumeStatus(ctx, volume, job.String()) ++ }, ++ ) ++ } ++} ++ ++// Create a PersistentVolume in the Kubernetes API server. ++func (self *VolumeReconciler) createPersistentVolume( ++ ctx context.Context, volume *storagev1alpha1.Volume, ++) (reconcile.Result, error) { ++ reqLogger := log.WithValues( ++ "Volume.Name", volume.Name, ++ "Volume.NodeName", string(volume.Spec.NodeName), ++ ) ++ ++ // Fetch referenced storage class. ++ scName := volume.Spec.StorageClassName ++ sc, err := self.getStorageClass(ctx, scName) ++ if err != nil { ++ errmsg := fmt.Sprintf("cannot get StorageClass '%s'", scName) ++ reqLogger.Error(err, errmsg, "StorageClass.Name", scName) ++ return delayedRequeue(err) ++ } ++ deviceInfo, found := self.devices[volume.Name] ++ if !found { ++ reqLogger.Error(err, "no device info") ++ // Reschedule a call to `metalk8s_volumes.device_info`. ++ job := salt.JobHandle{Name: "GetDeviceInfo", ID: ""} ++ return self.setPendingVolumeStatus(ctx, volume, job.String()) ++ } ++ // Create the PersistentVolume object. ++ pv, err := newPersistentVolume(volume, sc, deviceInfo) ++ if err != nil { ++ reqLogger.Error( ++ err, "cannot create the PersistentVolume object: requeue", ++ "PersistentVolume.Name", volume.Name, ++ ) ++ return delayedRequeue(err) ++ } ++ // Set Volume instance as the owner and controller. ++ err = controllerutil.SetControllerReference(volume, pv, self.Scheme) ++ if err != nil { ++ reqLogger.Error( ++ err, "cannot become owner of the PersistentVolume: requeue", ++ "PersistentVolume.Name", volume.Name, ++ ) ++ return delayedRequeue(err) ++ } ++ // Create the PV! ++ if err := self.Client.Create(ctx, pv); err != nil { ++ reqLogger.Error( ++ err, "cannot create PersistentVolume: requeue", ++ "PersistentVolume.Name", volume.Name, ++ ) ++ return delayedRequeue(err) ++ } ++ reqLogger.Info( ++ "creating a new PersistentVolume", "PersistentVolume.Name", pv.Name, ++ ) ++ ++ self.recorder.Event( ++ volume, corev1.EventTypeNormal, "PvCreation", ++ "backing PersistentVolume created", ++ ) ++ return self.setAvailableVolumeStatus(ctx, volume) ++} ++ ++// Build a PersistentVolume from a Volume object. ++// ++// Arguments ++// ++// volume: a Volume object ++// storageClass: a StorageClass object ++// deviceInfo: the device information + // +-// For more details, check Reconcile and its Result here: +-// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.21.0/pkg/reconcile +-func (r *VolumeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +- _ = logf.FromContext(ctx) ++// Returns ++// ++// The PersistentVolume representing the given Volume. ++func newPersistentVolume( ++ volume *storagev1alpha1.Volume, ++ storageClass *storagev1.StorageClass, ++ deviceInfo deviceInfo, ++) (*corev1.PersistentVolume, error) { ++ volumeSize := *resource.NewQuantity(deviceInfo.size, resource.BinarySI) ++ // We must have `fsType` as parameter, otherwise we can't create our PV. ++ scName := volume.Spec.StorageClassName ++ fsType, found := storageClass.Parameters["fsType"] ++ if !found { ++ return nil, fmt.Errorf( ++ "missing field 'parameters.fsType' in StorageClass '%s'", scName, ++ ) ++ } ++ ++ pv := corev1.PersistentVolume{ ++ ObjectMeta: volume.Spec.Template.Metadata, ++ Spec: volume.Spec.Template.Spec, ++ } ++ pv.ObjectMeta.Name = volume.Name ++ pv.ObjectMeta.Finalizers = append( ++ pv.ObjectMeta.Finalizers, VOLUME_PROTECTION, ++ ) ++ pv.Spec.AccessModes = []corev1.PersistentVolumeAccessMode{ ++ corev1.ReadWriteOnce, ++ } ++ pv.Spec.Capacity = map[corev1.ResourceName]resource.Quantity{ ++ corev1.ResourceStorage: volumeSize, ++ } ++ pv.Spec.MountOptions = storageClass.MountOptions ++ pv.Spec.VolumeMode = &volume.Spec.Mode ++ pv.Spec.PersistentVolumeSource = corev1.PersistentVolumeSource{ ++ Local: &corev1.LocalVolumeSource{ ++ Path: deviceInfo.path, ++ FSType: &fsType, ++ }, ++ } ++ pv.Spec.PersistentVolumeReclaimPolicy = "Retain" ++ pv.Spec.StorageClassName = volume.Spec.StorageClassName ++ pv.Spec.NodeAffinity = nodeAffinity(volume.Spec.NodeName) ++ ++ return &pv, nil ++} + +- // TODO(user): your logic here ++func nodeAffinity(node types.NodeName) *corev1.VolumeNodeAffinity { ++ selector := corev1.NodeSelector{ ++ NodeSelectorTerms: []corev1.NodeSelectorTerm{ ++ { ++ MatchExpressions: []corev1.NodeSelectorRequirement{ ++ { ++ Key: "kubernetes.io/hostname", ++ Operator: corev1.NodeSelectorOpIn, ++ Values: []string{string(node)}, ++ }, ++ }, ++ }, ++ }, ++ } ++ affinity := corev1.VolumeNodeAffinity{ ++ Required: &selector, ++ } ++ return &affinity ++} ++ ++// Check if a PersistentVolume is only used by us. ++func isPersistentVolumeUnused(pv *corev1.PersistentVolume) bool { ++ reqLogger := log.WithValues("PersistentVolume.Name", pv.Name) + +- return ctrl.Result{}, nil ++ switch pv.Status.Phase { ++ case corev1.VolumeBound: ++ reqLogger.Info( ++ "backing PersistentVolume is bound: cannot delete volume", ++ ) ++ return false ++ case corev1.VolumePending: ++ reqLogger.Info( ++ "backing PersistentVolume is pending: waiting for stabilization", ++ ) ++ return false ++ case corev1.VolumeAvailable, corev1.VolumeReleased, corev1.VolumeFailed: ++ reqLogger.Info("the backing PersistentVolume is in a removable state") ++ finalizers := pv.GetFinalizers() ++ if len(finalizers) == 1 && finalizers[0] == VOLUME_PROTECTION { ++ reqLogger.Info("the backing PersistentVolume is unused") ++ return true ++ } ++ return false ++ default: ++ phase := pv.Status.Phase ++ errmsg := fmt.Sprintf( ++ "unexpected PersistentVolume status (%+v): do nothing", phase, ++ ) ++ reqLogger.Info(errmsg, "PersistentVolume.Status", phase) ++ return false ++ } ++} ++ ++// Destroy the give PersistentVolume. ++func (self *VolumeReconciler) reclaimStorage( ++ ctx context.Context, ++ volume *storagev1alpha1.Volume, ++ pv *corev1.PersistentVolume, ++ saltenv string, ++) (reconcile.Result, error) { ++ nodeName := string(volume.Spec.NodeName) ++ reqLogger := log.WithValues( ++ "Volume.Name", volume.Name, "Volume.NodeName", nodeName, ++ ) ++ job, err := salt.JobFromString(volume.Status.Job) ++ if err != nil { ++ reqLogger.Error(err, "cannot parse Salt job from Volume status") ++ return requeue(err) ++ } ++ jobId := job.ID ++ ++ // Ignore existing Job ID in Failed case (no job are running), JID only here ++ // for debug (which is now useless as we're going to delete the Volume). ++ if volume.ComputePhase() == storagev1alpha1.VolumeFailed { ++ jobId = "" ++ } ++ ++ switch jobId { ++ case "": // No job in progress: call Salt to unprepare the volume. ++ job, err := self.salt.UnprepareVolume( ++ ctx, nodeName, volume.Name, saltenv, ++ ) ++ if err != nil { ++ reqLogger.Error(err, "failed to run UnprepareVolume") ++ return delayedRequeue(err) ++ } else { ++ reqLogger.Info("start to unprepare the volume") ++ self.recorder.Event( ++ volume, corev1.EventTypeNormal, "SaltCall", ++ "volume finalization started", ++ ) ++ return self.setTerminatingVolumeStatus(ctx, volume, job.String()) ++ } ++ case JOB_DONE_MARKER: // Salt job is done, now let's remove the finalizers. ++ if pv != nil { ++ if err := self.removePvFinalizer(ctx, pv); err != nil { ++ reqLogger.Error(err, "cannot remove PersistentVolume finalizer") ++ return delayedRequeue(err) ++ } ++ reqLogger.Info("PersistentVolume finalizer removed") ++ self.recorder.Event( ++ volume, corev1.EventTypeNormal, "VolumeFinalization", ++ "storage reclaimed", ++ ) ++ } ++ if err := self.removeVolumeFinalizer(ctx, volume); err != nil { ++ reqLogger.Error(err, "cannot remove Volume finalizer") ++ return delayedRequeue(err) ++ } ++ reqLogger.Info("volume finalizer removed") ++ return endReconciliation() ++ default: // UnprepareVolume in progress: poll its state. ++ return self.pollSaltJob( ++ ctx, "volume finalization", volume, pv, ++ self.setTerminatingVolumeStatus, ++ storagev1alpha1.ReasonDestructionError, ++ func(_ map[string]interface{}) (reconcile.Result, error) { ++ job.ID = JOB_DONE_MARKER ++ return self.setTerminatingVolumeStatus(ctx, volume, job.String()) ++ }, ++ ) ++ } ++} ++ ++// Trigger a reschedule after a short delay. ++func delayedRequeue(err error) (reconcile.Result, error) { ++ delay := 10 * time.Second ++ return reconcile.Result{Requeue: err == nil, RequeueAfter: delay}, err ++} ++ ++// Trigger a reschedule as soon as possible. ++func requeue(err error) (reconcile.Result, error) { ++ return reconcile.Result{Requeue: err == nil}, err ++} ++ ++// Don't trigger a reschedule, we're done. ++func endReconciliation() (reconcile.Result, error) { ++ return reconcile.Result{}, nil ++} ++ ++// Return the credential to use to authenticate with Salt API. ++func getAuthCredential(config *rest.Config) *salt.Credential { ++ if config.BearerToken == "" { ++ panic("must use a BearerToken for SaltAPI authentication") ++ } ++ log.Info("using ServiceAccount bearer token") ++ return salt.NewCredential( ++ // FIXME: this should depend on the actual SA used ++ "system:serviceaccount:kube-system:storage-operator-controller-manager", ++ config.BearerToken, ++ salt.Bearer, ++ ) ++} ++ ++// Extract the device info from a Salt result. ++func parseDeviceInfo(result map[string]interface{}) (*deviceInfo, error) { ++ size_str, ok := result["size"].(string) ++ if !ok { ++ return nil, fmt.Errorf( ++ "cannot find a string value for key 'size' in %v", result, ++ ) ++ } ++ path, ok := result["path"].(string) ++ if !ok { ++ return nil, fmt.Errorf( ++ "cannot find a string value for key 'path' in %v", result, ++ ) ++ } ++ ++ if size, err := strconv.ParseInt(size_str, 10, 64); err != nil { ++ return nil, errorsng.Wrapf( ++ err, "cannot parse device size (%s)", size_str, ++ ) ++ } else { ++ return &deviceInfo{size, path}, nil ++ } + } + + // SetupWithManager sets up the controller with the Manager. + func (r *VolumeReconciler) SetupWithManager(mgr ctrl.Manager) error { ++ config := mgr.GetConfig() ++ caCertData := config.CAData ++ if len(caCertData) == 0 { ++ log.Info("CAData is empty, fallbacking on CAFile") ++ cert, err := ioutil.ReadFile(config.CAFile) ++ if err != nil { ++ return errorsng.Wrapf( ++ err, "cannot read CA cert file (%s)", config.CAFile, ++ ) ++ } ++ caCertData = cert ++ } ++ saltClient, err := salt.NewClient(getAuthCredential(config), caCertData) ++ if err != nil { ++ return err ++ } ++ ++ r.recorder = mgr.GetEventRecorderFor("volume-controller") ++ r.salt = saltClient ++ r.devices = make(map[string]deviceInfo) ++ + return ctrl.NewControllerManagedBy(mgr). + For(&storagev1alpha1.Volume{}). +- Named("volume"). ++ Owns(&corev1.PersistentVolume{}). + Complete(r) + } diff --git a/tools/upgrade-operator-sdk/storage-operator/patches/volume_types.patch b/tools/upgrade-operator-sdk/storage-operator/patches/volume_types.patch new file mode 100644 index 0000000000..9efe3fa1b3 --- /dev/null +++ b/tools/upgrade-operator-sdk/storage-operator/patches/volume_types.patch @@ -0,0 +1,344 @@ +--- a/api/v1alpha1/volume_types.go ++++ b/api/v1alpha1/volume_types.go +@@ -1,5 +1,5 @@ + /* +-Copyright 2026. ++Copyright 2021. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. +@@ -17,32 +17,152 @@ + package v1alpha1 + + import ( ++ "errors" ++ "fmt" ++ ++ corev1 "k8s.io/api/core/v1" ++ "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ++ "k8s.io/apimachinery/pkg/types" + ) + + // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! + // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. + +-// VolumeSpec defines the desired state of Volume. ++type SparseLoopDeviceVolumeSource struct { ++ // Size of the generated sparse file backing the PersistentVolume. ++ Size resource.Quantity `json:"size"` ++} ++ ++type RawBlockDeviceVolumeSource struct { ++ // Path of the block device on the node to back the PersistentVolume. ++ DevicePath string `json:"devicePath"` ++} ++ ++type LVMLVSource struct { ++ // Name of the LVM VolumeGroup on the node to create the LVM LogicalVolume to back ++ // the PersistentVolume ++ VGName string `json:"vgName"` ++ // Size of the created LVM LogicalVolume backing the PersistentVolume ++ Size resource.Quantity `json:"size"` ++} ++ ++type VolumeSource struct { ++ SparseLoopDevice *SparseLoopDeviceVolumeSource `json:"sparseLoopDevice,omitempty"` ++ RawBlockDevice *RawBlockDeviceVolumeSource `json:"rawBlockDevice,omitempty"` ++ LVMLogicalVolume *LVMLVSource `json:"lvmLogicalVolume,omitempty"` ++} ++ ++// VolumeSpec defines the desired state of Volume + type VolumeSpec struct { + // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster + // Important: Run "make" to regenerate code after modifying this file + +- // Foo is an example field of Volume. Edit volume_types.go to remove/update +- Foo string `json:"foo,omitempty"` ++ // Name of the node on which the volume is available. ++ NodeName types.NodeName `json:"nodeName"` ++ ++ // Name of the StorageClass that gets assigned to the volume. Also, any ++ // mount options are copied from the StorageClass to the ++ // PersistentVolume if present. ++ StorageClassName string `json:"storageClassName"` ++ ++ // How the volume is intended to be consumed, either Block or Filesystem ++ // (default is Filesystem). ++ // +optional ++ // +kubebuilder:validation:Enum=Filesystem;Block ++ Mode corev1.PersistentVolumeMode `json:"mode,omitempty"` ++ ++ // Template for the underlying PersistentVolume. ++ // +optional ++ Template PersistentVolumeTemplateSpec `json:"template,omitempty"` ++ ++ VolumeSource `json:",inline"` ++} ++ ++// Describes the PersistentVolume that will be created to back the Volume. ++type PersistentVolumeTemplateSpec struct { ++ // Standard object's metadata. ++ // +optional ++ // +kubebuilder:pruning:PreserveUnknownFields ++ Metadata metav1.ObjectMeta `json:"metadata,omitempty"` ++ // Specification of the Persistent Volume. ++ // +optional ++ Spec corev1.PersistentVolumeSpec `json:"spec,omitempty"` ++} ++ ++type VolumePhase string ++ ++// TODO: kept for temporary compatibility, to be removed. ++// "Enum" representing the phase of a volume. ++const ( ++ VolumeFailed VolumePhase = "Failed" ++ VolumePending VolumePhase = "Pending" ++ VolumeAvailable VolumePhase = "Available" ++ VolumeTerminating VolumePhase = "Terminating" ++) ++ ++type ConditionReason string ++ ++// TODO: replace those by more fine-grained ones. ++// "Enum" representing the error codes of the Failed state. ++const ( ++ ReasonPending ConditionReason = "Pending" ++ ReasonTerminating ConditionReason = "Terminating" ++ ++ ReasonInternalError ConditionReason = "InternalError" ++ ReasonCreationError ConditionReason = "CreationError" ++ ReasonDestructionError ConditionReason = "DestructionError" ++ ReasonUnavailableError ConditionReason = "UnavailableError" ++) ++ ++type VolumeConditionType string ++ ++const ( ++ // VolumeReady means Volume is ready to be used. ++ VolumeReady VolumeConditionType = "Ready" ++) ++ ++type VolumeCondition struct { ++ // Type of volume condition. ++ // +kubebuilder:validation:Enum=Ready ++ Type VolumeConditionType `json:"type"` ++ // Status of the condition, one of True, False, Unknown. ++ // +kubebuilder:validation:Enum=True;False;Unknown ++ Status corev1.ConditionStatus `json:"status"` ++ // Last time the condition was updated (optional). ++ LastUpdateTime metav1.Time `json:"lastUpdateTime,omitempty"` ++ // Last time the condition transited from one status to another (optional). ++ LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"` ++ // Unique, one-word, CamelCase reason for the condition's last transition. ++ // +kubebuilder:validation:Enum=Pending;Terminating;InternalError;CreationError;DestructionError;UnavailableError ++ Reason ConditionReason `json:"reason,omitempty"` ++ // Human readable message indicating details about last transition. ++ Message string `json:"message,omitempty"` + } + +-// VolumeStatus defines the observed state of Volume. ++// VolumeStatus defines the observed state of Volume + type VolumeStatus struct { + // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster + // Important: Run "make" to regenerate code after modifying this file ++ ++ // List of conditions through which the Volume has or has not passed. ++ // +listType=map ++ // +listMapKey=type ++ Conditions []VolumeCondition `json:"conditions,omitempty"` ++ ++ // Job in progress ++ Job string `json:"job,omitempty"` ++ // Name of the underlying block device. ++ DeviceName string `json:"deviceName,omitempty"` + } + +-// +kubebuilder:object:root=true +-// +kubebuilder:subresource:status +-// +kubebuilder:resource:scope=Cluster ++//+kubebuilder:object:root=true ++//+kubebuilder:subresource:status ++//+kubebuilder:resource:scope=Cluster ++//+kubebuilder:printcolumn:name="Node",type="string",JSONPath=".spec.nodeName",description="The node on which the volume is available" ++//+kubebuilder:printcolumn:name="StorageClass",type="string",JSONPath=".spec.storageClassName",description="The storage class of the volume" + +-// Volume is the Schema for the volumes API. ++// Volume is the Schema for the volumes API + type Volume struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` +@@ -51,9 +171,171 @@ + Status VolumeStatus `json:"status,omitempty"` + } + +-// +kubebuilder:object:root=true ++// Set a condition for the volume. ++// ++// If a condition of this type already exists it is updated, otherwise a new ++// condition is added. ++// ++// Arguments ++// ++// kind: type of condition ++// status: status of the condition ++// reason: one-word, CamelCase reason for the transition (optional) ++// message: details about the transition (optional) ++func (self *Volume) SetCondition( ++ kind VolumeConditionType, ++ status corev1.ConditionStatus, ++ reason ConditionReason, ++ message string, ++) { ++ now := metav1.Now() ++ condition := VolumeCondition{ ++ Type: kind, ++ Status: status, ++ LastUpdateTime: now, ++ LastTransitionTime: now, ++ Reason: reason, ++ Message: message, ++ } ++ ++ for idx, cond := range self.Status.Conditions { ++ if cond.Type == kind { ++ // Don't update timestamps if status hasn't changed. ++ if cond.Status == condition.Status { ++ condition.LastTransitionTime = cond.LastTransitionTime ++ condition.LastUpdateTime = cond.LastUpdateTime ++ } ++ self.Status.Conditions[idx] = condition ++ return ++ } ++ } ++ self.Status.Conditions = append(self.Status.Conditions, condition) ++} ++ ++// Get the condition identified by `kind` for the volume. ++// ++// Return `nil` if no such condition exists on the volume. ++func (self *Volume) GetCondition(kind VolumeConditionType) *VolumeCondition { ++ for _, cond := range self.Status.Conditions { ++ if cond.Type == kind { ++ return &cond ++ } ++ } ++ return nil ++} ++ ++// Return the volume phase, computed from the Ready condition. ++func (self *Volume) ComputePhase() VolumePhase { ++ if ready := self.GetCondition(VolumeReady); ready != nil { ++ switch ready.Status { ++ case corev1.ConditionTrue: ++ return VolumeAvailable ++ case corev1.ConditionFalse: ++ return VolumeFailed ++ case corev1.ConditionUnknown: ++ if ready.Reason == ReasonPending { ++ return VolumePending ++ } ++ return VolumeTerminating ++ } ++ } ++ return "" ++} ++ ++// Update the volume status to Failed phase. ++// ++// Arguments ++// ++// reason: the error code that triggered failure. ++// format: the string format for the error message ++// args: values used in the error message ++func (self *Volume) SetFailedStatus( ++ reason ConditionReason, format string, args ...interface{}, ++) { ++ message := fmt.Sprintf(format, args...) ++ ++ // Don't overwrite `Job`: having JID around can help for debug. ++ self.SetCondition(VolumeReady, corev1.ConditionFalse, reason, message) ++} ++ ++// Update the volume status to Pending phase. ++// ++// Arguments ++// ++// job: job in progress ++func (self *Volume) SetPendingStatus(job string) { ++ self.SetCondition(VolumeReady, corev1.ConditionUnknown, ReasonPending, "") ++ self.Status.Job = job ++} ++ ++// Update the volume status to Available phase. ++func (self *Volume) SetAvailableStatus() { ++ self.SetCondition(VolumeReady, corev1.ConditionTrue, "", "") ++ self.Status.Job = "" ++} ++ ++// Update the volume status to Terminating phase. ++// ++// Arguments ++// ++// job: job in progress ++func (self *Volume) SetTerminatingStatus(job string) { ++ self.SetCondition(VolumeReady, corev1.ConditionUnknown, ReasonTerminating, "") ++ self.Status.Job = job ++} ++ ++// Check if a volume is valid. ++func (self *Volume) IsValid() error { ++ // Check if a type is specified. ++ if self.Spec.SparseLoopDevice == nil && ++ self.Spec.RawBlockDevice == nil && ++ self.Spec.LVMLogicalVolume == nil { ++ return errors.New("volume type not found in Volume Spec") ++ } ++ // Check if the size is strictly positive. ++ if self.Spec.SparseLoopDevice != nil { ++ if self.Spec.SparseLoopDevice.Size.Sign() <= 0 { ++ return fmt.Errorf( ++ "invalid SparseLoopDevice size (should be greater than 0): %s", ++ self.Spec.SparseLoopDevice.Size.String(), ++ ) ++ } ++ } else if self.Spec.LVMLogicalVolume != nil { ++ if self.Spec.LVMLogicalVolume.Size.Sign() <= 0 { ++ return fmt.Errorf( ++ "invalid LVM LogicalVolume size (should be greater than 0): %s", ++ self.Spec.LVMLogicalVolume.Size.String(), ++ ) ++ } ++ } ++ ++ // Default to Filesystem when mode is not specified. ++ if self.Spec.Mode == "" { ++ self.Spec.Mode = corev1.PersistentVolumeFilesystem ++ } ++ ++ return nil ++} ++ ++// Check if a volume is in an unrecoverable state. ++func (self *Volume) IsInUnrecoverableFailedState() *VolumeCondition { ++ // Only `UnavailableError` is recoverable. ++ if ready := self.GetCondition(VolumeReady); ready != nil { ++ if ready.Status == corev1.ConditionFalse && ++ ready.Reason != ReasonUnavailableError { ++ return ready ++ } ++ } ++ return nil ++} ++ ++func (self *Volume) IsFormatted() bool { ++ return self.Spec.Mode == corev1.PersistentVolumeFilesystem ++} ++ ++//+kubebuilder:object:root=true + +-// VolumeList contains a list of Volume. ++// VolumeList contains a list of Volume + type VolumeList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` diff --git a/tools/upgrade-operator-sdk/upgrade.py b/tools/upgrade-operator-sdk/upgrade.py new file mode 100755 index 0000000000..ec2801e419 --- /dev/null +++ b/tools/upgrade-operator-sdk/upgrade.py @@ -0,0 +1,797 @@ +#!/usr/bin/env python3 +"""Automates the upgrade of operator-sdk based projects. + +Scaffolds a fresh project, detects the latest Go and k8s.io patch +versions, restores custom code from a backup, applies GNU patch files, +and runs the build pipeline. + +Usage: + python3 scripts/upgrade-operator-sdk/upgrade.py \\ + --operator-dir [OPTIONS] + +Examples: + python3 scripts/upgrade-operator-sdk/upgrade.py \\ + --operator-dir operator \\ + scripts/upgrade-operator-sdk/operator + +Options: + --operator-dir Path to the operator project directory (required) + --skip-backup Skip the backup step (assumes .bak already exists) + --clean-tools Remove tool cache after the upgrade (forces re-download) + --yes, -y Skip the confirmation prompt + -h, --help Show this help message + +Environment variables: + GITHUB_TOKEN Optional; raises GitHub API rate limit from 60 + to 5000 req/hour for operator-sdk release checks. + +Requires: go, curl, patch, pyyaml (pip install pyyaml) +""" + +import argparse +import json +import os +import re +import shutil +import subprocess +import sys +import time +import urllib.error +import urllib.request +from pathlib import Path +from typing import Any, Final, NoReturn + +try: + import yaml +except ImportError: + print("pyyaml is required: pip install pyyaml", file=sys.stderr) + sys.exit(1) + +# --------------------------------------------------------------------------- +# Paths +# --------------------------------------------------------------------------- +TOOLS_BIN: Final = Path.home() / ".cache" / "upgrade-operator-sdk" / "bin" +_SDK_BIN: Final = TOOLS_BIN / "operator-sdk" + +_ENCODING: Final = "utf-8" + +# --------------------------------------------------------------------------- +# URLs & HTTP +# --------------------------------------------------------------------------- +_URL_OPERATOR_SDK_DOWNLOAD: Final = ( + "https://github.com/operator-framework/operator-sdk" + "/releases/download/{version}/operator-sdk_{goos}_{goarch}" +) +_GITHUB_RELEASES_URL: Final = "https://api.github.com/repos/{repo}/releases/latest" +_GITHUB_REPO_OPERATOR_SDK: Final = "operator-framework/operator-sdk" +_URL_GO_RELEASES: Final = "https://go.dev/dl/?mode=json&include=all" +_URL_GO_MODULE_VERSIONS: Final = "https://proxy.golang.org/{module}/@v/list" + +_HTTP_RETRIES: Final = 3 + +# k8s.io libraries bumped together (lock-step releases). +_K8S_LIBS: Final = ("k8s.io/api", "k8s.io/apimachinery", "k8s.io/client-go") +# We only query one module on the proxy — all three share the same version. +_K8S_LIB_MODULE: Final = _K8S_LIBS[0] + +# --------------------------------------------------------------------------- +# Logging +# --------------------------------------------------------------------------- +_GREEN: Final = "\033[32m" +_YELLOW: Final = "\033[33m" +_RED: Final = "\033[31m" +_BLUE_BOLD: Final = "\033[1;34m" +_BOLD: Final = "\033[1m" +_RESET: Final = "\033[0m" + + +def log_info(msg: str) -> None: + print(f"{_GREEN}[INFO]{_RESET} {msg}") + + +def log_warn(msg: str) -> None: + print(f"{_YELLOW}[WARN]{_RESET} {msg}", file=sys.stderr) + + +def log_error(msg: str) -> None: + print(f"{_RED}[ERROR]{_RESET} {msg}", file=sys.stderr) + + +def log_step(msg: str) -> None: + print(f"\n{_BLUE_BOLD}==>{_RESET} {_BOLD}{msg}{_RESET}") + + +def die(msg: str) -> NoReturn: + log_error(msg) + sys.exit(1) + + +# --------------------------------------------------------------------------- +# HTTP helpers +# --------------------------------------------------------------------------- + + +def _http_get(url: str, *, headers: dict[str, str] | None = None) -> bytes: + req = urllib.request.Request(url, headers=headers or {}) + for attempt in range(_HTTP_RETRIES): + try: + with urllib.request.urlopen(req, timeout=30) as resp: + data: bytes = resp.read() + return data + except urllib.error.HTTPError: + raise + except urllib.error.URLError as exc: + if attempt < _HTTP_RETRIES - 1: + delay = 2 ** (attempt + 1) + log_warn( + f"Request to {url} failed ({exc.reason}), " + f"retrying in {delay}s..." + ) + time.sleep(delay) + else: + raise + raise RuntimeError("_http_get: unreachable") + + +def _fetch_json(url: str) -> Any: + try: + return json.loads(_http_get(url)) + except urllib.error.HTTPError as e: + die(f"HTTP {e.code} for {url}") + except urllib.error.URLError as e: + die(f"Failed to fetch {url}: {e.reason}") + + +def _fetch_text(url: str) -> str: + try: + return _http_get(url).decode(_ENCODING) + except urllib.error.HTTPError as e: + die(f"HTTP {e.code} for {url}") + except urllib.error.URLError as e: + die(f"Failed to fetch {url}: {e.reason}") + + +def _github_headers() -> dict[str, str]: + """Return GitHub API headers. Uses GITHUB_TOKEN if set (raises rate limit).""" + headers: dict[str, str] = {} + token = os.environ.get("GITHUB_TOKEN") + if token: + headers["Authorization"] = f"Bearer {token}" + return headers + + +def _fetch_latest_github_release(repo: str) -> str: + """Return the latest release tag, or empty string on failure.""" + try: + data = json.loads( + _http_get( + _GITHUB_RELEASES_URL.format(repo=repo), + headers=_github_headers(), + ) + ) + return str(data["tag_name"]) + except (urllib.error.HTTPError, urllib.error.URLError) as e: + log_warn(f"Could not fetch latest release for {repo}: {e}") + return "" + + +# --------------------------------------------------------------------------- +# Config loading +# --------------------------------------------------------------------------- + + +def load_config(config_dir: str, operator_dir: str) -> dict[str, Any]: + """Load and validate the operator config from a directory.""" + d = Path(config_dir).resolve() + config_file = d / "config.yaml" + if not config_file.exists(): + die(f"Config file not found: {config_file}") + with config_file.open(encoding=_ENCODING) as f: + cfg: dict[str, Any] = yaml.safe_load(f) + + for key in ("repo", "domain", "apis", "operator_sdk_version"): + if key not in cfg: + die(f"Missing required key {key!r} in {config_file}") + + op = Path(operator_dir).resolve() + cfg["name"] = d.name + cfg["config_file"] = config_file + cfg["operator_dir"] = op + cfg["patches_dir"] = d / "patches" + cfg["backup_dir"] = op.parent / f"{op.name}.bak" + cfg.setdefault("raw_copy", []) + cfg.setdefault("delete", []) + cfg.setdefault("extra_commands", []) + + return cfg + + +# --------------------------------------------------------------------------- +# Version detection from scaffold go.mod +# --------------------------------------------------------------------------- + + +def _latest_go_patch(go_major_minor: str) -> str: + """Query go.dev for the latest stable patch of *go_major_minor*.""" + releases = _fetch_json(_URL_GO_RELEASES) + prefix = f"go{go_major_minor}." + return next( + ( + r["version"] + for r in releases + if r["version"].startswith(prefix) and r.get("stable") + ), + f"go{go_major_minor}.0", + ) + + +def _latest_k8s_patch(base_version: str) -> str: + """Query Go module proxy for the latest stable k8s.io patch.""" + m = re.match(r"(v\d+\.\d+)\.", base_version) + if not m: + return base_version + prefix = m.group(1) + "." + content = _fetch_text(_URL_GO_MODULE_VERSIONS.format(module=_K8S_LIB_MODULE)) + # Filter stable releases only (skip pre-releases containing "-"). + candidates = [ + v.strip() + for v in content.splitlines() + if v.strip().startswith(prefix) and "-" not in v.strip() + ] + if not candidates: + return base_version + + def _patch_num(v: str) -> int: + try: + return int(v.rsplit(".", 1)[-1]) + except ValueError: + return -1 + + return max(candidates, key=_patch_num) + + +def detect_latest_patches(cfg: dict[str, Any]) -> dict[str, str]: + """Read the scaffold go.mod and resolve latest patch versions. + + Also checks the latest operator-sdk release on GitHub. + """ + log_step("Detecting latest available versions") + detected: dict[str, str] = {} + + # operator-sdk: latest GitHub release + log_info("Querying GitHub for latest operator-sdk release...") + latest_sdk = _fetch_latest_github_release(_GITHUB_REPO_OPERATOR_SDK) + if latest_sdk: + detected["operator_sdk_version"] = latest_sdk + log_info(f" operator-sdk: {latest_sdk}") + + # Go and k8s.io: from scaffold go.mod + gomod_path = cfg["operator_dir"] / "go.mod" + if not gomod_path.exists(): + die(f"go.mod not found at {gomod_path}") + gomod = gomod_path.read_text(encoding=_ENCODING) + + m_go = re.search(r"^go\s+(\d+\.\d+)", gomod, re.MULTILINE) + if not m_go: + die("Failed to parse Go version from scaffold go.mod") + go_major_minor = m_go.group(1) + + log_info(f"Scaffold Go version: {go_major_minor}") + log_info("Querying go.dev for latest patch...") + go_toolchain = _latest_go_patch(go_major_minor) + detected["go_toolchain"] = go_toolchain + log_info(f" Go toolchain: {go_toolchain}") + + m_k8s = re.search(r"k8s\.io/api\s+(v\S+)", gomod) + if m_k8s: + k8s_base = m_k8s.group(1) + log_info(f"Scaffold k8s.io/api: {k8s_base}") + log_info("Querying module proxy for latest patch...") + k8s_libs = _latest_k8s_patch(k8s_base) + detected["k8s_libs"] = k8s_libs + log_info(f" k8s.io libs: {k8s_libs}") + + return detected + + +def reconcile_versions( + cfg: dict[str, Any], + detected: dict[str, str], +) -> None: + """Compare detected versions with YAML pins. + + - No pin: use detected, log info. + - Pin < detected: warn (newer available), keep pinned. + - Pin == detected: all good. + - Pin > detected: warn (unusual), use detected. + + Never modifies the YAML file. Zero interactive input -- CI-safe. + """ + log_step("Reconciling versions") + + for key in ("operator_sdk_version", "go_toolchain", "k8s_libs"): + found = detected.get(key, "") + if not found: + continue + pinned = cfg.get(key, "") + + if not pinned: + log_info(f" {key}: {found} (detected, not pinned)") + cfg[key] = found + elif found == pinned: + log_info(f" {key}: {pinned} (up to date)") + elif found > pinned: # lexicographic, works for semver + log_warn(f" {key}: pinned {pinned}, newer {found} available") + cfg[key] = pinned + else: + log_warn(f" {key}: pinned {pinned} > detected {found}, using detected") + cfg[key] = found + + +def confirm_upgrade(cfg: dict[str, Any]) -> None: + """Print config summary and ask the user to confirm.""" + print() + print(f"{_BOLD}The following upgrade will be performed:{_RESET}") + print() + print(f" operator-sdk {cfg['operator_sdk_version']}") + print() + print(f" Target: {cfg['name']}") + print(f" Operator dir: {cfg['operator_dir']}") + print() + answer = input(f"{_BOLD}Proceed? [y/N] {_RESET}").strip().lower() + if answer not in ("y", "yes"): + log_info("Aborted by user.") + sys.exit(0) + + +# --------------------------------------------------------------------------- +# Process execution +# --------------------------------------------------------------------------- + + +def _tool_env(cfg: dict[str, Any]) -> dict[str, str]: + env: dict[str, str] = { + "PATH": f"{TOOLS_BIN}:{os.environ.get('PATH', '')}", + } + # Set after reconcile_versions() resolves the latest patch. + if cfg.get("go_toolchain"): + env["GOTOOLCHAIN"] = cfg["go_toolchain"] + return env + + +def run( + cmd: list[str], + cfg: dict[str, Any], + *, + cwd: Path | None = None, + check: bool = True, + capture: bool = False, +) -> subprocess.CompletedProcess[Any]: + merged_env = {**os.environ, **_tool_env(cfg)} + kwargs: dict[str, Any] = {"cwd": cwd, "env": merged_env} + if capture: + kwargs["capture_output"] = True + kwargs["text"] = True + return subprocess.run(cmd, check=check, **kwargs) + + +# =================================================================== +# Phase 0 — Install tools +# =================================================================== + + +def _check_prerequisites() -> None: + missing = [tool for tool in ("go", "curl", "patch") if shutil.which(tool) is None] + if missing: + die(f"Required tools not found in PATH: {', '.join(missing)}") + + +def install_operator_sdk(cfg: dict[str, Any]) -> None: + version = cfg["operator_sdk_version"] + log_step(f"Installing operator-sdk {version}") + TOOLS_BIN.mkdir(parents=True, exist_ok=True) + + if _SDK_BIN.exists(): + result = run([str(_SDK_BIN), "version"], cfg, capture=True, check=False) + if version.lstrip("v") in result.stdout: + log_info("Already installed") + return + + goos = run(["go", "env", "GOOS"], cfg, capture=True).stdout.strip() + goarch = run(["go", "env", "GOARCH"], cfg, capture=True).stdout.strip() + url = _URL_OPERATOR_SDK_DOWNLOAD.format(version=version, goos=goos, goarch=goarch) + log_info(f"Downloading for {goos}/{goarch}...") + run(["curl", "-sSLo", str(_SDK_BIN), url], cfg) + _SDK_BIN.chmod(0o755) + ver = ( + run([str(_SDK_BIN), "version"], cfg, capture=True).stdout.strip().split("\n")[0] + ) + log_info(f"Installed: {ver}") + + +# =================================================================== +# Phase 1 — Backup +# =================================================================== + + +def backup_operator(cfg: dict[str, Any]) -> None: + op_dir: Path = cfg["operator_dir"] + bak: Path = cfg["backup_dir"] + log_step(f"Phase 1: Backing up {cfg['name']}") + + if bak.exists(): + log_warn(f"Removing existing backup {bak}") + shutil.rmtree(bak) + if not op_dir.exists(): + die(f"{op_dir} does not exist") + + op_dir.rename(bak) + log_info(f"{op_dir} -> {bak}") + + +# =================================================================== +# Phase 2 — Scaffold fresh project +# =================================================================== + + +def scaffold_project(cfg: dict[str, Any]) -> None: + op_dir: Path = cfg["operator_dir"] + sdk = str(_SDK_BIN) + log_step(f"Phase 2: Scaffolding {cfg['name']}") + + op_dir.mkdir(parents=True, exist_ok=True) + + run( + [ + sdk, + "init", + "--domain", + cfg["domain"], + "--repo", + cfg["repo"], + "--project-name", + cfg["name"], + ], + cfg, + cwd=op_dir, + ) + + for api in cfg["apis"]: + _create_api(op_dir, sdk, api, cfg) + + _delete_scaffold_files(cfg) + + log_info("Scaffold complete") + + +def _delete_scaffold_files(cfg: dict[str, Any]) -> None: + """Remove files/dirs listed in the 'delete' config field.""" + op_dir: Path = cfg["operator_dir"] + for rel_path in cfg.get("delete", []): + target = op_dir / rel_path + if not target.exists(): + continue + if target.is_dir(): + shutil.rmtree(target) + else: + target.unlink() + log_info(f" Deleted {rel_path}") + + +def _create_api( + op_dir: Path, sdk: str, api: dict[str, Any], cfg: dict[str, Any] +) -> None: + group = api.get("group", "") + version = api["version"] + kind = api["kind"] + tail = ["--version", version, "--kind", kind, "--resource", "--controller"] + + if not api.get("namespaced", True): + tail.append("--namespaced=false") + + cmd = [sdk, "create", "api"] + if group: + cmd += ["--group", group] + cmd += tail + + result = run(cmd, cfg, cwd=op_dir, check=False) + if result.returncode != 0: + die(f"Failed to create API {kind} (exit {result.returncode})") + + log_info(f"Created {kind} API (group={group!r})") + + +# =================================================================== +# Phase 3 — Restore custom code from backup +# =================================================================== + + +def restore_backup(cfg: dict[str, Any]) -> None: + """Copy raw_copy entries from backup into the scaffold output. + + Entries are directories or files that are purely custom (not + generated by operator-sdk). The scaffold version is replaced + entirely for directories. + + For files: if the file already exists in the scaffold, an error is + raised with a diff. For directories: the scaffold directory is + replaced entirely. + """ + op_dir: Path = cfg["operator_dir"] + bak: Path = cfg["backup_dir"] + log_step(f"Phase 3: Restoring custom code for {cfg['name']}") + + count = 0 + for rel_path in cfg["raw_copy"]: + src = bak / rel_path + dst = op_dir / rel_path + + if not src.exists(): + die(f" {rel_path} not found in backup at {src}") + + if src.is_dir(): + if dst.exists(): + # Compare scaffold vs backup; skip if identical, error if different. + result = subprocess.run( + ["diff", "-rq", str(dst), str(src)], + capture_output=True, + text=True, + ) + if result.returncode == 0: + n = sum(1 for _ in dst.rglob("*") if _.is_file()) + log_info(f" {rel_path} ({n} files, identical to scaffold)") + count += n + continue + log_error(f" {rel_path} exists in scaffold with different content") + subprocess.run( + ["diff", "-ru", str(dst), str(src)], + capture_output=False, + ) + die( + f"Conflict in {rel_path}. Update the directory in " + ".bak/ then re-run with --skip-backup." + ) + shutil.copytree(src, dst) + n = sum(1 for _ in dst.rglob("*") if _.is_file()) + log_info(f" {rel_path} ({n} files)") + count += n + else: + if dst.exists(): + log_error(f" {rel_path} exists in both scaffold and backup") + result = subprocess.run( + ["diff", "-u", str(dst), str(src)], + capture_output=True, + text=True, + ) + if result.stdout: + print(result.stdout) + die( + f"Conflict: {rel_path}. Update the file in .bak/ " + "then re-run with --skip-backup." + ) + dst.parent.mkdir(parents=True, exist_ok=True) + shutil.copy(src, dst) + log_info(f" {rel_path}") + count += 1 + + log_info(f"Custom code restored: {count} file(s)") + + +# =================================================================== +# Phase 4 — Apply patches and placeholder substitutions +# =================================================================== + + +def adapt_project(cfg: dict[str, Any]) -> None: + """Apply patch files and substitute placeholders.""" + _apply_patches(cfg) + _substitute_placeholders(cfg) + + +def _apply_patches(cfg: dict[str, Any]) -> None: + patch_dir: Path = cfg["patches_dir"] + op_dir: Path = cfg["operator_dir"] + + if not patch_dir.is_dir(): + log_warn(f"No patch directory found at {patch_dir}") + return + + for patch_file in sorted(patch_dir.glob("*.patch")): + log_info(f"Applying {patch_file.name}...") + result = subprocess.run( + [ + "patch", + "-p1", + "--forward", + "--no-backup-if-mismatch", + "-i", + str(patch_file), + ], + cwd=op_dir, + capture_output=True, + text=True, + ) + if result.returncode != 0: + log_warn( + f"{patch_file.name} did not apply cleanly " + f"(exit {result.returncode}); resolve manually:\n" + f" {result.stdout.strip()}" + ) + else: + log_info(f" {patch_file.name} applied") + + +def _substitute_placeholders(cfg: dict[str, Any]) -> None: + op_dir: Path = cfg["operator_dir"] + + makefile = op_dir / "Makefile" + if makefile.exists(): + text = makefile.read_text(encoding=_ENCODING) + if cfg.get("go_toolchain"): + text = text.replace("__GOTOOLCHAIN__", cfg["go_toolchain"]) + makefile.write_text(text, encoding=_ENCODING) + + log_info("Placeholders substituted") + + +# =================================================================== +# Phase 5 — Generate +# =================================================================== + + +def generate(cfg: dict[str, Any]) -> None: + op_dir: Path = cfg["operator_dir"] + log_step(f"Phase 5: Generate & verify {cfg['name']}") + + # Remove scaffold-generated bin/ to force fresh tool downloads + # via the Makefile (kustomize, controller-gen, etc.). + bin_dir = op_dir / "bin" + if bin_dir.exists(): + shutil.rmtree(bin_dir) + + if cfg.get("k8s_libs"): + k8s_get = [f"{lib}@{cfg['k8s_libs']}" for lib in _K8S_LIBS] + log_info(f"Pinning k8s.io libs to {cfg['k8s_libs']}...") + run(["go", "get", *k8s_get], cfg, cwd=op_dir) + + steps: list[tuple[str, list[str]]] = [ + ("go mod tidy...", ["go", "mod", "tidy"]), + ("make manifests generate...", ["make", "manifests", "generate"]), + ("make fmt vet...", ["make", "fmt", "vet"]), + ] + for msg, cmd in steps: + log_info(msg) + run(cmd, cfg, cwd=op_dir) + + for extra in cfg.get("extra_commands", []): + log_info(f"Running {' '.join(extra)}...") + run(extra, cfg, cwd=op_dir) + + log_info(f"Build succeeded for {cfg['name']}") + + +# =================================================================== +# Cleanup +# =================================================================== + + +def _clean_tools() -> None: + """Remove the tool cache.""" + if TOOLS_BIN.exists(): + log_step(f"Cleaning tool cache ({TOOLS_BIN})") + shutil.rmtree(TOOLS_BIN) + log_info(f"Removed {TOOLS_BIN}") + else: + log_info("Tool cache already empty") + + +# =================================================================== +# Recovery +# =================================================================== + + +def _log_recovery_hint(cfg: dict[str, Any]) -> None: + op_dir: Path = cfg["operator_dir"] + bak: Path = cfg["backup_dir"] + log_error(f"Processing of '{cfg['name']}' was interrupted or failed") + if bak.exists(): + log_warn(f"Backup preserved at: {bak}") + if op_dir.exists() and bak.exists(): + log_warn("To restore the original state:") + log_warn(f" rm -rf {op_dir} && mv {bak} {op_dir}") + elif bak.exists(): + log_warn(f"To restore: mv {bak} {op_dir}") + + +# =================================================================== +# Main +# =================================================================== + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Upgrade an operator-sdk project by scaffolding " + "fresh and applying patches from a config directory.", + ) + parser.add_argument( + "config_dir", + help="Path to config directory containing config.yaml " "and patches/", + ) + parser.add_argument( + "--operator-dir", + required=True, + help="Path to the operator project directory", + ) + parser.add_argument( + "--skip-backup", + action="store_true", + help="Skip backup (assumes .bak exists)", + ) + parser.add_argument( + "--clean-tools", + action="store_true", + help="Remove tool cache after upgrade", + ) + parser.add_argument( + "--yes", + "-y", + action="store_true", + help="Skip the confirmation prompt", + ) + args = parser.parse_args() + + _check_prerequisites() + + cfg = load_config(args.config_dir, args.operator_dir) + + if not args.yes: + confirm_upgrade(cfg) + + log_step(f"Operator SDK Upgrade -> {cfg['operator_sdk_version']}") + + install_operator_sdk(cfg) + + op_dir: Path = cfg["operator_dir"] + bak: Path = cfg["backup_dir"] + + if not args.skip_backup: + backup_operator(cfg) + else: + log_info("Skipping backup (--skip-backup)") + if not bak.exists(): + die( + f"{bak} does not exist; cannot use --skip-backup " + "without an existing backup directory" + ) + if op_dir.exists(): + shutil.rmtree(op_dir) + + try: + scaffold_project(cfg) # Phase 2 + detected = detect_latest_patches(cfg) # Phase 2b + reconcile_versions(cfg, detected) # compare & pin + log_info( + f"Using: Go {cfg.get('go_toolchain', 'scaffold')}" + f", k8s.io {cfg.get('k8s_libs', 'scaffold')}" + ) + restore_backup(cfg) # Phase 3 + adapt_project(cfg) # Phase 4 + generate(cfg) # Phase 5 + except BaseException: + _log_recovery_hint(cfg) + raise + + if args.clean_tools: + _clean_tools() + + log_step("Upgrade complete!") + print() + log_info(f"Backup preserved at: {bak}/") + print() + log_info("Recommended next steps:") + log_info(" 1. git diff") + log_info(f" 2. cd {cfg['name']} && make test") + + +if __name__ == "__main__": + main()