Skip to content

Commit 951f264

Browse files
authored
Add dangling instance check for Windows (#280)
1 parent 4ef2bdb commit 951f264

4 files changed

Lines changed: 302 additions & 85 deletions

File tree

scaler/asg.go

Lines changed: 120 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package scaler
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"log"
78
"sort"
@@ -17,6 +18,10 @@ import (
1718
ssmTypes "github.com/aws/aws-sdk-go-v2/service/ssm/types"
1819
)
1920

21+
// ErrWindowsGracefulScaleInNotSupported is returned when attempting graceful scale-in on Windows instances.
22+
// Windows instances don't support SIGTERM-based graceful shutdown; they rely on lifecycle hooks instead.
23+
var ErrWindowsGracefulScaleInNotSupported = errors.New("graceful scale-in not supported on Windows")
24+
2025
const (
2126
activitySucessfulStatusCode = "Successful"
2227
userRequestForChangingDesiredCapacity = "a user request explicitly set group desired capacity changing the desired capacity"
@@ -70,57 +75,79 @@ func (a *ASGDriver) waitForSSMReady(ctx context.Context, instanceID string, time
7075
return fmt.Errorf("timed out waiting for SSM agent to become ready on %s", instanceID)
7176
}
7277

73-
// checkAndTerminate runs the existing SSM check + terminate logic on a slice of instance IDs.
74-
// The number of instances to check is already limited by the caller.
75-
func (a *ASGDriver) checkAndTerminate(
76-
ctx context.Context,
77-
instances []string,
78-
ssmSvc *ssm.Client,
79-
ec2Svc *ec2.Client,
80-
) (terminatedCount int, firstError error) {
81-
// terminatedCount is a named return value, initialized to 0 by default.
82-
83-
for _, instanceID := range instances {
78+
// getASGPlatform detects whether the ASG contains Linux or Windows instances.
79+
// Since each ASG is single-platform, we only need to check one instance.
80+
func (a *ASGDriver) getASGPlatform(ctx context.Context, instances []ec2Types.Instance) string {
81+
for _, instance := range instances {
82+
// The Platform field is only set for Windows instances.
83+
// Use case-insensitive comparison because the EC2 API returns "windows" (lowercase)
84+
// but ec2Types.PlatformValuesWindows is "Windows" (capitalized).
85+
if strings.EqualFold(string(instance.Platform), "windows") {
86+
return "windows"
87+
}
88+
}
89+
return "linux"
90+
}
8491

85-
checkCommand := `
86-
#!/bin/bash
87-
# Check if buildkite-agent service is running or has been marked for termination
88-
# Note: Even after SIGTERM, the service status may still show as "active" and "running"
89-
# until jobs complete, so we primarily rely on the termination marker file.
92+
// getCheckCommand returns the appropriate check command for the platform
93+
func (a *ASGDriver) getCheckCommand(platform string) string {
94+
if platform == "windows" {
95+
return `
96+
$AgentStatus = nssm status buildkite-agent 2>&1
97+
if ($AgentStatus -match "SERVICE_RUNNING") {
98+
Write-Output "RUNNING"
99+
} else {
100+
Write-Output "NOT_RUNNING"
101+
}
102+
`
103+
}
90104

105+
// Default to Linux
106+
return `#!/bin/bash
107+
# Linux check command
91108
if [ -f /tmp/buildkite-agent-termination-marker ]; then
92109
echo "MARKER_EXISTS: Instance is already marked for termination"
93-
cat /tmp/buildkite-agent-termination-marker
94110
exit 0
95111
fi
96112
97113
ACTIVE_STATE=$(systemctl show buildkite-agent -p ActiveState | cut -d= -f2)
98-
SUB_STATE=$(systemctl show buildkite-agent -p SubState | cut -d= -f2)
99-
100-
echo "Service status: ActiveState=$ACTIVE_STATE SubState=$SUB_STATE"
101-
102114
case "$ACTIVE_STATE" in
103-
"active")
104-
echo "RUNNING: Service is active ($SUB_STATE)"
105-
exit 0
106-
;;
107-
"activating")
108-
echo "ACTIVATING: Service is starting"
109-
exit 0
110-
;;
111-
*)
112-
# Service is not running
113-
echo "Detailed service information:"
114-
systemctl status buildkite-agent --no-pager || true
115-
echo "NOT_RUNNING: Service is $ACTIVE_STATE/$SUB_STATE"
116-
exit 1
117-
;;
115+
"active"|"activating") echo "RUNNING" ;;
116+
*) echo "NOT_RUNNING: $ACTIVE_STATE" ;;
118117
esac
119118
`
119+
}
120120

121+
// checkAndMarkUnhealthy uses SSM to check if buildkite-agent is running on each instance.
122+
// Instances where the agent service is not running are marked unhealthy via the ASG API,
123+
// which will cause the ASG to terminate and replace them.
124+
//
125+
// This is safe to use without graceful shutdown because:
126+
// - We only mark unhealthy if the agent service is NOT running
127+
// - If the agent service is not running, there cannot be any jobs in progress
128+
// - This is different from normal scale-in which uses SIGTERM for graceful shutdown
129+
//
130+
// This function is specifically for "zombie" instances where the EC2 instance is running
131+
// but the buildkite-agent process has died (e.g., due to crashes, OOM, or other failures).
132+
func (a *ASGDriver) checkAndMarkUnhealthy(
133+
ctx context.Context,
134+
instances []string,
135+
ssmSvc *ssm.Client,
136+
asgSvc *autoscaling.Client,
137+
platform string,
138+
) (markedUnhealthyCount int, firstError error) {
139+
checkCommand := a.getCheckCommand(platform)
140+
141+
// Use the appropriate SSM document based on platform
142+
documentName := "AWS-RunShellScript"
143+
if platform == "windows" {
144+
documentName = "AWS-RunPowerShellScript"
145+
}
146+
147+
for _, instanceID := range instances {
121148
checkInput := &ssm.SendCommandInput{
122149
InstanceIds: []string{instanceID},
123-
DocumentName: aws.String("AWS-RunShellScript"),
150+
DocumentName: aws.String(documentName),
124151
Parameters: map[string][]string{
125152
"commands": {checkCommand},
126153
},
@@ -176,10 +203,10 @@ esac
176203
if checkCmdResult.Status == ssmTypes.CommandInvocationStatusFailed ||
177204
(checkCmdResult.Status == ssmTypes.CommandInvocationStatusSuccess && checkCmdResult.StandardOutputContent != nil && strings.Contains(*checkCmdResult.StandardOutputContent, "NOT_RUNNING")) {
178205

179-
// Skip if it's already been marked for termination or is activating (based on script's output)
206+
// Skip if it's already been marked for termination (based on script's output)
180207
if checkCmdResult.StandardOutputContent != nil &&
181-
(strings.Contains(*checkCmdResult.StandardOutputContent, "MARKER_EXISTS") || strings.Contains(*checkCmdResult.StandardOutputContent, "ACTIVATING")) {
182-
log.Printf("[Elastic CI Mode] ℹ️ Instance %s has buildkite-agent in transition state (marker exists or activating), not a dangling instance", instanceID)
208+
strings.Contains(*checkCmdResult.StandardOutputContent, "MARKER_EXISTS") {
209+
log.Printf("[Elastic CI Mode] ℹ️ Instance %s is already marked for termination, skipping", instanceID)
183210
if checkCmdResult.StandardOutputContent != nil {
184211
log.Printf("[Elastic CI Mode] Service status details for %s: %s", instanceID, *checkCmdResult.StandardOutputContent)
185212
}
@@ -191,27 +218,27 @@ esac
191218
log.Printf("[Elastic CI Mode] Service status for %s: %s", instanceID, *checkCmdResult.StandardOutputContent)
192219
}
193220

194-
_, termErr := ec2Svc.TerminateInstances(ctx, &ec2.TerminateInstancesInput{
195-
InstanceIds: []string{instanceID},
221+
// Mark instance as unhealthy so ASG will terminate it
222+
_, err := asgSvc.SetInstanceHealth(ctx, &autoscaling.SetInstanceHealthInput{
223+
InstanceId: aws.String(instanceID),
224+
HealthStatus: aws.String("Unhealthy"),
196225
})
197226

198-
if termErr != nil {
199-
log.Printf("[Elastic CI Mode] Error: Failed to terminate dangling instance %s: %v", instanceID, termErr)
227+
if err != nil {
228+
log.Printf("[Elastic CI Mode] Error: Failed to mark instance %s as unhealthy: %v", instanceID, err)
200229
if firstError == nil {
201-
firstError = fmt.Errorf("TerminateInstances failed for %s: %w", instanceID, termErr)
230+
firstError = fmt.Errorf("SetInstanceHealth failed for %s: %w", instanceID, err)
202231
}
203232
} else {
204-
log.Printf("[Elastic CI Mode] Successfully initiated termination for dangling instance %s via EC2 API", instanceID)
205-
terminatedCount++
233+
log.Printf("[Elastic CI Mode] Successfully marked instance %s as unhealthy", instanceID)
234+
markedUnhealthyCount++
206235
}
207236
} else if checkCmdResult.Status != ssmTypes.CommandInvocationStatusSuccess {
208237
log.Printf("[Elastic CI Mode] Agent status check command for %s did not succeed (status: %s). Output: %s", instanceID, checkCmdResult.Status, aws.ToString(checkCmdResult.StandardOutputContent))
209-
} else {
210-
log.Printf("[Elastic CI Mode] Agent on instance %s appears to be running normally. Status: %s. Output: %s", instanceID, checkCmdResult.Status, aws.ToString(checkCmdResult.StandardOutputContent))
211238
}
212239
}
213240

214-
return terminatedCount, firstError
241+
return markedUnhealthyCount, firstError
215242
}
216243

217244
func (a *ASGDriver) Describe(ctx context.Context) (AutoscaleGroupDetails, error) {
@@ -347,8 +374,20 @@ type dryRunASG struct {
347374
}
348375

349376
func (a *ASGDriver) SendSIGTERMToAgents(ctx context.Context, instanceID string) error {
377+
ec2Client := ec2.NewFromConfig(a.Cfg)
350378
ssmClient := ssm.NewFromConfig(a.Cfg)
351379

380+
// Detect platform - graceful SIGTERM is only supported on Linux
381+
descResp, err := ec2Client.DescribeInstances(ctx, &ec2.DescribeInstancesInput{
382+
InstanceIds: []string{instanceID},
383+
})
384+
if err == nil && len(descResp.Reservations) > 0 && len(descResp.Reservations[0].Instances) > 0 {
385+
instance := descResp.Reservations[0].Instances[0]
386+
if strings.EqualFold(string(instance.Platform), "windows") {
387+
return ErrWindowsGracefulScaleInNotSupported
388+
}
389+
}
390+
352391
// Wait for SSM agent to be ready before sending command
353392
if err := a.waitForSSMReady(ctx, instanceID, 30*time.Second); err != nil {
354393
log.Printf("SSM agent not ready on instance %s, cannot send SIGTERM: %v", instanceID, err)
@@ -367,7 +406,7 @@ sudo systemctl stop buildkite-agent.service || sudo /opt/buildkite-agent/bin/bui
367406
`
368407
log.Printf("[Elastic CI Mode] Sending SIGTERM to instance %s", instanceID)
369408

