Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/sandbox-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func main() {
var extProcMaxConcurrency int
var kubeClientQPS float64
var kubeClientBurst int
var basicQPS int

utilfeature.DefaultMutableFeatureGate.AddFlag(pflag.CommandLine)

Expand All @@ -47,6 +48,7 @@ func main() {
pflag.StringVar(&pprofAddr, "pprof-addr", ":6060", "The address the pprof debug maps to.")

// Register server configuration flags
pflag.IntVar(&basicQPS, "basic-qps", models.DefaultBasicQPS, "Basic QPS for all APIs")
pflag.IntVar(&port, "port", 8080, "The port the server listens on")
pflag.StringVar(&e2bAdminKey, "e2b-admin-key", "", "E2B admin API key (if empty, a random UUID will be generated)")
pflag.BoolVar(&e2bEnableAuth, "e2b-enable-auth", true, "Enable E2B authentication")
Expand Down Expand Up @@ -129,8 +131,8 @@ func main() {
klog.Fatalf("Failed to initialize Kubernetes client: %v", err)
}

sandboxController := e2b.NewController(domain, e2bAdminKey, sysNs, e2bMaxTimeout, maxClaimWorkers, maxCreateQPS, uint32(extProcMaxConcurrency),
port, e2bEnableAuth, clientSet)
sandboxController := e2b.NewController(domain, e2bAdminKey, sysNs, basicQPS, e2bMaxTimeout, maxClaimWorkers,
maxCreateQPS, uint32(extProcMaxConcurrency), port, e2bEnableAuth, clientSet)
if err := sandboxController.Init(); err != nil {
klog.Fatalf("Failed to initialize sandbox controller: %v", err)
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/controller/sandboxclaim/core/common_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/openkruise/agents/pkg/sandbox-manager/config"
"github.com/openkruise/agents/pkg/utils"
stateutils "github.com/openkruise/agents/pkg/utils/sandboxutils"
"golang.org/x/time/rate"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -196,12 +195,10 @@ func (c *commonControl) claimSandboxes(ctx context.Context, claim *agentsv1alpha
return 0, fmt.Errorf("failed to build claim options: %w", err)
}

claimLockChannel := make(chan struct{}, batchSize) // set to max batch size, not controlled
limiter := rate.NewLimiter(rate.Inf, batchSize)
// Attempt to claim sandboxes concurrently using DoItSlowly
claimedCount, err := utils.DoItSlowly(batchSize, InitialClaimBatchSize, func() error {
// Pass nil for rand so sandboxcr uses global rand (concurrent-safe).
sbx, metrics, claimErr := sandboxcr.TryClaimSandbox(ctx, opts, &c.pickCache, c.cache, c.sandboxClient, claimLockChannel, limiter)
sbx, metrics, claimErr := sandboxcr.TryClaimSandbox(ctx, opts, &c.pickCache, c.cache, c.sandboxClient)
if claimErr != nil {
log.Error(claimErr, "Failed to claim sandbox")
return claimErr
Expand Down
2 changes: 2 additions & 0 deletions pkg/sandbox-manager/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ func (m *SandboxManager) ClaimSandbox(ctx context.Context, opts infra.ClaimSandb
SandboxCreationResponses.WithLabelValues("failure").Inc()
return nil, errors.NewError(errors.ErrorNotFound, fmt.Sprintf("template %s not found", opts.Template))
}
opts.CreateRateLimiter = m.createRateLimiter
opts.ConcurrencyLimiter = m.claimConcurrencyLimiter
sandbox, metrics, err := m.infra.ClaimSandbox(ctx, opts)
if err != nil {
log.Error(err, "failed to claim sandbox", "metrics", metrics.String())
Expand Down
10 changes: 8 additions & 2 deletions pkg/sandbox-manager/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

"github.com/openkruise/agents/pkg/sandbox-manager/config"
"github.com/openkruise/agents/pkg/utils/limiter"
"golang.org/x/time/rate"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"

Expand All @@ -22,12 +24,13 @@ const (
)

type SandboxManager struct {
Namespace string

client *clients.ClientSet

infra infra.Infrastructure
proxy *proxy.Server

claimConcurrencyLimiter *limiter.ConcurrencyLimiter
createRateLimiter *rate.Limiter
}

// NewSandboxManager creates a new SandboxManager instance.
Expand All @@ -37,6 +40,9 @@ func NewSandboxManager(client *clients.ClientSet, adapter proxy.RequestAdapter,
m := &SandboxManager{
client: client,
proxy: proxy.NewServer(adapter, opts),
// limiters
claimConcurrencyLimiter: limiter.NewConcurrencyLimiter(opts.MaxClaimWorkers),
createRateLimiter: rate.NewLimiter(rate.Limit(opts.MaxCreateQPS), opts.MaxCreateQPS),
}
var err error
m.infra, err = sandboxcr.NewInfra(client, client.K8sClient, m.proxy, opts)
Expand Down
1 change: 1 addition & 0 deletions pkg/sandbox-manager/infra/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Sandbox interface {
GetRuntimeURL() string
GetAccessToken() string
}

type CacheProvider interface {
GetPersistentVolume(name string) (*corev1.PersistentVolume, error)
GetSecret(namespace, name string) (*corev1.Secret, error)
Expand Down
42 changes: 22 additions & 20 deletions pkg/sandbox-manager/infra/sandboxcr/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ func ValidateAndInitClaimOptions(opts infra.ClaimSandboxOptions) (infra.ClaimSan
// the sandbox object should not be used anymore and needs appropriate handling.
//
// ValidateAndInitClaimOptions must be called before this function.
func TryClaimSandbox(ctx context.Context, opts infra.ClaimSandboxOptions, pickCache *sync.Map, cache *Cache, client *clients.ClientSet,
claimLockChannel chan struct{}, createLimiter *rate.Limiter) (claimed infra.Sandbox, metrics infra.ClaimMetrics, err error) {
func TryClaimSandbox(ctx context.Context, opts infra.ClaimSandboxOptions, pickCache *sync.Map, cache *Cache, client *clients.ClientSet) (claimed infra.Sandbox, metrics infra.ClaimMetrics, err error) {
ctx = logs.Extend(ctx, "tryClaimId", uuid.NewString()[:8])
log := klog.FromContext(ctx)

Expand All @@ -83,22 +82,23 @@ func TryClaimSandbox(ctx context.Context, opts infra.ClaimSandboxOptions, pickCa
default:
}

log.Info("waiting for a free claim worker")
startWaiting := time.Now()
freeWorkerOnce := sync.OnceFunc(func() {
<-claimLockChannel // free the worker
})
select {
case <-ctx.Done():
err = fmt.Errorf("context canceled before getting a free claim worker: %v", ctx.Err())
log.Error(ctx.Err(), "failed to get a free claim worker")
return
case claimLockChannel <- struct{}{}:
var freeWorkerOnce func()
if opts.ConcurrencyLimiter != nil {
log.Info("waiting for a free claim worker")
startWaiting := time.Now()
freeWorkerOnce, err = opts.ConcurrencyLimiter.Wait(ctx)
if err != nil {
err = fmt.Errorf("context canceled before getting a free claim worker: %v", ctx.Err())
log.Error(ctx.Err(), "failed to get a free claim worker")
return
}
metrics.Wait = time.Since(startWaiting)
log.Info("got a free claim worker", "cost", metrics.Wait)
}
defer func() {
freeWorkerOnce()
if freeWorkerOnce != nil {
freeWorkerOnce()
}
metrics.LastError = err
log.Info("try claim sandbox result", "metrics", metrics.String())
clearFailedSandbox(ctx, claimed, err, opts.ReserveFailedSandbox)
Expand All @@ -107,7 +107,7 @@ func TryClaimSandbox(ctx context.Context, opts infra.ClaimSandboxOptions, pickCa
var sbx *Sandbox
var lockType infra.LockType
pickStart := time.Now()
sbx, lockType, err = pickAnAvailableSandbox(ctx, opts, pickCache, cache, client.SandboxClient, createLimiter)
sbx, lockType, err = pickAnAvailableSandbox(ctx, opts, pickCache, cache, client.SandboxClient)
if err != nil {
log.Error(err, "failed to select available sandbox")
return
Expand Down Expand Up @@ -149,7 +149,9 @@ func TryClaimSandbox(ctx context.Context, opts infra.ClaimSandboxOptions, pickCa
log = log.WithValues("sandbox", klog.KObj(sbx.Sandbox))
log.Info("sandbox locked", "cost", metrics.PickAndLock, "type", metrics.LockType)
claimed = sbx
freeWorkerOnce() // free worker early
if freeWorkerOnce != nil {
freeWorkerOnce() // free worker early
}

// Step 3: Built-in post processes. The locked sandbox must be always returned to be cleared properly.
if lockType == infra.LockTypeCreate || lockType == infra.LockTypeSpeculate || opts.InplaceUpdate != nil {
Expand Down Expand Up @@ -223,7 +225,7 @@ func getPickKey(sbx *v1alpha1.Sandbox) string {
}

func pickAnAvailableSandbox(ctx context.Context, opts infra.ClaimSandboxOptions,
pickCache *sync.Map, cache *Cache, client clients.SandboxClient, limiter *rate.Limiter) (*Sandbox, infra.LockType, error) {
pickCache *sync.Map, cache *Cache, client clients.SandboxClient) (*Sandbox, infra.LockType, error) {
template, cnt := opts.Template, opts.CandidateCounts
ctx = logs.Extend(ctx, "action", "pickAnAvailableSandbox")
log := klog.FromContext(ctx).WithValues("template", template).V(consts.DebugLogLevel)
Expand All @@ -234,7 +236,7 @@ func pickAnAvailableSandbox(ctx context.Context, opts infra.ClaimSandboxOptions,
if len(objects) == 0 {
if opts.CreateOnNoStock {
log.Info("will create a new sandbox", "reason", "NoStock")
return newSandboxFromTemplate(opts, cache, client, limiter)
return newSandboxFromTemplate(opts, cache, client, opts.CreateRateLimiter)
}
return nil, "", NoAvailableError(template, "no stock")
}
Expand Down Expand Up @@ -300,7 +302,7 @@ func pickAnAvailableSandbox(ctx context.Context, opts infra.ClaimSandboxOptions,
// Step 3: create new sandbox
if opts.CreateOnNoStock {
log.Info("will create a new sandbox")
return newSandboxFromTemplate(opts, cache, client, limiter)
return newSandboxFromTemplate(opts, cache, client, opts.CreateRateLimiter)
}
return nil, "", NoAvailableError(template, pickErr.Error())
}
Expand Down Expand Up @@ -349,7 +351,7 @@ func pickFromCandidates(ctx context.Context, candidates []*v1alpha1.Sandbox, pic
var FilteredAnnotationsOnCreation []string

func newSandboxFromTemplate(opts infra.ClaimSandboxOptions, cache *Cache, client clients.SandboxClient, limiter *rate.Limiter) (*Sandbox, infra.LockType, error) {
if !limiter.Allow() {
if limiter != nil && !limiter.Allow() {
return nil, "", NoAvailableError(opts.Template, "sandbox creation is not allowed by rate limiter")
}
sbs, err := cache.GetSandboxSet(opts.Template)
Expand Down
23 changes: 15 additions & 8 deletions pkg/sandbox-manager/infra/sandboxcr/claim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/google/uuid"
"github.com/openkruise/agents/pkg/sandbox-manager/clients"
"github.com/openkruise/agents/pkg/sandbox-manager/config"
"github.com/openkruise/agents/pkg/utils/limiter"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -307,17 +308,17 @@ func TestInfra_ClaimSandbox(t *testing.T) {
infraOptions: config.SandboxManagerOptions{
MaxClaimWorkers: 1,
},
preProcess: func(t *testing.T, infra *Infra) {
infra.claimLockChannel <- struct{}{}
},
options: infra.ClaimSandboxOptions{
User: user,
Template: existTemplate,
User: user,
Template: existTemplate,
ConcurrencyLimiter: limiter.NewConcurrencyLimiter(0),
},
expectError: "context canceled before getting a free claim worker: context deadline exceeded",
},
}

globalConcurrencyLimiter := limiter.NewConcurrencyLimiter(2)
globalCreateRateLimiter := rate.NewLimiter(rate.Inf, 100000)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.options.ClaimTimeout = 50 * time.Millisecond
Expand Down Expand Up @@ -379,6 +380,12 @@ func TestInfra_ClaimSandbox(t *testing.T) {
if tt.claimCtx != nil {
claimCtx = tt.claimCtx(t.Context())
}
if tt.options.ConcurrencyLimiter == nil {
tt.options.ConcurrencyLimiter = globalConcurrencyLimiter
}
if tt.options.CreateRateLimiter == nil {
tt.options.CreateRateLimiter = globalCreateRateLimiter
}
sbx, metrics, err := testInfra.ClaimSandbox(claimCtx, tt.options)
if tt.expectError != "" {
require.Error(t, err)
Expand Down Expand Up @@ -590,7 +597,7 @@ func TestClaimSandboxFailed(t *testing.T) {
}
opts, err := ValidateAndInitClaimOptions(tt.options)
require.NoError(t, err)
_, _, err = TryClaimSandbox(ctx, opts, &testInfra.pickCache, testInfra.Cache, client, testInfra.claimLockChannel, testInfra.createLimiter)
_, _, err = TryClaimSandbox(ctx, opts, &testInfra.pickCache, testInfra.Cache, client)
require.Error(t, err)
assert.Contains(t, err.Error(), tt.expectError)
_, err = client.ApiV1alpha1().Sandboxes(sbx.Namespace).Get(t.Context(), name, metav1.GetOptions{})
Expand Down Expand Up @@ -766,7 +773,7 @@ func TestNewSandboxFromTemplate_RateLimitExceeded(t *testing.T) {
utils.InitLogOutput()

// Create a rate limiter with 0 burst to ensure it's always exhausted
limiter := rate.NewLimiter(rate.Limit(1), 0)
l := rate.NewLimiter(rate.Limit(1), 0)

// Create test infrastructure
infraInstance, client := NewTestInfra(t)
Expand Down Expand Up @@ -811,7 +818,7 @@ func TestNewSandboxFromTemplate_RateLimitExceeded(t *testing.T) {
}

// Call the function
sbx, _, err := newSandboxFromTemplate(opts, infraInstance.Cache, infraInstance.Client.SandboxClient, limiter)
sbx, _, err := newSandboxFromTemplate(opts, infraInstance.Cache, infraInstance.Client.SandboxClient, l)

// Assertions
assert.Nil(t, sbx, "sandbox should be nil when rate limited")
Expand Down
9 changes: 2 additions & 7 deletions pkg/sandbox-manager/infra/sandboxcr/infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/openkruise/agents/pkg/sandbox-manager/clients"
"github.com/openkruise/agents/pkg/sandbox-manager/config"
"github.com/openkruise/agents/pkg/sandbox-manager/consts"
"golang.org/x/time/rate"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
k8sinformers "k8s.io/client-go/informers"
Expand All @@ -34,9 +33,7 @@ type Infra struct {
Proxy *proxy.Server

// For claiming sandbox
pickCache sync.Map
claimLockChannel chan struct{}
createLimiter *rate.Limiter
pickCache sync.Map

// Currently, templates stores the mapping of sandboxset name -> number of namespaces. For example,
// if a sandboxset with the same name is created in two different namespaces, the corresponding value would be 2.
Expand Down Expand Up @@ -71,8 +68,6 @@ func NewInfra(client *clients.ClientSet, k8sClient kubernetes.Interface, proxy *
Client: client,
Proxy: proxy,
reconcileRouteStopCh: make(chan struct{}),
claimLockChannel: make(chan struct{}, opts.MaxClaimWorkers),
createLimiter: rate.NewLimiter(rate.Limit(opts.MaxCreateQPS), opts.MaxCreateQPS),
}

cache.AddSandboxEventHandler(k8scache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -128,7 +123,7 @@ func (i *Infra) ClaimSandbox(ctx context.Context, opts infra.ClaimSandboxOptions
}, func() error {
metrics.Retries++
log.Info("try to claim sandbox", "retries", metrics.Retries)
claimed, tryMetrics, claimErr := TryClaimSandbox(claimCtx, opts, &i.pickCache, i.Cache, i.Client, i.claimLockChannel, i.createLimiter)
claimed, tryMetrics, claimErr := TryClaimSandbox(claimCtx, opts, &i.pickCache, i.Cache, i.Client)
metrics.Total += tryMetrics.Total
metrics.Wait += tryMetrics.Wait
metrics.PickAndLock += tryMetrics.PickAndLock
Expand Down
7 changes: 5 additions & 2 deletions pkg/sandbox-manager/infra/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

"github.com/openkruise/agents/pkg/sandbox-manager/config"
"github.com/openkruise/agents/pkg/utils/limiter"
"golang.org/x/time/rate"
)

type ClaimSandboxOptions struct {
Expand All @@ -17,8 +19,6 @@ type ClaimSandboxOptions struct {
CandidateCounts int `json:"candidateCounts"`
// Lock string used in optimistic lock
LockString string `json:"lockString"`
// PreCheck checks the sandbox before modifying it
PreCheck func(sandbox Sandbox) error `json:"-"`
// Set Modifier to modify the Sandbox before it is updated
Modifier func(sandbox Sandbox) `json:"-"`
// Set ReserveFailedSandbox to true to reserve failed sandboxes
Expand All @@ -38,6 +38,9 @@ type ClaimSandboxOptions struct {
// A creating sandbox lasts for SpeculateCreatingDuration may be picked as a candidate when no available ones in SandboxSets.
// Set to 0 to disable speculation feature
SpeculateCreatingDuration time.Duration `json:"speculateCreatingDuration"`
// Limiters
CreateRateLimiter *rate.Limiter `json:"-"`
ConcurrencyLimiter *limiter.ConcurrencyLimiter `json:"-"`
}

type ClaimMetrics struct {
Expand Down
4 changes: 3 additions & 1 deletion pkg/servers/e2b/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
type Controller struct {
port int
maxTimeout int
basicQps int

// manager params
systemNamespace string // the namespace where the sandbox manager is running
Expand All @@ -49,9 +50,10 @@ type Controller struct {
}

// NewController creates a new E2B Controller
func NewController(domain, adminKey string, sysNs string, maxTimeout, maxClaimWorkers, maxCreateQPS int, extProcMaxConcurrency uint32,
func NewController(domain, adminKey string, sysNs string, basicQps, maxTimeout, maxClaimWorkers, maxCreateQPS int, extProcMaxConcurrency uint32,
port int, enableAuth bool, clientSet *clients.ClientSet) *Controller {
sc := &Controller{
basicQps: basicQps,
mux: http.NewServeMux(),
client: clientSet,
domain: domain,
Expand Down
2 changes: 1 addition & 1 deletion pkg/servers/e2b/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func Setup(t *testing.T) (*Controller, *clients.ClientSet, func()) {
_, err := clientSet.CoreV1().Secrets(namespace).Create(t.Context(), secret, metav1.CreateOptions{})
assert.NoError(t, err)

controller := NewController("example.com", InitKey, namespace, models.DefaultMaxTimeout, 10,
controller := NewController("example.com", InitKey, namespace, models.DefaultBasicQPS, models.DefaultMaxTimeout, 10,
0, 0, TestServerPort, true, clientSet)
assert.NoError(t, controller.Init())
_, err = controller.Run(namespace, "component=sandbox-manager")
Expand Down
29 changes: 29 additions & 0 deletions pkg/servers/e2b/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package e2b

import (
"context"
"net/http"
"time"

"github.com/openkruise/agents/pkg/sandbox-manager/consts"
"github.com/openkruise/agents/pkg/servers/web"
"golang.org/x/time/rate"
"k8s.io/klog/v2"
)

func (sc *Controller) SimpleRateLimiter(qps int) web.MiddleWare {
limiter := rate.NewLimiter(rate.Limit(qps), qps*2)
return func(ctx context.Context, r *http.Request) (context.Context, *web.ApiError) {
log := klog.FromContext(ctx).WithValues("middleware", "RateLimiter")
start := time.Now()
if err := limiter.Wait(r.Context()); err != nil {
log.Error(err, "request blocked by rate limiter")
return ctx, &web.ApiError{
Code: http.StatusTooManyRequests,
Message: "request blocked by server throttle",
}
}
log.V(consts.DebugLogLevel).Info("request allowed by rate limiter", "latency", time.Since(start))
return ctx, nil
}
}
Loading
Loading