Skip to content

Commit b8f2043

Browse files
committed
fix: set tasks as done for disabled parallelism
1 parent ca76c09 commit b8f2043

File tree

1 file changed

+11
-3
lines changed

1 file changed

+11
-3
lines changed

utils/worker/worker_temp.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type ITask interface {
1717
RunTask(worker_id worker_types.WorkerID) error
1818
IsDone() bool
1919
setError(error)
20+
GetError() error
2021
GetStatusCode() worker_types.TaskStatusCode
2122
SetAsDone()
2223
}
@@ -32,6 +33,7 @@ func (t *Task) IsDone() bool { return t.done }
3233

3334
func (t *Task) SetAsDone() { t.done = true }
3435
func (t *Task) setError(err error) { t.err = err }
36+
func (t *Task) GetError() error { return t.err }
3537

3638
func (t *Task) GetStatusCode() worker_types.TaskStatusCode {
3739
if t.err == nil && t.done {
@@ -108,7 +110,9 @@ func (j *TaskPool) launchWorker(worker_id worker_types.WorkerID, tasks <-chan IT
108110

109111
if r := recover(); r != nil {
110112
logus.Log.Error("Recovered in doRunf", typelog.Any("panic", r))
111-
task_err <- errors.New(fmt.Sprintln("task paniced", r))
113+
err := errors.New(fmt.Sprintln("task paniced", r))
114+
task.setError(err)
115+
task_err <- err
112116
}
113117
}()
114118
task_err <- task.RunTask(worker_id)
@@ -178,6 +182,7 @@ func RunTasksInTempPool(name string, tasks []ITask, opts ...TaskPoolOption) erro
178182
if taskPool.disable_parallelism {
179183
for pseudo_worker_id, task := range tasks {
180184
task.RunTask(worker_types.WorkerID(pseudo_worker_id))
185+
task.SetAsDone()
181186
finished_tasks = append(finished_tasks, task)
182187
}
183188
} else {
@@ -192,8 +197,11 @@ func RunTasksInTempPool(name string, tasks []ITask, opts ...TaskPoolOption) erro
192197
for task_number, task := range tasks {
193198
task_id := worker_types.TaskID(task_number)
194199
if !task.IsDone() {
195-
worker_logus.Log.Error("task failed", worker_logus.TaskID(task_id))
196-
return errors.New(fmt.Sprintf("task failed, task_id=", task_id))
200+
worker_logus.Log.Error("task failed", worker_logus.TaskID(task_id), typelog.OptError(task.GetError()))
201+
err := task.GetError()
202+
if err != nil {
203+
return errors.New(fmt.Sprintln("task failed, task_id=", task_id, "error=", err.Error()))
204+
}
197205
}
198206
worker_logus.Log.Debug("task succeed", worker_logus.TaskID(task_id))
199207
}

0 commit comments

Comments
 (0)