-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathgenplan.go
More file actions
148 lines (127 loc) · 4.64 KB
/
genplan.go
File metadata and controls
148 lines (127 loc) · 4.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// Unless explicitly stated otherwise all files in this repository are licensed under the MIT License.
//
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2024 Datadog, Inc.
package controller
import (
"context"
"fmt"
"github.com/go-logr/logr"
temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
"github.com/temporalio/temporal-worker-controller/internal/k8s"
"github.com/temporalio/temporal-worker-controller/internal/planner"
"github.com/temporalio/temporal-worker-controller/internal/temporal"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
)
// plan holds the actions to execute during reconciliation
type plan struct {
// Where to take actions
TemporalNamespace string
WorkerDeploymentName string
// Which actions to take
DeleteDeployments []*appsv1.Deployment
CreateDeployment *appsv1.Deployment
ScaleDeployments map[*corev1.ObjectReference]uint32
UpdateDeployments []*appsv1.Deployment
// Register new versions as current or with ramp
UpdateVersionConfig *planner.VersionConfig
// Start a workflow
startTestWorkflows []startWorkflowConfig
// Build IDs of versions from which the controller should
// remove IgnoreLastModifierKey from the version metadata
RemoveIgnoreLastModifierBuilds []string
}
// startWorkflowConfig defines a workflow to be started
type startWorkflowConfig struct {
workflowType string
workflowID string
buildID string
taskQueue string
}
// generatePlan creates a plan for the controller to execute
func (r *TemporalWorkerDeploymentReconciler) generatePlan(
ctx context.Context,
l logr.Logger,
w *temporaliov1alpha1.TemporalWorkerDeployment,
connection temporaliov1alpha1.TemporalConnectionSpec,
temporalState *temporal.TemporalWorkerState,
) (*plan, error) {
workerDeploymentName := k8s.ComputeWorkerDeploymentName(w)
targetBuildID := k8s.ComputeBuildID(w)
// Fetch Kubernetes deployment state
k8sState, err := k8s.GetDeploymentState(
ctx,
r.Client,
w.Namespace,
w.Name,
workerDeploymentName,
)
if err != nil {
return nil, fmt.Errorf("unable to get Kubernetes deployment state: %w", err)
}
// Create a simple plan structure
plan := &plan{
TemporalNamespace: w.Spec.WorkerOptions.TemporalNamespace,
WorkerDeploymentName: workerDeploymentName,
ScaleDeployments: make(map[*corev1.ObjectReference]uint32),
}
// Check if we need to force manual strategy due to external modification
rolloutStrategy := w.Spec.RolloutStrategy
if w.Status.LastModifierIdentity != getControllerIdentity() &&
w.Status.LastModifierIdentity != "" &&
!temporalState.IgnoreLastModifier {
l.Info(fmt.Sprintf("Forcing Manual rollout strategy since Worker Deployment was modified by a user with a different identity '%s'; to allow controller to make changes again, set 'temporal.io/ignore-last-modifier=true' in the metadata of your Current or Ramping Version; see ownership runbook at docs/ownership.md for more details.", w.Status.LastModifierIdentity))
rolloutStrategy.Strategy = temporaliov1alpha1.UpdateManual
}
// Generate the plan using the planner package
plannerConfig := &planner.Config{
RolloutStrategy: rolloutStrategy,
}
planResult, err := planner.GeneratePlan(
l,
k8sState,
&w.Status,
&w.Spec,
temporalState,
connection,
plannerConfig,
workerDeploymentName,
r.MaxDeploymentVersionsIneligibleForDeletion,
)
if err != nil {
return nil, fmt.Errorf("error generating plan: %w", err)
}
// Convert planner result to controller plan
plan.DeleteDeployments = planResult.DeleteDeployments
plan.ScaleDeployments = planResult.ScaleDeployments
plan.UpdateDeployments = planResult.UpdateDeployments
// Convert version config
plan.UpdateVersionConfig = planResult.VersionConfig
plan.RemoveIgnoreLastModifierBuilds = planResult.RemoveIgnoreLastModifierBuilds
// Convert test workflows
for _, wf := range planResult.TestWorkflows {
plan.startTestWorkflows = append(plan.startTestWorkflows, startWorkflowConfig{
workflowType: wf.WorkflowType,
workflowID: wf.WorkflowID,
buildID: wf.BuildID,
taskQueue: wf.TaskQueue,
})
}
// Handle deployment creation if needed
if planResult.ShouldCreateDeployment {
d, err := r.newDeployment(w, targetBuildID, connection)
if err != nil {
return nil, err
}
plan.CreateDeployment = d
}
return plan, nil
}
// Create a new deployment with owner reference
func (r *TemporalWorkerDeploymentReconciler) newDeployment(
w *temporaliov1alpha1.TemporalWorkerDeployment,
buildID string,
connection temporaliov1alpha1.TemporalConnectionSpec,
) (*appsv1.Deployment, error) {
return k8s.NewDeploymentWithControllerRef(w, buildID, connection, r.Scheme)
}