-
Notifications
You must be signed in to change notification settings - Fork 142
Expand file tree
/
Copy pathpool.go
More file actions
177 lines (154 loc) · 5.23 KB
/
pool.go
File metadata and controls
177 lines (154 loc) · 5.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package scheduler
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/beam-cloud/beta9/pkg/network"
"github.com/beam-cloud/beta9/pkg/repository"
"github.com/beam-cloud/beta9/pkg/types"
"github.com/google/uuid"
"k8s.io/apimachinery/pkg/api/resource"
)
const (
Beta9WorkerLabelKey string = "run.beam.cloud/role"
Beta9WorkerLabelValue string = "worker"
Beta9WorkerJobPrefix string = "worker"
Beta9MachineLabelIDKey string = "run.beam.cloud/machine-id"
Beta9WorkerLabelIDKey string = "run.beam.cloud/worker-id"
Beta9WorkerLabelPoolNameKey string = "run.beam.cloud/worker-pool-name"
PrometheusPortKey string = "prometheus.io/port"
PrometheusScrapeKey string = "prometheus.io/scrape"
tmpVolumeName string = "beta9-tmp"
logVolumeName string = "beta9-logs"
imagesVolumeName string = "beta9-images"
storageVolumeName string = "beta9-storage"
checkpointVolumeName string = "beta9-checkpoints"
devicePluginVolumeName string = "kubelet-device-plugins"
defaultDevicePluginPath string = "/var/lib/kubelet/device-plugins"
defaultContainerName string = "worker"
defaultWorkerEntrypoint string = "/usr/local/bin/worker"
defaultWorkerLogPath string = "/var/log/worker"
defaultImagesPath string = "/images"
defaultCheckpointPath string = "/checkpoints"
defaultStoragePath string = "/storage"
defaultSharedMemoryPct float32 = 0.5
poolMonitoringInterval = 1 * time.Second
poolHealthCheckInterval = 10 * time.Second
)
type WorkerPoolController interface {
AddWorker(cpu int64, memory int64, gpuCount uint32) (*types.Worker, error)
AddWorkerToMachine(cpu int64, memory int64, gpuType string, gpuCount uint32, machineId string) (*types.Worker, error)
Name() string
FreeCapacity() (*WorkerPoolCapacity, error)
Context() context.Context
IsPreemptable() bool
State() (*types.WorkerPoolState, error)
RequiresPoolSelector() bool
Mode() types.PoolMode
ContainerRuntime() string
}
type WorkerPoolConfig struct {
DefaultWorkerCpuRequest int64
DefaultWorkerMemoryRequest int64
}
type WorkerPoolCapacity struct {
FreeCpu int64
FreeMemory int64
FreeGpu uint
PendingCpu int64
PendingMemory int64
PendingGpu uint
}
type WorkerPoolControllerOptions struct {
Name string
Context context.Context
Config types.AppConfig
BackendRepo repository.BackendRepository
WorkerRepo repository.WorkerRepository
WorkerPoolRepo repository.WorkerPoolRepository
ContainerRepo repository.ContainerRepository
ProviderName *types.MachineProvider
ProviderRepo repository.ProviderRepository
EventRepo repository.EventRepository
Tailscale *network.Tailscale
}
func GenerateWorkerId() string {
return uuid.New().String()[:8]
}
func MonitorPoolSize(wpc WorkerPoolController,
workerPoolConfig *types.WorkerPoolConfig,
workerRepo repository.WorkerRepository,
workerPoolRepo repository.WorkerPoolRepository,
providerRepo repository.ProviderRepository) error {
poolSizer, err := NewWorkerPoolSizer(wpc, workerPoolConfig, workerRepo, workerPoolRepo, providerRepo)
if err != nil {
return err
}
go poolSizer.Start()
return nil
}
func MonitorPoolHealth(opts PoolHealthMonitorOptions) error {
poolHealthMonitor := NewPoolHealthMonitor(PoolHealthMonitorOptions{
Controller: opts.Controller,
WorkerPoolConfig: opts.WorkerPoolConfig,
WorkerConfig: opts.WorkerConfig,
WorkerRepo: opts.WorkerRepo,
ProviderRepo: opts.ProviderRepo,
WorkerPoolRepo: opts.WorkerPoolRepo,
ContainerRepo: opts.ContainerRepo,
EventRepo: opts.EventRepo,
})
go poolHealthMonitor.Start()
return nil
}
func freePoolCapacity(workerRepo repository.WorkerRepository, wpc WorkerPoolController) (*WorkerPoolCapacity, error) {
workers, err := workerRepo.GetAllWorkersInPool(wpc.Name())
if err != nil {
return nil, err
}
capacity := &WorkerPoolCapacity{
FreeCpu: 0,
FreeMemory: 0,
FreeGpu: 0,
}
for _, worker := range workers {
switch worker.Status {
case types.WorkerStatusDisabled:
continue
case types.WorkerStatusPending:
capacity.PendingCpu += worker.FreeCpu
capacity.PendingMemory += worker.FreeMemory
if worker.Gpu != "" && worker.FreeCpu > 0 && worker.FreeMemory > 0 {
capacity.PendingGpu += uint(worker.TotalGpuCount)
}
default:
capacity.FreeCpu += worker.FreeCpu
capacity.FreeMemory += worker.FreeMemory
if worker.Gpu != "" && (worker.FreeCpu > 0 && worker.FreeMemory > 0) {
capacity.FreeGpu += uint(worker.FreeGpuCount)
}
}
}
return capacity, nil
}
func calculateMemoryQuantity(percentStr string, memoryTotal int64) resource.Quantity {
percent, err := parseMemoryPercentage(percentStr)
if err != nil {
percent = defaultSharedMemoryPct
}
return resource.MustParse(fmt.Sprintf("%dMi", int64(float32(memoryTotal)*percent)))
}
func parseMemoryPercentage(percentStr string) (float32, error) {
ps := strings.TrimSuffix(percentStr, "%")
percent, err := strconv.ParseFloat(ps, 32)
if err != nil {
return 0, err
}
if percent <= 0 {
return 0, errors.New("percent must be greater than 0")
}
return float32(percent) / 100, nil
}