Skip to content
This repository was archived by the owner on Mar 5, 2026. It is now read-only.

Commit d96fa47

Browse files
committed
add supervisor & refactor code
1 parent 4f1125f commit d96fa47

File tree

13 files changed

+975
-519
lines changed

13 files changed

+975
-519
lines changed

README.md

Lines changed: 90 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -4,43 +4,38 @@ A Golang package for supporting worker supervisor model.
44
The package help developer easy to run tasks.
55
It's scalable with minimum effort.
66

7+
easyworker inspired by Erlang OTP.
8+
79
# Design
810

911
The package has two main part:
1012

11-
* The supervisor: Manage worker, give a task to worker & collect result.
12-
* The worker: Receive task, run task then send result to supervisor.
13+
* The supervisor: Manage child, give a task to child & collect result.
14+
* The child: Receive task, run task then send result to supervisor.
1315

1416
## Supervisor
1517

1618
Start worker and moniter worker.
1719
Send task to worker and get result.
1820

19-
## Worker
21+
## Child
2022

2123
Run task with user's function and handle error.
2224
If user's function panic worker will check retry config and re-run if needed.
2325

2426
# Guide
2527

26-
easywork support 2 type of worker:
28+
easywork support 2 type of worker and a type of supervisor:
2729

28-
* Task. Add a list of task and run worker
29-
* Stream. Start worker then push data to worker from channel
30+
* Task, Add a list of task and run worker.
31+
* Stream, Start worker then push data to worker from channel.
32+
* Supervisor, Start a supervisor for custom worker.
3033

3134
## EasyTask
3235

3336
This is simple way to run parallel task.
3437
User doesn't to manage goroutine, channel,...
3538

36-
## EasyStream
37-
38-
This is used for streaming type.
39-
In this case, tasks are continuously send to worker by user's channel.
40-
Results will receive from other channle of user.
41-
42-
# Example
43-
4439
EasyTask example:
4540

4641
```go
@@ -52,66 +47,73 @@ fnSum = func(a ...int) int {
5247
return sum
5348
}
5449

50+
// number of workers
5551
numWorkers := 3
52+
53+
// retry times
5654
retryTimes := 0
57-
retrySleep := 0
5855

59-
config, _ := NewConfig(fnSum, numWorkers, retryTimes, retrySleep)
56+
// sleep time before re-run
57+
retrySleep := 0
6058

61-
task, err := NewTask(config)
59+
// new config for EasyTask
60+
config, _ := easyworker.NewConfig(fnSum, numWorkers, retryTimes, retrySleep)
6261

63-
if err != nil {
64-
t.Error("cannot create task, ", err)
65-
return
66-
}
62+
// new EasyTask
63+
task, _ := easyworker.NewTask(config)
6764

65+
// add tasks
6866
myTask.AddTask(1, 2, 3)
6967
myTask.AddTask(3, 4, 5, 6, 7)
7068

69+
// start workers
7170
r, e := myTask.Run()
71+
7272
if e != nil {
7373
t.Error("run task failed, ", e)
7474
} else {
7575
fmt.Println("task result:", r)
7676
}
7777
```
7878

79+
## EasyStream
80+
81+
This is used for streaming type.
82+
In this case, tasks are continuously send to worker by user's channel.
83+
Results will receive from other channle of user.
84+
7985
EasyStream example:
8086

