Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,17 +311,31 @@ func main() {
}

if pool != nil {
// Pick the per-provider ranked size list. Empty → scaler defers to
// the pool's single configured default (cfg.AzureVMSize / cfg.EC2InstanceType).
var machineSizes []string
switch {
case len(cfg.AzureVMSizes) > 0 && cfg.AzureSubscriptionID != "":
machineSizes = cfg.AzureVMSizes
case len(cfg.EC2InstanceTypes) > 0 && (cfg.EC2AMI != "" || cfg.EC2SSMParameterName != ""):
machineSizes = cfg.EC2InstanceTypes
}
if len(machineSizes) > 0 {
log.Printf("opensandbox: scaler size fallback ranked: %v", machineSizes)
}

scalerState := controlplane.NewRedisScalerState(redisRegistry.RedisClient())
scaler := controlplane.NewScaler(controlplane.ScalerConfig{
Pool: pool,
Registry: redisRegistry,
Store: opts.Store,
StateStore: scalerState,
WorkerImage: cfg.EC2WorkerImage,
Cooldown: time.Duration(cfg.ScaleCooldownSec) * time.Second,
MinWorkers: cfg.MinWorkersPerRegion,
MaxWorkers: cfg.MaxWorkersPerRegion,
IdleReserve: cfg.IdleReserveWorkers,
Pool: pool,
Registry: redisRegistry,
Store: opts.Store,
StateStore: scalerState,
WorkerImage: cfg.EC2WorkerImage,
Cooldown: time.Duration(cfg.ScaleCooldownSec) * time.Second,
MinWorkers: cfg.MinWorkersPerRegion,
MaxWorkers: cfg.MaxWorkersPerRegion,
IdleReserve: cfg.IdleReserveWorkers,
MachineSizes: machineSizes,
})
defer scaler.Stop()

Expand Down
50 changes: 48 additions & 2 deletions internal/compute/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/rand"
"encoding/base64"
"errors"
"fmt"
"log"
"strings"
Expand All @@ -18,6 +19,51 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/azsecrets"
)

// azureQuotaCodes are the error-message fragments the Azure ARM API uses for
// quota and SKU-availability rejections. Detected via substring match against
// the wrapped error string because the SDK surfaces these through several
// layers (BeginCreateOrUpdate immediate failure, PollUntilDone async failure,
// nested ResponseError) and pinning to a single concrete type misses cases.
// All of these are recoverable by retrying with a different VM size, so the
// autoscaler treats them as the ErrQuotaExceeded class.
var azureQuotaCodes = []string{
"QuotaExceeded",
"OperationNotAllowed",
"SkuNotAvailable",
"AllocationFailed",
"ZonalAllocationFailed",
"OverconstrainedAllocationRequest",
"exceeding approved quota",
"exceeding approved Total Regional Cores quota",
"exceeding approved Standard",
}

// isAzureQuotaErr reports whether err matches one of the documented quota /
// capacity rejection codes from Azure ARM.
func isAzureQuotaErr(err error) bool {
if err == nil {
return false
}
msg := err.Error()
for _, code := range azureQuotaCodes {
if strings.Contains(msg, code) {
return true
}
}
return false
}

// wrapAzureCreateErr tags createMachine errors with ErrQuotaExceeded when the
// underlying ARM failure was a quota/capacity rejection, so the scaler can
// fall through to the next VM size in its ranked list.
func wrapAzureCreateErr(err error, format string, args ...any) error {
wrapped := fmt.Errorf(format, args...)
if isAzureQuotaErr(err) {
return errors.Join(ErrQuotaExceeded, wrapped)
}
return wrapped
}

