Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ type KubernetesApplicationSpec struct {
// The label will be configured to variant manifests used to distinguish them.
VariantLabel KubernetesVariantLabel `json:"variantLabel"`

// TODO: Define fields for KubernetesApplicationSpec.
}

func (s *KubernetesApplicationSpec) UnmarshalJSON(data []byte) error {
Expand Down Expand Up @@ -201,6 +200,32 @@ type K8sCanaryRolloutStageOptions struct {
// K8sCanaryCleanStageOptions contains all configurable values for a K8S_CANARY_CLEAN stage.
type K8sCanaryCleanStageOptions struct{}

// K8sPrimaryRolloutStageOptions contains all configurable values for a K8S_PRIMARY_ROLLOUT stage.
type K8sPrimaryRolloutStageOptions struct {
// Suffix that should be used when naming the PRIMARY variant's resources.
// Default is "primary".
Suffix string `json:"suffix" default:"primary"`
// Whether the PRIMARY service should be created.
CreateService bool `json:"createService"`
// Whether the PRIMARY variant label should be added to manifests if they were missing.
AddVariantLabelToSelector bool `json:"addVariantLabelToSelector"`
// Whether the resources that are no longer defined in Git should be removed or not.
Prune bool `json:"prune"`
}

func (o *K8sPrimaryRolloutStageOptions) UnmarshalJSON(data []byte) error {
type alias K8sPrimaryRolloutStageOptions
var a alias
if err := json.Unmarshal(data, &a); err != nil {
return err
}
*o = K8sPrimaryRolloutStageOptions(a)
if err := defaults.Set(o); err != nil {
return err
}
return nil
}

// K8sResourcePatch represents a patch operation for a Kubernetes resource.
type K8sResourcePatch struct {
// The target of the patch operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ const (
StageK8sMultiCanaryRollout = "K8S_CANARY_ROLLOUT"
// StageK8sMultiCanaryClean represents the state where all canary resources should be removed.
StageK8sMultiCanaryClean = "K8S_CANARY_CLEAN"
// StageK8sMultiPrimaryRollout represents the state where the new version is promoted as PRIMARY to all targets.
StageK8sMultiPrimaryRollout = "K8S_PRIMARY_ROLLOUT"
)

var allStages = []string{
StageK8sMultiSync,
StageK8sMultiRollback,
StageK8sMultiCanaryRollout,
StageK8sMultiCanaryClean,
StageK8sMultiPrimaryRollout,
}

const (
Expand All @@ -48,6 +51,8 @@ const (
StageDescriptionK8sMultiCanaryRollout = "Rollout the new version as CANARY to all targets"
// StageDescriptionK8sMultiCanaryClean represents the description of the K8sCanaryClean stage.
StageDescriptionK8sMultiCanaryClean = "Remove all canary resources"
// StageDescriptionK8sMultiPrimaryRollout represents the description of the K8sPrimaryRollout stage.
StageDescriptionK8sMultiPrimaryRollout = "Rollout the new version as PRIMARY to all targets"
)

func buildQuickSyncPipeline(autoRollback bool) []sdk.QuickSyncStage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ func (p *Plugin) ExecuteStage(ctx context.Context, _ *sdk.ConfigNone, dts []*sdk
return &sdk.ExecuteStageResponse{
Status: p.executeK8sMultiCanaryCleanStage(ctx, input, dts),
}, nil
case StageK8sMultiPrimaryRollout:
return &sdk.ExecuteStageResponse{Status: p.executeK8sMultiPrimaryRolloutStage(ctx, input, dts)}, nil
default:
return nil, errors.New("unimplemented or unsupported stage")
}
Expand Down
263 changes: 263 additions & 0 deletions pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/primary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
// Copyright 2025 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package deployment

import (
"cmp"
"context"
"encoding/json"
"fmt"
"time"

"golang.org/x/sync/errgroup"

sdk "github.com/pipe-cd/piped-plugin-sdk-go"

kubeconfig "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/config"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/toolregistry"
)

func (p *Plugin) executeK8sMultiPrimaryRolloutStage(ctx context.Context, input *sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec], dts []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]) sdk.StageStatus {
lp := input.Client.LogPersister()

cfg, err := input.Request.TargetDeploymentSource.AppConfig()
if err != nil {
lp.Errorf("Failed while decoding application config (%v)", err.Error())
return sdk.StageStatusFailure
}

var stageCfg kubeconfig.K8sPrimaryRolloutStageOptions
if len(input.Request.StageConfig) > 0 {
if err := json.Unmarshal(input.Request.StageConfig, &stageCfg); err != nil {
lp.Errorf("Failed while unmarshalling stage config (%v)", err)
return sdk.StageStatusFailure
}
}

type targetConfig struct {
deployTarget *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]
multiTarget *kubeconfig.KubernetesMultiTarget
}

deployTargetMap := make(map[string]*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig])
targetConfigs := make([]targetConfig, 0, len(dts))

