Skip to content

Commit 89dbbf6

Browse files
Pior Bastidapior
Pior Bastida
authored andcommitted
WIP
1 parent 419d9e4 commit 89dbbf6

File tree

8 files changed

+185
-144
lines changed

8 files changed

+185
-144
lines changed

component.go

+34-20
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"time"
66
)
77

8+
var DefaultComponentShutdownTimeout = 30 * time.Second
9+
810
type C interface {
911
Add(runner Runnable)
1012
AddService(service Runnable)
@@ -15,59 +17,71 @@ type component struct {
1517
services *group
1618
}
1719

20+
// Component manages two groups of Runnable, processes and services.
21+
// Services are stopped after processes.
22+
// The intended purposes is to manage the simplest form of runnable dependencies.
1823
func Component(processes ...Runnable) *component {
1924
return &component{
20-
processes: &group{timeout: 30 * time.Second, runnables: processes},
21-
services: &group{timeout: 30 * time.Second},
25+
processes: &group{timeout: DefaultComponentShutdownTimeout, runnables: processes},
26+
services: &group{timeout: DefaultComponentShutdownTimeout},
2227
}
2328
}
2429

25-
// WithShutdownTimeout changes the default timeout (30s).
26-
func (c *component) WithShutdownTimeout(process, service time.Duration) *component {
27-
c.processes.timeout = process
28-
c.services.timeout = service
30+
// WithProcessShutdownTimeout changes the shutdown timeout of the process runnables.
31+
// See also DefaultComponentShutdownTimeout.
32+
func (c *component) WithProcessShutdownTimeout(timeout time.Duration) *component {
33+
c.processes.timeout = timeout
34+
return c
35+
}
36+
37+
// WithServiceShutdownTimeout changes the shutdown timeout of the service runnables.
38+
// See also DefaultComponentShutdownTimeout.
39+
func (c *component) WithServiceShutdownTimeout(timeout time.Duration) *component {
40+
c.services.timeout = timeout
2941
return c
3042
}
3143

32-
// Add registers runnables as process. Processes will be shutdown before services.
33-
func (c *component) Add(runner ...Runnable) {
34-
c.processes.runnables = append(c.processes.runnables, runner...)
44+
// AddProcess registers runnables as process. Processes will be shutdown before services.
45+
func (c *component) AddProcess(runner ...Runnable) *component {
46+
c.processes.Add(runner...)
47+
return c
3548
}
3649

3750
// Add registers runnables as services. Services will be shutdown after processes.
38-
func (c *component) AddService(service ...Runnable) {
39-
c.services.runnables = append(c.services.runnables, service...)
51+
func (c *component) AddService(service ...Runnable) *component {
52+
c.services.Add(service...)
53+
return c
4054
}
4155

4256
func (c *component) Run(ctx context.Context) error {
43-
ctxValues := ContextValues(ctx)
57+
ctxNoCancel := context.WithoutCancel(ctx)
4458

4559
// Starting
4660

4761
Log(c, "starting services")
48-
c.services.start(ctxValues)
62+
c.services.Start(ctxNoCancel)
4963

5064
Log(c, "starting processes")
51-
c.processes.start(ctxValues)
65+
c.processes.Start(ctxNoCancel)
5266

5367
// Waiting for shutdown
5468

5569
select {
56-
case <-c.processes.errors:
57-
Log(c, "a runner stopped")
58-
case <-c.services.errors:
59-
Log(c, "a service stopped")
70+
case r := <-c.processes.StoppedRunnables():
71+
Log(c, "process stopped: %s", findName(r))
72+
case r := <-c.services.StoppedRunnables():
73+
Log(c, "service stopped: %s", findName(r))
6074
case <-ctx.Done():
6175
Log(c, "context cancelled")
6276
}
6377

6478
// shutdown
6579

6680
Log(c, "shutting down processes")
67-
c.processes.stop()
81+
c.processes.Stop()
6882

6983
Log(c, "shutting down services")
70-
c.services.stop()
84+
c.services.Stop()
7185

7286
Log(c, "shutdown complete")
7387
return ctx.Err()

component_test.go

+30-5
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,48 @@
11
package runnable
22

3-
func ExampleComponent() {
3+
import (
4+
"context"
5+
stdlog "log"
6+
"os"
7+
)
8+
9+
func ExampleComponent_cancelled() {
410
ctx, cancel := initializeForExample()
511
defer cancel()
612

713
c := Component()
8-
c.Add(newDummyRunnable())
14+
c.AddProcess(newDummyRunnable())
915

10-
c.Run(ctx)
16+
_ = c.Run(ctx)
1117

1218
// Output:
1319
// component: starting services
1420
// component: starting processes
15-
// group: dummyRunnable started
1621
// dummyRunnable: started
1722
// component: context cancelled
1823
// component: shutting down processes
1924
// dummyRunnable: stopped
20-
// group: dummyRunnable stopped
25+
// component: shutting down services
26+
// component: shutdown complete
27+
}
28+
29+
func ExampleComponent_failing() {
30+
ctx := context.Background()
31+
32+
SetLogger(stdlog.New(os.Stdout, "", 0))
33+
34+
c := Component()
35+
c.AddProcess(newDyingRunnable())
36+
37+
_ = c.Run(ctx)
38+
39+
// Output:
40+
// component: starting services
41+
// component: starting processes
42+
// dyingRunnable: started
43+
// dyingRunnable: stopped with error: dying
44+
// component: a process stopped
45+
// component: shutting down processes
2146
// component: shutting down services
2247
// component: shutdown complete
2348
}

examples/component/jobqueue.go

-50
This file was deleted.

examples/component/main.go

+19-22
Original file line numberDiff line numberDiff line change
@@ -4,47 +4,44 @@ import (
44
"context"
55
"fmt"
66
"net/http"
7-
"strconv"
87
"time"
98

109
"github.com/pior/runnable"
10+
"github.com/pior/runnable/examples/jobqueue"
1111
)
1212

13-
func monitor(jobs *StupidJobQueue) runnable.RunnableFunc {
13+
// Run it with:
14+
// go run ./examples/component/main.go
15+
16+
// Test it with:
17+
// curl "http://localhost:8000/?count=3"
18+
19+
func jobMonitor(jobs *jobqueue.JobQueue) runnable.RunnableFunc {
1420
return func(ctx context.Context) error {
1521
fmt.Printf("Task executed: %d\t\ttasks waiting: %d\n", jobs.Executed(), jobs.Waiting())
1622
return nil
1723
}
1824
}
1925

2026
func main() {
21-
jobs := NewStupidJobQueue()
27+
jobs := jobqueue.New()
2228

23-
// curl http://localhost:8000/?count=3
2429
server := &http.Server{
2530
Addr: "localhost:8000",
2631
Handler: http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
27-
r.ParseForm()
28-
count := r.Form.Get("count")
29-
if count == "" {
30-
count = "1"
31-
}
32-
countInt, _ := strconv.Atoi(count)
33-
34-
for i := 0; i < countInt; i++ {
35-
jobs.Enqueue(r.URL.Path)
36-
fmt.Fprintln(rw, "Job enqueued!")
37-
}
32+
jobs.Enqueue(r.URL.Path)
33+
fmt.Fprintln(rw, "Job enqueued!")
3834
}),
3935
}
40-
serverRunner := runnable.HTTPServer(server)
41-
42-
monitor := runnable.Every(runnable.Func(monitor(jobs)), 2*time.Second)
4336

44-
c := runnable.Component().WithShutdownTimeout(5*time.Second, 3*time.Second)
45-
c.Add(serverRunner)
46-
c.AddService(jobs)
47-
c.Add(monitor)
37+
c := runnable.Component().
38+
WithProcessShutdownTimeout(5 * time.Second).
39+
WithServiceShutdownTimeout(5 * time.Second).
40+
AddProcess(runnable.HTTPServer(server)).
41+
AddService(jobs).
42+
AddProcess(
43+
runnable.Every(runnable.Func(jobMonitor(jobs)), 2*time.Second),
44+
)
4845

4946
fmt.Println("Enqueue jobs with: curl http://localhost:8000/?count=3")
5047

examples/example/main.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,16 @@ import (
77
"time"
88

99
"github.com/pior/runnable"
10+
"github.com/pior/runnable/examples/jobqueue"
1011
)
1112

1213
func main() {
13-
jobs := NewStupidJobQueue()
14+
jobs := jobqueue.New()
1415

1516
server := &http.Server{
1617
Addr: "localhost:8000",
1718
Handler: http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
18-
jobs.Perform(r.URL.Path)
19+
jobs.Enqueue(r.URL.Path)
1920
fmt.Fprintln(rw, "Job enqueued!")
2021
}),
2122
}

0 commit comments

Comments
 (0)