Skip to content

Commit 55f229c

Browse files
author
Pior Bastida
committed
Add a Component runner with support for dependencies
1 parent e7eef26 commit 55f229c

File tree

3 files changed

+235
-0
lines changed

3 files changed

+235
-0
lines changed

component.go

+133
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package runnable
2+
3+
import (
4+
"context"
5+
"errors"
6+
"sync/atomic"
7+
"time"
8+
)
9+
10+
type C interface {
11+
Add(runner Runnable)
12+
AddService(service Runnable)
13+
}
14+
15+
type Component struct {
16+
processes *group
17+
services *group
18+
}
19+
20+
func NewComponent(runnables ...Runnable) *Component {
21+
return &Component{
22+
processes: &group{timeout: 30 * time.Second, runnables: runnables},
23+
services: &group{timeout: 30 * time.Second},
24+
}
25+
}
26+
27+
func (c *Component) WithShutdownTimeout(process, service time.Duration) *Component {
28+
c.processes.timeout = process
29+
c.services.timeout = service
30+
return c
31+
}
32+
33+
// Add registers a process.
34+
func (c *Component) Add(runner Runnable) {
35+
c.processes.runnables = append(c.processes.runnables, runner)
36+
}
37+
38+
// AddService registers a service.
39+
func (c *Component) AddService(service Runnable) {
40+
c.services.runnables = append(c.services.runnables, service)
41+
}
42+
43+
func (c *Component) Run(ctx context.Context) error {
44+
ctxValues := ContextValues(ctx)
45+
46+
// Starting
47+
48+
log.Printf("group: starting services")
49+
c.services.start(ctxValues)
50+
51+
log.Printf("group: starting processes")
52+
c.processes.start(ctxValues)
53+
54+
// Waiting for shutdown
55+
56+
select {
57+
case <-c.processes.errors:
58+
log.Printf("group: a runner stopped")
59+
case <-c.services.errors:
60+
log.Printf("group: a service stopped")
61+
case <-ctx.Done():
62+
log.Printf("group: context cancelled")
63+
}
64+
65+
// shutdown
66+
67+
log.Printf("group: shutting down processes")
68+
c.processes.stop()
69+
70+
log.Printf("group: shutting down services")
71+
c.services.stop()
72+
73+
log.Printf("group: shutdown complete")
74+
return ctx.Err()
75+
}
76+
77+
type group struct {
78+
runnables []Runnable
79+
running uint32
80+
errors chan error
81+
cancel func()
82+
timeout time.Duration
83+
}
84+
85+
func (g *group) start(ctx context.Context) {
86+
newCtx, cancel := context.WithCancel(ctx)
87+
88+
g.cancel = cancel
89+
g.errors = make(chan error)
90+
91+
for _, r := range g.runnables {
92+
atomic.AddUint32(&g.running, 1)
93+
go g.run(newCtx, r)
94+
}
95+
}
96+
97+
func (g *group) stop() {
98+
g.cancel()
99+
100+
ticker := time.NewTicker(5 * time.Second)
101+
defer ticker.Stop()
102+
103+
after := time.After(g.timeout)
104+
105+
for {
106+
select {
107+
case <-ticker.C:
108+
log.Printf("group: %d still running", g.running)
109+
case <-g.errors:
110+
if g.running == 0 {
111+
return
112+
}
113+
case <-after:
114+
log.Printf("group: waited %s, %d still running", g.timeout, g.running)
115+
return
116+
}
117+
}
118+
}
119+
120+
func (g *group) run(ctx context.Context, runnable Runnable) {
121+
name := findName(runnable)
122+
123+
log.Printf("group: %s started", name)
124+
err := Recover(runnable).Run(ctx)
125+
if err == nil || errors.Is(err, context.Canceled) {
126+
log.Printf("group: %s stopped", name)
127+
} else {
128+
log.Printf("group: %s stopped with error: %+v", name, err)
129+
}
130+
131+
atomic.AddUint32(&g.running, ^uint32(0))
132+
g.errors <- err
133+
}

examples/component/jobqueue.go

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"sync/atomic"
8+
"time"
9+
)
10+
11+
type StupidJobQueue struct {
12+
queue chan string
13+
executed int64
14+
}
15+
16+
func NewStupidJobQueue() *StupidJobQueue {
17+
return &StupidJobQueue{queue: make(chan string, 100)}
18+
}
19+
20+
func (s *StupidJobQueue) Enqueue(url string) {
21+
s.queue <- url
22+
}
23+
24+
func (s *StupidJobQueue) Waiting() int {
25+
return len(s.queue)
26+
}
27+
28+
func (s *StupidJobQueue) Executed() int64 {
29+
return atomic.LoadInt64(&s.executed)
30+
}
31+
32+
func (s *StupidJobQueue) Run(ctx context.Context) error {
33+
var wg sync.WaitGroup
34+
wg.Add(1)
35+
go func() {
36+
for url := range s.queue {
37+
fmt.Printf("Performing job: %s\n", url)
38+
atomic.AddInt64(&s.executed, 1)
39+
time.Sleep(time.Second)
40+
}
41+
wg.Done()
42+
}()
43+
44+
<-ctx.Done()
45+
fmt.Printf("Draining job queue\n")
46+
close(s.queue)
47+
wg.Wait()
48+
49+
return ctx.Err()
50+
}

examples/component/main.go

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"strconv"
8+
"time"
9+
10+
"github.com/pior/runnable"
11+
)
12+
13+
func monitor(jobs *StupidJobQueue) runnable.RunnableFunc {
14+
return func(ctx context.Context) error {
15+
fmt.Printf("Task executed: %d\t\ttasks waiting: %d\n", jobs.Executed(), jobs.Waiting())
16+
return nil
17+
}
18+
}
19+
20+
func main() {
21+
jobs := NewStupidJobQueue()
22+
23+
// curl http://localhost:8000/?count=3
24+
server := &http.Server{
25+
Addr: "localhost:8000",
26+
Handler: http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
27+
r.ParseForm()
28+
count := r.Form.Get("count")
29+
if count == "" {
30+
count = "1"
31+
}
32+
countInt, _ := strconv.Atoi(count)
33+
34+
for i := 0; i < countInt; i++ {
35+
jobs.Enqueue(r.URL.Path)
36+
fmt.Fprintln(rw, "Job enqueued!")
37+
}
38+
}),
39+
}
40+
serverRunner := runnable.HTTPServer(server)
41+
42+
monitor := runnable.Every(runnable.Func(monitor(jobs)), 2*time.Second)
43+
44+
c := runnable.NewComponent().WithShutdownTimeout(5*time.Second, 3*time.Second)
45+
c.Add(serverRunner)
46+
c.AddService(jobs)
47+
c.Add(monitor)
48+
49+
fmt.Println("Enqueue jobs with: curl http://localhost:8000/?count=3")
50+
51+
runnable.Run(c)
52+
}

0 commit comments

Comments
 (0)