for _, target := range dts {
deployTargetMap[target.Name] = target
}

// If no multi-targets are specified, roll out primary to all deploy targets.
if len(cfg.Spec.Input.MultiTargets) == 0 {
for _, dt := range dts {
targetConfigs = append(targetConfigs, targetConfig{
deployTarget: dt,
multiTarget: nil,
})
}
} else {
for _, multiTarget := range cfg.Spec.Input.MultiTargets {
dt, ok := deployTargetMap[multiTarget.Target.Name]
if !ok {
lp.Infof("Ignore multi target '%s': not matched any deployTarget", multiTarget.Target.Name)
continue
}
targetConfigs = append(targetConfigs, targetConfig{
deployTarget: dt,
multiTarget: &multiTarget,
})
}
}

eg, ctx := errgroup.WithContext(ctx)
for _, tc := range targetConfigs {
eg.Go(func() error {
lp.Infof("Start primary rollout for target %s", tc.deployTarget.Name)
status := p.primaryRollout(ctx, input, tc.deployTarget, tc.multiTarget, stageCfg)
if status == sdk.StageStatusFailure {
return fmt.Errorf("failed to primary rollout for target %s", tc.deployTarget.Name)
}
return nil
})
}

if err := eg.Wait(); err != nil {
lp.Errorf("Failed while rolling out primary (%v)", err)
return sdk.StageStatusFailure
}

return sdk.StageStatusSuccess
}

