@@ -7,13 +7,19 @@ import (
7
7
"context"
8
8
"encoding/json"
9
9
"fmt"
10
+ "io"
10
11
"net"
11
12
"net/http"
13
+ "strings"
14
+ "sync"
15
+ "time"
12
16
17
+ "github.com/Code-Hex/go-infinity-channel"
13
18
"github.com/Code-Hex/vz/v3"
14
19
"github.com/crc-org/vfkit/pkg/config"
15
20
"github.com/oomol-lab/ovm/pkg/cli"
16
21
"github.com/oomol-lab/ovm/pkg/logger"
22
+ "golang.org/x/crypto/ssh"
17
23
"golang.org/x/sync/errgroup"
18
24
)
19
25
@@ -56,6 +62,10 @@ type powerSaveModeBody struct {
56
62
Enable bool `json:"enable"`
57
63
}
58
64
65
+ type execBody struct {
66
+ Command string `json:"command"`
67
+ }
68
+
59
69
func (s * Restful ) mux () * http.ServeMux {
60
70
mux := http .NewServeMux ()
61
71
mux .HandleFunc ("/info" , func (w http.ResponseWriter , r * http.Request ) {
@@ -129,6 +139,71 @@ func (s *Restful) mux() *http.ServeMux {
129
139
130
140
s .powerSaveMode (body .Enable )
131
141
})
142
+ mux .HandleFunc ("/exec" , func (w http.ResponseWriter , r * http.Request ) {
143
+ if r .Method != http .MethodPost {
144
+ http .Error (w , "post only" , http .StatusBadRequest )
145
+ return
146
+ }
147
+
148
+ var body execBody
149
+ if err := json .NewDecoder (r .Body ).Decode (& body ); err != nil {
150
+ s .log .Warnf ("Failed to decode request body: %v" , err )
151
+ http .Error (w , "failed to decode request body" , http .StatusBadRequest )
152
+ return
153
+ }
154
+
155
+ w .Header ().Set ("Content-Type" , "text/event-stream" )
156
+ w .Header ().Set ("Cache-Control" , "no-cache" )
157
+ w .Header ().Set ("Connection" , "keep-alive" )
158
+
159
+ if _ , ok := w .(http.Flusher ); ! ok {
160
+ s .log .Warnf ("Bowser does not support server-sent events" )
161
+ return
162
+ }
163
+
164
+ outCh := infinity .NewChannel [string ]()
165
+ errCh := make (chan string )
166
+ doneCh := make (chan struct {})
167
+
168
+ go func () {
169
+ if err := s .exec (body .Command , outCh , errCh ); err != nil {
170
+ s .log .Warnf ("Failed to execute command: %v" , err )
171
+ }
172
+
173
+ _ , _ = fmt .Fprintf (w , "event: done\n " )
174
+ _ , _ = fmt .Fprintf (w , "data: done\n \n " )
175
+ w .(http.Flusher ).Flush ()
176
+
177
+ doneCh <- struct {}{}
178
+ outCh .Close ()
179
+ close (errCh )
180
+ }()
181
+
182
+ for {
183
+ select {
184
+ case <- doneCh :
185
+ s .log .Warnf ("Command execution finished" )
186
+ return
187
+ case err := <- errCh :
188
+ _ , _ = fmt .Fprintf (w , "event: error\n " )
189
+ _ , _ = fmt .Fprintf (w , "data: %s\n \n " , encodeSSE (err ))
190
+ w .(http.Flusher ).Flush ()
191
+ continue
192
+ case out := <- outCh .Out ():
193
+ _ , _ = fmt .Fprintf (w , "event: out\n " )
194
+ _ , _ = fmt .Fprintf (w , "data: %s\n \n " , encodeSSE (out ))
195
+ w .(http.Flusher ).Flush ()
196
+ continue
197
+ case <- r .Context ().Done ():
198
+ s .log .Warnf ("Client closed connection" )
199
+ return
200
+ case <- time .After (3 * time .Second ):
201
+ _ , _ = fmt .Fprintf (w , ": ping\n \n " )
202
+ w .(http.Flusher ).Flush ()
203
+ continue
204
+ }
205
+ }
206
+ })
132
207
133
208
return mux
134
209
}
@@ -219,3 +294,84 @@ func (s *Restful) powerSaveMode(enable bool) {
219
294
s .log .Info ("request /powerSaveMode" )
220
295
s .opt .PowerSaveMode = enable
221
296
}
297
+
298
+ func (s * Restful ) exec (command string , outCh * infinity.Channel [string ], errCh chan string ) error {
299
+ s .log .Info ("request /exec" )
300
+
301
+ conf := & ssh.ClientConfig {
302
+ User : "root" ,
303
+ HostKeyCallback : ssh .InsecureIgnoreHostKey (),
304
+ Auth : []ssh.AuthMethod {
305
+ ssh .PublicKeys (s .opt .SSHSigner ),
306
+ },
307
+ }
308
+
309
+ conn , err := ssh .Dial ("tcp" , fmt .Sprintf ("127.0.0.1:%d" , s .opt .SSHPort ), conf )
310
+ if err != nil {
311
+ errCh <- fmt .Sprintf ("dial ssh error: %v" , err )
312
+ return fmt .Errorf ("dial ssh error: %w" , err )
313
+ }
314
+ defer conn .Close ()
315
+
316
+ session , err := conn .NewSession ()
317
+ if err != nil {
318
+ errCh <- fmt .Sprintf ("new ssh session error: %v" , err )
319
+ return fmt .Errorf ("new ssh session error: %w" , err )
320
+ }
321
+ defer session .Close ()
322
+
323
+ w := ch2Writer (outCh )
324
+ session .Stdout = w
325
+ stderr := recordWriter (w )
326
+ session .Stderr = stderr
327
+
328
+ if err := session .Run (command ); err != nil {
329
+ newErr := fmt .Errorf ("%s\n %s" , stderr .LastRecord (), err )
330
+ errCh <- fmt .Sprintf (newErr .Error ())
331
+ return fmt .Errorf ("run ssh command error: %w" , newErr )
332
+ }
333
+
334
+ return nil
335
+ }
336
+
337
+ type chWriter struct {
338
+ ch * infinity.Channel [string ]
339
+ mu sync.Mutex
340
+ }
341
+
342
+ func (w * chWriter ) Write (p []byte ) (n int , err error ) {
343
+ w .mu .Lock ()
344
+ defer w .mu .Unlock ()
345
+ w .ch .In () <- string (p )
346
+ return len (p ), nil
347
+ }
348
+
349
+ func ch2Writer (ch * infinity.Channel [string ]) io.Writer {
350
+ return & chWriter {
351
+ ch : ch ,
352
+ }
353
+ }
354
+
355
+ type writer struct {
356
+ w io.Writer
357
+ last []byte
358
+ }
359
+
360
+ func (w * writer ) Write (p []byte ) (n int , err error ) {
361
+ w .last = p
362
+ return w .w .Write (p )
363
+ }
364
+
365
+ func (w * writer ) LastRecord () string {
366
+ return string (w .last )
367
+ }
368
+
369
+ func recordWriter (w io.Writer ) * writer {
370
+ return & writer {
371
+ w : w ,
372
+ }
373
+ }
374
+
375
+ func encodeSSE (str string ) string {
376
+ return strings .ReplaceAll (strings .TrimSpace (str ), "\n " , "\n data: " )
377
+ }
0 commit comments