-
-
Notifications
You must be signed in to change notification settings - Fork 45
Expand file tree
/
Copy pathQueuesCommand.swift
More file actions
301 lines (258 loc) · 12.5 KB
/
QueuesCommand.swift
File metadata and controls
301 lines (258 loc) · 12.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
import ConsoleKit
@preconcurrency import Dispatch
import Vapor
import NIOConcurrencyHelpers
import NIOCore
import Atomics
/// The command to start the Queue job
public final class QueuesCommand: AsyncCommand, Sendable {
// See `Command.signature`.
public let signature = Signature()
// See `Command.Signature`.
public struct Signature: CommandSignature {
public init() {}
@Option(name: "queue", help: "Specifies a single queue to run")
var queue: String?
@Flag(name: "scheduled", help: "Runs the scheduled queue jobs")
var scheduled: Bool
}
// See `Command.help`.
public var help: String { "Starts the Vapor Queues worker" }
private let application: Application
private let box: NIOLockedValueBox<Box>
struct Box: Sendable {
var jobTasks: [RepeatedTask]
var scheduledTasks: [String: AnyScheduledJob.Task]
var recoveryTask: RepeatedTask? // Periodic stale job recovery task (like Sidekiq Beat)
var signalSources: [any DispatchSourceSignal]
var didShutdown: Bool
}
/// Create a new ``QueuesCommand``.
///
/// - Parameters:
/// - application: The active Vapor `Application`.
/// - scheduled: This parameter is a historical artifact and has no effect.
public init(application: Application, scheduled: Bool = false) {
self.application = application
self.box = .init(.init(jobTasks: [], scheduledTasks: [:], recoveryTask: nil, signalSources: [], didShutdown: false))
}
// See `AsyncCommand.run(using:signature:)`.
public func run(using context: CommandContext, signature: QueuesCommand.Signature) async throws {
// shutdown future
let promise = self.application.eventLoopGroup.any().makePromise(of: Void.self)
self.application.running = .start(using: promise)
// setup signal sources for shutdown
let signalQueue = DispatchQueue(label: "codes.vapor.jobs.command")
func makeSignalSource(_ code: Int32) {
#if canImport(Darwin)
/// https://github.com/swift-server/swift-service-lifecycle/blob/main/Sources/UnixSignals/UnixSignalsSequence.swift#L77-L82
signal(code, SIG_IGN)
#endif
let source = DispatchSource.makeSignalSource(signal: code, queue: signalQueue)
source.setEventHandler {
print() // clear ^C
promise.succeed(())
}
source.resume()
self.box.withLockedValue { $0.signalSources.append(source) }
}
makeSignalSource(SIGTERM)
makeSignalSource(SIGINT)
if signature.scheduled {
self.application.logger.info("Starting scheduled jobs worker")
try self.startScheduledJobs()
} else {
let queue: QueueName = signature.queue.map { .init(string: $0) } ?? .default
self.application.logger.info("Starting jobs worker", metadata: ["queue": .string(queue.string)])
try self.startJobs(on: queue)
}
}
/// Starts an in-process jobs worker for queued tasks
///
/// - Parameter queueName: The queue to run the jobs on
public func startJobs(on queueName: QueueName) throws {
let queue = self.application.queues.queue(queueName, on: self.application.eventLoopGroup.any())
// Recover stale jobs on startup (if recovery is enabled)
if self.application.queues.configuration.enableStaleJobRecovery {
let recoveryFuture = queue.recoverStaleJobs()
// Log recovery result, but don't block - start workers regardless
recoveryFuture.whenComplete { result in
switch result {
case .success(let count):
if count > 0 {
self.application.logger.info("Recovered stale jobs", metadata: ["count": "\(count)", "queue": .string(queueName.string)])
} else {
self.application.logger.trace("No stale jobs to recover", metadata: ["queue": .string(queueName.string)])
}
case .failure(let error):
self.application.logger.error("Failed to recover stale jobs", metadata: [
"queue": .string(queueName.string),
"error": "\(String(reflecting: error))"
])
}
}
} else {
self.application.logger.trace("Stale job recovery is disabled", metadata: ["queue": .string(queueName.string)])
}
let workerCount: Int
switch self.application.queues.configuration.workerCount {
case .default:
workerCount = self.application.eventLoopGroup.makeIterator().reduce(0, { n, _ in n + 1 })
self.application.logger.trace("Using default worker count", metadata: ["workerCount": "\(workerCount)"])
case .custom(let custom):
workerCount = custom
self.application.logger.trace("Using custom worker count", metadata: ["workerCount": "\(workerCount)"])
}
var tasks: [RepeatedTask] = []
for eventLoop in self.application.eventLoopGroup.makeIterator().prefix(workerCount) {
self.application.logger.trace("Booting worker")
let worker = self.application.queues.queue(queueName, on: eventLoop).worker
let task = eventLoop.scheduleRepeatedAsyncTask(
initialDelay: .zero,
delay: worker.queue.configuration.refreshInterval
) { task in
worker.queue.logger.trace("Running refresh task")
return worker.run().map {
worker.queue.logger.trace("Worker ran the task successfully")
}.recover { error in
worker.queue.logger.error("Job run failed", metadata: ["error": "\(String(reflecting: error))"])
}.map {
if self.box.withLockedValue({ $0.didShutdown }) {
worker.queue.logger.trace("Shutting down, cancelling the task")
task.cancel()
}
}
}
tasks.append(task)
}
self.box.withLockedValue { $0.jobTasks = tasks }
self.application.logger.trace("Finished adding jobTasks, total count: \(tasks.count)")
// Schedule periodic stale job recovery (like Sidekiq Beat - runs every 15 seconds by default)
// Only if recovery is enabled
if self.application.queues.configuration.enableStaleJobRecovery {
let recoveryInterval = self.application.queues.configuration.staleJobRecoveryInterval
let recoveryQueue = self.application.queues.queue(queueName, on: self.application.eventLoopGroup.any())
let recoveryTask = self.application.eventLoopGroup.any().scheduleRepeatedAsyncTask(
initialDelay: recoveryInterval, // Wait before first check (after startup recovery)
delay: recoveryInterval
) { task in
recoveryQueue.logger.trace("Running periodic stale job recovery check")
return recoveryQueue.recoverStaleJobs().map { count in
if count > 0 {
recoveryQueue.logger.info("Periodic recovery: recovered stale jobs", metadata: ["count": "\(count)"])
}
}.recover { error in
recoveryQueue.logger.error("Periodic recovery failed", metadata: ["error": "\(String(reflecting: error))"])
}.map {
// Check if shutdown was requested
if self.box.withLockedValue({ $0.didShutdown }) {
recoveryQueue.logger.trace("Shutting down, cancelling recovery task")
task.cancel()
}
}
}
self.box.withLockedValue { $0.recoveryTask = recoveryTask }
let recoveryIntervalSeconds = Double(recoveryInterval.nanoseconds) / 1_000_000_000.0
self.application.logger.info("Started periodic stale job recovery", metadata: [
"interval": "\(Int(recoveryIntervalSeconds))s",
"queue": .string(queueName.string)
])
} else {
self.application.logger.trace("Periodic stale job recovery is disabled", metadata: ["queue": .string(queueName.string)])
}
}
/// Starts the scheduled jobs in-process
public func startScheduledJobs() throws {
self.application.logger.trace("Checking for scheduled jobs to begin the worker")
guard !self.application.queues.configuration.scheduledJobs.isEmpty else {
self.application.logger.warning("No scheduled jobs exist, exiting scheduled jobs worker.")
return
}
self.application.logger.trace("Beginning the scheduling process")
self.application.queues.configuration.scheduledJobs.forEach {
self.application.logger.trace("Scheduling job", metadata: ["name": "\($0.job.name)"])
self.schedule($0)
}
}
private func schedule(_ job: AnyScheduledJob) {
self.box.withLockedValue { box in
if box.didShutdown {
self.application.logger.trace("Application is shutting down, not scheduling job", metadata: ["name": "\(job.job.name)"])
return
}
let context = QueueContext(
queueName: QueueName(string: "scheduled"),
configuration: self.application.queues.configuration,
application: self.application,
logger: self.application.logger,
on: self.application.eventLoopGroup.any()
)
guard let task = job.schedule(context: context) else {
return
}
self.application.logger.trace("Job was scheduled successfully", metadata: ["name": "\(job.job.name)"])
box.scheduledTasks[job.job.name] = task
task.done.whenComplete { result in
switch result {
case .failure(let error):
context.logger.error("Scheduled job failed", metadata: ["name": "\(job.job.name)", "error": "\(String(reflecting: error))"])
case .success: break
}
// Explicitly spin the event loop so we don't deadlock on a reentrant call to this method.
context.eventLoop.execute {
self.schedule(job)
}
}
}
}
/// Shuts down the jobs worker
public func shutdown() {
self.box.withLockedValue { box in
box.didShutdown = true
// stop running in case shutting down from signal
self.application.running?.stop()
// clear signal sources
box.signalSources.forEach { $0.cancel() } // clear refs
box.signalSources = []
// stop all job queue workers
box.jobTasks.forEach {
$0.syncCancel(on: self.application.eventLoopGroup.any())
}
// stop all scheduled jobs
box.scheduledTasks.values.forEach {
$0.task.syncCancel(on: self.application.eventLoopGroup.any())
}
// stop periodic recovery task
if let recoveryTask = box.recoveryTask {
recoveryTask.syncCancel(on: self.application.eventLoopGroup.any())
}
}
}
public func asyncShutdown() async {
let (jobTasks, scheduledTasks, recoveryTask) = self.box.withLockedValue { box in
box.didShutdown = true
// stop running in case shutting down from signal
self.application.running?.stop()
// clear signal sources
box.signalSources.forEach { $0.cancel() } // clear refs
box.signalSources = []
// Release the lock before we start any suspensions
return (box.jobTasks, box.scheduledTasks, box.recoveryTask)
}
// stop all job queue workers
for jobTask in jobTasks {
await jobTask.asyncCancel(on: self.application.eventLoopGroup.any())
}
// stop all scheduled jobs
for scheduledTask in scheduledTasks.values {
await scheduledTask.task.asyncCancel(on: self.application.eventLoopGroup.any())
}
// stop periodic recovery task
if let recoveryTask = recoveryTask {
await recoveryTask.asyncCancel(on: self.application.eventLoopGroup.any())
}
}
deinit {
assert(self.box.withLockedValue { $0.didShutdown }, "JobsCommand did not shutdown before deinit")
}
}