Skip to content

Commit 2fe9517

Browse files
committed
Add concurrency limit for task
1 parent 8b95b52 commit 2fe9517

File tree

1 file changed

+19
-0
lines changed

1 file changed

+19
-0
lines changed

common/task/task.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type Group struct {
2323
tasks []taskItem
2424
cleanup func()
2525
fastFail bool
26+
queue chan struct{}
2627
}
2728

2829
func (g *Group) Append(name string, f func(ctx context.Context) error) {
@@ -46,6 +47,13 @@ func (g *Group) FastFail() {
4647
g.fastFail = true
4748
}
4849

50+
func (g *Group) Concurrency(n int) {
51+
g.queue = make(chan struct{}, n)
52+
for i := 0; i < n; i++ {
53+
g.queue <- struct{}{}
54+
}
55+
}
56+
4957
func (g *Group) Run(contextList ...context.Context) error {
5058
return g.RunContextList(contextList)
5159
}
@@ -65,6 +73,14 @@ func (g *Group) RunContextList(contextList []context.Context) error {
6573
for _, task := range g.tasks {
6674
currentTask := task
6775
go func() {
76+
if g.queue != nil {
77+
<-g.queue
78+
select {
79+
case <-taskCancelContext.Done():
80+
return
81+
default:
82+
}
83+
}
6884
err := currentTask.Run(taskCancelContext)
6985
errorAccess.Lock()
7086
if err != nil {
@@ -83,6 +99,9 @@ func (g *Group) RunContextList(contextList []context.Context) error {
8399
taskCancel(errTaskSucceed{})
84100
taskFinish(errTaskSucceed{})
85101
}
102+
if g.queue != nil {
103+
g.queue <- struct{}{}
104+
}
86105
}()
87106
}
88107

0 commit comments

Comments
 (0)