-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathapi.go
More file actions
137 lines (122 loc) · 5.44 KB
/
api.go
File metadata and controls
137 lines (122 loc) · 5.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package sandbox_manager
import (
"context"
"fmt"
"time"
"github.com/openkruise/agents/api/v1alpha1"
"github.com/openkruise/agents/pkg/sandbox-manager/errors"
"github.com/openkruise/agents/pkg/sandbox-manager/infra"
"k8s.io/klog/v2"
)
// ClaimSandbox attempts to lock a Pod and assign it to the current caller
func (m *SandboxManager) ClaimSandbox(ctx context.Context, opts infra.ClaimSandboxOptions) (infra.Sandbox, error) {
log := klog.FromContext(ctx)
if !m.infra.HasTemplate(opts.Template) {
// Requirement: Track failure in API layer
SandboxCreationResponses.WithLabelValues("failure").Inc()
return nil, errors.NewError(errors.ErrorNotFound, fmt.Sprintf("template %s not found", opts.Template))
}
opts.CreateRateLimiter = m.createRateLimiter
opts.ConcurrencyLimiter = m.claimConcurrencyLimiter
sandbox, metrics, err := m.infra.ClaimSandbox(ctx, opts)
if err != nil {
log.Error(err, "failed to claim sandbox", "metrics", metrics.String())
// Requirement: Track failure in API layer
SandboxCreationResponses.WithLabelValues("failure").Inc()
return nil, errors.NewError(errors.ErrorInternal, fmt.Sprintf("failed to claim sandbox: %v", err))
}
// Success: Record metrics
SandboxCreationResponses.WithLabelValues("success").Inc()
// Requirement: Only measure the latency when no error exists
SandboxCreationLatency.Observe(float64(metrics.Total.Milliseconds()))
state, reason := sandbox.GetState()
log.Info("sandbox claimed", "sandbox", klog.KObj(sandbox), "metrics", metrics.String(), "state", state, "reason", reason)
// Sync route without refresh since sandbox was just claimed and state is already up-to-date
if err = m.syncRoute(ctx, sandbox, false); err != nil {
log.Error(err, "failed to sync route with peers after claim")
}
return sandbox, nil
}
// GetClaimedSandbox returns a claimed (running or paused) Pod by its ID
func (m *SandboxManager) GetClaimedSandbox(ctx context.Context, user, sandboxID string) (infra.Sandbox, error) {
log := klog.FromContext(ctx).WithValues("sandboxID", sandboxID)
log.Info("try to get claimed sandbox")
sbx, err := m.infra.GetClaimedSandbox(ctx, sandboxID)
if err != nil {
log.Error(err, "failed to get sandbox from cache")
return nil, errors.NewError(errors.ErrorNotFound, fmt.Sprintf("sandbox %s not found", sandboxID))
}
state, reason := sbx.GetState()
if state == v1alpha1.SandboxStateAvailable || state == v1alpha1.SandboxStateCreating {
// not claimed sandbox should return not found
log.Error(nil, "sandbox is not claimed", "state", state, "reason", reason)
return nil, errors.NewError(errors.ErrorNotFound, fmt.Sprintf("sandbox %s not found", sandboxID))
}
if sbx.GetRoute().Owner != user {
log.Error(nil, "sandbox is not owned by user")
return nil, errors.NewError(errors.ErrorNotAllowed, fmt.Sprintf("sandbox %s is not owned", sandboxID))
}
if state != v1alpha1.SandboxStatePaused && state != v1alpha1.SandboxStateRunning {
log.Error(nil, "sandbox is not healthy", "state", state, "reason", reason)
return nil, errors.NewError(errors.ErrorBadRequest, fmt.Sprintf("sandbox %s is not healthy (state %s, reason %s)", sandboxID, state, reason))
}
return sbx, nil
}
func (m *SandboxManager) ListSandboxes(user string, limit int, filter func(infra.Sandbox) bool) ([]infra.Sandbox, error) {
sandboxes, err := m.infra.SelectSandboxes(user, limit, filter)
if err != nil {
return nil, errors.NewError(errors.ErrorNotFound, fmt.Sprintf("failed to list sandboxes: %v", err))
}
return sandboxes, nil
}
func (m *SandboxManager) GetOwnerOfSandbox(sandboxID string) (string, bool) {
route, ok := m.proxy.LoadRoute(sandboxID)
return route.Owner, ok
}
// syncRoute syncs the sandbox route with peers
// If refresh is true, it will refresh the sandbox state before syncing
// Returns error if route sync fails, but refresh failures are logged and ignored
func (m *SandboxManager) syncRoute(ctx context.Context, sbx infra.Sandbox, refresh bool) error {
log := klog.FromContext(ctx).WithValues("sandbox", klog.KObj(sbx))
// Refresh sandbox to get the latest state if needed
if refresh {
if err := sbx.InplaceRefresh(ctx, false); err != nil {
log.Error(err, "failed to refresh sandbox, route sync may use stale state")
// Continue to sync route even if refresh fails, as the route might still be valid
}
}
start := time.Now()
route := sbx.GetRoute()
m.proxy.SetRoute(ctx, route)
err := m.proxy.SyncRouteWithPeers(route)
if err != nil {
log.Error(err, "failed to sync route with peers")
return err
}
log.Info("route synced with peers", "cost", time.Since(start), "route", route)
return nil
}
// PauseSandbox pauses a sandbox and syncs route with peers
func (m *SandboxManager) PauseSandbox(ctx context.Context, sbx infra.Sandbox, opts infra.PauseOptions) error {
log := klog.FromContext(ctx).WithValues("sandbox", klog.KObj(sbx))
if err := sbx.Pause(ctx, opts); err != nil {
log.Error(err, "failed to pause sandbox")
return err
}
if err := m.syncRoute(ctx, sbx, true); err != nil {
log.Error(err, "failed to sync route with peers after pause")
}
return nil
}
// ResumeSandbox resumes a sandbox and syncs route with peers
func (m *SandboxManager) ResumeSandbox(ctx context.Context, sbx infra.Sandbox) error {
log := klog.FromContext(ctx).WithValues("sandbox", klog.KObj(sbx))
if err := sbx.Resume(ctx); err != nil {
log.Error(err, "failed to resume sandbox")
return err
}
if err := m.syncRoute(ctx, sbx, true); err != nil {
log.Error(err, "failed to sync route with peers after resume")
}
return nil
}