8187
```go
88+
// fun will do task
8289
fnStr = func (a int, suffix string) string {
8390
if a%3 == 0 {
8491
panic("panic from user func")
8592
}
8693
return fmt.Sprintf("%d_%s", a, suffix)
8794
}
8895

89-
inCh := make(chan []interface{}, 1)
96+
inCh := make(chan []interface{})
9097
outCh := make(chan interface{})
9198

9299
// number of workers = number of cpu cores (logical cores)
93-
config, _ := NewConfig(fnSum, DefaultNumWorker(), 3, 1000)
100+
config, _ := easyworker.NewConfig(fnSum, DefaultNumWorker(), 3, 1000)
94101

95102
// test with stream.
96-
myStream, err := NewStream(config, inCh, outCh)
97-
if err != nil {
98-
t.Error("create EasyWorker failed, ", err)
99-
}
103+
myStream, _ := easyworker.NewStream(config, inCh, outCh)
100104

101-
e := myStream.Run()
102-
if e != nil {
103-
t.Error("run stream task failed, ", e)
104-
} else {
105-
fmt.Println("stream is running")
106-
}
105+
// start stream.
106+
myStream.Run()
107107

108+
// receive data from stream.
108109
go func() {
109110
for {
110111
r := <-outCh
111112
fmt.Println("stream result: ", r)
112113
}
113114
}()
114115

116+
// send data to stream.
115117
go func() {
116118
for i := 0; i < 15; i++ {
117119
input := []interface{}{i, "3"}
@@ -121,8 +123,60 @@ go func() {
121123
}
122124
}()
123125

124-
time.Sleep(2 * time.Second)
126+
// wait for get result
127+
time.Sleep(time.Second)
125128

126-
// Stop all worker
129+
// stop all worker
127130
myStream.Stop()
128-
```
131+
```
132+
133+
## Supervisor
134+
135+
This is used for generic purpose worker(child).
136+
Every children has a owner restart strategy.
137+
138+
Currently, child has three type of restart strategy:
139+
140+
* ALWAYS_RESTART, supervisor always restart child if it panic/done.
141+
* NORMAL_RESTART, supervisor will restart if child was panic.
142+
* NO_RESTART, supervisor will don't restart for any reason.
143+
144+
Child will be started after add to supervisor.
145+
146+
supervisor example:
147+
148+
```go
149+
// example function need to run in child.
150+
loopRun1 = func(a int) {
151+
for i := 0; i < a; i++ {
152+
time.Sleep(time.Second)
153+
fmt.Println("loop at", i)
154+
}
155+
fmt.Println("Loop exit..")
156+
}
157+
158+
// example function run in child. It will panic if counter > 3
159+
LoopRunWithPanic = func(a int) {
160+
for i := 0; i < a; i++ {
161+
time.Sleep(time.Second)
162+
fmt.Println("loop at", i)
163+
if i > 3 {
164+
panic("test loop with panic")
165+
}
166+
}
167+
// maybe you won't see this.
168+
fmt.Println("Loop exit..")
169+
}
170+
171+
// create a supervisor
172+
sup := easyworker.NewSupervisor()
173+
174+
// add direct child to supervisor.
175+
sup.NewChild(easyworker.NORMAL_RESTART, LoopRun, 5)
176+
177+
// create a child
178+
child, _ := easyworker.NewChild(ALWAYS_RESTART, LoopRunWithPanic, 5)
179+
180+
// add exists child.
181+
sup.AddChild(child)
182+
```

child.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package easyworker
2+
3+
import (
4+
"fmt"
5+
"reflect"
6+
)
7+
8+
const (
9+
// Always restart child
10+
ALWAYS_RESTART = iota
11+
12+
// Just restart if child got an error
13+
NORMAL_RESTART
14+
15+
// No restart child
16+
NO_RESTART
17+
)
18+
19+
const (
20+
CHILD_PANIC = iota
21+
CHILD_DONE
22+
CHILD_RUNING
23+
CHILD_RESTARTING
24+
CHILD_STOPPED
25+
)
26+
27+
var (
28+
// use to store last id of child. id is auto_increment.
29+
childLastId int
30+
)
31+
32+
type Child struct {
33+
id int
34+
restart_type int
35+
cmdCh chan cmd
36+
status int
37+
38+
fun interface{}
39+
params []interface{}
40+
}
41+
42+
func NewChild(restart int, fun interface{}, params ...interface{}) (ret Child, retErr error) {
43+
if restart < ALWAYS_RESTART || restart > NO_RESTART {
44+
retErr = fmt.Errorf("in correct restart type, input: %d", restart)
45+
return
46+
}
47+
48+
if retErr = verifyFunc(fun); retErr != nil {
49+
return
50+
}
51+
52+
childLastId++
53+
54+
return Child{
55+
id: childLastId,
56+
restart_type: restart,
57+
fun: fun,
58+
params: params,
59+
}, nil
60+
}
61+
62+
func (c *Child) run() {
63+
defer func() {
64+
msg := cmd{
65+
id: c.id,
66+
typeCmd: CHILD_DONE,
67+
}
68+
if r := recover(); r != nil {
69+
fmt.Println(c.id, ", worker was panic, ", r)
70+
msg.typeCmd = CHILD_PANIC
71+
msg.data = r
72+
}
73+
74+
c.cmdCh <- msg
75+
}()
76+
77+
var (
78+
ret reflect.Value
79+
err error
80+
)
81+
82+
c.status = CHILD_RUNING
83+
l:
84+
for {
85+
// call user define function.
86+
ret, err = invokeFun(c.fun, c.params...)
87+
88+
switch c.restart_type {
89+
case ALWAYS_RESTART:
90+
if err != nil {
91+
fmt.Println(c.id, "failed, reason:", err)
92+
93+
}
94+
fmt.Println(c.id, "child re-run")
95+
continue l
96+
case NORMAL_RESTART:
97+
if err == nil {
98+
fmt.Println(c.id, "done, child no re-run")
99+
break l
100+
} else {
101+
fmt.Println(c.id, "failed, child re-run, reason:", err)
102+
continue l
103+
}
104+
case NO_RESTART:
105+
if err != nil {
106+
fmt.Println(c.id, "failed, no re-run, reason:", err)
107+
break l
108+
}
109+
}
110+
111+
}
112+
113+
if err != nil {
114+
fmt.Println(c.id, ", call function failed, error: ", err)
115+
} else {
116+
fmt.Println(c.id, ", function return ", ret)
117+
}
118+
}

child_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package easyworker
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
"time"
7+
)
8+
9+
func LoopRun2(a int) {
10+
for i := 0; i < a; i++ {
11+
time.Sleep(time.Second)
12+
fmt.Println("loop at", i)
13+
}
14+
fmt.Println("Loop exit..")
15+
}
16+
17+
func TestChildIncorrectedParams(t *testing.T) {
18+
_, err := NewChild(ALWAYS_RESTART, "LoopRun", 5)
19+
20+
if err == nil {
21+
t.Error("missed checking function from user")
22+
}
23+
24+
_, err = NewChild(1000, LoopRun, 5)
25+
26+
if err == nil {
27+
t.Error("missed checking restart strategy from user")
28+
}
29+
}

0 commit comments

Comments
 (0)