-
Notifications
You must be signed in to change notification settings - Fork 860
Expand file tree
/
Copy pathserver.go
More file actions
326 lines (275 loc) · 9.03 KB
/
server.go
File metadata and controls
326 lines (275 loc) · 9.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
// SPDX-FileCopyrightText: Copyright The Lima Authors
// SPDX-License-Identifier: Apache-2.0
package server
import (
"bufio"
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"net"
"os"
"os/exec"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"github.com/lima-vm/lima/v2/pkg/driver"
pb "github.com/lima-vm/lima/v2/pkg/driver/external"
"github.com/lima-vm/lima/v2/pkg/driver/external/client"
"github.com/lima-vm/lima/v2/pkg/limatype"
"github.com/lima-vm/lima/v2/pkg/limatype/dirnames"
"github.com/lima-vm/lima/v2/pkg/limatype/filenames"
"github.com/lima-vm/lima/v2/pkg/registry"
)
type DriverServer struct {
pb.UnimplementedDriverServer
driver driver.Driver
logger *logrus.Logger
}
type listenerTracker struct {
net.Listener
connected chan struct{}
once sync.Once
}
func (t *listenerTracker) Accept() (net.Conn, error) {
c, err := t.Listener.Accept()
if err == nil {
t.once.Do(func() { close(t.connected) })
}
return c, err
}
func Serve(ctx context.Context, driver driver.Driver) {
preConfiguredDriverAction := flag.Bool("pre-driver-action", false, "Run pre-driver action before starting the gRPC server")
inspectStatus := flag.Bool("inspect-status", false, "Inspect status of the driver")
flag.Parse() //nolint:revive // Serve is intended to be called from external driver's main()
if *preConfiguredDriverAction {
handlePreConfiguredDriverAction(ctx, driver)
return
}
if *inspectStatus {
handleInspectStatus(driver)
return
}
logger := logrus.New()
logger.SetLevel(logrus.DebugLevel)
driverInfo := driver.Info()
socketPath := filepath.Join(os.TempDir(), fmt.Sprintf("lima-driver-%s-%d.sock", driverInfo.Name, os.Getpid()))
defer func() {
if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) {
logger.Warnf("Failed to remove socket file: %v", err)
}
}()
if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) {
logger.Fatalf("Failed to remove existing socket file: %v", err)
}
var lc net.ListenConfig
listener, err := lc.Listen(ctx, "unix", socketPath)
if err != nil {
logger.Fatalf("Failed to listen on Unix socket: %v", err)
}
defer listener.Close()
tListener := &listenerTracker{
Listener: listener,
connected: make(chan struct{}),
}
output := map[string]string{"socketPath": socketPath}
if err := json.NewEncoder(os.Stdout).Encode(output); err != nil {
logger.Fatalf("Failed to encode socket path as JSON: %v", err)
}
kaProps := keepalive.ServerParameters{
Time: 10 * time.Second,
Timeout: 30 * time.Second,
}
kaPolicy := keepalive.EnforcementPolicy{
MinTime: 10 * time.Second,
PermitWithoutStream: true,
}
server := grpc.NewServer(
grpc.KeepaliveParams(kaProps),
grpc.KeepaliveEnforcementPolicy(kaPolicy),
)
pb.RegisterDriverServer(server, &DriverServer{
driver: driver,
logger: logger,
})
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
shutdownCh := make(chan struct{})
var closeOnce sync.Once
closeShutdown := func() { closeOnce.Do(func() { close(shutdownCh) }) }
go func() {
<-sigs
logger.Info("Received shutdown signal, stopping server...")
closeShutdown()
}()
go func() {
timer := time.NewTimer(60 * time.Second)
defer timer.Stop()
select {
case <-tListener.connected:
logger.Debug("Client connected; disabling 60s startup shutdown")
return
case <-timer.C:
logger.Info("No client connected within 60 seconds, shutting down server...")
closeShutdown()
case <-shutdownCh:
return
}
}()
go func() {
logger.Infof("Starting external driver server for %s", driverInfo.Name)
logger.Infof("Server starting on Unix socket: %s", socketPath)
if err := server.Serve(tListener); err != nil {
if errors.Is(err, grpc.ErrServerStopped) {
logger.Errorf("Server stopped: %v", err)
} else {
logger.Errorf("Failed to serve: %v", err)
}
}
}()
<-shutdownCh
server.GracefulStop()
}
func handleInspectStatus(driver driver.Driver) {
decoder := json.NewDecoder(os.Stdin)
encoder := json.NewEncoder(os.Stdout)
var payload []byte
if err := decoder.Decode(&payload); err != nil {
fmt.Fprintf(os.Stderr, "Failed to decode instance payload from stdin: %v", err)
}
var inst limatype.Instance
if err := inst.UnmarshalJSON(payload); err != nil {
fmt.Fprintf(os.Stderr, "Failed to unmarshal instance: %v", err)
}
status := driver.InspectStatus(context.Background(), &inst)
inst.Status = status
resp, err := inst.MarshalJSON()
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to marshal instance response: %v", err)
}
if err := encoder.Encode(resp); err != nil {
fmt.Fprintf(os.Stderr, "Failed to encode instance response: %v", err)
}
}
func handlePreConfiguredDriverAction(ctx context.Context, driver driver.Driver) {
decoder := json.NewDecoder(os.Stdin)
encoder := json.NewEncoder(os.Stdout)
var payload limatype.PreConfiguredDriverPayload
if err := decoder.Decode(&payload); err != nil {
fmt.Fprintf(os.Stderr, "Failed to decode pre-configured driver payload from stdin: %v", err)
}
config := &payload.Config
if err := driver.FillConfig(ctx, config, payload.FilePath); err != nil {
fmt.Fprintf(os.Stderr, "Failed to fill config: %v", err)
}
if err := encoder.Encode(*config); err != nil {
fmt.Fprintf(os.Stderr, "Error encoding response: %v", err)
}
}
// Start begins the driver startup process. It sends an initial response to unblock
// the client and then streams subsequent errors(if any), as the driver initializes.
// A final success message is streamed upon successful completion.
func Start(extDriver *registry.ExternalDriver, instName string) error {
extDriver.Logger.Debugf("Starting external driver at %s", extDriver.Path)
if instName == "" {
return errors.New("instance name cannot be empty")
}
extDriver.InstanceName = instName
ctx, cancel := context.WithCancel(context.Background())
cmd := exec.CommandContext(ctx, extDriver.Path)
stdout, err := cmd.StdoutPipe()
if err != nil {
cancel()
return fmt.Errorf("failed to create stdout pipe for external driver: %w", err)
}
instanceDir, err := dirnames.InstanceDir(extDriver.InstanceName)
if err != nil {
cancel()
return fmt.Errorf("failed to determine instance directory: %w", err)
}
logPath := filepath.Join(instanceDir, filenames.ExternalDriverStderrLog)
logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
if err != nil {
cancel()
return fmt.Errorf("failed to open external driver log file: %w", err)
}
cmd.Stderr = logFile
if err := cmd.Start(); err != nil {
cancel()
return fmt.Errorf("failed to start external driver: %w", err)
}
driverLogger := extDriver.Logger.WithField("driver", extDriver.Name)
scanner := bufio.NewScanner(stdout)
var socketPath string
if scanner.Scan() {
var output map[string]string
if err := json.Unmarshal(scanner.Bytes(), &output); err != nil {
cancel()
if err := cmd.Process.Kill(); err != nil {
driverLogger.Errorf("Failed to kill external driver process: %v", err)
}
return fmt.Errorf("failed to parse socket path JSON: %w", err)
}
socketPath = output["socketPath"]
} else {
cancel()
if err := cmd.Process.Kill(); err != nil {
driverLogger.Errorf("Failed to kill external driver process: %v", err)
}
return errors.New("failed to read socket path from driver")
}
extDriver.SocketPath = socketPath
driverClient, err := client.NewDriverClient(extDriver.SocketPath, extDriver.Logger)
if err != nil {
cancel()
if err := cmd.Process.Kill(); err != nil {
driverLogger.Errorf("Failed to kill external driver process after client creation failure: %v", err)
}
return fmt.Errorf("failed to create driver client: %w", err)
}
extDriver.Command = cmd
extDriver.Client = driverClient
extDriver.Ctx = ctx
extDriver.CancelFunc = cancel
driverLogger.Debugf("External driver %s started successfully", extDriver.Name)
return nil
}
func Stop(extDriver *registry.ExternalDriver) error {
if extDriver.Command == nil {
return fmt.Errorf("external driver %s is not running", extDriver.Name)
}
extDriver.Logger.Debugf("Stopping external driver %s", extDriver.Name)
if extDriver.CancelFunc != nil {
extDriver.CancelFunc()
}
if err := extDriver.Command.Process.Signal(syscall.SIGTERM); err != nil {
extDriver.Logger.Errorf("Failed to kill external driver process: %v", err)
}
if err := os.Remove(extDriver.SocketPath); err != nil && !os.IsNotExist(err) {
extDriver.Logger.Warnf("Failed to remove socket file: %v", err)
}
extDriver.Command = nil
extDriver.Client = nil
extDriver.Ctx = nil
extDriver.CancelFunc = nil
extDriver.Logger.Debugf("External driver %s stopped successfully", extDriver.Name)
return nil
}
func StopAllExternalDrivers() {
for name, driver := range registry.ExternalDrivers {
if driver.Command != nil && driver.Command.Process != nil {
if err := Stop(driver); err != nil {
logrus.Errorf("Failed to stop external driver %s: %v", name, err)
} else {
logrus.Debugf("External driver %s stopped successfully", name)
}
}
delete(registry.ExternalDrivers, name)
}
}