Skip to content

Commit 719bce9

Browse files
iignatevichiberdinsky-skilld
authored andcommitted
add terminate functionality
1 parent 45f13aa commit 719bce9

File tree

7 files changed

+208
-26
lines changed

7 files changed

+208
-26
lines changed

action.terminate.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
runtime: plugin
2+
action:
3+
title: "Test: Terminate"
4+
description: >-
5+
Removes key-value pair from keyring

plugin.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"os"
99
"path/filepath"
10+
"time"
1011

1112
"github.com/launchrctl/launchr"
1213
"github.com/launchrctl/launchr/pkg/action"
@@ -26,6 +27,9 @@ const (
2627
//go:embed action.yaml
2728
var actionYaml []byte
2829

30+
//go:embed action.terminate.yaml
31+
var actionTerminateYaml []byte
32+
2933
func init() {
3034
launchr.RegisterPlugin(&Plugin{})
3135
}
@@ -105,5 +109,31 @@ func (p *Plugin) DiscoverActions(_ context.Context) ([]*action.Action, error) {
105109

106110
return p.runBackgroundWeb(ctx, webRunFlags, webPidFile)
107111
}))
108-
return []*action.Action{a}, nil
112+
113+
terminateA := action.NewFromYAML("test:terminate", actionTerminateYaml)
114+
terminateA.SetRuntime(action.NewFnRuntime(func(ctx context.Context, a *action.Action) error {
115+
ticker := time.NewTicker(10 * time.Second)
116+
defer ticker.Stop()
117+
118+
for i := 1; i <= 6; i++ {
119+
select {
120+
case <-ctx.Done():
121+
fmt.Println("someFunction: Context cancelled, stopping early")
122+
str := []byte("someFunction: Context cancelled, stopping early")
123+
_, _ = a.Input().Streams().Out().Write(str)
124+
return ctx.Err()
125+
case <-ticker.C:
126+
fmt.Printf("someFunction: Running... %d0 seconds\n", i)
127+
str := []byte(fmt.Sprintf("someFunction: Running... %d0 seconds\n", i))
128+
_, err := a.Input().Streams().Out().Write(str)
129+
if err != nil {
130+
return err
131+
}
132+
}
133+
}
134+
135+
return nil
136+
}))
137+
138+
return []*action.Action{a, terminateA}, nil
109139
}

server/api.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
type launchrServer struct {
2525
actionMngr action.Manager
26+
stateMngr *StateManager
2627
cfg launchr.Config
2728
ctx context.Context
2829
baseURL string
@@ -96,6 +97,31 @@ func (l *launchrServer) GetOneRunningActionByID(w http.ResponseWriter, _ *http.R
9697
})
9798
}
9899

100+
func (l *launchrServer) CancelRunningAction(w http.ResponseWriter, _ *http.Request, id ActionId, runID ActionRunInfoId) {
101+
ri, ok := l.actionMngr.RunInfoByID(runID)
102+
if !ok {
103+
sendError(w, http.StatusNotFound, fmt.Sprintf("action run info with id %q is not found", id))
104+
return
105+
}
106+
107+
if ri.Status != "running" {
108+
sendError(w, http.StatusNotFound, fmt.Sprintf("action %q is not running", id))
109+
return
110+
}
111+
112+
as, ok := l.stateMngr.actionStateByID(runID)
113+
if !ok {
114+
sendError(w, http.StatusNotFound, fmt.Sprintf("action state info with id %q is not found", id))
115+
return
116+
}
117+
118+
// Cancel context
119+
as.cancelSwitch()
120+
121+
w.WriteHeader(http.StatusOK)
122+
_ = json.NewEncoder(w).Encode(struct{}{})
123+
}
124+
99125
func (l *launchrServer) GetRunningActionStreams(w http.ResponseWriter, _ *http.Request, id ActionId, runID ActionRunInfoId, params GetRunningActionStreamsParams) {
100126
ri, ok := l.actionMngr.RunInfoByID(runID)
101127
if !ok {
@@ -377,7 +403,8 @@ func (l *launchrServer) RunAction(w http.ResponseWriter, r *http.Request, id str
377403
return
378404
}
379405

380-
ri, chErr := l.actionMngr.RunBackground(l.ctx, a, runID)
406+
state := l.stateMngr.registerState(runID)
407+
ri, chErr := l.actionMngr.RunBackground(state.context, a, runID)
381408

382409
go func() {
383410
err := <-chErr
@@ -387,6 +414,8 @@ func (l *launchrServer) RunAction(w http.ResponseWriter, r *http.Request, id str
387414
if _, writeErr := streams.Err().Write([]byte(err.Error())); writeErr != nil {
388415
launchr.Log().Error("Failed to write error to stream", "error", writeErr)
389416
}
417+
418+
l.stateMngr.removeActionState(runID)
390419
}
391420
}()
392421

server/openapi.gen.go

Lines changed: 71 additions & 24 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/openapi.yaml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,20 @@ paths:
149149
$ref: '#/components/schemas/ActionRunStreamData'
150150
default:
151151
$ref: '#/components/responses/DefaultError'
152+
/actions/{id}/running/{runId}/cancel:
153+
post:
154+
summary: Cancels running action
155+
description: Cancels running action
156+
operationId: cancelRunningAction
157+
parameters:
158+
- $ref: '#/components/parameters/ActionId'
159+
- $ref: '#/components/parameters/ActionRunInfoId'
160+
responses:
161+
'200':
162+
description: Successful operation
163+
content: {}
164+
default:
165+
$ref: '#/components/responses/DefaultError'
152166
/wizard:
153167
get:
154168
summary: Lists all wizards

server/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ func Run(ctx context.Context, app launchr.App, opts *RunOptions) error {
9595
customize: opts.FrontendCustomize,
9696
logsDirPath: opts.LogsDirPath,
9797
app: app,
98+
stateMngr: NewStateManager(),
9899
}
99100
app.GetService(&store.actionMngr)
100101
app.GetService(&store.cfg)

server/state.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package server
2+
3+
import (
4+
"context"
5+
"sync"
6+
)
7+
8+
// StateManager is a definition of manager for actions states
9+
type StateManager struct {
10+
// @todo merge state into action manager
11+
actionState map[string]*ActionState
12+
mx sync.Mutex
13+
}
14+
15+
// NewStateManager constructs a new state manager.
16+
func NewStateManager() *StateManager {
17+
return &StateManager{
18+
actionState: make(map[string]*ActionState),
19+
}
20+
}
21+
22+
func (m *StateManager) registerState(id string) *ActionState {
23+
m.mx.Lock()
24+
defer m.mx.Unlock()
25+
26+
ctx, cancel := context.WithCancel(context.Background())
27+
28+
as := &ActionState{
29+
id: id,
30+
context: ctx,
31+
cancelSwitch: cancel,
32+
}
33+
m.actionState[id] = as
34+
35+
return as
36+
}
37+
38+
func (m *StateManager) removeActionState(id string) {
39+
m.mx.Lock()
40+
defer m.mx.Unlock()
41+
delete(m.actionState, id)
42+
}
43+
44+
func (m *StateManager) actionStateByID(id string) (*ActionState, bool) {
45+
m.mx.Lock()
46+
defer m.mx.Unlock()
47+
ri, ok := m.actionState[id]
48+
return ri, ok
49+
}
50+
51+
// ActionState defines running action state
52+
type ActionState struct {
53+
id string
54+
context context.Context
55+
cancelSwitch context.CancelFunc
56+
}

0 commit comments

Comments
 (0)