Skip to content

Commit c073acf

Browse files
author
Pior Bastida
committed
WIP
1 parent 25e50ad commit c073acf

File tree

5 files changed

+148
-57
lines changed

5 files changed

+148
-57
lines changed

component.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ func (c *component) WithShutdownTimeout(process, service time.Duration) *compone
3131

3232
// Add registers runnables as process. Processes will be shutdown before services.
3333
func (c *component) Add(runner ...Runnable) {
34-
c.processes.runnables = append(c.processes.runnables, runner...)
34+
c.processes.add(runner...)
3535
}
3636

3737
// Add registers runnables as services. Services will be shutdown after processes.
3838
func (c *component) AddService(service ...Runnable) {
39-
c.services.runnables = append(c.services.runnables, service...)
39+
c.services.add(service...)
4040
}
4141

4242
func (c *component) Run(ctx context.Context) error {
@@ -45,17 +45,17 @@ func (c *component) Run(ctx context.Context) error {
4545
// Starting
4646

4747
Log(c, "starting services")
48-
c.services.start(ctxValues)
48+
c.services.Start(ctxValues)
4949

5050
Log(c, "starting processes")
51-
c.processes.start(ctxValues)
51+
c.processes.Start(ctxValues)
5252

5353
// Waiting for shutdown
5454

5555
select {
56-
case <-c.processes.errors:
57-
Log(c, "a runner stopped")
58-
case <-c.services.errors:
56+
case <-c.processes.WaitForShutdown():
57+
Log(c, "a process stopped")
58+
case <-c.services.WaitForShutdown():
5959
Log(c, "a service stopped")
6060
case <-ctx.Done():
6161
Log(c, "context cancelled")
@@ -64,10 +64,10 @@ func (c *component) Run(ctx context.Context) error {
6464
// shutdown
6565

6666
Log(c, "shutting down processes")
67-
c.processes.stop()
67+
c.processes.Stop()
6868

6969
Log(c, "shutting down services")
70-
c.services.stop()
70+
c.services.Stop()
7171

7272
Log(c, "shutdown complete")
7373
return ctx.Err()

component_test.go

+29-4
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,48 @@
11
package runnable
22

3-
func ExampleComponent() {
3+
import (
4+
"context"
5+
stdlog "log"
6+
"os"
7+
)
8+
9+
func ExampleComponent_cancelled() {
410
ctx, cancel := initializeForExample()
511
defer cancel()
612

713
c := Component()
814
c.Add(newDummyRunnable())
915

10-
c.Run(ctx)
16+
_ = c.Run(ctx)
1117

1218
// Output:
1319
// component: starting services
1420
// component: starting processes
15-
// group: dummyRunnable started
1621
// dummyRunnable: started
1722
// component: context cancelled
1823
// component: shutting down processes
1924
// dummyRunnable: stopped
20-
// group: dummyRunnable stopped
25+
// component: shutting down services
26+
// component: shutdown complete
27+
}
28+
29+
func ExampleComponent_failing() {
30+
ctx := context.Background()
31+
32+
SetLogger(stdlog.New(os.Stdout, "", 0))
33+
34+
c := Component()
35+
c.Add(newDyingRunnable())
36+
37+
_ = c.Run(ctx)
38+
39+
// Output:
40+
// component: starting services
41+
// component: starting processes
42+
// dyingRunnable: started
43+
// dyingRunnable: stopped with error: dying
44+
// component: a process stopped
45+
// component: shutting down processes
2146
// component: shutting down services
2247
// component: shutdown complete
2348
}

group.go

+49-42
Original file line numberDiff line numberDiff line change
@@ -2,69 +2,76 @@ package runnable
22

33
import (
44
"context"
5-
"errors"
6-
"sync/atomic"
75
"time"
86
)
97

108
type group struct {
119
runnables []Runnable
12-
running uint32
13-
errors chan error
14-
cancel func()
1510
timeout time.Duration
11+
12+
contextCancel func()
13+
started []*StartedRunnable
14+
shutdownCh chan struct{}
15+
stoppedCh chan *StartedRunnable
1616
}
1717

18-
func (g *group) start(ctx context.Context) {
19-
newCtx, cancel := context.WithCancel(ctx)
18+
func (g *group) add(rs ...Runnable) {
19+
g.runnables = append(g.runnables, rs...)
20+
}
2021

21-
g.cancel = cancel
22-
g.errors = make(chan error)
22+
// func (g *group) Run(ctx context.Context) error {
23+
// g.Start(ctx)
2324

24-
for _, r := range g.runnables {
25-
atomic.AddUint32(&g.running, 1)
26-
go g.run(newCtx, r)
27-
}
25+
// select {
26+
// case <-ctx.Done():
27+
// case <-g.shutdownCh:
28+
// }
29+
30+
// g.waitCh <- struct{}{}
31+
32+
// errs := append([]error{ctx.Err()}, g.Stop()...)
33+
// return errors.Join(errs...)
34+
// }
35+
36+
func (g *group) WaitForShutdown() chan struct{} {
37+
return g.shutdownCh
2838
}
2939

30-
func (g *group) stop() {
31-
g.cancel()
40+
func (g *group) Start(ctx context.Context) {
41+
ctx, g.contextCancel = context.WithCancel(ctx)
42+
43+
g.started = make([]*StartedRunnable, 0, len(g.runnables))
44+
g.shutdownCh = make(chan struct{}, len(g.runnables))
45+
g.stoppedCh = make(chan *StartedRunnable, len(g.runnables))
3246

33-
if g.running == 0 {
34-
return
47+
for _, r := range g.runnables {
48+
g.started = append(g.started, StartRunnable(ctx, r, g.shutdownCh, g.stoppedCh))
3549
}
3650

37-
ticker := time.NewTicker(5 * time.Second)
38-
defer ticker.Stop()
51+
}
52+
53+
func (g *group) Stop() (groupErrors []error) {
54+
g.contextCancel() // stop all other runnings
3955

40-
after := time.After(g.timeout)
56+
stopTimeout := time.After(g.timeout)
57+
logTicker := time.NewTicker(3 * time.Second)
58+
defer logTicker.Stop()
4159

42-
for {
60+
running := len(g.started)
61+
for running > 0 {
4362
select {
44-
case <-ticker.C:
45-
Log(g, "%d still running", g.running)
46-
case <-g.errors:
47-
if g.running == 0 {
48-
return
63+
case stopped := <-g.stoppedCh:
64+
running--
65+
if stopped.err != nil {
66+
groupErrors = append(groupErrors, stopped.err)
4967
}
50-
case <-after:
51-
Log(g, "waited %s, %d still running", g.timeout, g.running)
68+
case <-logTicker.C:
69+
Log(g, "%d still running", running)
70+
case <-stopTimeout:
71+
Log(g, "waited %s, %d still running", g.timeout, running)
5272
return
5373
}
5474
}
55-
}
56-
57-
func (g *group) run(ctx context.Context, runnable Runnable) {
58-
name := findName(runnable)
59-
60-
log.Printf("group: %s started", name)
61-
err := Recover(runnable).Run(ctx)
62-
if err == nil || errors.Is(err, context.Canceled) {
63-
log.Printf("group: %s stopped", name)
64-
} else {
65-
log.Printf("group: %s stopped with error: %+v", name, err)
66-
}
6775

68-
atomic.AddUint32(&g.running, ^uint32(0))
69-
g.errors <- err
76+
return
7077
}

start.go

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package runnable
2+
3+
import (
4+
"context"
5+
"errors"
6+
"sync"
7+
)
8+
9+
type StartedRunnable struct {
10+
baseWrapper
11+
12+
runnable Runnable
13+
shutdownCh chan struct{}
14+
stoppedCh chan *StartedRunnable
15+
16+
mu sync.Mutex
17+
running bool
18+
err error
19+
}
20+
21+
func StartRunnable(ctx context.Context, r Runnable, shutdownCh chan struct{}, stoppedCh chan *StartedRunnable) *StartedRunnable {
22+
s := &StartedRunnable{
23+
baseWrapper: baseWrapper{"", r},
24+
runnable: r,
25+
shutdownCh: shutdownCh,
26+
stoppedCh: stoppedCh,
27+
}
28+
29+
go func() {
30+
Log(r, "started")
31+
err := Recover(r).Run(ctx)
32+
if err == nil || errors.Is(err, context.Canceled) {
33+
Log(r, "stopped")
34+
} else {
35+
Log(r, "stopped with error: %+v", err)
36+
}
37+
38+
s.mu.Lock()
39+
defer s.mu.Unlock()
40+
41+
s.running = false
42+
s.err = err
43+
44+
if s.shutdownCh != nil {
45+
s.shutdownCh <- struct{}{}
46+
}
47+
if s.stoppedCh != nil {
48+
s.stoppedCh <- s
49+
}
50+
}()
51+
52+
return s
53+
}
54+
55+
func (s *StartedRunnable) State() (running bool, err error) {
56+
s.mu.Lock()
57+
defer s.mu.Unlock()
58+
return s.running, s.err
59+
}

testing_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ func newDummyRunnable() *dummyRunnable {
2020
type dummyRunnable struct{}
2121

2222
func (r *dummyRunnable) Run(ctx context.Context) error {
23-
Log(r, "started")
23+
// Log(r, "DEBUG:started")
2424
<-ctx.Done()
25-
Log(r, "stopped")
25+
// Log(r, "DEBUG:stopped")
2626
return ctx.Err()
2727
}
2828

0 commit comments

Comments
 (0)