@@ -21,10 +21,12 @@ package aws
21
21
import (
22
22
"errors"
23
23
"fmt"
24
+ "math"
24
25
"math/rand"
25
26
"regexp"
26
27
"strconv"
27
28
"strings"
29
+ "sync"
28
30
"time"
29
31
30
32
apiv1 "k8s.io/api/core/v1"
@@ -34,6 +36,7 @@ import (
34
36
35
37
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
36
38
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws"
39
+ "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws/awserr"
37
40
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling"
38
41
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/ec2"
39
42
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/eks"
@@ -46,6 +49,7 @@ const (
46
49
operationPollInterval = 100 * time .Millisecond
47
50
maxRecordsReturnedByAPI = 100
48
51
maxAsgNamesPerDescribe = 100
52
+ maxAttachInstanceCount = 20
49
53
refreshInterval = 1 * time .Minute
50
54
autoDiscovererTypeASG = "asg"
51
55
asgAutoDiscovererKeyTag = "tag"
@@ -156,6 +160,204 @@ func (m *AwsManager) SetAsgSize(asg *asg, size int) error {
156
160
return m .asgCache .SetAsgSize (asg , size )
157
161
}
158
162
163
+ // LaunchAndAttach launches a fleet of instances and attaches them to the ASG
164
+ func (m * AwsManager ) LaunchAndAttach (asg * asg , size int ) error {
165
+ // TODO: needs locking
166
+ // TODO: needs to inform asgCache to increment its size
167
+
168
+ spotCapacity , onDemandCapacity := m .calculateSpotCapacity (asg , size )
169
+ tags := m .getInstanceTags (asg )
170
+ launchTemplateConfigs , err := m .getFleetLaunchTemplateConfigs (asg )
171
+ if err != nil {
172
+ return fmt .Errorf ("getting launch template configs, %w" , err )
173
+ }
174
+
175
+ // Call Fleet API to immediately trigger EC2 instance launch
176
+ params := & ec2.CreateFleetInput {
177
+ Type : aws .String (ec2 .FleetTypeInstant ),
178
+ LaunchTemplateConfigs : launchTemplateConfigs ,
179
+ TargetCapacitySpecification : & ec2.TargetCapacitySpecificationRequest {
180
+ OnDemandTargetCapacity : aws .Int64 (int64 (onDemandCapacity )),
181
+ SpotTargetCapacity : aws .Int64 (int64 (spotCapacity )),
182
+ TotalTargetCapacity : aws .Int64 (int64 (size )),
183
+ DefaultTargetCapacityType : aws .String (ec2 .DefaultTargetCapacityTypeOnDemand ), // TODO: what should this default be, does it matter?
184
+ // TODO: support attribute-based instance type capacity selection
185
+ },
186
+ TagSpecifications : []* ec2.TagSpecification {
187
+ {ResourceType : aws .String (ec2 .ResourceTypeInstance ), Tags : tags },
188
+ {ResourceType : aws .String (ec2 .ResourceTypeVolume ), Tags : tags },
189
+ {ResourceType : aws .String (ec2 .ResourceTypeFleet ), Tags : tags },
190
+ },
191
+ SpotOptions : & ec2.SpotOptionsRequest {
192
+ AllocationStrategy : aws .String (asg .InstancesDistribution .spotAllocationStrategy ),
193
+ },
194
+ OnDemandOptions : & ec2.OnDemandOptionsRequest {
195
+ AllocationStrategy : aws .String (asg .InstancesDistribution .onDemandAllocationStrategy ),
196
+ },
197
+ }
198
+ fleetOutput , err := m .awsService .CreateFleet (params )
199
+ if err != nil {
200
+ return fmt .Errorf ("creating fleet, %w" , err )
201
+ }
202
+
203
+ // extract created instance IDs
204
+ var instanceIDs []* string
205
+ for _ , instance := range fleetOutput .Instances {
206
+ instanceIDs = append (instanceIDs , instance .InstanceIds ... )
207
+ }
208
+
209
+ // Attach the instances to the ASG in groups of 20
210
+ var wg sync.WaitGroup
211
+ var attachErrs []error
212
+ for i := 0 ; i < len (instanceIDs ); i += maxAttachInstanceCount {
213
+ end := i + maxAttachInstanceCount
214
+ if end > len (instanceIDs ) {
215
+ end = len (instanceIDs )
216
+ }
217
+ wg .Add (1 )
218
+
219
+ go func (instanceIDs []* string ) {
220
+ defer wg .Done ()
221
+
222
+ params := & autoscaling.AttachInstancesInput {
223
+ InstanceIds : instanceIDs ,
224
+ AutoScalingGroupName : aws .String (asg .Name ),
225
+ }
226
+
227
+ // TODO: add a timeout to this loop
228
+ for {
229
+ _ , err := m .awsService .AttachInstances (params )
230
+ if err != nil {
231
+ // retry on pending instances ValidationError
232
+ var aerr awserr.Error
233
+ if errors .As (err , & aerr ) && aerr .Code () == "ValidationError" && strings .Contains (aerr .Message (), "pending" ) {
234
+ time .Sleep (operationPollInterval )
235
+ continue
236
+ }
237
+
238
+ // otherwise add to attachErrs which get raised at the end
239
+ attachErrs = append (attachErrs , err )
240
+ }
241
+ break
242
+ }
243
+
244
+ }(instanceIDs [i :end ])
245
+ }
246
+ wg .Wait ()
247
+
248
+ // Return any errors that occurred during instance attachment
249
+ // TODO: terminate instances that failed to attach and/or fail back to ASG SetDesiredCapacity
250
+ return fmt .Errorf ("attaching instances to ASG %q: %+v" , asg .Name , attachErrs )
251
+
252
+ // Calculate how many instances failed to launch, fallback to incrementing ASG's SetDesiredCapacity
253
+ failedLaunchCount := len (fleetOutput .Errors ) - size
254
+ if failedLaunchCount > 0 {
255
+ klog .Warningf ("failed to launch %d instances for %s via CreateFleet call - falling back to SetDesiredCapacity: %+v" ,
256
+ failedLaunchCount , asg .Name , fleetOutput .Errors )
257
+ return m .SetAsgSize (asg , asg .curSize + failedLaunchCount )
258
+ }
259
+
260
+ return nil
261
+ }
262
+
263
+ func (m * AwsManager ) getInstanceTags (asg * asg ) []* ec2.Tag {
264
+ tags := make ([]* ec2.Tag , 0 , len (asg .Tags ))
265
+ for i := range asg .Tags {
266
+ if asg .Tags [i ].PropagateAtLaunch != nil && * asg .Tags [i ].PropagateAtLaunch {
267
+ key := asg .Tags [i ].Key
268
+ if str := aws .StringValue (key ); strings .HasPrefix (str , "aws:" ) {
269
+ key = aws .String (fmt .Sprintf ("reserved:%s" , str ))
270
+ }
271
+ tags = append (tags , & ec2.Tag {Key : key , Value : asg .Tags [i ].Value })
272
+ }
273
+ }
274
+ return tags
275
+ }
276
+
277
+ func (m * AwsManager ) calculateSpotCapacity (asg * asg , size int ) (spotCapacity int , onDemandCapacity int ) {
278
+ for size > 0 {
279
+ if asg .curSize < asg .InstancesDistribution .onDemandBaseCapacity {
280
+ onDemandCapacity ++
281
+ size --
282
+ } else {
283
+ // TODO: should this consider the current ratio of spot/on-demand instances?
284
+ onDemand := int (math .Floor (float64 (size ) * float64 (asg .InstancesDistribution .onDemandPercentageAboveBaseCapacity ) / 100 ))
285
+ onDemandCapacity += onDemand
286
+ spotCapacity += size - onDemand
287
+ size = 0
288
+ }
289
+ }
290
+ return
291
+ }
292
+
293
+ func (m * AwsManager ) getFleetLaunchTemplateConfigs (asg * asg ) ([]* ec2.FleetLaunchTemplateConfigRequest , error ) {
294
+ var launchTemplateConfigs []* ec2.FleetLaunchTemplateConfigRequest
295
+
296
+ subnetIDOverrides := make ([]* ec2.FleetLaunchTemplateOverridesRequest , len (asg .SubnetIDs ))
297
+ for i , subnetID := range asg .SubnetIDs {
298
+ subnetIDOverrides [i ] = & ec2.FleetLaunchTemplateOverridesRequest {
299
+ SubnetId : aws .String (subnetID ),
300
+ }
301
+ }
302
+
303
+ var defaultLaunchTemplateSpecification * ec2.FleetLaunchTemplateSpecificationRequest = nil
304
+ if asg .LaunchTemplate != nil {
305
+ defaultLaunchTemplateSpecification = & ec2.FleetLaunchTemplateSpecificationRequest {
306
+ LaunchTemplateName : aws .String (asg .LaunchTemplate .name ),
307
+ Version : aws .String (asg .LaunchTemplate .version ),
308
+ }
309
+ }
310
+
311
+ if asg .MixedInstancesPolicy != nil {
312
+ defaultLaunchTemplateSpecification = & ec2.FleetLaunchTemplateSpecificationRequest {
313
+ LaunchTemplateName : aws .String (asg .MixedInstancesPolicy .launchTemplate .name ),
314
+ Version : aws .String (asg .MixedInstancesPolicy .launchTemplate .version ),
315
+ }
316
+
317
+ for i := range asg .MixedInstancesPolicy .launchTemplateOverrides {
318
+ lto := asg .MixedInstancesPolicy .launchTemplateOverrides [i ]
319
+ launchTemplateSpecification := defaultLaunchTemplateSpecification
320
+ if lto .LaunchTemplateSpecification != nil {
321
+ launchTemplateSpecification = & ec2.FleetLaunchTemplateSpecificationRequest {
322
+ LaunchTemplateName : lto .LaunchTemplateSpecification .LaunchTemplateName ,
323
+ Version : lto .LaunchTemplateSpecification .Version ,
324
+ }
325
+ }
326
+
327
+ overrides := make ([]* ec2.FleetLaunchTemplateOverridesRequest , len (subnetIDOverrides ))
328
+ for i := range subnetIDOverrides {
329
+ overrides [i ] = & ec2.FleetLaunchTemplateOverridesRequest {
330
+ SubnetId : subnetIDOverrides [i ].SubnetId ,
331
+
332
+ InstanceType : lto .InstanceType ,
333
+ // TODO: support weighted capacity and instance requirements
334
+ }
335
+ if asg .InstancesDistribution .spotMaxPrice != "" {
336
+ overrides [i ].MaxPrice = aws .String (asg .InstancesDistribution .spotMaxPrice )
337
+ }
338
+ }
339
+
340
+ launchTemplateConfigs = append (launchTemplateConfigs , & ec2.FleetLaunchTemplateConfigRequest {
341
+ LaunchTemplateSpecification : launchTemplateSpecification ,
342
+ Overrides : overrides ,
343
+ })
344
+ }
345
+ }
346
+
347
+ if len (launchTemplateConfigs ) == 0 {
348
+ if defaultLaunchTemplateSpecification == nil {
349
+ return nil , fmt .Errorf ("cannot find LaunchTemplate for ASG %q" , asg .Name )
350
+ }
351
+
352
+ launchTemplateConfigs = append (launchTemplateConfigs , & ec2.FleetLaunchTemplateConfigRequest {
353
+ LaunchTemplateSpecification : defaultLaunchTemplateSpecification ,
354
+ Overrides : subnetIDOverrides ,
355
+ })
356
+ }
357
+
358
+ return launchTemplateConfigs , nil
359
+ }
360
+
159
361
// DeleteInstances deletes the given instances. All instances must be controlled by the same ASG.
160
362
func (m * AwsManager ) DeleteInstances (instances []* AwsInstanceRef ) error {
161
363
if err := m .asgCache .DeleteInstances (instances ); err != nil {
0 commit comments