Skip to content

Commit c83d29a

Browse files
committed
Initial implementation of replacement on Spot termination
1 parent 4f45670 commit c83d29a

5 files changed

+140
-29
lines changed

core/instance_actions.go

+16-12
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,13 @@ func (i *instance) handleInstanceStates() (bool, error) {
3232
return false, nil
3333
}
3434

35-
// returns an instance ID or error
3635
func (i *instance) launchSpotReplacement() (*string, error) {
36+
return i.launchReplacement("spot")
37+
}
38+
39+
func (i *instance) launchReplacement(replacementLifecycle string) (*string, error) {
3740

38-
ltData, err := i.createLaunchTemplateData()
41+
ltData, err := i.createLaunchTemplateData(replacementLifecycle)
3942

4043
if err != nil {
4144
log.Println("failed to create LaunchTemplate data,", err.Error())
@@ -61,7 +64,7 @@ func (i *instance) launchSpotReplacement() (*string, error) {
6164
return nil, err
6265
}
6366

64-
cfi := i.createFleetInput(lt, instanceTypes)
67+
cfi := i.createFleetInput(lt, instanceTypes, replacementLifecycle)
6568

6669
resp, err := i.region.services.ec2.CreateFleet(cfi)
6770

@@ -78,7 +81,8 @@ func (i *instance) swapWithGroupMember(asg *autoScalingGroup) (*instance, error)
7881

7982
odInstance, err := i.getSwapCandidate()
8083
if err != nil {
81-
log.Printf("Couldn't find suitable OnDemand swap candidate: %s", err.Error())
84+
log.Printf("Couldn't find suitable OnDemand swap candidate, terminating Spot instance: %s", *i.InstanceId)
85+
i.terminate()
8286
return nil, err
8387
}
8488

@@ -95,7 +99,7 @@ func (i *instance) swapWithGroupMember(asg *autoScalingGroup) (*instance, error)
9599
defer asg.setAutoScalingMaxSize(maxSize)
96100
}
97101

98-
log.Printf("Attaching spot instance %s to the group %s",
102+
log.Printf("Attaching %s instance %s to the group %s", *i.InstanceLifecycle,
99103
*i.InstanceId, asg.name)
100104
err = asg.attachSpotInstance(*i.InstanceId, true)
101105

@@ -106,12 +110,12 @@ func (i *instance) swapWithGroupMember(asg *autoScalingGroup) (*instance, error)
106110
return nil, fmt.Errorf("couldn't attach spot instance %s ", *i.InstanceId)
107111
}
108112

109-
log.Printf("Terminating on-demand instance %s from the group %s",
113+
log.Printf("Terminating instance %s from the group %s",
110114
*odInstance.InstanceId, asg.name)
111115
if err := asg.terminateInstanceInAutoScalingGroup(odInstance.Instance.InstanceId, true, true); err != nil {
112-
log.Printf("On-demand instance %s couldn't be terminated, re-trying...",
116+
log.Printf("instance %s couldn't be terminated, re-trying...",
113117
*odInstance.InstanceId)
114-
return nil, fmt.Errorf("couldn't terminate on-demand instance %s",
118+
return nil, fmt.Errorf("couldn't terminate instance %s",
115119
*odInstance.InstanceId)
116120
}
117121

@@ -121,23 +125,23 @@ func (i *instance) swapWithGroupMember(asg *autoScalingGroup) (*instance, error)
121125
func (i *instance) getSwapCandidate() (*instance, error) {
122126
odInstanceID := i.getReplacementTargetInstanceID()
123127
if odInstanceID == nil {
124-
log.Println("Couldn't find target on-demand instance of", *i.InstanceId)
128+
log.Println("Couldn't find target instance of", *i.InstanceId)
125129
return nil, fmt.Errorf("couldn't find target instance for %s", *i.InstanceId)
126130
}
127131

128132
if err := i.region.scanInstance(odInstanceID); err != nil {
129-
log.Printf("Couldn't describe the target on-demand instance %s", *odInstanceID)
133+
log.Printf("Couldn't describe the target instance %s", *odInstanceID)
130134
return nil, fmt.Errorf("target instance %s couldn't be described", *odInstanceID)
131135
}
132136

133137
odInstance := i.region.instances.get(*odInstanceID)
134138
if odInstance == nil {
135-
log.Printf("Target on-demand instance %s couldn't be found", *odInstanceID)
139+
log.Printf("Target instance %s couldn't be found", *odInstanceID)
136140
return nil, fmt.Errorf("target instance %s is missing", *odInstanceID)
137141
}
138142

139143
if !odInstance.shouldBeReplacedWithSpot() {
140-
log.Printf("Target on-demand instance %s shouldn't be replaced", *odInstanceID)
144+
log.Printf("Target instance %s shouldn't be replaced", *odInstanceID)
141145
i.terminate()
142146
return nil, fmt.Errorf("target instance %s should not be replaced with spot",
143147
*odInstanceID)

core/instance_conversion.go

+25-8
Original file line numberDiff line numberDiff line change
@@ -403,8 +403,13 @@ func (i *instance) processImageBlockDevices(rii *ec2.RequestLaunchTemplateData)
403403
rii.BlockDeviceMappings = i.convertImageBlockDeviceMappings(resp.Images[0].BlockDeviceMappings)
404404
}
405405

406-
func (i *instance) createLaunchTemplateData() (*ec2.RequestLaunchTemplateData, error) {
407-
i.price = i.typeInfo.pricing.onDemand * i.asg.config.OnDemandPriceMultiplier
406+
func (i *instance) createLaunchTemplateData(instanceLifecycle string) (*ec2.RequestLaunchTemplateData, error) {
407+
odPrice := i.typeInfo.pricing.onDemand
408+
mp := 1.0
409+
if i.asg != nil && i.asg.config.OnDemandPriceMultiplier != 0 {
410+
mp = i.asg.config.OnDemandPriceMultiplier
411+
}
412+
i.price = odPrice * mp
408413

409414
placement := ec2.LaunchTemplatePlacementRequest(*i.Placement)
410415

@@ -430,11 +435,14 @@ func (i *instance) createLaunchTemplateData() (*ec2.RequestLaunchTemplateData, e
430435

431436
ltData.EbsOptimized = i.EbsOptimized
432437

433-
ltData.InstanceMarketOptions = &ec2.LaunchTemplateInstanceMarketOptionsRequest{
434-
MarketType: aws.String(Spot),
435-
SpotOptions: &ec2.LaunchTemplateSpotMarketOptionsRequest{
436-
MaxPrice: aws.String(strconv.FormatFloat(i.price, 'g', 10, 64)),
437-
},
438+
if instanceLifecycle == "spot" {
439+
440+
ltData.InstanceMarketOptions = &ec2.LaunchTemplateInstanceMarketOptionsRequest{
441+
MarketType: aws.String(Spot),
442+
SpotOptions: &ec2.LaunchTemplateSpotMarketOptionsRequest{
443+
MaxPrice: aws.String(strconv.FormatFloat(i.price, 'g', 10, 64)),
444+
},
445+
}
438446
}
439447

440448
ltData.Placement = &placement
@@ -467,7 +475,7 @@ func (i *instance) createFleetLaunchTemplate(ltData *ec2.RequestLaunchTemplateDa
467475
return &ltName, err
468476
}
469477

470-
func (i *instance) createFleetInput(ltName *string, instanceTypes []*string) *ec2.CreateFleetInput {
478+
func (i *instance) createFleetInput(ltName *string, instanceTypes []*string, lifeCycle string) *ec2.CreateFleetInput {
471479

472480
var overrides []*ec2.FleetLaunchTemplateOverridesRequest
473481

@@ -494,13 +502,22 @@ func (i *instance) createFleetInput(ltName *string, instanceTypes []*string) *ec
494502
SpotOptions: &ec2.SpotOptionsRequest{
495503
AllocationStrategy: aws.String(i.asg.config.SpotAllocationStrategy),
496504
},
505+
OnDemandOptions: &ec2.OnDemandOptionsRequest{
506+
AllocationStrategy: aws.String("prioritized"),
507+
},
497508
Type: aws.String("instant"),
498509
TargetCapacitySpecification: &ec2.TargetCapacitySpecificationRequest{
499510
SpotTargetCapacity: aws.Int64(1),
500511
TotalTargetCapacity: aws.Int64(1),
501512
DefaultTargetCapacityType: aws.String("spot"),
502513
},
503514
}
515+
if lifeCycle != "spot" {
516+
log.Printf("Overriding default capacity type to ondemand\n")
517+
retval.TargetCapacitySpecification.DefaultTargetCapacityType = aws.String("on-demand")
518+
retval.TargetCapacitySpecification.SpotTargetCapacity = aws.Int64(0)
519+
retval.TargetCapacitySpecification.OnDemandTargetCapacity = aws.Int64(1)
520+
}
504521
return retval
505522
}
506523

core/instance_conversion_test.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -1053,7 +1053,7 @@ func Test_instance_createLaunchTemplateData(t *testing.T) {
10531053
for _, tt := range tests {
10541054
t.Run(tt.name, func(t *testing.T) {
10551055

1056-
got, _ := tt.inst.createLaunchTemplateData()
1056+
got, _ := tt.inst.createLaunchTemplateData("spot")
10571057

10581058
// make sure the lists of tags are sorted, otherwise the comparison fails
10591059
sort.Slice(got.TagSpecifications[0].Tags, func(i, j int) bool {
@@ -1586,6 +1586,9 @@ func Test_instance_createFleetInput(t *testing.T) {
15861586
},
15871587
},
15881588
},
1589+
OnDemandOptions: &ec2.OnDemandOptionsRequest{
1590+
AllocationStrategy: aws.String("prioritized"),
1591+
},
15891592
SpotOptions: &ec2.SpotOptionsRequest{
15901593
AllocationStrategy: aws.String("capacity-optimized-prioritized"),
15911594
},
@@ -1634,6 +1637,9 @@ func Test_instance_createFleetInput(t *testing.T) {
16341637
},
16351638
},
16361639
},
1640+
OnDemandOptions: &ec2.OnDemandOptionsRequest{
1641+
AllocationStrategy: aws.String("prioritized"),
1642+
},
16371643
SpotOptions: &ec2.SpotOptionsRequest{
16381644
AllocationStrategy: aws.String("capacity-optimized"),
16391645
},
@@ -1650,7 +1656,7 @@ func Test_instance_createFleetInput(t *testing.T) {
16501656
for _, tt := range tests {
16511657
t.Run(tt.name, func(t *testing.T) {
16521658

1653-
got := tt.i.createFleetInput(tt.ltName, tt.instanceTypes)
1659+
got := tt.i.createFleetInput(tt.ltName, tt.instanceTypes, "spot")
16541660

16551661
if !reflect.DeepEqual(got, tt.want) {
16561662
t.Errorf("instance.createFleetInput() = %v, want %v", got, tt.want)

core/instance_queries.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,7 @@ func (i *instance) canTerminate() bool {
9797
func (i *instance) shouldBeReplacedWithSpot() bool {
9898
protT, _ := i.isProtectedFromTermination()
9999
return i.belongsToEnabledASG() &&
100-
i.asgNeedsReplacement() &&
101-
!i.isSpot() &&
100+
(i.isSpot() || i.asgNeedsReplacement()) &&
102101
!i.isProtectedFromScaleIn() &&
103102
!protT
104103
}
@@ -121,7 +120,7 @@ func (i *instance) belongsToEnabledASG() bool {
121120
asg.loadLaunchTemplate()
122121
i.asg = &asg
123122
i.price = i.typeInfo.pricing.onDemand / i.region.conf.OnDemandPriceMultiplier * i.asg.config.OnDemandPriceMultiplier
124-
log.Printf("%s instace %s belongs to enabled ASG %s", i.region.name,
123+
log.Printf("%s instance %s belongs to enabled ASG %s", i.region.name,
125124
*i.InstanceId, i.asg.name)
126125
return true
127126
}

core/main.go

+89-4
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,15 @@ func (a *AutoSpotting) processEventInstance(eventType string, region string, ins
260260
spotTermination := newSpotTermination(region)
261261

262262
if spotTermination.IsInAutoSpottingASG(instanceID, a.config.TagFilteringMode, a.config.FilterByTags) {
263-
err := spotTermination.executeAction(instanceID, a.config.TerminationNotificationAction, eventType)
263+
newInstance, err := a.replaceTerminatingSpotInstance(*instanceID, region)
264+
if err == nil {
265+
log.Printf("Launched replacement instance for instance %s: %s\n", *instanceID, *newInstance)
266+
return nil
267+
}
268+
269+
log.Printf("Error launching replacement instance for instance %s: %s, continued to handle its Spot termination\n", *instanceID, err.Error())
270+
271+
err = spotTermination.executeAction(instanceID, a.config.TerminationNotificationAction, eventType)
264272
if err != nil {
265273
log.Printf("Error executing spot termination/rebalance action: %s\n", err.Error())
266274
return err
@@ -438,10 +446,13 @@ func (a *AutoSpotting) handleNewInstanceLaunch(regionName string, instanceID str
438446
}
439447

440448
// Try OnDemand
441-
if err := a.handleNewOnDemandInstanceLaunch(r, i); err != nil {
449+
if err := a.handleNewOnDemandInstanceLaunch(r, i); !i.isSpot() && err != nil {
450+
log.Printf("%s Instance %s couldn't be handled as on-demand instance", i.region.name, *i.InstanceId)
442451
return err
443452
}
444453

454+
log.Printf("%s Instance %s couldn't be handled as on-demand instance", i.region.name, *i.InstanceId)
455+
445456
// Try Spot
446457
// in case we're not triggered by SQS event we do nothing, onDemand event already manage launched spot instance
447458
if len(a.config.sqsReceiptHandle) > 0 {
@@ -511,9 +522,9 @@ func (a *AutoSpotting) handleNewOnDemandInstanceLaunch(r *region, i *instance) e
511522
}
512523

513524
} else {
514-
log.Printf("%s skipping instance %s: either doesn't belong to an "+
525+
log.Printf("%s skipping %s instance %s: either doesn't belong to an "+
515526
"enabled ASG or should not be replaced with spot, ",
516-
i.region.name, *i.InstanceId)
527+
i.region.name, *i.InstanceLifecycle, *i.InstanceId)
517528
debug.Printf("%#v", i)
518529
}
519530
return nil
@@ -551,3 +562,77 @@ func (a *AutoSpotting) handleNewSpotInstanceLaunch(r *region, i *instance) error
551562
}
552563
return nil
553564
}
565+
566+
func (a *AutoSpotting) replaceTerminatingSpotInstance(instanceID, regionName string) (*string, error) {
567+
r := &region{name: regionName, conf: a.config, services: connections{}}
568+
569+
if !r.enabled() {
570+
return nil, fmt.Errorf("region %s is not enabled", regionName)
571+
}
572+
573+
r.services.connect(regionName, a.config.MainRegion)
574+
r.setupAsgFilters()
575+
r.scanForEnabledAutoScalingGroups()
576+
577+
log.Println("Scanning full instance information in", r.name)
578+
r.determineInstanceTypeInformation(r.conf)
579+
580+
if err := r.scanInstance(aws.String(instanceID)); err != nil {
581+
log.Printf("%s Couldn't scan instance %s: %s", regionName,
582+
instanceID, err.Error())
583+
return nil, err
584+
}
585+
586+
i := r.instances.get(instanceID)
587+
if i == nil {
588+
log.Printf("%s Instance %s is missing, skipping...",
589+
regionName, instanceID)
590+
return nil, errors.New("instance missing")
591+
}
592+
log.Printf("%s Found instance %s in state %s",
593+
i.region.name, *i.InstanceId, *i.State.Name)
594+
595+
if *i.State.Name != "running" {
596+
log.Printf("%s Instance %s is not in the running state",
597+
i.region.name, *i.InstanceId)
598+
return nil, errors.New("instance not in running state")
599+
}
600+
601+
asgName := i.getReplacementTargetASGName()
602+
603+
if asgName == nil {
604+
log.Printf("Missing the ASG name tag\n")
605+
return nil, errors.New("missing ASG name tag")
606+
}
607+
608+
i.asg = i.region.findEnabledASGByName(*asgName)
609+
i.asg.scanInstances()
610+
i.asg.loadDefaultConfig()
611+
i.asg.loadConfigFromTags()
612+
i.asg.loadLaunchConfiguration()
613+
i.asg.loadLaunchTemplate()
614+
615+
newInstanceID, err := i.launchSpotReplacement()
616+
if err != nil {
617+
fmt.Printf("Spot Instance launch failed while replacing %s, error: %s, falling back to on-demand\n", *i.InstanceId, err.Error())
618+
619+
newInstanceID, err = i.launchReplacement("on-demand")
620+
if err != nil {
621+
fmt.Printf("Instance launch failed while replacing %s, error: %s\n", *i.InstanceId, err.Error())
622+
return nil, err
623+
}
624+
}
625+
626+
i.region.scanInstances()
627+
newInstance := i.region.instances.get(*newInstanceID)
628+
629+
newInstance.swapWithGroupMember(i.asg)
630+
631+
if err = i.asg.waitForInstanceStatus(newInstanceID, "InService", 5); err != nil {
632+
log.Printf("Instance %s is still not InService, trying to terminate it.",
633+
*newInstanceID)
634+
newInstance.terminate()
635+
}
636+
637+
return newInstanceID, nil
638+
}

0 commit comments

Comments
 (0)