const (
azureTagRole = "opensandbox-role"
azureTagDraining = "opensandbox-draining"
Expand Down Expand Up @@ -213,12 +259,12 @@ func (p *AzurePool) CreateMachine(ctx context.Context, opts MachineOpts) (*Machi
if err != nil {
log.Printf("azure: VM %s BeginCreateOrUpdate error detail: %+v", vmName, err)
go p.cleanupNIC(nicName, "create failed")
return nil, fmt.Errorf("azure: create VM %s failed: %w", vmName, err)
return nil, wrapAzureCreateErr(err, "azure: create VM %s failed: %w", vmName, err)
}
vmResp, err := vmPoller.PollUntilDone(ctx, nil)
if err != nil {
go p.cleanupNIC(nicName, "poll failed")
return nil, fmt.Errorf("azure: VM %s poll failed: %w", vmName, err)
return nil, wrapAzureCreateErr(err, "azure: VM %s poll failed: %w", vmName, err)
}
log.Printf("azure: VM %s created successfully", vmName)

Expand Down
36 changes: 35 additions & 1 deletion internal/compute/ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package compute
import (
"context"
"encoding/base64"
"errors"
"fmt"
"log"
"strings"
Expand All @@ -16,6 +17,39 @@ import (
"github.com/aws/aws-sdk-go-v2/service/ssm"
)

// ec2QuotaCodes are the AWS EC2 error codes that indicate a vCPU quota,
// per-account instance limit, or AZ-level capacity exhaustion. All of these
// are recoverable by retrying with a different instance type, so the
// autoscaler treats them as the ErrQuotaExceeded class.
var ec2QuotaCodes = []string{
"VcpuLimitExceeded",
"InstanceLimitExceeded",
"InsufficientInstanceCapacity",
"MaxSpotInstanceCountExceeded",
"Unsupported", // returned when a region/AZ doesn't offer the requested type
}

func isEC2QuotaErr(err error) bool {
if err == nil {
return false
}
msg := err.Error()
for _, code := range ec2QuotaCodes {
if strings.Contains(msg, code) {
return true
}
}
return false
}

func wrapEC2CreateErr(err error, format string, args ...any) error {
wrapped := fmt.Errorf(format, args...)
if isEC2QuotaErr(err) {
return errors.Join(ErrQuotaExceeded, wrapped)
}
return wrapped
}

const (
tagRole = "opensandbox:role"
tagInstanceType = "opensandbox:instance-type"
Expand Down Expand Up @@ -129,7 +163,7 @@ func (p *EC2Pool) CreateMachine(ctx context.Context, opts MachineOpts) (*Machine

result, err := p.client.RunInstances(ctx, input)
if err != nil {
return nil, fmt.Errorf("ec2: RunInstances failed: %w", err)
return nil, wrapEC2CreateErr(err, "ec2: RunInstances failed: %w", err)
}

if len(result.Instances) == 0 {
Expand Down
13 changes: 12 additions & 1 deletion internal/compute/pool.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
package compute

import "context"
import (
"context"
"errors"
)

// ErrQuotaExceeded indicates a provider quota or capacity constraint that may
// be resolved by trying a different machine size. Pool implementations wrap
// (via errors.Join or fmt.Errorf %w) when they detect provider-specific quota
// or "no capacity in this SKU/zone" failures. The autoscaler uses errors.Is to
// decide whether to fall through to the next size in a ranked list versus
// abort the launch.
var ErrQuotaExceeded = errors.New("quota or capacity exceeded")

// Machine represents a worker machine in the compute pool.
type Machine struct {
Expand Down
96 changes: 96 additions & 0 deletions internal/compute/quota_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package compute

import (
"errors"
"fmt"
"testing"
)

func TestIsAzureQuotaErr(t *testing.T) {
tests := []struct {
name string
err error
isQuota bool
}{
{"nil", nil, false},
{"unrelated", errors.New("network timeout"), false},
{"QuotaExceeded code", errors.New("ERROR CODE: QuotaExceeded\noperation failed"), true},
{"OperationNotAllowed", errors.New("OperationNotAllowed: regional cores"), true},
{"SkuNotAvailable", errors.New("SkuNotAvailable: Standard_D16ads_v7 in eastus2"), true},
{"AllocationFailed", errors.New("AllocationFailed: capacity unavailable"), true},
{"ZonalAllocationFailed", errors.New("ZonalAllocationFailed in zone 1"), true},
{"OverconstrainedAllocationRequest", errors.New("OverconstrainedAllocationRequest"), true},
{"approved quota text", errors.New("operation results in exceeding approved quota"), true},
{"regional cores phrasing", errors.New("Operation could not be completed as it results in exceeding approved Total Regional Cores quota"), true},
{"wrapped quota", fmt.Errorf("azure: create VM failed: %w", errors.New("QuotaExceeded")), true},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got := isAzureQuotaErr(tc.err)
if got != tc.isQuota {
t.Fatalf("isAzureQuotaErr(%v) = %v, want %v", tc.err, got, tc.isQuota)
}
})
}
}

func TestIsEC2QuotaErr(t *testing.T) {
tests := []struct {
name string
err error
isQuota bool
}{
{"nil", nil, false},
{"unrelated", errors.New("connection reset"), false},
{"VcpuLimitExceeded", errors.New("VcpuLimitExceeded: max 32 in region"), true},
{"InstanceLimitExceeded", errors.New("InstanceLimitExceeded"), true},
{"InsufficientInstanceCapacity", errors.New("InsufficientInstanceCapacity in us-east-1a"), true},
{"MaxSpotInstanceCountExceeded", errors.New("MaxSpotInstanceCountExceeded"), true},
{"Unsupported in AZ", errors.New("Unsupported: c7gd.metal not offered in this AZ"), true},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got := isEC2QuotaErr(tc.err)
if got != tc.isQuota {
t.Fatalf("isEC2QuotaErr(%v) = %v, want %v", tc.err, got, tc.isQuota)
}
})
}
}

func TestWrapAzureCreateErrTagsQuota(t *testing.T) {
quotaSrc := errors.New("AllocationFailed: no capacity")
wrapped := wrapAzureCreateErr(quotaSrc, "azure: create VM foo failed: %w", quotaSrc)
if !errors.Is(wrapped, ErrQuotaExceeded) {
t.Fatalf("expected wrapped quota error to match ErrQuotaExceeded, got %v", wrapped)
}
// The original error chain must remain reachable so callers can still log
// the underlying provider message.
if !errors.Is(wrapped, quotaSrc) {
t.Fatalf("expected wrapped error to preserve original chain, got %v", wrapped)
}
}

func TestWrapAzureCreateErrPassesThroughNonQuota(t *testing.T) {
src := errors.New("network unreachable")
wrapped := wrapAzureCreateErr(src, "azure: create VM foo failed: %w", src)
if errors.Is(wrapped, ErrQuotaExceeded) {
t.Fatalf("non-quota error should not be tagged as ErrQuotaExceeded: %v", wrapped)
}
if !errors.Is(wrapped, src) {
t.Fatalf("expected wrapped error to preserve original chain, got %v", wrapped)
}
}

func TestWrapEC2CreateErrTagsQuota(t *testing.T) {
src := errors.New("VcpuLimitExceeded: max 32 in us-east-1")
wrapped := wrapEC2CreateErr(src, "ec2: RunInstances failed: %w", src)
if !errors.Is(wrapped, ErrQuotaExceeded) {
t.Fatalf("expected wrapped quota error to match ErrQuotaExceeded, got %v", wrapped)
}
if !errors.Is(wrapped, src) {
t.Fatalf("expected wrapped error to preserve original chain, got %v", wrapped)
}
}
29 changes: 27 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ type Config struct {

// AWS EC2 compute pool (server mode only — for auto-scaling worker machines)
EC2AMI string // Custom AMI for worker instances
EC2InstanceType string // e.g. "c7gd.metal", "r6gd.metal", "r7gd.metal"
EC2InstanceType string // single fallback type; used only when EC2InstanceTypes is empty
EC2InstanceTypes []string // ranked list of instance types tried in order on quota/capacity errors
EC2SubnetID string // VPC subnet for worker instances
EC2SecurityGroupID string // Security group (allow 8080, 9090, 9091)
EC2KeyName string // SSH key pair name (for debugging)
Expand All @@ -84,7 +85,8 @@ type Config struct {
// Azure compute pool (server mode — for auto-scaling worker VMs)
AzureSubscriptionID string // Azure subscription ID
AzureResourceGroup string // Resource group for worker VMs
AzureVMSize string // e.g. "Standard_D16s_v5"
AzureVMSize string // single fallback size; used only when AzureVMSizes is empty
AzureVMSizes []string // ranked list of VM sizes tried in order on quota/capacity errors
AzureImageID string // Custom image ID or URN
AzureSubnetID string // Full resource ID of the VNet subnet
AzureSSHPublicKey string // SSH public key for worker VMs
Expand Down Expand Up @@ -204,6 +206,7 @@ func Load() (*Config, error) {

EC2AMI: os.Getenv("OPENSANDBOX_EC2_AMI"),
EC2InstanceType: envOrDefault("OPENSANDBOX_EC2_INSTANCE_TYPE", "c7gd.metal"),
EC2InstanceTypes: splitCSV(os.Getenv("OPENSANDBOX_EC2_INSTANCE_TYPES")),
EC2SubnetID: os.Getenv("OPENSANDBOX_EC2_SUBNET_ID"),
EC2SecurityGroupID: os.Getenv("OPENSANDBOX_EC2_SECURITY_GROUP_ID"),
EC2KeyName: os.Getenv("OPENSANDBOX_EC2_KEY_NAME"),
Expand All @@ -214,6 +217,7 @@ func Load() (*Config, error) {
AzureSubscriptionID: os.Getenv("OPENSANDBOX_AZURE_SUBSCRIPTION_ID"),
AzureResourceGroup: os.Getenv("OPENSANDBOX_AZURE_RESOURCE_GROUP"),
AzureVMSize: envOrDefault("OPENSANDBOX_AZURE_VM_SIZE", "Standard_D16s_v5"),
AzureVMSizes: splitCSV(os.Getenv("OPENSANDBOX_AZURE_VM_SIZES")),
AzureImageID: os.Getenv("OPENSANDBOX_AZURE_IMAGE_ID"),
AzureSubnetID: os.Getenv("OPENSANDBOX_AZURE_SUBNET_ID"),
AzureSSHPublicKey: os.Getenv("OPENSANDBOX_AZURE_SSH_PUBLIC_KEY"),
Expand Down Expand Up @@ -277,6 +281,27 @@ func envOrDefault(key, fallback string) string {
return fallback
}

// splitCSV parses a comma-separated value into a non-empty trimmed slice.
// Empty input or all-whitespace entries return nil so callers can use len() == 0
// to detect "not configured." Leaves the order intact since rank matters
// for the autoscaler's machine-size fallback list.
func splitCSV(s string) []string {
if s == "" {
return nil
}
parts := strings.Split(s, ",")
out := make([]string, 0, len(parts))
for _, p := range parts {
if trimmed := strings.TrimSpace(p); trimmed != "" {
out = append(out, trimmed)
}
}
if len(out) == 0 {
return nil
}
return out
}

func envOrDefaultInt(key string, fallback int) int {
if v := os.Getenv(key); v != "" {
if n, err := strconv.Atoi(v); err == nil {
Expand Down
Loading