Skip to content

Commit 87ea81c

Browse files
Pior Bastidapior
Pior Bastida
authored andcommitted
WIP
1 parent 419d9e4 commit 87ea81c

File tree

5 files changed

+152
-58
lines changed

5 files changed

+152
-58
lines changed

component.go

+13-10
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ type component struct {
1515
services *group
1616
}
1717

18+
// Component manages two groups of Runnable, processes and services.
19+
// Services are stopped after processes.
20+
// The intended purposes is to manage the simplest form of runnable dependencies.
1821
func Component(processes ...Runnable) *component {
1922
return &component{
2023
processes: &group{timeout: 30 * time.Second, runnables: processes},
@@ -31,31 +34,31 @@ func (c *component) WithShutdownTimeout(process, service time.Duration) *compone
3134

3235
// Add registers runnables as process. Processes will be shutdown before services.
3336
func (c *component) Add(runner ...Runnable) {
34-
c.processes.runnables = append(c.processes.runnables, runner...)
37+
c.processes.add(runner...)
3538
}
3639

3740
// Add registers runnables as services. Services will be shutdown after processes.
3841
func (c *component) AddService(service ...Runnable) {
39-
c.services.runnables = append(c.services.runnables, service...)
42+
c.services.add(service...)
4043
}
4144

4245
func (c *component) Run(ctx context.Context) error {
43-
ctxValues := ContextValues(ctx)
46+
ctxNoCancel := context.WithoutCancel(ctx)
4447

4548
// Starting
4649

4750
Log(c, "starting services")
48-
c.services.start(ctxValues)
51+
c.services.Start(ctxNoCancel)
4952

5053
Log(c, "starting processes")
51-
c.processes.start(ctxValues)
54+
c.processes.Start(ctxNoCancel)
5255

5356
// Waiting for shutdown
5457

5558
select {
56-
case <-c.processes.errors:
57-
Log(c, "a runner stopped")
58-
case <-c.services.errors:
59+
case <-c.processes.WaitForShutdown():
60+
Log(c, "a process stopped")
61+
case <-c.services.WaitForShutdown():
5962
Log(c, "a service stopped")
6063
case <-ctx.Done():
6164
Log(c, "context cancelled")
@@ -64,10 +67,10 @@ func (c *component) Run(ctx context.Context) error {
6467
// shutdown
6568

6669
Log(c, "shutting down processes")
67-
c.processes.stop()
70+
c.processes.Stop()
6871

6972
Log(c, "shutting down services")
70-
c.services.stop()
73+
c.services.Stop()
7174

7275
Log(c, "shutdown complete")
7376
return ctx.Err()

component_test.go

+29-4
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,48 @@
11
package runnable
22

3-
func ExampleComponent() {
3+
import (
4+
"context"
5+
stdlog "log"
6+
"os"
7+
)
8+
9+
func ExampleComponent_cancelled() {
410
ctx, cancel := initializeForExample()
511
defer cancel()
612

713
c := Component()
814
c.Add(newDummyRunnable())
915

10-
c.Run(ctx)
16+
_ = c.Run(ctx)
1117

1218
// Output:
1319
// component: starting services
1420
// component: starting processes
15-
// group: dummyRunnable started
1621
// dummyRunnable: started
1722
// component: context cancelled
1823
// component: shutting down processes
1924
// dummyRunnable: stopped
20-
// group: dummyRunnable stopped
25+
// component: shutting down services
26+
// component: shutdown complete
27+
}
28+
29+
func ExampleComponent_failing() {
30+
ctx := context.Background()
31+
32+
SetLogger(stdlog.New(os.Stdout, "", 0))
33+
34+
c := Component()
35+
c.Add(newDyingRunnable())
36+
37+
_ = c.Run(ctx)
38+
39+
// Output:
40+
// component: starting services
41+
// component: starting processes
42+
// dyingRunnable: started
43+
// dyingRunnable: stopped with error: dying
44+
// component: a process stopped
45+
// component: shutting down processes
2146
// component: shutting down services
2247
// component: shutdown complete
2348
}

group.go

+49-42
Original file line numberDiff line numberDiff line change
@@ -2,69 +2,76 @@ package runnable
22

33
import (
44
"context"
5-
"errors"
6-
"sync/atomic"
75
"time"
86
)
97

108
type group struct {
119
runnables []Runnable
12-
running uint32
13-
errors chan error
14-
cancel func()
1510
timeout time.Duration
11+
12+
contextCancel func()
13+
started []*StartedRunnable
14+
shutdownCh chan struct{}
15+
stoppedCh chan *StartedRunnable
1616
}
1717

18-
func (g *group) start(ctx context.Context) {
19-
newCtx, cancel := context.WithCancel(ctx)
18+
func (g *group) add(rs ...Runnable) {
19+
g.runnables = append(g.runnables, rs...)
20+
}
2021

21-
g.cancel = cancel
22-
g.errors = make(chan error)
22+
// func (g *group) Run(ctx context.Context) error {
23+
// g.Start(ctx)
2324

24-
for _, r := range g.runnables {
25-
atomic.AddUint32(&g.running, 1)
26-
go g.run(newCtx, r)
27-
}
25+
// select {
26+
// case <-ctx.Done():
27+
// case <-g.shutdownCh:
28+
// }
29+
30+
// g.waitCh <- struct{}{}
31+
32+
// errs := append([]error{ctx.Err()}, g.Stop()...)
33+
// return errors.Join(errs...)
34+
// }
35+
36+
func (g *group) WaitForShutdown() chan struct{} {
37+
return g.shutdownCh
2838
}
2939

30-
func (g *group) stop() {
31-
g.cancel()
40+
func (g *group) Start(ctx context.Context) {
41+
ctx, g.contextCancel = context.WithCancel(ctx)
42+
43+
g.started = make([]*StartedRunnable, 0, len(g.runnables))
44+
g.shutdownCh = make(chan struct{}, len(g.runnables))
45+
g.stoppedCh = make(chan *StartedRunnable, len(g.runnables))
3246

33-
if g.running == 0 {
34-
return
47+
for _, r := range g.runnables {
48+
g.started = append(g.started, StartRunnable(ctx, r, g.shutdownCh, g.stoppedCh))
3549
}
3650

37-
ticker := time.NewTicker(5 * time.Second)
38-
defer ticker.Stop()
51+
}
52+
53+
func (g *group) Stop() (groupErrors []error) {
54+
g.contextCancel() // stop all other runnings
3955

40-
after := time.After(g.timeout)
56+
stopTimeout := time.After(g.timeout)
57+
logTicker := time.NewTicker(3 * time.Second)
58+
defer logTicker.Stop()
4159

42-
for {
60+
running := len(g.started)
61+
for running > 0 {
4362
select {
44-
case <-ticker.C:
45-
Log(g, "%d still running", g.running)
46-
case <-g.errors:
47-
if g.running == 0 {
48-
return
63+
case stopped := <-g.stoppedCh:
64+
running--
65+
if stopped.err != nil {
66+
groupErrors = append(groupErrors, stopped.err)
4967
}
50-
case <-after:
51-
Log(g, "waited %s, %d still running", g.timeout, g.running)
68+
case <-logTicker.C:
69+
Log(g, "%d still running", running)
70+
case <-stopTimeout:
71+
Log(g, "waited %s, %d still running", g.timeout, running)
5272
return
5373
}
5474
}
55-
}
56-
57-
func (g *group) run(ctx context.Context, runnable Runnable) {
58-
name := findName(runnable)
59-
60-
log.Printf("group: %s started", name)
61-
err := Recover(runnable).Run(ctx)
62-
if err == nil || errors.Is(err, context.Canceled) {
63-
log.Printf("group: %s stopped", name)
64-
} else {
65-
log.Printf("group: %s stopped with error: %+v", name, err)
66-
}
6775

68-
atomic.AddUint32(&g.running, ^uint32(0))
69-
g.errors <- err
76+
return
7077
}

start.go

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package runnable
2+
3+
import (
4+
"context"
5+
"errors"
6+
"sync"
7+
)
8+
9+
type StartedRunnable struct {
10+
baseWrapper
11+
12+
runnable Runnable
13+
shutdownCh chan struct{}
14+
stoppedCh chan *StartedRunnable
15+
16+
mu sync.Mutex
17+
running bool
18+
err error
19+
}
20+
21+
func StartRunnable(ctx context.Context, r Runnable, shutdownCh chan struct{}, stoppedCh chan *StartedRunnable) *StartedRunnable {
22+
s := &StartedRunnable{
23+
baseWrapper: baseWrapper{"", r},
24+
runnable: r,
25+
shutdownCh: shutdownCh,
26+
stoppedCh: stoppedCh,
27+
}
28+
29+
go func() {
30+
Log(r, "started")
31+
err := Recover(r).Run(ctx)
32+
if err == nil || errors.Is(err, context.Canceled) {
33+
Log(r, "stopped")
34+
} else {
35+
Log(r, "stopped with error: %+v", err)
36+
}
37+
38+
s.mu.Lock()
39+
defer s.mu.Unlock()
40+
41+
s.running = false
42+
s.err = err
43+
44+
if s.shutdownCh != nil {
45+
s.shutdownCh <- struct{}{}
46+
}
47+
if s.stoppedCh != nil {
48+
s.stoppedCh <- s
49+
}
50+
}()
51+
52+
return s
53+
}
54+
55+
func (s *StartedRunnable) State() (running bool, err error) {
56+
s.mu.Lock()
57+
defer s.mu.Unlock()
58+
return s.running, s.err
59+
}

testing_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ func newDummyRunnable() *dummyRunnable {
2020
type dummyRunnable struct{}
2121

2222
func (r *dummyRunnable) Run(ctx context.Context) error {
23-
Log(r, "started")
23+
// Log(r, "DEBUG:started")
2424
<-ctx.Done()
25-
Log(r, "stopped")
25+
// Log(r, "DEBUG:stopped")
2626
return ctx.Err()
2727
}
2828

0 commit comments

Comments
 (0)