Skip to content

Commit 5a28d18

Browse files
author
Pior Bastida
committed
Add a Component runner with support for dependencies
1 parent a06be4b commit 5a28d18

File tree

4 files changed

+242
-0
lines changed

4 files changed

+242
-0
lines changed

component.go

+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package runnable
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
type C interface {
9+
Add(runner Runnable)
10+
AddService(service Runnable)
11+
}
12+
13+
type component struct {
14+
processes *group
15+
services *group
16+
}
17+
18+
func Component(processes ...Runnable) *component {
19+
return &component{
20+
processes: &group{timeout: 30 * time.Second, runnables: processes},
21+
services: &group{timeout: 30 * time.Second},
22+
}
23+
}
24+
25+
// WithShutdownTimeout changes the default timeout (30s).
26+
func (c *component) WithShutdownTimeout(process, service time.Duration) *component {
27+
c.processes.timeout = process
28+
c.services.timeout = service
29+
return c
30+
}
31+
32+
// Add registers runnables as process. Processes will be shutdown before services.
33+
func (c *component) Add(runner ...Runnable) {
34+
c.processes.runnables = append(c.processes.runnables, runner...)
35+
}
36+
37+
// Add registers runnables as services. Services will be shutdown after processes.
38+
func (c *component) AddService(service ...Runnable) {
39+
c.services.runnables = append(c.services.runnables, service...)
40+
}
41+
42+
func (c *component) Run(ctx context.Context) error {
43+
ctxValues := ContextValues(ctx)
44+
45+
// Starting
46+
47+
log.Printf("group: starting services")
48+
c.services.start(ctxValues)
49+
50+
log.Printf("group: starting processes")
51+
c.processes.start(ctxValues)
52+
53+
// Waiting for shutdown
54+
55+
select {
56+
case <-c.processes.errors:
57+
log.Printf("group: a runner stopped")
58+
case <-c.services.errors:
59+
log.Printf("group: a service stopped")
60+
case <-ctx.Done():
61+
log.Printf("group: context cancelled")
62+
}
63+
64+
// shutdown
65+
66+
log.Printf("group: shutting down processes")
67+
c.processes.stop()
68+
69+
log.Printf("group: shutting down services")
70+
c.services.stop()
71+
72+
log.Printf("group: shutdown complete")
73+
return ctx.Err()
74+
}

examples/component/jobqueue.go

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"sync/atomic"
8+
"time"
9+
)
10+
11+
type StupidJobQueue struct {
12+
queue chan string
13+
executed int64
14+
}
15+
16+
func NewStupidJobQueue() *StupidJobQueue {
17+
return &StupidJobQueue{queue: make(chan string, 100)}
18+
}
19+
20+
func (s *StupidJobQueue) Enqueue(url string) {
21+
s.queue <- url
22+
}
23+
24+
func (s *StupidJobQueue) Waiting() int {
25+
return len(s.queue)
26+
}
27+
28+
func (s *StupidJobQueue) Executed() int64 {
29+
return atomic.LoadInt64(&s.executed)
30+
}
31+
32+
func (s *StupidJobQueue) Run(ctx context.Context) error {
33+
var wg sync.WaitGroup
34+
wg.Add(1)
35+
go func() {
36+
for url := range s.queue {
37+
fmt.Printf("Performing job: %s\n", url)
38+
atomic.AddInt64(&s.executed, 1)
39+
time.Sleep(time.Second)
40+
}
41+
wg.Done()
42+
}()
43+
44+
<-ctx.Done()
45+
fmt.Printf("Draining job queue\n")
46+
close(s.queue)
47+
wg.Wait()
48+
49+
return ctx.Err()
50+
}

examples/component/main.go

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"strconv"
8+
"time"
9+
10+
"github.com/pior/runnable"
11+
)
12+
13+
func monitor(jobs *StupidJobQueue) runnable.RunnableFunc {
14+
return func(ctx context.Context) error {
15+
fmt.Printf("Task executed: %d\t\ttasks waiting: %d\n", jobs.Executed(), jobs.Waiting())
16+
return nil
17+
}
18+
}
19+
20+
func main() {
21+
jobs := NewStupidJobQueue()
22+
23+
// curl http://localhost:8000/?count=3
24+
server := &http.Server{
25+
Addr: "localhost:8000",
26+
Handler: http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
27+
r.ParseForm()
28+
count := r.Form.Get("count")
29+
if count == "" {
30+
count = "1"
31+
}
32+
countInt, _ := strconv.Atoi(count)
33+
34+
for i := 0; i < countInt; i++ {
35+
jobs.Enqueue(r.URL.Path)
36+
fmt.Fprintln(rw, "Job enqueued!")
37+
}
38+
}),
39+
}
40+
serverRunner := runnable.HTTPServer(server)
41+
42+
monitor := runnable.Every(runnable.Func(monitor(jobs)), 2*time.Second)
43+
44+
c := runnable.Component().WithShutdownTimeout(5*time.Second, 3*time.Second)
45+
c.Add(serverRunner)
46+
c.AddService(jobs)
47+
c.Add(monitor)
48+
49+
fmt.Println("Enqueue jobs with: curl http://localhost:8000/?count=3")
50+
51+
runnable.Run(c)
52+
}

group.go

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package runnable
2+
3+
import (
4+
"context"
5+
"errors"
6+
"sync/atomic"
7+
"time"
8+
)
9+
10+
type group struct {
11+
runnables []Runnable
12+
running uint32
13+
errors chan error
14+
cancel func()
15+
timeout time.Duration
16+
}
17+
18+
func (g *group) start(ctx context.Context) {
19+
newCtx, cancel := context.WithCancel(ctx)
20+
21+
g.cancel = cancel
22+
g.errors = make(chan error)
23+
24+
for _, r := range g.runnables {
25+
atomic.AddUint32(&g.running, 1)
26+
go g.run(newCtx, r)
27+
}
28+
}
29+
30+
func (g *group) stop() {
31+
g.cancel()
32+
33+
ticker := time.NewTicker(5 * time.Second)
34+
defer ticker.Stop()
35+
36+
after := time.After(g.timeout)
37+
38+
for {
39+
select {
40+
case <-ticker.C:
41+
log.Printf("group: %d still running", g.running)
42+
case <-g.errors:
43+
if g.running == 0 {
44+
return
45+
}
46+
case <-after:
47+
log.Printf("group: waited %s, %d still running", g.timeout, g.running)
48+
return
49+
}
50+
}
51+
}
52+
53+
func (g *group) run(ctx context.Context, runnable Runnable) {
54+
name := findName(runnable)
55+
56+
log.Printf("group: %s started", name)
57+
err := Recover(runnable).Run(ctx)
58+
if err == nil || errors.Is(err, context.Canceled) {
59+
log.Printf("group: %s stopped", name)
60+
} else {
61+
log.Printf("group: %s stopped with error: %+v", name, err)
62+
}
63+
64+
atomic.AddUint32(&g.running, ^uint32(0))
65+
g.errors <- err
66+
}

0 commit comments

Comments
 (0)