Skip to content

Commit 7954c07

Browse files
authored
Merge pull request #361 from diggerhq/error-state-cleanup
serial migrations, in flight guard, prevent zombie ticks
2 parents 76da539 + 213fc5f commit 7954c07

6 files changed

Lines changed: 395 additions & 162 deletions

File tree

cmd/server/main.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,10 @@ func main() {
183183
log.Printf("opensandbox: sandbox domain configured (%s)", cfg.SandboxDomain)
184184
}
185185

186-
// Initialize Redis worker registry in server mode
186+
// Initialize Redis worker registry in server mode. The scaler reference
187+
// is hoisted to outer scope so the API server can be wired with it later
188+
// for the migrate endpoint (see server.SetMigrator below).
189+
var scalerOrchestrator *controlplane.Scaler
187190
var redisRegistry *controlplane.RedisWorkerRegistry
188191
if cfg.Mode == "server" && cfg.RedisURL != "" {
189192
var err error
@@ -500,6 +503,11 @@ func main() {
500503
}
501504

502505
scalerState := controlplane.NewRedisScalerState(redisRegistry.RedisClient())
506+
// Note: this scaler is also exposed via scalerOrchestrator so the
507+
// API server can wire it (server.SetMigrator) below. POST
508+
// /api/sandboxes/:id/migrate then flows through the same
509+
// per-target-serialized, in-flight-tracked, abort-cleaned-up
510+
// code path as the scaler's drain.
503511
scaler := controlplane.NewScaler(controlplane.ScalerConfig{
504512
Pool: pool,
505513
Registry: redisRegistry,
@@ -520,6 +528,7 @@ func main() {
520528
CellID: cfg.CellID,
521529
})
522530
defer scaler.Stop()
531+
scalerOrchestrator = scaler
523532

524533
// Leader election: only the leader runs the scaler. The
525534
// per-sandbox autoscaler (created later) consults this same
@@ -626,6 +635,14 @@ func main() {
626635
// Token never leaves this process; the UI proxies its queries through
627636
// /api/sandboxes/:id/logs. Empty token disables the endpoint (503).
628637
server.SetAxiomQueryConfig(cfg.AxiomQueryToken, cfg.AxiomDataset)
638+
639+
// Wire the scaler as the API's migration orchestrator. nil-safe — when
640+
// the scaler isn't running (combined mode, no pool) the migrate endpoint
641+
// returns 503 instead of routing through the old duplicated inline path.
642+
if scalerOrchestrator != nil {
643+
server.SetMigrator(scalerOrchestrator)
644+
log.Printf("opensandbox: migrate API wired to scaler orchestrator (shared per-target serialization)")
645+
}
629646
if cfg.AxiomQueryToken != "" {
630647
log.Printf("opensandbox: sandbox session logs read API enabled (dataset=%s)", cfg.AxiomDataset)
631648
}

internal/api/router.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,26 @@ type Server struct {
7878
// single-PG mode or tests); resolveSecretStoreInto + template lookup fall
7979
// back to s.store in that case.
8080
edge *edgeclient.Client
81+
82+
// migrator delegates live-migration to the scaler's orchestrator so the
83+
// API path (POST /api/sandboxes/:id/migrate) shares the per-target
84+
// serialization + abort-on-failure cleanup the scaler uses for drains.
85+
// nil disables the migrate endpoint (returns 503).
86+
migrator MigrationOrchestrator
87+
}
88+
89+
// MigrationOrchestrator is the slice of the scaler the API needs to delegate
90+
// migration requests to. Defined as an interface here (rather than importing
91+
// the concrete *controlplane.Scaler) so the API package stays test-friendly
92+
// and the dependency direction is API → interface ← controlplane.
93+
type MigrationOrchestrator interface {
94+
LiveMigrateSandbox(ctx context.Context, sandboxID, sourceWorkerID, targetWorkerID string) error
95+
}
96+
97+
// SetMigrator wires the scaler-backed migration orchestrator. Called from
98+
// cmd/server/main.go after the scaler is constructed.
99+
func (s *Server) SetMigrator(m MigrationOrchestrator) {
100+
s.migrator = m
81101
}
82102

83103
// SetEdgeClient wires the api-edge HTTP client. Caller is responsible for

internal/api/sandbox.go

Lines changed: 19 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,6 +1059,14 @@ func (s *Server) setTimeout(c echo.Context) error {
10591059

10601060
// migrateSandbox performs live migration of a sandbox to a different worker.
10611061
// POST /api/sandboxes/:id/migrate {"targetWorker": "w-azure-osb-worker-xxx"}
1062+
//
1063+
// Delegates to the scaler's LiveMigrateSandbox so the user-driven API path
1064+
// shares the same per-target serialization, in-flight tracking, abort-on-
1065+
// failure cleanup, and source/target consistency guarantees as the scaler's
1066+
// drain path. Previously this handler had its own inline copy of the
1067+
// migration sequence that bypassed all those protections, meaning a user
1068+
// firing parallel /migrate calls at a single target could OOM-kill the
1069+
// target worker the same way the parallel-drain cascade did.
10621070
func (s *Server) migrateSandbox(c echo.Context) error {
10631071
id := c.Param("id")
10641072
ctx := c.Request().Context()
@@ -1073,6 +1081,9 @@ func (s *Server) migrateSandbox(c echo.Context) error {
10731081
if s.workerRegistry == nil || s.store == nil {
10741082
return c.JSON(http.StatusServiceUnavailable, map[string]string{"error": "migration requires server mode with worker registry"})
10751083
}
1084+
if s.migrator == nil {
1085+
return c.JSON(http.StatusServiceUnavailable, map[string]string{"error": "migration orchestrator not configured (scaler missing)"})
1086+
}
10761087

10771088
// Look up sandbox to find source worker
10781089
session, err := s.store.GetSandboxSession(ctx, id)
@@ -1083,137 +1094,16 @@ func (s *Server) migrateSandbox(c echo.Context) error {
10831094
return c.JSON(http.StatusBadRequest, map[string]string{"error": "sandbox must be running to migrate"})
10841095
}
10851096

1086-
// Mark as migrating — blocks exec/proxy routing until migration completes
1087-
migrationDone := false
1088-
if s.store != nil {
1089-
if err := s.store.SetMigrating(ctx, id, req.TargetWorker); err != nil {
1090-
log.Printf("migrate %s: failed to set migrating state: %v", id, err)
1091-
}
1092-
// Guarantee we revert on failure
1093-
defer func() {
1094-
if !migrationDone && s.store != nil {
1095-
s.store.FailMigration(ctx, id)
1096-
}
1097-
}()
1098-
}
1099-
1100-
// Get source and target worker gRPC clients
1101-
sourceClient, err := s.workerRegistry.GetWorkerClient(session.WorkerID)
1102-
if err != nil {
1103-
return c.JSON(http.StatusServiceUnavailable, map[string]string{"error": "source worker unreachable: " + err.Error()})
1104-
}
1105-
targetClient, err := s.workerRegistry.GetWorkerClient(req.TargetWorker)
1106-
if err != nil {
1107-
return c.JSON(http.StatusServiceUnavailable, map[string]string{"error": "target worker unreachable: " + err.Error()})
1108-
}
1109-
11101097
t0 := time.Now()
1111-
1112-
// Step 1: Pre-copy drives to S3 (thin overlay, never flatten).
1113-
preCopyCtx, preCopyCancel := context.WithTimeout(ctx, 10*time.Minute)
1114-
defer preCopyCancel()
1115-
preCopyResp, err := sourceClient.PreCopyDrives(preCopyCtx, &pb.PreCopyDrivesRequest{
1116-
SandboxId: id,
1117-
})
1118-
if err != nil {
1119-
return c.JSON(http.StatusInternalServerError, map[string]string{"error": "pre-copy drives: " + err.Error()})
1120-
}
1121-
1122-
if preCopyResp.GoldenVersion == "" {
1123-
return c.JSON(http.StatusBadRequest, map[string]string{"error": "source sandbox has no goldenVersion — cannot live migrate safely; use hibernate/wake instead"})
1124-
}
1125-
1126-
log.Printf("migrate %s: drives pre-copied to S3 (%dms, golden=%s)", id, time.Since(t0).Milliseconds(), preCopyResp.GoldenVersion)
1127-
1128-
// Step 2: Prepare target (downloads thin overlay, rebases if needed, starts QEMU -incoming).
1129-
// CPU and memory come from the source worker — must match exactly for QEMU migration.
1130-
cpuCount := preCopyResp.BaseCpuCount
1131-
memoryMB := preCopyResp.BaseMemoryMb
1132-
if cpuCount == 0 {
1133-
cpuCount = 2
1134-
}
1135-
if memoryMB == 0 {
1136-
memoryMB = 1024
1137-
}
1138-
1139-
prepCtx, prepCancel := context.WithTimeout(ctx, 10*time.Minute)
1140-
defer prepCancel()
1141-
prepResp, err := targetClient.PrepareMigrationIncoming(prepCtx, &pb.PrepareMigrationIncomingRequest{
1142-
SandboxId: id,
1143-
CpuCount: cpuCount,
1144-
MemoryMb: memoryMB,
1145-
GuestPort: 80,
1146-
Template: session.Template,
1147-
RootfsS3Key: preCopyResp.RootfsKey,
1148-
WorkspaceS3Key: preCopyResp.WorkspaceKey,
1149-
OverlayMode: true,
1150-
SourceGoldenVersion: preCopyResp.GoldenVersion,
1151-
// Carry the secrets-proxy session from source → target. Without
1152-
// this the destination has no substitution map and outbound HTTPS
1153-
// from the migrated VM would leak `osb_sealed_xxx` env vars
1154-
// verbatim to upstream services. Empty when no secret store.
1155-
SealedTokens: preCopyResp.SealedTokens,
1156-
EgressAllowlist: preCopyResp.EgressAllowlist,
1157-
TokenHosts: preCopyResp.TokenHosts,
1158-
SealedNames: preCopyResp.SealedNames,
1159-
})
1160-
if err != nil {
1161-
return c.JSON(http.StatusInternalServerError, map[string]string{"error": "prepare target: " + err.Error()})
1162-
}
1163-
1164-
log.Printf("migrate %s: target prepared at %s (host port %d, secrets=%d)", id, prepResp.IncomingAddr, prepResp.HostPort, len(preCopyResp.SealedTokens))
1165-
1166-
// Step 3: Live migrate from source to target
1167-
migrateCtx, migrateCancel := context.WithTimeout(ctx, 5*time.Minute)
1168-
defer migrateCancel()
1169-
_, err = sourceClient.LiveMigrate(migrateCtx, &pb.LiveMigrateRequest{
1170-
SandboxId: id,
1171-
IncomingAddr: prepResp.IncomingAddr,
1172-
})
1173-
if err != nil {
1174-
cleanCtx, cleanCancel := context.WithTimeout(context.Background(), 10*time.Second)
1175-
targetClient.DestroySandbox(cleanCtx, &pb.DestroySandboxRequest{SandboxId: id})
1176-
cleanCancel()
1177-
log.Printf("migrate %s: live migrate failed, cleaned up target on %s: %v", id, req.TargetWorker, err)
1178-
return c.JSON(http.StatusInternalServerError, map[string]string{"error": "live migrate: " + err.Error()})
1179-
}
1180-
1181-
log.Printf("migrate %s: QMP migration complete (%dms)", id, time.Since(t0).Milliseconds())
1182-
1183-
// Step 4: Complete migration on target (reconnect agent, patch network)
1184-
completeCtx, completeCancel := context.WithTimeout(ctx, 30*time.Second)
1185-
defer completeCancel()
1186-
_, err = targetClient.CompleteMigrationIncoming(completeCtx, &pb.CompleteMigrationIncomingRequest{
1187-
SandboxId: id,
1188-
})
1189-
if err != nil {
1190-
cleanCtx, cleanCancel := context.WithTimeout(context.Background(), 10*time.Second)
1191-
targetClient.DestroySandbox(cleanCtx, &pb.DestroySandboxRequest{SandboxId: id})
1192-
cleanCancel()
1193-
log.Printf("migrate %s: complete failed, cleaned up target on %s: %v", id, req.TargetWorker, err)
1194-
return c.JSON(http.StatusInternalServerError, map[string]string{"error": "complete migration: " + err.Error()})
1195-
}
1196-
1197-
// Step 5: Complete migration — update DB status and worker_id atomically.
1198-
// Use background context — the request context may be close to expiry for large migrations.
1199-
if s.store != nil {
1200-
completeDBCtx, completeDBCancel := context.WithTimeout(context.Background(), 10*time.Second)
1201-
if err := s.store.CompleteMigration(completeDBCtx, id, req.TargetWorker); err != nil {
1202-
log.Printf("migrate %s: WARNING: CompleteMigration DB update failed: %v", id, err)
1203-
}
1204-
completeDBCancel()
1205-
// Mirror to D1 sandboxes_index so the dashboard's cross-cell view and
1206-
// the proxyToCellSDK routing reflect the new worker immediately. The
1207-
// new worker's `created` event would eventually sync it, but emitting
1208-
// here closes the window where stale routing would re-target the
1209-
// vanished source worker. Reads org from the existing session.
1210-
if session, err := s.store.GetSandboxSession(context.Background(), id); err == nil {
1211-
s.publishSandboxLifecycleEvent(context.Background(), "migrated", id, session.OrgID, req.TargetWorker, "user_migrate")
1212-
}
1098+
if err := s.migrator.LiveMigrateSandbox(ctx, id, session.WorkerID, req.TargetWorker); err != nil {
1099+
// LiveMigrateSandbox sets cell-PG state on failure (FailMigration /
1100+
// FailMigrationPostQMP) and emits the relevant stopped/migrated D1
1101+
// event itself. Don't double-flip here — just surface the error.
1102+
log.Printf("migrate %s: LiveMigrateSandbox failed: %v", id, err)
1103+
return c.JSON(http.StatusInternalServerError, map[string]string{"error": err.Error()})
12131104
}
1214-
migrationDone = true
12151105

1216-
// Invalidate proxy route cache so next request routes to the new worker
1106+
// Invalidate proxy route cache so next request routes to the new worker.
12171107
if s.sandboxAPIProxy != nil {
12181108
s.sandboxAPIProxy.InvalidateRouteCache(id)
12191109
}
@@ -1230,6 +1120,7 @@ func (s *Server) migrateSandbox(c echo.Context) error {
12301120
})
12311121
}
12321122

1123+
12331124
// effectivePlan returns the org's billing plan for billing gates, resolving it
12341125
// from the most authoritative source available. Plan is a GLOBAL signal: it
12351126
// changes via the edge (Stripe → DO mark-pro → D1), and no single cell has the

0 commit comments

Comments
 (0)