-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathcore.go
More file actions
159 lines (143 loc) · 4.63 KB
/
core.go
File metadata and controls
159 lines (143 loc) · 4.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package e2b
import (
"context"
"errors"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/openkruise/agents/pkg/sandbox-manager/config"
"github.com/openkruise/agents/pkg/sandbox-manager/consts"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"github.com/openkruise/agents/pkg/agent-runtime/storages"
sandbox_manager "github.com/openkruise/agents/pkg/sandbox-manager"
"github.com/openkruise/agents/pkg/sandbox-manager/clients"
"github.com/openkruise/agents/pkg/sandbox-manager/infra"
"github.com/openkruise/agents/pkg/sandbox-manager/logs"
"github.com/openkruise/agents/pkg/servers/e2b/adapters"
"github.com/openkruise/agents/pkg/servers/e2b/keys"
)
// Controller handles sandbox-related operations
type Controller struct {
port int
maxTimeout int
// manager params
systemNamespace string // the namespace where the sandbox manager is running
maxClaimWorkers int
maxCreateQPS int
extProcMaxConcurrency uint32
sandboxLabelSelector string
sandboxNamespace string
// fields
mux *http.ServeMux
server *http.Server
stop chan os.Signal
client *clients.ClientSet
cache infra.CacheProvider
storageRegistry storages.VolumeMountProviderRegistry
clientConfig *rest.Config
domain string
manager *sandbox_manager.SandboxManager
keys *keys.SecretKeyStorage
}
// NewController creates a new E2B Controller
func NewController(domain, adminKey string, sysNs, sandboxNamespace, sandboxLabelSelector string, maxTimeout, maxClaimWorkers, maxCreateQPS int, extProcMaxConcurrency uint32,
port int, enableAuth bool, clientSet *clients.ClientSet) *Controller {
sc := &Controller{
mux: http.NewServeMux(),
client: clientSet,
domain: domain,
clientConfig: clientSet.Config,
port: port,
maxTimeout: maxTimeout,
systemNamespace: sysNs, // the namespace where the sandbox manager is running
sandboxNamespace: sandboxNamespace,
sandboxLabelSelector: sandboxLabelSelector,
maxClaimWorkers: maxClaimWorkers,
maxCreateQPS: maxCreateQPS,
extProcMaxConcurrency: extProcMaxConcurrency,
}
sc.server = &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: sc.mux,
ReadHeaderTimeout: 5 * time.Second,
}
if enableAuth {
sc.keys = &keys.SecretKeyStorage{
Namespace: sysNs,
AdminKey: adminKey,
Client: clientSet.K8sClient,
Stop: make(chan struct{}),
}
}
return sc
}
func (sc *Controller) Init() error {
ctx := logs.NewContext()
log := klog.FromContext(ctx)
log.Info("init controller")
adapter := adapters.DefaultAdapterFactory(sc.port)
sandboxManager, err := sandbox_manager.NewSandboxManager(sc.client, adapter, config.SandboxManagerOptions{
SystemNamespace: sc.systemNamespace,
SandboxNamespace: sc.sandboxNamespace,
SandboxLabelSelector: sc.sandboxLabelSelector,
MaxClaimWorkers: sc.maxClaimWorkers,
ExtProcMaxConcurrency: sc.extProcMaxConcurrency,
MaxCreateQPS: sc.maxCreateQPS,
})
if err != nil {
return err
}
infraWithCache := sandboxManager.GetInfra()
if infraWithCache != nil {
sc.cache = infraWithCache.GetCache()
}
sc.manager = sandboxManager
sc.storageRegistry = storages.NewStorageProvider()
sc.registerRoutes()
if sc.keys == nil {
return nil
}
return sc.keys.Init(ctx)
}
func (sc *Controller) Run(sysNs, peerSelector string) (context.Context, error) {
if sc.stop != nil {
return nil, errors.New("controller already started")
}
ctx, cancel := context.WithCancel(logs.NewContext())
// Channel to listen for interrupt signal
sc.stop = make(chan os.Signal, 1)
signal.Notify(sc.stop, syscall.SIGINT, syscall.SIGTERM)
if err := sc.manager.Run(ctx, sysNs, peerSelector); err != nil {
klog.Fatalf("Sandbox manager failed to start: %v", err)
}
// Run HTTP server in a goroutine
go func() {
klog.InfoS("Starting Server", "address", sc.server.Addr)
if err := sc.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
klog.Fatalf("HTTP server failed to start: %v", err)
}
}()
// stopper
go func() {
<-sc.stop
// Shutdown server gracefully
klog.InfoS("Shutting down server...")
defer cancel()
sc.manager.Stop()
// Shutdown HTTP server with timeout
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), consts.ShutdownTimeout)
defer shutdownCancel()
if err := sc.server.Shutdown(shutdownCtx); err != nil {
klog.ErrorS(err, "HTTP server forced to shutdown")
}
klog.InfoS("Server exited")
}()
if sc.keys != nil {
sc.keys.Run()
}
return ctx, nil
}