Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 40 additions & 10 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import (
"encoding/base64"
"fmt"
"log"
"net/http"
"os"
"strings"
"os/signal"
"syscall"

"time"

"github.com/labstack/echo/v4"

"github.com/opensandbox/opensandbox/internal/api"
"github.com/opensandbox/opensandbox/internal/auth"
"github.com/opensandbox/opensandbox/internal/billing"
Expand Down Expand Up @@ -384,21 +387,48 @@ func main() {
log.Println("opensandbox: usage reporter started (interval=5m)")
}

// Start NATS sync consumer if both PG and NATS are configured
if opts.Store != nil && cfg.NATSURL != "" {
consumer, err := db.NewSyncConsumer(opts.Store, cfg.NATSURL)
// Cloudflare event forwarder: drains events:{cell_id} Redis stream to the CF
// events-ingest Worker. Only starts when both endpoint and secret are set
// — missing config means running without CF integration (e.g. early dev).
if cfg.CFEventEndpoint != "" && cfg.CFEventSecret != "" && cfg.RedisURL != "" {
forwarder, err := controlplane.NewEventForwarder(
cfg.RedisURL, cfg.CellID, cfg.CFEventEndpoint, cfg.CFEventSecret,
)
if err != nil {
log.Printf("opensandbox: NATS sync consumer not available: %v (continuing without)", err)
log.Printf("opensandbox: event forwarder not available: %v (continuing without)", err)
} else {
if err := consumer.Start(); err != nil {
log.Printf("opensandbox: failed to start NATS sync consumer: %v", err)
} else {
defer consumer.Stop()
log.Println("opensandbox: NATS sync consumer started")
}
forwarder.SetStore(opts.Store)
forwarder.Start()
defer forwarder.Stop()
log.Printf("opensandbox: event forwarder started (cell=%s → %s)", cfg.CellID, cfg.CFEventEndpoint)
}
}

// Cloudflare admin handlers: /admin/halt-org and /admin/resume-org, called
// by the CreditAccount DO (push) and halt_reconciler (pull). Requires the
// Redis worker registry for halt (gRPC client lookup) and resume (worker
// selection for wake).
var adminHandlers *controlplane.AdminHandlers
if cfg.CFAdminSecret != "" && opts.Store != nil && redisRegistry != nil {
adminHandlers = controlplane.NewAdminHandlers(opts.Store, redisRegistry, redisRegistry, cfg.CFAdminSecret)
adminMux := http.NewServeMux()
adminHandlers.Register(adminMux)
server.Echo().Any("/admin/*", echo.WrapHandler(adminMux))
log.Println("opensandbox: admin handlers registered (/admin/halt-org, /admin/resume-org)")
}

// Halt reconciler: 60s pull of /internal/halt-list from the CF api-edge
// Worker, as a safety net for DO push webhooks lost to network partitions.
if cfg.HaltListURL != "" && cfg.CFAdminSecret != "" && adminHandlers != nil {
reconciler := controlplane.NewHaltReconciler(
opts.Store, redisRegistry, redisRegistry,
cfg.HaltListURL, cfg.CFAdminSecret, cfg.CellID, adminHandlers,
)
reconciler.Start()
defer reconciler.Stop()
log.Printf("opensandbox: halt reconciler started (cell=%s → %s)", cfg.CellID, cfg.HaltListURL)
}

// Graceful shutdown
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
Expand Down
21 changes: 7 additions & 14 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,23 +409,16 @@ func main() {
}
}

// NATS
if cfg.NATSURL != "" {
pub, err := worker.NewEventPublisher(cfg.NATSURL, cfg.Region, cfg.WorkerID, sandboxDBMgr)
// Redis Streams event publisher: XADDs to events:{cell_id}. Replaces NATS.
// Heartbeat already handled by redis_heartbeat above.
if cfg.RedisURL != "" {
pub, err := worker.NewRedisEventPublisher(cfg.RedisURL, cfg.CellID, cfg.WorkerID, sandboxDBMgr, store)
if err != nil {
log.Printf("opensandbox-worker: NATS not available: %v (continuing without event sync)", err)
log.Printf("opensandbox-worker: Redis event publisher not available: %v (continuing without event sync)", err)
} else {
pub.Start()
if qemuMgr != nil {
pub.SetGoldenVersion(qemuMgr.GoldenVersion())
}
pub.StartHeartbeat(func() (int, int, float64, float64, float64) {
count, _ := mgr.Count(context.Background())
cpuPct, memPct, diskPct := worker.SystemStats()
return cfg.MaxCapacity, count, cpuPct, memPct, diskPct
})
defer pub.Stop()
log.Println("opensandbox-worker: NATS event publisher started")
log.Printf("opensandbox-worker: Redis event publisher started (stream=events:%s)", cfg.CellID)
}
}

Expand All @@ -442,7 +435,7 @@ func main() {

// Usage collector for billing (samples cgroup stats every 60s, flushes to DB every 5 min)
if store != nil {
usageCollector := worker.NewUsageCollector(mgr, store, segmentClient)
usageCollector := worker.NewUsageCollector(mgr, store, segmentClient, sandboxDBMgr)
usageCollector.Start()
defer usageCollector.Stop()
}
Expand Down
3 changes: 0 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ require (
github.com/klauspost/compress v1.18.0
github.com/labstack/echo/v4 v4.12.0
github.com/mattn/go-sqlite3 v1.14.24
github.com/nats-io/nats.go v1.48.0
github.com/prometheus/client_golang v1.20.5
github.com/redis/go-redis/v9 v9.18.0
github.com/segmentio/analytics-go/v3 v3.3.0
Expand Down Expand Up @@ -71,8 +70,6 @@ require (
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
Expand Down
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,6 @@ github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBW
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U=
github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
25 changes: 19 additions & 6 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,21 @@ type Config struct {
// Auth
JWTSecret string // Shared secret for sandbox-scoped JWTs

// NATS
NATSURL string // NATS server URL

// Worker identity
Region string // Region identifier (e.g., "iad", "ams")
// Cell/worker identity
Region string // Cloud region for compute pools (AWS/Azure/S3 region, e.g. "us-east-1")
CellID string // Full deployment identifier, e.g. "dev-cell-a", "aws-useast1-cell-a".
// Drives Redis stream key (events:{cell_id}), event envelope, CF routing.
WorkerID string // Unique worker ID (e.g., "w-iad-1")
HTTPAddr string // Public HTTP address for direct SDK access

// Cloudflare event forwarding (events-ingest Worker)
CFEventEndpoint string // e.g. "https://events.opencomputer.workers.dev/ingest"
CFEventSecret string // HMAC secret shared with the events-ingest Worker

// Cloudflare admin callbacks (halt/resume + halt-list reconciler)
CFAdminSecret string // HMAC secret shared with api-edge Worker for /admin/* and /internal/halt-list
HaltListURL string // e.g. "https://api.opencomputer.workers.dev/internal/halt-list"

// WorkOS
WorkOSAPIKey string
WorkOSClientID string
Expand Down Expand Up @@ -141,11 +148,17 @@ func Load() (*Config, error) {
DatabaseURL: envOrDefault("OPENSANDBOX_DATABASE_URL", os.Getenv("DATABASE_URL")),
DataDir: envOrDefault("OPENSANDBOX_DATA_DIR", "/data/sandboxes"),
JWTSecret: os.Getenv("OPENSANDBOX_JWT_SECRET"),
NATSURL: envOrDefault("OPENSANDBOX_NATS_URL", "nats://localhost:4222"),
Region: envOrDefault("OPENSANDBOX_REGION", "local"),
WorkerID: envOrDefault("OPENSANDBOX_WORKER_ID", "w-local-1"),
HTTPAddr: envOrDefault("OPENSANDBOX_HTTP_ADDR", "http://localhost:8080"),

CFEventEndpoint: os.Getenv("OPENSANDBOX_CF_EVENT_ENDPOINT"),
CFEventSecret: os.Getenv("OPENSANDBOX_CF_EVENT_SECRET"),
CFAdminSecret: os.Getenv("OPENSANDBOX_CF_ADMIN_SECRET"),
HaltListURL: os.Getenv("OPENSANDBOX_HALT_LIST_URL"),

CellID: envOrDefault("OPENSANDBOX_CELL_ID", envOrDefault("OPENSANDBOX_REGION", "dev-cell-a")),

WorkOSAPIKey: os.Getenv("WORKOS_API_KEY"),
WorkOSClientID: os.Getenv("WORKOS_CLIENT_ID"),
WorkOSRedirectURI: envOrDefault("WORKOS_REDIRECT_URI", "http://localhost:8080/auth/callback"),
Expand Down
Loading