Skip to content

Commit b4dc6d3

Browse files
committed
fix(worker): pick up new jobs while processing instead of waiting for
completion
1 parent 04dbac8 commit b4dc6d3

File tree

2 files changed

+142
-2
lines changed

2 files changed

+142
-2
lines changed

src/worker.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,25 @@ export class Worker {
276276
continue
277277
}
278278

279-
const completed = await this.#pool.waitForNextCompletion()
280-
yield { type: 'completed', queue: completed.queue, job: completed.job }
279+
const hasCapacity = this.#pool.hasCapacity(this.#concurrency)
280+
281+
// If we have capacity, don't block indefinitely waiting for a completion;
282+
// wake up periodically to try to acquire newly enqueued jobs.
283+
const result = await Promise.race([
284+
this.#pool
285+
.waitForNextCompletion()
286+
.then((completed) => ({ kind: 'completed' as const, completed })),
287+
...(hasCapacity
288+
? [setTimeout(this.#idleDelay).then(() => ({ kind: 'tick' as const }))]
289+
: []),
290+
])
291+
292+
if (result.kind === 'tick') {
293+
// No completion yet, but we woke up to check the queue again
294+
continue
295+
}
296+
297+
yield { type: 'completed', queue: result.completed.queue, job: result.completed.job }
281298
} catch (error) {
282299
yield {
283300
type: 'error',

tests/_utils/register_worker_concurrency_suite.ts

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,4 +193,127 @@ export function registerWorkerConcurrencyTestSuite(options: WorkerConcurrencyTes
193193
'No job should appear twice in execution order'
194194
)
195195
})
196+
197+
test('jobs dispatched with delays should run concurrently when capacity is available', async ({
198+
assert,
199+
cleanup,
200+
}) => {
201+
const jobStartTimes: Map<string, number> = new Map()
202+
const jobEndTimes: Map<string, number> = new Map()
203+
204+
class SlowJob extends Job<{ jobId: string }> {
205+
async execute() {
206+
jobStartTimes.set(this.payload.jobId, Date.now())
207+
// Simulate a slow job (300ms)
208+
await setTimeout(300)
209+
jobEndTimes.set(this.payload.jobId, Date.now())
210+
}
211+
}
212+
213+
const adapter = await options.createAdapter()
214+
Locator.register(SlowJob.name, SlowJob)
215+
216+
cleanup(() => Locator.clear())
217+
218+
const config: QueueManagerConfig = {
219+
default: 'test',
220+
adapters: { test: () => adapter },
221+
worker: {
222+
concurrency: 5,
223+
idleDelay: 50, // Short idle delay to pick up new jobs quickly
224+
},
225+
}
226+
227+
const worker = new Worker(config)
228+
229+
cleanup(async () => {
230+
await worker.stop()
231+
})
232+
233+
// Start processing in background
234+
const processingPromise = (async () => {
235+
let cycles = 0
236+
const maxCycles = 50
237+
while (cycles < maxCycles) {
238+
const cycle = await worker.processCycle(['default'])
239+
cycles++
240+
if (cycle?.type === 'idle' && jobEndTimes.size === 4) break
241+
}
242+
})()
243+
244+
// Push jobs with delays between them (simulating the user's scenario)
245+
await adapter.pushOn('default', {
246+
id: 'delayed-job-0',
247+
name: 'SlowJob',
248+
payload: { jobId: 'job-0' },
249+
attempts: 0,
250+
})
251+
252+
await setTimeout(50)
253+
254+
await adapter.pushOn('default', {
255+
id: 'delayed-job-1',
256+
name: 'SlowJob',
257+
payload: { jobId: 'job-1' },
258+
attempts: 0,
259+
})
260+
261+
await setTimeout(50)
262+
263+
await adapter.pushOn('default', {
264+
id: 'delayed-job-2',
265+
name: 'SlowJob',
266+
payload: { jobId: 'job-2' },
267+
attempts: 0,
268+
})
269+
270+
await setTimeout(50)
271+
272+
await adapter.pushOn('default', {
273+
id: 'delayed-job-3',
274+
name: 'SlowJob',
275+
payload: { jobId: 'job-3' },
276+
attempts: 0,
277+
})
278+
279+
await processingPromise
280+
281+
// All 4 jobs should have been executed
282+
assert.equal(jobStartTimes.size, 4, 'All 4 jobs should have started')
283+
assert.equal(jobEndTimes.size, 4, 'All 4 jobs should have completed')
284+
285+
// Verify concurrent execution: jobs 1, 2, 3 should start BEFORE job 0 ends
286+
// If they ran sequentially, job 1 would start after job 0's 300ms execution
287+
const job0Start = jobStartTimes.get('job-0')!
288+
const job0End = jobEndTimes.get('job-0')!
289+
const job1Start = jobStartTimes.get('job-1')!
290+
const job2Start = jobStartTimes.get('job-2')!
291+
const job3Start = jobStartTimes.get('job-3')!
292+
293+
// Job 1 should start before job 0 ends (proving concurrency)
294+
assert.isTrue(
295+
job1Start < job0End,
296+
`Job 1 should start (${job1Start}) before job 0 ends (${job0End}) - concurrent execution`
297+
)
298+
299+
// Job 2 should start before job 0 ends
300+
assert.isTrue(
301+
job2Start < job0End,
302+
`Job 2 should start (${job2Start}) before job 0 ends (${job0End}) - concurrent execution`
303+
)
304+
305+
// Job 3 should start before job 0 ends
306+
assert.isTrue(
307+
job3Start < job0End,
308+
`Job 3 should start (${job3Start}) before job 0 ends (${job0End}) - concurrent execution`
309+
)
310+
311+
// All jobs should start within a reasonable time window (not sequentially)
312+
const maxStartDiff = Math.max(job1Start, job2Start, job3Start) - job0Start
313+
assert.isBelow(
314+
maxStartDiff,
315+
250,
316+
`All jobs should start within 250ms of each other (actual: ${maxStartDiff}ms)`
317+
)
318+
})
196319
}

0 commit comments

Comments
 (0)