Skip to content

Jobs sometimes executed multiple times. #27

Open
@sidepelican

Description

@sidepelican

Jobs sometimes executed multiple times.

Issue

I have found a concurrency safety issue.
A dispatched job will be dequeued twice with same jobID and payload.

Reproducing

This is a reproducing repository: https://github.com/sidepelican/QueuesFluentDriverMultipleExecution
This repository dispatches a simple job several times, and automatically detects when the same job launched multiple times.

$ swift run
...
2022-12-19T19:08:24+0900 info main : job_id=940F558A-EC00-47B9-8935-45D884281708 p_id=51675C99-B581-4555-9072-A376D1E95770 [App] EchoJob!
2022-12-19T19:08:24+0900 info main : job_id=940F558A-EC00-47B9-8935-45D884281708 p_id=51675C99-B581-4555-9072-A376D1E95770 [App] EchoJob!
p_id=51675C99-B581-4555-9072-A376D1E95770 is multiple executed!

Cause

  1. Queues calls Queue.set and Queue.push in Queue.dispatch.

https://github.com/vapor/queues/blob/c95c891c3c04817eac1165587fb02457c749523a/Sources/Queues/Queue.swift#L84-L86

  1. FluentQueue.set save a JobModel. JobModel.state has .pending as initial state.

public func set(_ id: JobIdentifier, to jobStorage: JobData) -> EventLoopFuture<Void> {
let jobModel = JobModel(id: id, queue: queueName.string, jobData: JobDataModel(jobData: jobStorage))
// If the job must run at a later time, ensure it won't be picked earlier since
// we sort pending jobs by date when querying
jobModel.runAtOrAfter = jobStorage.delayUntil ?? Date()
return jobModel.save(on: db).map { metadata in
return
}
}

  1. FluentQueue.push writes the job's state to pending. The default value of state is .pending, so this operation is seemingly meaningless.

public func push(_ id: JobIdentifier) -> EventLoopFuture<Void> {
guard let sqlDb = db as? SQLDatabase else {
return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound)
}
return sqlDb
.update(JobModel.schema)
.set(SQLColumn("\(FieldKey.state)"), to: SQLBind(QueuesFluentJobState.pending))
.where(SQLColumn("\(FieldKey.id)"), .equal, SQLBind(id.string))
.run()
}

  1. The jobs set in 2 is ready for the workers to dequeue. What happens if a worker dequeues a job set in 2 between 2 and 3? The worker set the state to .processing and then it is overridden to .pending in 3.
  2. The state is .pending so another worker can dequeue the job. Incident happens.

How to fix?

I think there are two ways.
One is to add .initialized to QueuesFluentJobState and use it as an initial value of JobModel.state.
The other is to do nothing in FluentQueue.push.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions