@@ -21,10 +21,12 @@ package aws
21
21
import (
22
22
"errors"
23
23
"fmt"
24
+ "math"
24
25
"math/rand"
25
26
"regexp"
26
27
"strconv"
27
28
"strings"
29
+ "sync"
28
30
"time"
29
31
30
32
apiv1 "k8s.io/api/core/v1"
@@ -34,6 +36,7 @@ import (
34
36
35
37
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
36
38
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws"
39
+ "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws/awserr"
37
40
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling"
38
41
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/ec2"
39
42
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/eks"
@@ -46,6 +49,7 @@ const (
46
49
operationPollInterval = 100 * time .Millisecond
47
50
maxRecordsReturnedByAPI = 100
48
51
maxAsgNamesPerDescribe = 100
52
+ maxAttachInstanceCount = 20
49
53
refreshInterval = 1 * time .Minute
50
54
autoDiscovererTypeASG = "asg"
51
55
asgAutoDiscovererKeyTag = "tag"
@@ -156,6 +160,206 @@ func (m *AwsManager) SetAsgSize(asg *asg, size int) error {
156
160
return m .asgCache .SetAsgSize (asg , size )
157
161
}
158
162
163
+ // LaunchAndAttach launches a fleet of instances and attaches them to the ASG
164
+ func (m * AwsManager ) LaunchAndAttach (asg * asg , size int ) error {
165
+ // TODO: needs locking
166
+ // TODO: needs to inform asgCache to increment its size
167
+
168
+ spotCapacity , onDemandCapacity := m .calculateSpotCapacity (asg , size )
169
+ tags := m .getInstanceTags (asg )
170
+ launchTemplateConfigs , err := m .getFleetLaunchTemplateConfigs (asg )
171
+ if err != nil {
172
+ return fmt .Errorf ("getting launch template configs, %w" , err )
173
+ }
174
+
175
+ // Call Fleet API to immediately trigger EC2 instance launch
176
+ params := & ec2.CreateFleetInput {
177
+ Type : aws .String (ec2 .FleetTypeInstant ),
178
+ LaunchTemplateConfigs : launchTemplateConfigs ,
179
+ TargetCapacitySpecification : & ec2.TargetCapacitySpecificationRequest {
180
+ OnDemandTargetCapacity : aws .Int64 (int64 (onDemandCapacity )),
181
+ SpotTargetCapacity : aws .Int64 (int64 (spotCapacity )),
182
+ TotalTargetCapacity : aws .Int64 (int64 (size )),
183
+ DefaultTargetCapacityType : aws .String (ec2 .DefaultTargetCapacityTypeOnDemand ), // TODO: what should this default be, does it matter?
184
+ // TODO: support attribute-based instance type capacity selection
185
+ },
186
+ TagSpecifications : []* ec2.TagSpecification {
187
+ {ResourceType : aws .String (ec2 .ResourceTypeInstance ), Tags : tags },
188
+ {ResourceType : aws .String (ec2 .ResourceTypeVolume ), Tags : tags },
189
+ {ResourceType : aws .String (ec2 .ResourceTypeFleet ), Tags : tags },
190
+ },
191
+ SpotOptions : & ec2.SpotOptionsRequest {
192
+ AllocationStrategy : aws .String (asg .InstancesDistribution .spotAllocationStrategy ),
193
+ },
194
+ OnDemandOptions : & ec2.OnDemandOptionsRequest {
195
+ AllocationStrategy : aws .String (asg .InstancesDistribution .onDemandAllocationStrategy ),
196
+ },
197
+ }
198
+ fleetOutput , err := m .awsService .CreateFleet (params )
199
+ if err != nil {
200
+ return fmt .Errorf ("creating fleet, %w" , err )
201
+ }
202
+
203
+ // extract created instance IDs
204
+ var instanceIDs []* string
205
+ for _ , instance := range fleetOutput .Instances {
206
+ instanceIDs = append (instanceIDs , instance .InstanceIds ... )
207
+ }
208
+
209
+ // Attach the instances to the ASG in groups of 20
210
+ var wg sync.WaitGroup
211
+ var attachErrs []error
212
+ for i := 0 ; i < len (instanceIDs ); i += maxAttachInstanceCount {
213
+ end := i + maxAttachInstanceCount
214
+ if end > len (instanceIDs ) {
215
+ end = len (instanceIDs )
216
+ }
217
+ wg .Add (1 )
218
+
219
+ go func (instanceIDs []* string ) {
220
+ defer wg .Done ()
221
+
222
+ params := & autoscaling.AttachInstancesInput {
223
+ InstanceIds : instanceIDs ,
224
+ AutoScalingGroupName : aws .String (asg .Name ),
225
+ }
226
+
227
+ // TODO: add a timeout to this loop
228
+ for {
229
+ _ , err := m .awsService .AttachInstances (params )
230
+ if err != nil {
231
+ // retry on pending instances ValidationError
232
+ var aerr awserr.Error
233
+ if errors .As (err , & aerr ) && aerr .Code () == "ValidationError" && strings .Contains (aerr .Message (), "pending" ) {
234
+ time .Sleep (operationPollInterval )
235
+ continue
236
+ }
237
+
238
+ // otherwise add to attachErrs which get raised at the end
239
+ attachErrs = append (attachErrs , err )
240
+ }
241
+ break
242
+ }
243
+
244
+ }(instanceIDs [i :end ])
245
+ }
246
+ wg .Wait ()
247
+
248
+ // Return any errors that occurred during instance attachment
249
+ // TODO: terminate instances that failed to attach and/or fail back to ASG SetDesiredCapacity
250
+ if len (attachErrs ) > 0 {
251
+ return fmt .Errorf ("attaching instances to ASG %q: %+v" , asg .Name , attachErrs )
252
+ }
253
+
254
+ // Calculate how many instances failed to launch, fallback to incrementing ASG's SetDesiredCapacity
255
+ failedLaunchCount := len (fleetOutput .Errors ) - size
256
+ if failedLaunchCount > 0 {
257
+ klog .Warningf ("failed to launch %d instances for %s via CreateFleet call - falling back to SetDesiredCapacity: %+v" ,
258
+ failedLaunchCount , asg .Name , fleetOutput .Errors )
259
+ return m .SetAsgSize (asg , asg .curSize + failedLaunchCount )
260
+ }
261
+
262
+ return nil
263
+ }
264
+
265
+ func (m * AwsManager ) getInstanceTags (asg * asg ) []* ec2.Tag {
266
+ tags := make ([]* ec2.Tag , 0 , len (asg .Tags ))
267
+ for i := range asg .Tags {
268
+ if asg .Tags [i ].PropagateAtLaunch != nil && * asg .Tags [i ].PropagateAtLaunch {
269
+ key := asg .Tags [i ].Key
270
+ if str := aws .StringValue (key ); strings .HasPrefix (str , "aws:" ) {
271
+ key = aws .String (fmt .Sprintf ("reserved:%s" , str ))
272
+ }
273
+ tags = append (tags , & ec2.Tag {Key : key , Value : asg .Tags [i ].Value })
274
+ }
275
+ }
276
+ return tags
277
+ }
278
+
279
+ func (m * AwsManager ) calculateSpotCapacity (asg * asg , size int ) (spotCapacity int , onDemandCapacity int ) {
280
+ for size > 0 {
281
+ if asg .curSize < asg .InstancesDistribution .onDemandBaseCapacity {
282
+ onDemandCapacity ++
283
+ size --
284
+ } else {
285
+ // TODO: should this consider the current ratio of spot/on-demand instances?
286
+ onDemand := int (math .Floor (float64 (size ) * float64 (asg .InstancesDistribution .onDemandPercentageAboveBaseCapacity ) / 100 ))
287
+ onDemandCapacity += onDemand
288
+ spotCapacity += size - onDemand
289
+ size = 0
290
+ }
291
+ }
292
+ return
293
+ }
294
+
295
+ func (m * AwsManager ) getFleetLaunchTemplateConfigs (asg * asg ) ([]* ec2.FleetLaunchTemplateConfigRequest , error ) {
296
+ var launchTemplateConfigs []* ec2.FleetLaunchTemplateConfigRequest
297
+
298
+ subnetIDOverrides := make ([]* ec2.FleetLaunchTemplateOverridesRequest , len (asg .SubnetIDs ))
299
+ for i , subnetID := range asg .SubnetIDs {
300
+ subnetIDOverrides [i ] = & ec2.FleetLaunchTemplateOverridesRequest {
301
+ SubnetId : aws .String (subnetID ),
302
+ }
303
+ }
304
+
305
+ var defaultLaunchTemplateSpecification * ec2.FleetLaunchTemplateSpecificationRequest = nil
306
+ if asg .LaunchTemplate != nil {
307
+ defaultLaunchTemplateSpecification = & ec2.FleetLaunchTemplateSpecificationRequest {
308
+ LaunchTemplateName : aws .String (asg .LaunchTemplate .name ),
309
+ Version : aws .String (asg .LaunchTemplate .version ),
310
+ }
311
+ }
312
+
313
+ if asg .MixedInstancesPolicy != nil {
314
+ defaultLaunchTemplateSpecification = & ec2.FleetLaunchTemplateSpecificationRequest {
315
+ LaunchTemplateName : aws .String (asg .MixedInstancesPolicy .launchTemplate .name ),
316
+ Version : aws .String (asg .MixedInstancesPolicy .launchTemplate .version ),
317
+ }
318
+
319
+ for i := range asg .MixedInstancesPolicy .launchTemplateOverrides {
320
+ lto := asg .MixedInstancesPolicy .launchTemplateOverrides [i ]
321
+ launchTemplateSpecification := defaultLaunchTemplateSpecification
322
+ if lto .LaunchTemplateSpecification != nil {
323
+ launchTemplateSpecification = & ec2.FleetLaunchTemplateSpecificationRequest {
324
+ LaunchTemplateName : lto .LaunchTemplateSpecification .LaunchTemplateName ,
325
+ Version : lto .LaunchTemplateSpecification .Version ,
326
+ }
327
+ }
328
+
329
+ overrides := make ([]* ec2.FleetLaunchTemplateOverridesRequest , len (subnetIDOverrides ))
330
+ for i := range subnetIDOverrides {
331
+ overrides [i ] = & ec2.FleetLaunchTemplateOverridesRequest {
332
+ SubnetId : subnetIDOverrides [i ].SubnetId ,
333
+
334
+ InstanceType : lto .InstanceType ,
335
+ // TODO: support weighted capacity and instance requirements
336
+ }
337
+ if asg .InstancesDistribution .spotMaxPrice != "" {
338
+ overrides [i ].MaxPrice = aws .String (asg .InstancesDistribution .spotMaxPrice )
339
+ }
340
+ }
341
+
342
+ launchTemplateConfigs = append (launchTemplateConfigs , & ec2.FleetLaunchTemplateConfigRequest {
343
+ LaunchTemplateSpecification : launchTemplateSpecification ,
344
+ Overrides : overrides ,
345
+ })
346
+ }
347
+ }
348
+
349
+ if len (launchTemplateConfigs ) == 0 {
350
+ if defaultLaunchTemplateSpecification == nil {
351
+ return nil , fmt .Errorf ("cannot find LaunchTemplate for ASG %q" , asg .Name )
352
+ }
353
+
354
+ launchTemplateConfigs = append (launchTemplateConfigs , & ec2.FleetLaunchTemplateConfigRequest {
355
+ LaunchTemplateSpecification : defaultLaunchTemplateSpecification ,
356
+ Overrides : subnetIDOverrides ,
357
+ })
358
+ }
359
+
360
+ return launchTemplateConfigs , nil
361
+ }
362
+
159
363
// DeleteInstances deletes the given instances. All instances must be controlled by the same ASG.
160
364
func (m * AwsManager ) DeleteInstances (instances []* AwsInstanceRef ) error {
161
365
if err := m .asgCache .DeleteInstances (instances ); err != nil {
0 commit comments