370-
_, err := ssmClient.SendCommand(ctx, &ssm.SendCommandInput{
409+
_, err = ssmClient.SendCommand(ctx, &ssm.SendCommandInput{
371410
InstanceIds: []string{instanceID},
372411
DocumentName: aws.String("AWS-RunShellScript"),
373412
Parameters: map[string][]string{"commands": {command}},
@@ -382,18 +421,26 @@ sudo systemctl stop buildkite-agent.service || sudo /opt/buildkite-agent/bin/bui
382421
return nil
383422
}
384423

424+
// CleanupDanglingInstances finds and marks unhealthy any "zombie" instances where the
425+
// buildkite-agent service has stopped running but the EC2 instance is still alive.
426+
//
427+
// This is different from normal scale-in:
428+
// - Normal scale-in: instances are healthy, jobs may be running -> uses SIGTERM for graceful shutdown
429+
// - This function: agent service is stopped, no jobs can be running -> safe to mark as unhealthy
430+
//
431+
// Marking instances unhealthy (via autoscaling:SetInstanceHealth) causes the ASG to terminate
432+
// and replace them according to its configured policies.
385433
func (a *ASGDriver) CleanupDanglingInstances(ctx context.Context, minimumInstanceUptime time.Duration, maxDanglingInstancesToCheck int) error {
386-
log.Printf("[Elastic CI Mode] Starting dangling instance check for ASG %s (min uptime: %s, max check: %d)", a.Name, minimumInstanceUptime, maxDanglingInstancesToCheck)
387434
ec2Client := ec2.NewFromConfig(a.Cfg)
388435
ssmClient := ssm.NewFromConfig(a.Cfg)
436+
asgClient := autoscaling.NewFromConfig(a.Cfg)
389437

390438
asgDetails, err := a.Describe(ctx)
391439
if err != nil {
392440
return fmt.Errorf("failed to describe ASG %s: %w", a.Name, err)
393441
}
394442

395443
if len(asgDetails.InstanceIDs) == 0 {
396-
log.Printf("[Elastic CI Mode] No instances in ASG %s to check.", a.Name)
397444
return nil
398445
}
399446

@@ -417,41 +464,44 @@ func (a *ASGDriver) CleanupDanglingInstances(ctx context.Context, minimumInstanc
417464
}
418465

419466
if len(instancesToConsiderChecking) == 0 {
420-
log.Printf("[Elastic CI Mode] No running instances older than %s in ASG %s to consider for dangling check.", minimumInstanceUptime, a.Name)
421467
return nil
422468
}
423469

470+
// Detect platform from instances (each ASG is single-platform)
471+
platform := a.getASGPlatform(ctx, instancesToConsiderChecking)
472+
424473
// Sort instances by launch time (oldest first) to prioritize checking older ones
425474
sort.SliceStable(instancesToConsiderChecking, func(i, j int) bool {
426475
return instancesToConsiderChecking[i].LaunchTime.Before(*instancesToConsiderChecking[j].LaunchTime)
427476
})
428477

429-
checkedCount := 0
430-
totalActuallyTerminated := 0
478+
totalMarkedUnhealthy := 0
431479
var firstErrorEncountered error
432480

433-
instancesForSSMCheck := make([]string, 0)
434-
for i := 0; i < len(instancesToConsiderChecking) && (maxDanglingInstancesToCheck <= 0 || checkedCount < maxDanglingInstancesToCheck); i++ {
435-
instance := instancesToConsiderChecking[i]
436-
instanceID := *instance.InstanceId
437-
instancesForSSMCheck = append(instancesForSSMCheck, instanceID)
438-
checkedCount++
481+
// Limit the number of instances to check if maxDanglingInstancesToCheck is set
482+
instancesToCheck := instancesToConsiderChecking
483+
if maxDanglingInstancesToCheck > 0 && len(instancesToCheck) > maxDanglingInstancesToCheck {
484+
instancesToCheck = instancesToCheck[:maxDanglingInstancesToCheck]
485+
}
486+
487+
instancesForSSMCheck := make([]string, 0, len(instancesToCheck))
488+
for _, instance := range instancesToCheck {
489+
instancesForSSMCheck = append(instancesForSSMCheck, *instance.InstanceId)
439490
}
440491

441492
if len(instancesForSSMCheck) > 0 {
442-
log.Printf("[Elastic CI Mode] Performing detailed SSM check for %d candidate instance(s): %v", len(instancesForSSMCheck), instancesForSSMCheck)
443-
terminatedInCall, errInCall := a.checkAndTerminate(ctx, instancesForSSMCheck, ssmClient, ec2Client)
444-
totalActuallyTerminated += terminatedInCall
445-
// Only store the first error encountered during the process.
493+
log.Printf("[Elastic CI Mode] Checking %d %s instance(s) for dangling agents: %v", len(instancesForSSMCheck), platform, instancesForSSMCheck)
494+
markedInCall, errInCall := a.checkAndMarkUnhealthy(ctx, instancesForSSMCheck, ssmClient, asgClient, platform)
495+
totalMarkedUnhealthy += markedInCall
446496
if errInCall != nil {
447497
firstErrorEncountered = errInCall
448-
// Log the error but continue, as other instances might have been processed or other calls might succeed.
449-
log.Printf("[Elastic CI Mode] Error during checkAndTerminate call: %v", errInCall)
450498
}
451499
}
452500

453-
log.Printf("[Elastic CI Mode] Dangling instance check complete for ASG %s. Considered: %d, Actually Sent for SSM Check: %d, Terminated this run: %d",
454-
a.Name, len(instancesToConsiderChecking), checkedCount, totalActuallyTerminated)
501+
// Only log summary when we actually marked instances unhealthy
502+
if totalMarkedUnhealthy > 0 {
503+
log.Printf("[Elastic CI Mode] Dangling instance check: marked %d instance(s) as unhealthy", totalMarkedUnhealthy)
504+
}
455505

456506
return firstErrorEncountered
457507
}

0 commit comments

Comments
 (0)