Skip to content

Commit b2f30ad

Browse files
committed
WIP Add code path to launch instances using AWS Fleet API
1 parent cf606a1 commit b2f30ad

File tree

6 files changed

+264
-19
lines changed

6 files changed

+264
-19
lines changed

cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go

+22
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,20 @@ type launchTemplate struct {
5959

6060
type mixedInstancesPolicy struct {
6161
launchTemplate *launchTemplate
62+
launchTemplateOverrides []*autoscaling.LaunchTemplateOverrides
6263
instanceTypesOverrides []string
6364
instanceRequirementsOverrides *autoscaling.InstanceRequirements
6465
instanceRequirements *ec2.InstanceRequirements
6566
}
6667

68+
type instancesDistribution struct {
69+
onDemandAllocationStrategy string
70+
onDemandBaseCapacity int
71+
onDemandPercentageAboveBaseCapacity int
72+
spotAllocationStrategy string
73+
spotMaxPrice string
74+
}
75+
6776
type asg struct {
6877
AwsRef
6978

@@ -73,9 +82,11 @@ type asg struct {
7382
lastUpdateTime time.Time
7483

7584
AvailabilityZones []string
85+
SubnetIDs []string
7686
LaunchConfigurationName string
7787
LaunchTemplate *launchTemplate
7888
MixedInstancesPolicy *mixedInstancesPolicy
89+
InstancesDistribution *instancesDistribution
7990
Tags []*autoscaling.TagDescription
8091
}
8192

@@ -545,6 +556,7 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
545556

546557
curSize: int(aws.Int64Value(g.DesiredCapacity)),
547558
AvailabilityZones: aws.StringValueSlice(g.AvailabilityZones),
559+
SubnetIDs: strings.Split(aws.StringValue(g.VPCZoneIdentifier), ","),
548560
LaunchConfigurationName: aws.StringValue(g.LaunchConfigurationName),
549561
Tags: g.Tags,
550562
}
@@ -573,6 +585,7 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
573585

574586
asg.MixedInstancesPolicy = &mixedInstancesPolicy{
575587
launchTemplate: buildLaunchTemplateFromSpec(g.MixedInstancesPolicy.LaunchTemplate.LaunchTemplateSpecification),
588+
launchTemplateOverrides: g.MixedInstancesPolicy.LaunchTemplate.Overrides,
576589
instanceTypesOverrides: getInstanceTypes(g.MixedInstancesPolicy.LaunchTemplate.Overrides),
577590
instanceRequirementsOverrides: getInstanceTypeRequirements(g.MixedInstancesPolicy.LaunchTemplate.Overrides),
578591
}
@@ -586,6 +599,15 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
586599
if len(asg.MixedInstancesPolicy.instanceTypesOverrides) != 0 && asg.MixedInstancesPolicy.instanceRequirementsOverrides != nil {
587600
return nil, fmt.Errorf("invalid setup of both instance type and instance requirements overrides configured")
588601
}
602+
603+
asg.InstancesDistribution = &instancesDistribution{
604+
onDemandAllocationStrategy: aws.StringValue(g.MixedInstancesPolicy.InstancesDistribution.OnDemandAllocationStrategy),
605+
onDemandBaseCapacity: int(aws.Int64Value(g.MixedInstancesPolicy.InstancesDistribution.OnDemandBaseCapacity)),
606+
onDemandPercentageAboveBaseCapacity: int(aws.Int64Value(g.MixedInstancesPolicy.InstancesDistribution.OnDemandPercentageAboveBaseCapacity)),
607+
spotAllocationStrategy: aws.StringValue(g.MixedInstancesPolicy.InstancesDistribution.SpotAllocationStrategy),
608+
// TODO: support SpotInstancePools?
609+
spotMaxPrice: aws.StringValue(g.MixedInstancesPolicy.InstancesDistribution.SpotMaxPrice),
610+
}
589611
}
590612

591613
return asg, nil

cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go

