forked from awslabs/mountpoint-s3-csi-driver
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcreator.go
More file actions
241 lines (211 loc) · 8.78 KB
/
creator.go
File metadata and controls
241 lines (211 loc) · 8.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
package mppod
import (
"fmt"
"path/filepath"
"github.com/scality/mountpoint-s3-csi-driver/pkg/cluster"
"github.com/scality/mountpoint-s3-csi-driver/pkg/constants"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
"github.com/scality/mountpoint-s3-csi-driver/pkg/driver/node/volumecontext"
)
// Labels populated on spawned Mountpoint Pods.
const (
LabelMountpointVersion = constants.DriverName + "/mountpoint-version"
LabelCSIDriverVersion = constants.DriverName + "/mounted-by-csi-driver-version"
)
const EmptyDirSizeLimit = 10 * 1024 * 1024 // 10MiB
// A ContainerConfig represents configuration for containers in the spawned Mountpoint Pods.
type ContainerConfig struct {
Command string
Image string
HeadroomImage string // Image to use for headroom pods (typically a pause container)
ImagePullPolicy corev1.PullPolicy
}
// A Config represents configuration for spawned Mountpoint Pods.
type Config struct {
Namespace string
MountpointVersion string
PriorityClassName string
PreemptingPriorityClassName string // Priority class for pods that can preempt headroom pods
HeadroomPriorityClassName string // Priority class for headroom pods (typically low priority)
Container ContainerConfig
CSIDriverVersion string
ClusterVariant cluster.Variant
}
// A Creator allows creating specification for Mountpoint Pods to schedule.
type Creator struct {
config Config
}
// NewCreator creates a new creator with the given `config`.
func NewCreator(config Config) *Creator {
return &Creator{config: config}
}
// Create returns a new Mountpoint Pod spec to schedule for given `pod` and `pv`.
//
// It automatically assigns Mountpoint Pod to `pod`'s node.
// The name of the Mountpoint Pod is consistently generated from `pod` and `pv` using `MountpointPodNameFor` function.
func (c *Creator) Create(pod *corev1.Pod, pv *corev1.PersistentVolume) *corev1.Pod {
node := pod.Spec.NodeName
name := MountpointPodNameFor(string(pod.UID), pv.Name)
var volumeHandle string
if pv.Spec.CSI != nil {
volumeHandle = pv.Spec.CSI.VolumeHandle
}
mpPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: c.config.Namespace,
Labels: map[string]string{
LabelMountpointVersion: c.config.MountpointVersion,
LabelCSIDriverVersion: c.config.CSIDriverVersion,
},
Annotations: map[string]string{
AnnotationVolumeName: pv.Name,
AnnotationVolumeId: volumeHandle,
},
},
Spec: corev1.PodSpec{
// Mountpoint terminates with zero exit code on a successful termination,
// and in turn `/bin/scality-s3-csi-mounter` also exits with Mountpoint process' exit code,
// here `restartPolicy: OnFailure` allows Pod to only restart on non-zero exit codes (i.e. some failures)
// and not successful exists (i.e. zero exit code).
RestartPolicy: corev1.RestartPolicyOnFailure,
Containers: []corev1.Container{{
Name: "mountpoint",
Image: c.config.Container.Image,
ImagePullPolicy: c.config.Container.ImagePullPolicy,
Command: []string{c.config.Container.Command},
SecurityContext: &corev1.SecurityContext{
AllowPrivilegeEscalation: ptr.To(false),
Capabilities: &corev1.Capabilities{
Drop: []corev1.Capability{"ALL"},
},
RunAsUser: c.config.ClusterVariant.MountpointPodUserID(),
RunAsNonRoot: ptr.To(true),
SeccompProfile: &corev1.SeccompProfile{
Type: corev1.SeccompProfileTypeRuntimeDefault,
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: CommunicationDirName,
MountPath: filepath.Join("/", CommunicationDirName),
},
},
}},
PriorityClassName: c.config.PriorityClassName,
Affinity: &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
// This is to making sure Mountpoint Pod gets scheduled into same node as the Workload Pod
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchFields: []corev1.NodeSelectorRequirement{{
Key: metav1.ObjectNameField,
Operator: corev1.NodeSelectorOpIn,
Values: []string{node},
}},
},
},
},
},
},
Tolerations: []corev1.Toleration{
// Tolerate all taints.
// - "NoScheduled" – If the Workload Pod gets scheduled to a node, Mountpoint Pod should also get
// scheduled into the same node to provide the volume.
// - "NoExecute" – If the Workload Pod tolerates a "NoExecute" taint, Mountpoint Pod should also
// tolerate it to keep running and provide volume for the Workload Pod.
// If the Workload Pod would get descheduled and then the corresponding Mountpoint Pod
// would also get descheduled naturally due to CSI volume lifecycle.
{Operator: corev1.TolerationOpExists},
},
Volumes: []corev1.Volume{
// This emptyDir volume is used for communication between Mountpoint Pod and the CSI Driver Node Pod
{
Name: CommunicationDirName,
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{
Medium: corev1.StorageMediumMemory,
SizeLimit: resource.NewQuantity(EmptyDirSizeLimit, resource.BinarySI),
},
},
},
},
},
}
volumeAttributes := extractVolumeAttributes(pv)
if saName := volumeAttributes[volumecontext.MountpointPodServiceAccountName]; saName != "" {
mpPod.Spec.ServiceAccountName = saName
}
return mpPod
}
// extractVolumeAttributes extracts volume attributes from given `pv`.
// It always returns a non-nil map, and it's safe to use even though `pv` doesn't contain any volume attributes.
func extractVolumeAttributes(pv *corev1.PersistentVolume) map[string]string {
csiSpec := pv.Spec.CSI
if csiSpec == nil {
return map[string]string{}
}
volumeAttributes := csiSpec.VolumeAttributes
if volumeAttributes == nil {
return map[string]string{}
}
return volumeAttributes
}
// ExtractVolumeAttributes is a public wrapper for extractVolumeAttributes for use in other packages
func ExtractVolumeAttributes(pv *corev1.PersistentVolume) map[string]string {
return extractVolumeAttributes(pv)
}
// configureResourceRequests configures resource requests of the container if its specified in the volume attributes.
func (c *Creator) configureResourceRequests(mpContainer *corev1.Container, volumeAttributes map[string]string) error {
resourceRequestsCpu := volumeAttributes[volumecontext.MountpointContainerResourcesRequestsCpu]
resourceRequestsMemory := volumeAttributes[volumecontext.MountpointContainerResourcesRequestsMemory]
if resourceRequestsCpu != "" || resourceRequestsMemory != "" {
mpContainer.Resources.Requests = make(corev1.ResourceList)
if resourceRequestsCpu != "" {
quantity, err := resource.ParseQuantity(resourceRequestsCpu)
if err != nil {
return failedToParseQuantityError(err, volumecontext.MountpointContainerResourcesRequestsCpu, resourceRequestsCpu)
}
mpContainer.Resources.Requests[corev1.ResourceCPU] = quantity
}
if resourceRequestsMemory != "" {
quantity, err := resource.ParseQuantity(resourceRequestsMemory)
if err != nil {
return failedToParseQuantityError(err, volumecontext.MountpointContainerResourcesRequestsMemory, resourceRequestsMemory)
}
mpContainer.Resources.Requests[corev1.ResourceMemory] = quantity
}
}
return nil
}
// configureResourceLimits configures resource limits of the container if its specified in the volume attributes.
func (c *Creator) configureResourceLimits(mpContainer *corev1.Container, volumeAttributes map[string]string) error {
resourceLimitsCpu := volumeAttributes[volumecontext.MountpointContainerResourcesLimitsCpu]
resourceLimitsMemory := volumeAttributes[volumecontext.MountpointContainerResourcesLimitsMemory]
if resourceLimitsCpu != "" || resourceLimitsMemory != "" {
mpContainer.Resources.Limits = make(corev1.ResourceList)
if resourceLimitsCpu != "" {
quantity, err := resource.ParseQuantity(resourceLimitsCpu)
if err != nil {
return failedToParseQuantityError(err, volumecontext.MountpointContainerResourcesLimitsCpu, resourceLimitsCpu)
}
mpContainer.Resources.Limits[corev1.ResourceCPU] = quantity
}
if resourceLimitsMemory != "" {
quantity, err := resource.ParseQuantity(resourceLimitsMemory)
if err != nil {
return failedToParseQuantityError(err, volumecontext.MountpointContainerResourcesLimitsMemory, resourceLimitsMemory)
}
mpContainer.Resources.Limits[corev1.ResourceMemory] = quantity
}
}
return nil
}
// failedToParseQuantityError creates an error if provided quantity is not parsable.
func failedToParseQuantityError(err error, field, value string) error {
return fmt.Errorf("failed to parse quantity %q for %q: %w", value, field, err)
}