Skip to content

Commit bfb8681

Browse files
authored
Merge pull request #22 from pfnet-research/exit-on-empty-wait-duration
support grace period for exit on empty
2 parents 6d048aa + 73d5b35 commit bfb8681

File tree

4 files changed

+28
-12
lines changed

4 files changed

+28
-12
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,8 @@ worker:
529529
exitOnSuspend: true
530530
# If true, worker exits when the queue was empty
531531
exitOnEmpty: false
532+
# If exitOnEmpty is true, worker waits for exit in the grace period
533+
exitOnEmptyGracePeriod: 10s
532534
# If the value was positive, worker will exit
533535
# after processing the number of tasks
534536
numTasks: 1000

cmd/start_worker.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ func init() {
102102
flag.Bool("exit-on-empty", cmdOpts.Worker.ExitOnEmpty, "if set, worker exits when queue is empty")
103103
viperBindPFlag("Worker.ExitOnEmpty", strconv.FormatBool(cmdOpts.Worker.ExitOnEmpty), flag.Lookup("exit-on-empty"))
104104

105+
flag.Duration("exit-on-empty-grace-period", cmdOpts.Worker.ExitOnEmptyGracePeriod, "if exit-on-empty is true, worker waits for exit in the grace period")
106+
viperBindPFlag("Worker.ExitOnEmptyGracePeriod", cmdOpts.Worker.ExitOnEmptyGracePeriod.String(), flag.Lookup("exit-on-empty-grace-period"))
107+
105108
flag.Int("num-tasks", cmdOpts.Worker.NumTasks, "if set positive value, worker exits after processing the number of tasks")
106109
viperBindPFlag("Worker.NumTasks", strconv.Itoa(cmdOpts.Worker.NumTasks), flag.Lookup("num-tasks"))
107110

pkg/apis/worker/worker.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,15 @@ type Worker struct {
6262
}
6363

6464
type WorkerSpec struct {
65-
Name string `json:"name" yaml:"name" default:"-" validate:"required,min=1,max=256"`
66-
Concurrency int `json:"concurrency" yaml:"concurrency" default:"1" validate:"required,min=1"`
67-
TaskHandler TaskHandlerSpec `json:"taskHandler" yaml:"taskHandler" validate:"required"`
68-
HeartBeat HeartBeatSpec `json:"heartBeat" yaml:"heartBeat" validate:"required"`
69-
ExitOnSuspend bool `json:"exitOnSuspend" yaml:"exitOnSuspend" default:"true"`
70-
ExitOnEmpty bool `json:"exitOnEmpty" yaml:"exitOnEmpty" default:"false"`
71-
NumTasks int `json:"numTasks" yaml:"numTasks" default:"100"`
72-
WorkDir string `json:"workDir" yaml:"workDir" default:"/tmp" validate:"isWorkDirValid,required"`
65+
Name string `json:"name" yaml:"name" default:"-" validate:"required,min=1,max=256"`
66+
Concurrency int `json:"concurrency" yaml:"concurrency" default:"1" validate:"required,min=1"`
67+
TaskHandler TaskHandlerSpec `json:"taskHandler" yaml:"taskHandler" validate:"required"`
68+
HeartBeat HeartBeatSpec `json:"heartBeat" yaml:"heartBeat" validate:"required"`
69+
ExitOnSuspend bool `json:"exitOnSuspend" yaml:"exitOnSuspend" default:"true"`
70+
ExitOnEmpty bool `json:"exitOnEmpty" yaml:"exitOnEmpty" default:"false"`
71+
ExitOnEmptyGracePeriod time.Duration `json:"exitOnEmptyGracePeriod" yaml:"exitOnEmptyGracePeriod" default:"10s"`
72+
NumTasks int `json:"numTasks" yaml:"numTasks" default:"100"`
73+
WorkDir string `json:"workDir" yaml:"workDir" default:"/tmp" validate:"isWorkDirValid,required"`
7374
}
7475

7576
type TaskHandlerSpec struct {

pkg/worker/worker.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ func (w *Worker) Start() error {
122122
}
123123

124124
func (w *Worker) startProcessTasks() error {
125+
var queueEmptyDetectedAt *time.Time
125126
L:
126127
for i := w.config.NumTasks; i != 0; i-- {
127128
select {
@@ -159,12 +160,21 @@ L:
159160
w.smphr.Release(1)
160161
continue
161162
case backend.TaskQueueEmptyError:
162-
if w.config.ExitOnEmpty {
163-
w.logger.Info().Bool("exitOnEmpty", w.config.ExitOnEmpty).Msg("Queue is empty. Stopping worker")
163+
if queueEmptyDetectedAt == nil {
164+
now := time.Now()
165+
queueEmptyDetectedAt = &now
166+
}
167+
logger := w.logger.With().
168+
Bool("exitOnEmpty", w.config.ExitOnEmpty).
169+
Dur("exitOnEmptyGracePeriod", w.config.ExitOnEmptyGracePeriod).
170+
Time("detectedQueueEmptyAt", *queueEmptyDetectedAt).Logger()
171+
shouldExitNow := !time.Now().Before(queueEmptyDetectedAt.Add(w.config.ExitOnEmptyGracePeriod))
172+
if w.config.ExitOnEmpty && shouldExitNow {
173+
logger.Info().Msg("Queue is empty. Stopping worker")
164174
w.smphr.Release(1)
165175
return nil
166176
}
167-
w.logger.Debug().Bool("exitOnEmpty", w.config.ExitOnEmpty).Msg("Queue is empty. retrying in 5 seconds.")
177+
logger.Info().Msg("Queue is empty. retrying in 5 seconds.")
168178
util.SleepContext(w.ctx, 5*time.Second)
169179
w.smphr.Release(1)
170180
continue
@@ -176,7 +186,7 @@ L:
176186
}
177187
}
178188
w.logger.Debug().Interface("task", taskFetched).Msg("Task fetched")
179-
189+
queueEmptyDetectedAt = nil
180190
w.wg.Add(1)
181191
go func() {
182192
defer w.smphr.Release(1)

0 commit comments

Comments
 (0)