+20-11
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,17 @@ var (
5353

5454
// awsCloudProvider implements CloudProvider interface.
5555
type awsCloudProvider struct {
56-
awsManager *AwsManager
57-
resourceLimiter *cloudprovider.ResourceLimiter
56+
awsManager *AwsManager
57+
resourceLimiter *cloudprovider.ResourceLimiter
58+
useCreateFleetAndAttachAPI bool
5859
}
5960

6061
// BuildAwsCloudProvider builds CloudProvider implementation for AWS.
61-
func BuildAwsCloudProvider(awsManager *AwsManager, resourceLimiter *cloudprovider.ResourceLimiter) (cloudprovider.CloudProvider, error) {
62+
func BuildAwsCloudProvider(awsManager *AwsManager, resourceLimiter *cloudprovider.ResourceLimiter, useCreateFleetAndAttachAPI bool) (cloudprovider.CloudProvider, error) {
6263
aws := &awsCloudProvider{
63-
awsManager: awsManager,
64-
resourceLimiter: resourceLimiter,
64+
awsManager: awsManager,
65+
resourceLimiter: resourceLimiter,
66+
useCreateFleetAndAttachAPI: useCreateFleetAndAttachAPI,
6567
}
6668
return aws, nil
6769
}
@@ -99,8 +101,9 @@ func (aws *awsCloudProvider) NodeGroups() []cloudprovider.NodeGroup {
99101
ngs := make([]cloudprovider.NodeGroup, 0, len(asgs))
100102
for _, asg := range asgs {
101103
ngs = append(ngs, &AwsNodeGroup{
102-
asg: asg,
103-
awsManager: aws.awsManager,
104+
asg: asg,
105+
awsManager: aws.awsManager,
106+
useCreateFleetAndAttachAPI: aws.useCreateFleetAndAttachAPI,
104107
})
105108
}
106109

@@ -217,8 +220,9 @@ func AwsRefFromProviderId(id string) (*AwsInstanceRef, error) {
217220

218221
// AwsNodeGroup implements NodeGroup interface.
219222
type AwsNodeGroup struct {
220-
awsManager *AwsManager
221-
asg *asg
223+
awsManager *AwsManager
224+
asg *asg
225+
useCreateFleetAndAttachAPI bool
222226
}
223227

224228
// MaxSize returns maximum size of the node group.
@@ -277,7 +281,12 @@ func (ng *AwsNodeGroup) IncreaseSize(delta int) error {
277281
if size+delta > ng.asg.maxSize {
278282
return fmt.Errorf("size increase too large - desired:%d max:%d", size+delta, ng.asg.maxSize)
279283
}
280-
return ng.awsManager.SetAsgSize(ng.asg, size+delta)
284+
285+
if ng.useCreateFleetAndAttachAPI {
286+
return ng.awsManager.LaunchAndAttach(ng.asg, delta)
287+
} else {
288+
return ng.awsManager.SetAsgSize(ng.asg, size+delta)
289+
}
281290
}
282291

283292
// AtomicIncreaseSize is not implemented.
@@ -460,7 +469,7 @@ func BuildAWS(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscover
460469
klog.Fatalf("Failed to create AWS Manager: %v", err)
461470
}
462471

463-
provider, err := BuildAwsCloudProvider(manager, rl)
472+
provider, err := BuildAwsCloudProvider(manager, rl, opts.AWSUseCreateFleetAndAttachAPI)
464473
if err != nil {
465474
klog.Fatalf("Failed to create AWS cloud provider: %v", err)
466475
}

cluster-autoscaler/cloudprovider/aws/aws_manager.go

+202
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@ package aws
2121
import (
2222
"errors"
2323
"fmt"
24+
"math"
2425
"math/rand"
2526
"regexp"
2627
"strconv"
2728
"strings"
29+
"sync"
2830
"time"
2931

3032
apiv1 "k8s.io/api/core/v1"
@@ -34,6 +36,7 @@ import (
3436

3537
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
3638
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws"
39+
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws/awserr"
3740
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling"
3841
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/ec2"
3942
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/eks"
@@ -46,6 +49,7 @@ const (
4649
operationPollInterval = 100 * time.Millisecond
4750
maxRecordsReturnedByAPI = 100
4851
maxAsgNamesPerDescribe = 100
52+
maxAttachInstanceCount = 20
4953
refreshInterval = 1 * time.Minute
5054
autoDiscovererTypeASG = "asg"
5155
asgAutoDiscovererKeyTag = "tag"
@@ -156,6 +160,204 @@ func (m *AwsManager) SetAsgSize(asg *asg, size int) error {
156160
return m.asgCache.SetAsgSize(asg, size)
157161
}
158162

163+
// LaunchAndAttach launches a fleet of instances and attaches them to the ASG
164+
func (m *AwsManager) LaunchAndAttach(asg *asg, size int) error {
165+
// TODO: needs locking
166+
// TODO: needs to inform asgCache to increment its size
167+
168+
spotCapacity, onDemandCapacity := m.calculateSpotCapacity(asg, size)
169+
tags := m.getInstanceTags(asg)
170+
launchTemplateConfigs, err := m.getFleetLaunchTemplateConfigs(asg)
171+
if err != nil {
172+
return fmt.Errorf("getting launch template configs, %w", err)
173+
}
174+
175+
// Call Fleet API to immediately trigger EC2 instance launch
176+
params := &ec2.CreateFleetInput{
177+
Type: aws.String(ec2.FleetTypeInstant),
178+
LaunchTemplateConfigs: launchTemplateConfigs,
179+
TargetCapacitySpecification: &ec2.TargetCapacitySpecificationRequest{
180+
OnDemandTargetCapacity: aws.Int64(int64(onDemandCapacity)),
181+
SpotTargetCapacity: aws.Int64(int64(spotCapacity)),
182+
TotalTargetCapacity: aws.Int64(int64(size)),
183+
DefaultTargetCapacityType: aws.String(ec2.DefaultTargetCapacityTypeOnDemand), // TODO: what should this default be, does it matter?
184+
// TODO: support attribute-based instance type capacity selection
185+
},
186+
TagSpecifications: []*ec2.TagSpecification{
187+
{ResourceType: aws.String(ec2.ResourceTypeInstance), Tags: tags},
188+
{ResourceType: aws.String(ec2.ResourceTypeVolume), Tags: tags},
189+
{ResourceType: aws.String(ec2.ResourceTypeFleet), Tags: tags},
190+
},
191+
SpotOptions: &ec2.SpotOptionsRequest{
192+
AllocationStrategy: aws.String(asg.InstancesDistribution.spotAllocationStrategy),
193+
},
194+
OnDemandOptions: &ec2.OnDemandOptionsRequest{
195+
AllocationStrategy: aws.String(asg.InstancesDistribution.onDemandAllocationStrategy),
196+
},
197+
}
198+
fleetOutput, err := m.awsService.CreateFleet(params)
199+
if err != nil {
200+
return fmt.Errorf("creating fleet, %w", err)
201+
}
202+
203+
// extract created instance IDs
204+
var instanceIDs []*string
205+
for _, instance := range fleetOutput.Instances {
206+
instanceIDs = append(instanceIDs, instance.InstanceIds...)
207+
}
208+
209+
// Attach the instances to the ASG in groups of 20
210+
var wg sync.WaitGroup
211+
var attachErrs []error
212+
for i := 0; i < len(instanceIDs); i += maxAttachInstanceCount {
213+
end := i + maxAttachInstanceCount
214+
if end > len(instanceIDs) {
215+
end = len(instanceIDs)
216+
}
217+
wg.Add(1)
218+
219+
go func(instanceIDs []*string) {
220+
defer wg.Done()
221+
222+
params := &autoscaling.AttachInstancesInput{
223+
InstanceIds: instanceIDs,
224+
AutoScalingGroupName: aws.String(asg.Name),
225+
}
226+
227+
// TODO: add a timeout to this loop
228+
for {
229+
_, err := m.awsService.AttachInstances(params)
230+
if err != nil {
231+
// retry on pending instances ValidationError
232+
var aerr awserr.Error
233+
if errors.As(err, &aerr) && aerr.Code() == "ValidationError" && strings.Contains(aerr.Message(), "pending") {
234+
time.Sleep(operationPollInterval)
235+
continue
236+
}
237+
238+
// otherwise add to attachErrs which get raised at the end
239+
attachErrs = append(attachErrs, err)
240+
}
241+
break
242+
}
243+
244+
}(instanceIDs[i:end])
245+
}
246+
wg.Wait()
247+
248+
// Return any errors that occurred during instance attachment
249+
// TODO: terminate instances that failed to attach and/or fail back to ASG SetDesiredCapacity
250+
return fmt.Errorf("attaching instances to ASG %q: %+v", asg.Name, attachErrs)
251+
252+
// Calculate how many instances failed to launch, fallback to incrementing ASG's SetDesiredCapacity
253+
failedLaunchCount := len(fleetOutput.Errors) - size
254+
if failedLaunchCount > 0 {
255+
klog.Warningf("failed to launch %d instances for %s via CreateFleet call - falling back to SetDesiredCapacity: %+v",
256+
failedLaunchCount, asg.Name, fleetOutput.Errors)
257+
return m.SetAsgSize(asg, asg.curSize+failedLaunchCount)
258+
}
259+
260+
return nil
261+
}
262+
263+
func (m *AwsManager) getInstanceTags(asg *asg) []*ec2.Tag {
264+
tags := make([]*ec2.Tag, 0, len(asg.Tags))
265+
for i := range asg.Tags {
266+
if asg.Tags[i].PropagateAtLaunch != nil && *asg.Tags[i].PropagateAtLaunch {
267+
key := asg.Tags[i].Key
268+
if str := aws.StringValue(key); strings.HasPrefix(str, "aws:") {
269+
key = aws.String(fmt.Sprintf("reserved:%s", str))
270+
}
271+
tags = append(tags, &ec2.Tag{Key: key, Value: asg.Tags[i].Value})
272+
}
273+
}
274+
return tags
275+
}
276+
277+
func (m *AwsManager) calculateSpotCapacity(asg *asg, size int) (spotCapacity int, onDemandCapacity int) {
278+
for size > 0 {
279+
if asg.curSize < asg.InstancesDistribution.onDemandBaseCapacity {
280+
onDemandCapacity++
281+
size--
282+
} else {
283+
// TODO: should this consider the current ratio of spot/on-demand instances?
284+
onDemand := int(math.Floor(float64(size) * float64(asg.InstancesDistribution.onDemandPercentageAboveBaseCapacity) / 100))
285+
onDemandCapacity += onDemand
286+
spotCapacity += size - onDemand
287+
size = 0
288+
}
289+
}
290+
return
291+
}
292+
293+
func (m *AwsManager) getFleetLaunchTemplateConfigs(asg *asg) ([]*ec2.FleetLaunchTemplateConfigRequest, error) {
294+
var launchTemplateConfigs []*ec2.FleetLaunchTemplateConfigRequest
295+
296+
subnetIDOverrides := make([]*ec2.FleetLaunchTemplateOverridesRequest, len(asg.SubnetIDs))
297+
for i, subnetID := range asg.SubnetIDs {
298+
subnetIDOverrides[i] = &ec2.FleetLaunchTemplateOverridesRequest{
299+
SubnetId: aws.String(subnetID),
300+
}
301+
}
302+
303+
var defaultLaunchTemplateSpecification *ec2.FleetLaunchTemplateSpecificationRequest = nil
304+
if asg.LaunchTemplate != nil {
305+
defaultLaunchTemplateSpecification = &ec2.FleetLaunchTemplateSpecificationRequest{
306+
LaunchTemplateName: aws.String(asg.LaunchTemplate.name),
307+
Version: aws.String(asg.LaunchTemplate.version),
308+
}
309+
}
310+
311+
if asg.MixedInstancesPolicy != nil {
312+
defaultLaunchTemplateSpecification = &ec2.FleetLaunchTemplateSpecificationRequest{
313+
LaunchTemplateName: aws.String(asg.MixedInstancesPolicy.launchTemplate.name),
314+
Version: aws.String(asg.MixedInstancesPolicy.launchTemplate.version),
315+
}
316+
317+
for i := range asg.MixedInstancesPolicy.launchTemplateOverrides {
318+
lto := asg.MixedInstancesPolicy.launchTemplateOverrides[i]
319+
launchTemplateSpecification := defaultLaunchTemplateSpecification
320+
if lto.LaunchTemplateSpecification != nil {
321+
launchTemplateSpecification = &ec2.FleetLaunchTemplateSpecificationRequest{
322+
LaunchTemplateName: lto.LaunchTemplateSpecification.LaunchTemplateName,
323+
Version: lto.LaunchTemplateSpecification.Version,
324+
}
325+
}
326+
327+
overrides := make([]*ec2.FleetLaunchTemplateOverridesRequest, len(subnetIDOverrides))
328+
for i := range subnetIDOverrides {
329+
overrides[i] = &ec2.FleetLaunchTemplateOverridesRequest{
330+
SubnetId: subnetIDOverrides[i].SubnetId,
331+
332+
InstanceType: lto.InstanceType,
333+
// TODO: support weighted capacity and instance requirements
334+
}
335+
if asg.InstancesDistribution.spotMaxPrice != "" {
336+
overrides[i].MaxPrice = aws.String(asg.InstancesDistribution.spotMaxPrice)
337+
}
338+
}
339+
340+
launchTemplateConfigs = append(launchTemplateConfigs, &ec2.FleetLaunchTemplateConfigRequest{
341+
LaunchTemplateSpecification: launchTemplateSpecification,
342+
Overrides: overrides,
343+
})
344+
}
345+
}
346+
347+
if len(launchTemplateConfigs) == 0 {
348+
if defaultLaunchTemplateSpecification == nil {
349+
return nil, fmt.Errorf("cannot find LaunchTemplate for ASG %q", asg.Name)
350+
}
351+
352+
launchTemplateConfigs = append(launchTemplateConfigs, &ec2.FleetLaunchTemplateConfigRequest{
353+
LaunchTemplateSpecification: defaultLaunchTemplateSpecification,
354+
Overrides: subnetIDOverrides,
355+
})
356+
}
357+
358+
return launchTemplateConfigs, nil
359+
}
360+
159361
// DeleteInstances deletes the given instances. All instances must be controlled by the same ASG.
160362
func (m *AwsManager) DeleteInstances(instances []*AwsInstanceRef) error {
161363
if err := m.asgCache.DeleteInstances(instances); err != nil {

cluster-autoscaler/cloudprovider/aws/aws_wrapper.go

+2
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131

3232
// autoScalingI is the interface abstracting specific API calls of the auto-scaling service provided by AWS SDK for use in CA
3333
type autoScalingI interface {
34+
AttachInstances(input *autoscaling.AttachInstancesInput) (*autoscaling.AttachInstancesOutput, error)
3435
DescribeAutoScalingGroupsPages(input *autoscaling.DescribeAutoScalingGroupsInput, fn func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) error
3536
DescribeLaunchConfigurations(*autoscaling.DescribeLaunchConfigurationsInput) (*autoscaling.DescribeLaunchConfigurationsOutput, error)
3637
DescribeScalingActivities(*autoscaling.DescribeScalingActivitiesInput) (*autoscaling.DescribeScalingActivitiesOutput, error)
@@ -40,6 +41,7 @@ type autoScalingI interface {
4041

4142
// ec2I is the interface abstracting specific API calls of the EC2 service provided by AWS SDK for use in CA
4243
type ec2I interface {
44+
CreateFleet(input *ec2.CreateFleetInput) (*ec2.CreateFleetOutput, error)
4345
DescribeImages(input *ec2.DescribeImagesInput) (*ec2.DescribeImagesOutput, error)
4446
DescribeLaunchTemplateVersions(input *ec2.DescribeLaunchTemplateVersionsInput) (*ec2.DescribeLaunchTemplateVersionsOutput, error)
4547
GetInstanceTypesFromInstanceRequirementsPages(input *ec2.GetInstanceTypesFromInstanceRequirementsInput, fn func(*ec2.GetInstanceTypesFromInstanceRequirementsOutput, bool) bool) error

cluster-autoscaler/config/autoscaling_options.go

+3
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,9 @@ type AutoscalingOptions struct {
231231
BalancingLabels []string
232232
// AWSUseStaticInstanceList tells if AWS cloud provider use static instance type list or dynamically fetch from remote APIs.
233233
AWSUseStaticInstanceList bool
234+
// AWSUseCreateFleetAndAttachAPI tells the AWS cloud provider to increase the size of ASGs by launching instances directly
235+
// via the CreateFleet API and attach them to the ASG, instead of increasing the capacity to increase the speed of scale up.
236+
AWSUseCreateFleetAndAttachAPI bool
234237
// GCEOptions contain autoscaling options specific to GCE cloud provider.
235238
GCEOptions GCEOptions
236239
// KubeClientOpts specify options for kube client

0 commit comments

Comments
 (0)