This repository was archived by the owner on Mar 5, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.go
More file actions
113 lines (96 loc) · 1.94 KB
/
worker.go
File metadata and controls
113 lines (96 loc) · 1.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package easyworker
import (
"log"
"time"
)
const (
iSTREAM = -1
iERROR = iota
iFATAL_ERROR
iSUCCESS
iRETRY
iCANCEL
iRUN
iQUIT
iTASK
)
// struct carry task/command for worker.
type msg struct {
id int
msgType int
data any
}
// worker's information.
type worker struct {
// worker's id
id int64
// retry time, define by user.
retryTimes int
// sleep time between re-run.
retrySleep int
// function, define by user.
fun any
// command channel, supervisor uses to send command to worker.
cmd chan msg
// input channel, worker receives task (params) then run with fun.
inputCh chan msg
// output channel, worker send back result to supervisor.
resultCh chan msg
}
/*
start worker with options in struct.
after start worker will wait task from supervisor.
after task done, worker will send result back to supervisor with id of task.
*/
func (w *worker) run() {
defer func() {
if r := recover(); r != nil {
if printLog {
log.Println(w.id, ", worker was panic, ", r)
}
w.resultCh <- msg{id: int(w.id), msgType: iFATAL_ERROR, data: r}
}
}()
var (
task msg
ret []any
err error
)
for {
select {
case task = <-w.inputCh:
case cmd := <-w.cmd:
// receive a quit signal.
if cmd.msgType == iQUIT {
if printLog {
log.Println(w.id, "is exited")
}
return
}
}
switch task.msgType {
case iTASK:
args := task.data.([]any)
for i := 0; i <= w.retryTimes; i++ {
if i > 0 {
time.Sleep(time.Millisecond * time.Duration(w.retrySleep))
if printLog {
log.Println(w.id, ", retry(", i, ") function with last args")
}
}
ret, err = invokeFun(w.fun, args...)
if err == nil {
break
}
}
if err != nil {
if printLog {
log.Println(w.id, ", call function failed, error: ", err)
}
w.resultCh <- msg{id: task.id, msgType: iERROR, data: err}
} else {
w.resultCh <- msg{id: task.id, msgType: iSUCCESS, data: ret}
}
}
}
}