func (p *Plugin) primaryRollout(
ctx context.Context,
input *sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec],
dt *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig],
multiTarget *kubeconfig.KubernetesMultiTarget,
stageCfg kubeconfig.K8sPrimaryRolloutStageOptions,
) sdk.StageStatus {
lp := input.Client.LogPersister()

cfg, err := input.Request.TargetDeploymentSource.AppConfig()
if err != nil {
lp.Errorf("Failed while loading application config (%v)", err)
return sdk.StageStatusFailure
}

var (
appCfg = cfg.Spec
variantLabel = appCfg.VariantLabel.Key
primaryVariant = appCfg.VariantLabel.PrimaryValue
)

toolRegistry := toolregistry.NewRegistry(input.Client.ToolRegistry())
loader := provider.NewLoader(toolRegistry)

lp.Infof("Loading manifests at commit %s for handling", input.Request.TargetDeploymentSource.CommitHash)
manifests, err := p.loadManifests(ctx, &input.Request.Deployment, cfg.Spec, &input.Request.TargetDeploymentSource, loader, input.Logger, multiTarget)
if err != nil {
lp.Errorf("Failed while loading manifests (%v)", err)
return sdk.StageStatusFailure
}
lp.Successf("Successfully loaded %d manifests", len(manifests))

if len(manifests) == 0 {
lp.Error("This application has no Kubernetes manifests to handle")
return sdk.StageStatusFailure
}

// Generate the manifests for applying.
lp.Info("Start generating manifests for PRIMARY variant")
primaryManifests, err := generatePrimaryManifests(appCfg, manifests, stageCfg, variantLabel, primaryVariant)
if err != nil {
lp.Errorf("Unable to generate manifests for PRIMARY variant (%v)", err)
return sdk.StageStatusFailure
}
lp.Successf("Successfully generated %d manifests for PRIMARY variant", len(primaryManifests))

addVariantLabelsAndAnnotations(primaryManifests, variantLabel, primaryVariant)

if err := annotateConfigHash(primaryManifests); err != nil {
lp.Errorf("Unable to set %q annotation into the workload manifest (%v)", provider.AnnotationConfigHash, err)
return sdk.StageStatusFailure
}

deployTargetConfig := dt.Config

// Resolve kubectl version: multiTarget > spec > deployTarget
kubectlVersion := cmp.Or(appCfg.Input.KubectlVersion, deployTargetConfig.KubectlVersion)
if multiTarget != nil {
kubectlVersion = cmp.Or(multiTarget.KubectlVersion, kubectlVersion)
}

kubectlPath, err := toolRegistry.Kubectl(ctx, kubectlVersion)
if err != nil {
lp.Errorf("Failed while getting kubectl tool (%v)", err)
return sdk.StageStatusFailure
}

kubectl := provider.NewKubectl(kubectlPath)
applier := provider.NewApplier(kubectl, appCfg.Input, deployTargetConfig, input.Logger)

lp.Info("Start rolling out PRIMARY variant...")
if err := applyManifests(ctx, applier, primaryManifests, appCfg.Input.Namespace, lp); err != nil {
lp.Errorf("Failed while applying manifests (%v)", err)
return sdk.StageStatusFailure
}

if !stageCfg.Prune {
lp.Info("Resource GC was skipped because prune was not configured")
return sdk.StageStatusSuccess
}

// Wait for all applied manifests to be stable.
// In theory, we don't need to wait for them to be stable before going to the next step
// but waiting for a while reduces the number of Kubernetes changes in a short time.
lp.Info("Waiting for the applied manifests to be stable")
select {
case <-time.After(15 * time.Second):
break
case <-ctx.Done():
break
}

// Find the running resources that are not defined in Git.
lp.Info("Start finding all running PRIMARY resources but no longer defined in Git")
namespacedLiveResources, clusterScopedLiveResources, err := provider.GetLiveResources(ctx, kubectl, deployTargetConfig.KubeConfigPath, input.Request.Deployment.ApplicationID, fmt.Sprintf("%s=%s", variantLabel, primaryVariant))
if err != nil {
lp.Errorf("Failed while getting live resources (%v)", err)
return sdk.StageStatusFailure
}

if len(namespacedLiveResources)+len(clusterScopedLiveResources) == 0 {
lp.Info("There is no data about live resource so no resource will be removed")
return sdk.StageStatusSuccess
}

lp.Successf("Successfully loaded %d live resources", len(namespacedLiveResources)+len(clusterScopedLiveResources))

removeKeys := provider.FindRemoveResources(primaryManifests, namespacedLiveResources, clusterScopedLiveResources)
if len(removeKeys) == 0 {
lp.Info("There are no live resources should be removed")
return sdk.StageStatusSuccess
}

lp.Infof("Start pruning %d resources", len(removeKeys))
deletedCount := deleteResources(ctx, lp, applier, removeKeys)
lp.Successf("Successfully deleted %d resources", deletedCount)

return sdk.StageStatusSuccess
}

// generatePrimaryManifests generates manifests for the PRIMARY variant.
// It deep-copies the input manifests, adds the variant label to workload selectors
// if requested, and generates a variant Service manifest if requested.
func generatePrimaryManifests(appCfg *kubeconfig.KubernetesApplicationSpec, manifests []provider.Manifest, stageCfg kubeconfig.K8sPrimaryRolloutStageOptions, variantLabel, variant string) ([]provider.Manifest, error) {
suffix := variant
if stageCfg.Suffix != "" {
suffix = stageCfg.Suffix
}

primaryManifests := provider.DeepCopyManifests(manifests)

// Add the variant label to workload selectors if requested.
if stageCfg.AddVariantLabelToSelector {
workloads := findWorkloadManifests(primaryManifests, nil)
for _, m := range workloads {
if err := ensureVariantSelectorInWorkload(m, variantLabel, variant); err != nil {
return nil, fmt.Errorf("unable to check/set %q in selector of workload %s (%w)", variantLabel+": "+variant, m.Key().ReadableString(), err)
}
}
}

// Generate Service manifests for the PRIMARY variant if requested.
if stageCfg.CreateService {
serviceName := appCfg.Service.Name
services := findManifests(provider.KindService, serviceName, primaryManifests)
if len(services) == 0 {
return nil, fmt.Errorf("unable to find any service for PRIMARY variant")
}
// Deep-copy the services to avoid mutating the shared primaryManifests slice entries.
services = provider.DeepCopyManifests(services)

generatedServices, err := generateVariantServiceManifests(services, variantLabel, variant, suffix)
if err != nil {
return nil, fmt.Errorf("failed to generate service manifests: %w", err)
}
primaryManifests = append(primaryManifests, generatedServices...)
}

return primaryManifests, nil
}
Loading
Loading