Skip to content

Commit 4d5825c

Browse files
lxs137furykerry
authored andcommitted
feat(sandbox-manager): introduce scoped sandbox-related custom resource informer
Signed-off-by: menya <lxs137@hotmail.com>
1 parent 95ba21c commit 4d5825c

File tree

6 files changed

+228
-7
lines changed

6 files changed

+228
-7
lines changed

cmd/sandbox-manager/main.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ func main() {
3434
var e2bMaxTimeout int
3535
var sysNs string
3636
var peerSelector string
37+
var sandboxNamespace string
38+
var sandboxLabelSelector string
3739
var maxClaimWorkers int
3840
var maxCreateQPS int
3941
var extProcMaxConcurrency int
@@ -54,6 +56,8 @@ func main() {
5456
pflag.IntVar(&e2bMaxTimeout, "e2b-max-timeout", models.DefaultMaxTimeout, "E2B maximum timeout in seconds")
5557
pflag.StringVar(&sysNs, "system-namespace", utils.DefaultSandboxDeployNamespace, "The namespace where the sandbox manager is running (required)")
5658
pflag.StringVar(&peerSelector, "peer-selector", "", "Peer selector for sandbox manager (required)")
59+
pflag.StringVar(&sandboxNamespace, "sandbox-namespace", "", "Namespace to filter sandbox-related custom resources (Sandbox, SandboxSet, Checkpoint, SandboxTemplate). Defaults to all.")
60+
pflag.StringVar(&sandboxLabelSelector, "sandbox-label-selector", "", "Label selector to filter sandbox-related custom resources (Sandbox, SandboxSet, Checkpoint, SandboxTemplate). Defaults to all.")
5761
pflag.IntVar(&maxClaimWorkers, "max-claim-workers", consts.DefaultClaimWorkers, "Maximum number of claim workers (0 uses default)")
5862
pflag.IntVar(&maxCreateQPS, "max-create-qps", consts.DefaultCreateQPS, "Maximum QPS for sandbox creation (0 uses default)")
5963
pflag.IntVar(&extProcMaxConcurrency, "ext-proc-max-concurrency", consts.DefaultExtProcConcurrency, "Maximum concurrency for external processor (0 uses default)")
@@ -129,7 +133,7 @@ func main() {
129133
klog.Fatalf("Failed to initialize Kubernetes client: %v", err)
130134
}
131135

132-
sandboxController := e2b.NewController(domain, e2bAdminKey, sysNs, e2bMaxTimeout, maxClaimWorkers, maxCreateQPS, uint32(extProcMaxConcurrency),
136+
sandboxController := e2b.NewController(domain, e2bAdminKey, sysNs, sandboxNamespace, sandboxLabelSelector, e2bMaxTimeout, maxClaimWorkers, maxCreateQPS, uint32(extProcMaxConcurrency),
133137
port, e2bEnableAuth, clientSet)
134138
if err := sandboxController.Init(); err != nil {
135139
klog.Fatalf("Failed to initialize sandbox controller: %v", err)

pkg/sandbox-manager/config/manager.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77

88
type SandboxManagerOptions struct {
99
SystemNamespace string
10+
SandboxNamespace string
11+
SandboxLabelSelector string
1012
MaxClaimWorkers int
1113
MaxCreateQPS int
1214
ExtProcMaxConcurrency uint32

pkg/sandbox-manager/infra/sandboxcr/cache.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"golang.org/x/sync/singleflight"
1111
corev1 "k8s.io/api/core/v1"
12+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1213
k8sinformers "k8s.io/client-go/informers"
1314
"k8s.io/client-go/tools/cache"
1415
"k8s.io/klog/v2"
@@ -45,7 +46,16 @@ type Cache struct {
4546

4647
func NewCache(client *clients.ClientSet, opts config.SandboxManagerOptions) (*Cache, error) {
4748
// Create informer factory for custom Sandbox resources
48-
informerFactory := informers.NewSharedInformerFactory(client.SandboxClient, time.Minute*10)
49+
informerOptions := []informers.SharedInformerOption{}
50+
if opts.SandboxNamespace != "" {
51+
informerOptions = append(informerOptions, informers.WithNamespace(opts.SandboxNamespace))
52+
}
53+
if opts.SandboxLabelSelector != "" {
54+
informerOptions = append(informerOptions, informers.WithTweakListOptions(func(lo *metav1.ListOptions) {
55+
lo.LabelSelector = opts.SandboxLabelSelector
56+
}))
57+
}
58+
informerFactory := informers.NewSharedInformerFactoryWithOptions(client.SandboxClient, time.Minute*10, informerOptions...)
4959
sandboxInformer := informerFactory.Api().V1alpha1().Sandboxes().Informer()
5060
sandboxSetInformer := informerFactory.Api().V1alpha1().SandboxSets().Informer()
5161
checkpointInformer := informerFactory.Api().V1alpha1().Checkpoints().Informer()
@@ -87,11 +97,15 @@ func NewCache(client *clients.ClientSet, opts config.SandboxManagerOptions) (*Ca
8797
}
8898

8999
func NewTestCache(t *testing.T) (*Cache, *clients.ClientSet, error) {
90-
t.Helper()
91-
clientSet := clients.NewFakeClientSet(t)
92-
c, err := NewCache(clientSet, config.SandboxManagerOptions{
100+
return NewTestCacheWithOptions(t, config.SandboxManagerOptions{
93101
SystemNamespace: utils.DefaultSandboxDeployNamespace,
94102
})
103+
}
104+
105+
func NewTestCacheWithOptions(t *testing.T, opts config.SandboxManagerOptions) (*Cache, *clients.ClientSet, error) {
106+
t.Helper()
107+
clientSet := clients.NewFakeClientSet(t)
108+
c, err := NewCache(clientSet, opts)
95109
if err != nil {
96110
return nil, nil, err
97111
}

pkg/sandbox-manager/infra/sandboxcr/cache_test.go

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import (
66
"time"
77

88
agentsv1alpha1 "github.com/openkruise/agents/api/v1alpha1"
9+
sandboxfake "github.com/openkruise/agents/client/clientset/versioned/fake"
910
"github.com/openkruise/agents/pkg/sandbox-manager/clients"
11+
"github.com/openkruise/agents/pkg/sandbox-manager/config"
1012
constantUtils "github.com/openkruise/agents/pkg/utils"
1113
sandboxManagerUtils "github.com/openkruise/agents/pkg/utils/sandbox-manager"
1214
utils "github.com/openkruise/agents/pkg/utils/sandbox-manager"
@@ -15,6 +17,9 @@ import (
1517
corev1 "k8s.io/api/core/v1"
1618
"k8s.io/apimachinery/pkg/api/resource"
1719
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20+
"k8s.io/apimachinery/pkg/labels"
21+
"k8s.io/apimachinery/pkg/runtime"
22+
k8stesting "k8s.io/client-go/testing"
1823
)
1924

2025
func TestCache_WaitForSandboxSatisfied(t *testing.T) {
@@ -439,3 +444,193 @@ func TestCache_GetSecret_FromSync(t *testing.T) {
439444
assert.Equal(t, testSecret.Data, result.Data)
440445
assert.Equal(t, testSecret.Type, result.Type)
441446
}
447+
448+
func TestCache_InformerWithFilter_GetSandboxSet(t *testing.T) {
449+
sandboxManagerUtils.InitLogOutput()
450+
451+
tests := []struct {
452+
name string
453+
opts config.SandboxManagerOptions
454+
seedSandboxSets []*agentsv1alpha1.SandboxSet
455+
wantCachedNames []string // expected SandboxSet names visible in cache (namespace/name)
456+
queryName string // template name to query via GetSandboxSet
457+
wantGetErr bool // whether GetSandboxSet should return an error
458+
wantGetName string // expected name of the returned SandboxSet (if no error)
459+
wantGetNamespace string // expected namespace of the returned SandboxSet (if no error)
460+
}{
461+
{
462+
name: "no filter - all SandboxSets visible",
463+
opts: config.SandboxManagerOptions{},
464+
seedSandboxSets: []*agentsv1alpha1.SandboxSet{
465+
{ObjectMeta: metav1.ObjectMeta{Name: "tmpl-1", Namespace: "team-a"}},
466+
{ObjectMeta: metav1.ObjectMeta{Name: "tmpl-2", Namespace: "team-a"}},
467+
{ObjectMeta: metav1.ObjectMeta{Name: "tmpl-3", Namespace: "team-b"}},
468+
},
469+
wantCachedNames: []string{"team-a/tmpl-1", "team-a/tmpl-2", "team-b/tmpl-3"},
470+
queryName: "tmpl-3",
471+
wantGetErr: false,
472+
wantGetName: "tmpl-3",
473+
wantGetNamespace: "team-b",
474+
},
475+
{
476+
name: "namespace filter with multiple SandboxSets in same namespace",
477+
opts: config.SandboxManagerOptions{
478+
SandboxNamespace: "team-a",
479+
},
480+
seedSandboxSets: []*agentsv1alpha1.SandboxSet{
481+
{ObjectMeta: metav1.ObjectMeta{Name: "tmpl-1", Namespace: "team-a"}},
482+
{ObjectMeta: metav1.ObjectMeta{Name: "tmpl-2", Namespace: "team-a"}},
483+
{ObjectMeta: metav1.ObjectMeta{Name: "tmpl-3", Namespace: "team-b"}},
484+
},
485+
wantCachedNames: []string{"team-a/tmpl-1", "team-a/tmpl-2"},
486+
queryName: "tmpl-2",
487+
wantGetErr: false,
488+
wantGetName: "tmpl-2",
489+
wantGetNamespace: "team-a",
490+
},
491+
{
492+
name: "label selector filter with multiple SandboxSets",
493+
opts: config.SandboxManagerOptions{
494+
SandboxLabelSelector: "env=prod",
495+
},
496+
seedSandboxSets: []*agentsv1alpha1.SandboxSet{
497+
{ObjectMeta: metav1.ObjectMeta{Name: "tmpl-match-selector", Namespace: "team-a", Labels: map[string]string{"env": "prod"}}},
498+
{ObjectMeta: metav1.ObjectMeta{Name: "tmpl-not-match-selector", Namespace: "team-a", Labels: map[string]string{"env": "staging"}}},
499+
},
500+
wantCachedNames: []string{"team-a/tmpl-match-selector"},
501+
queryName: "tmpl-match-selector",
502+
wantGetErr: false,
503+
wantGetName: "tmpl-match-selector",
504+
wantGetNamespace: "team-a",
505+
},
506+
{
507+
name: "namespace and label selector filter with multiple SandboxSets",
508+
opts: config.SandboxManagerOptions{
509+
SandboxNamespace: "team-a",
510+
SandboxLabelSelector: "env=prod",
511+
},
512+
seedSandboxSets: []*agentsv1alpha1.SandboxSet{
513+
{ObjectMeta: metav1.ObjectMeta{Name: "tmpl-match-selector", Namespace: "team-a", Labels: map[string]string{"env": "prod"}}},
514+
{ObjectMeta: metav1.ObjectMeta{Name: "tmpl-not-match-selector", Namespace: "team-a", Labels: map[string]string{"env": "staging"}}},
515+
{ObjectMeta: metav1.ObjectMeta{Name: "tmpl-match-selector", Namespace: "team-b", Labels: map[string]string{"env": "prod"}}},
516+
},
517+
wantCachedNames: []string{"team-a/tmpl-match-selector"},
518+
queryName: "tmpl-match-selector",
519+
wantGetErr: false,
520+
wantGetName: "tmpl-match-selector",
521+
wantGetNamespace: "team-a",
522+
},
523+
}
524+
525+
for _, tt := range tests {
526+
t.Run(tt.name, func(t *testing.T) {
527+
var (
528+
c *Cache
529+
clientSet *clients.ClientSet
530+
err error
531+
)
532+
533+
if tt.opts.SandboxLabelSelector != "" {
534+
// When a label selector is configured, the fake client does not enforce it
535+
// natively. We work around this by:
536+
// 1. Creating the fake clientSet manually so we can access the underlying
537+
// *sandboxfake.Clientset and inject a PrependReactor.
538+
// 2. Writing seed data into the Tracker BEFORE starting the Cache so that
539+
// the Informer's initial List call goes through our reactor and returns
540+
// only the label-matching objects.
541+
// 3. The reactor parses the LabelSelector from ListOptions and filters the
542+
// objects returned by the Tracker, giving the Informer an accurate view.
543+
clientSet = clients.NewFakeClientSet(t)
544+
fakeClient, ok := clientSet.SandboxClient.(*sandboxfake.Clientset)
545+
require.True(t, ok, "SandboxClient must be *sandboxfake.Clientset")
546+
547+
// Write seed data into the Tracker before the Cache starts.
548+
for _, sbs := range tt.seedSandboxSets {
549+
require.NoError(t,
550+
fakeClient.Tracker().Add(sbs),
551+
"failed to add SandboxSet %s/%s to tracker", sbs.Namespace, sbs.Name,
552+
)
553+
}
554+
555+
// Inject a PrependReactor that enforces label selector filtering on List.
556+
fakeClient.PrependReactor("list", "sandboxsets", func(action k8stesting.Action) (bool, runtime.Object, error) {
557+
listAction, ok := action.(k8stesting.ListAction)
558+
if !ok {
559+
return false, nil, nil
560+
}
561+
// Retrieve all objects from the Tracker for this resource/namespace.
562+
objs, err := fakeClient.Tracker().List(
563+
agentsv1alpha1.SchemeGroupVersion.WithResource("sandboxsets"),
564+
agentsv1alpha1.SchemeGroupVersion.WithKind("SandboxSet"),
565+
listAction.GetNamespace(),
566+
)
567+
if err != nil {
568+
return true, nil, err
569+
}
570+
rawList, ok := objs.(*agentsv1alpha1.SandboxSetList)
571+
if !ok {
572+
return false, nil, nil
573+
}
574+
575+
// Apply label selector filtering when one is present.
576+
restriction := listAction.GetListRestrictions()
577+
selector := restriction.Labels
578+
if selector == nil || selector.Empty() {
579+
return true, rawList, nil
580+
}
581+
filtered := make([]agentsv1alpha1.SandboxSet, 0, len(rawList.Items))
582+
for _, item := range rawList.Items {
583+
if selector.Matches(labels.Set(item.Labels)) {
584+
filtered = append(filtered, item)
585+
}
586+
}
587+
rawList.Items = filtered
588+
return true, rawList, nil
589+
})
590+
591+
// Build and start the Cache with the pre-seeded, reactor-equipped clientSet.
592+
c, err = NewCache(clientSet, tt.opts)
593+
require.NoError(t, err)
594+
require.NoError(t, c.Run(t.Context()))
595+
} else {
596+
c, clientSet, err = NewTestCacheWithOptions(t, tt.opts)
597+
require.NoError(t, err)
598+
599+
// Seed SandboxSets after the Cache is running (standard path).
600+
for _, sbs := range tt.seedSandboxSets {
601+
_, err := clientSet.SandboxClient.ApiV1alpha1().SandboxSets(sbs.Namespace).Create(
602+
t.Context(), sbs, metav1.CreateOptions{},
603+
)
604+
require.NoError(t, err, "failed to create SandboxSet %s/%s", sbs.Namespace, sbs.Name)
605+
}
606+
}
607+
defer c.Stop()
608+
609+
// Wait for informer sync
610+
time.Sleep(300 * time.Millisecond)
611+
612+
// Inspect all SandboxSets in cache
613+
allItems := c.sandboxSetInformer.GetStore().List()
614+
cachedKeys := make([]string, 0, len(allItems))
615+
for _, item := range allItems {
616+
sbs, ok := item.(*agentsv1alpha1.SandboxSet)
617+
require.True(t, ok, "unexpected type in sandboxSetInformer store: %T", item)
618+
cachedKeys = append(cachedKeys, sbs.Namespace+"/"+sbs.Name)
619+
}
620+
assert.ElementsMatch(t, tt.wantCachedNames, cachedKeys,
621+
"cached SandboxSet keys mismatch")
622+
623+
// Query via GetSandboxSet
624+
got, err := c.GetSandboxSet(tt.queryName)
625+
if tt.wantGetErr {
626+
require.Error(t, err)
627+
assert.Nil(t, got)
628+
} else {
629+
require.NoError(t, err)
630+
require.NotNil(t, got)
631+
assert.Equal(t, tt.wantGetName, got.Name)
632+
assert.Equal(t, tt.wantGetNamespace, got.Namespace)
633+
}
634+
})
635+
}
636+
}

pkg/servers/e2b/core.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ type Controller struct {
3434
maxClaimWorkers int
3535
maxCreateQPS int
3636
extProcMaxConcurrency uint32
37+
sandboxLabelSelector string
38+
sandboxNamespace string
3739

3840
// fields
3941
mux *http.ServeMux
@@ -49,7 +51,7 @@ type Controller struct {
4951
}
5052

5153
// NewController creates a new E2B Controller
52-
func NewController(domain, adminKey string, sysNs string, maxTimeout, maxClaimWorkers, maxCreateQPS int, extProcMaxConcurrency uint32,
54+
func NewController(domain, adminKey string, sysNs, sandboxNamespace, sandboxLabelSelector string, maxTimeout, maxClaimWorkers, maxCreateQPS int, extProcMaxConcurrency uint32,
5355
port int, enableAuth bool, clientSet *clients.ClientSet) *Controller {
5456
sc := &Controller{
5557
mux: http.NewServeMux(),
@@ -59,6 +61,8 @@ func NewController(domain, adminKey string, sysNs string, maxTimeout, maxClaimWo
5961
port: port,
6062
maxTimeout: maxTimeout,
6163
systemNamespace: sysNs, // the namespace where the sandbox manager is running
64+
sandboxNamespace: sandboxNamespace,
65+
sandboxLabelSelector: sandboxLabelSelector,
6266
maxClaimWorkers: maxClaimWorkers,
6367
maxCreateQPS: maxCreateQPS,
6468
extProcMaxConcurrency: extProcMaxConcurrency,
@@ -88,6 +92,8 @@ func (sc *Controller) Init() error {
8892
adapter := adapters.DefaultAdapterFactory(sc.port)
8993
sandboxManager, err := sandbox_manager.NewSandboxManager(sc.client, adapter, config.SandboxManagerOptions{
9094
SystemNamespace: sc.systemNamespace,
95+
SandboxNamespace: sc.sandboxNamespace,
96+
SandboxLabelSelector: sc.sandboxLabelSelector,
9197
MaxClaimWorkers: sc.maxClaimWorkers,
9298
ExtProcMaxConcurrency: sc.extProcMaxConcurrency,
9399
MaxCreateQPS: sc.maxCreateQPS,

pkg/servers/e2b/core_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func Setup(t *testing.T) (*Controller, *clients.ClientSet, func()) {
8585
_, err := clientSet.CoreV1().Secrets(namespace).Create(t.Context(), secret, metav1.CreateOptions{})
8686
assert.NoError(t, err)
8787

88-
controller := NewController("example.com", InitKey, namespace, models.DefaultMaxTimeout, 10,
88+
controller := NewController("example.com", InitKey, namespace, "", "", models.DefaultMaxTimeout, 10,
8989
0, 0, TestServerPort, true, clientSet)
9090
assert.NoError(t, controller.Init())
9191
_, err = controller.Run(namespace, "component=sandbox-manager")

0 commit comments

Comments
 (0)