Skip to content

Commit 9e0b166

Browse files
committed
Fix Worker crash when a job handler calls release() or delete() on the job during handling
1 parent bf3dfc4 commit 9e0b166

File tree

2 files changed

+96
-4
lines changed

2 files changed

+96
-4
lines changed

src/QueueManager/Worker.php

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,10 @@ public function run(QueueInterface $queue, WorkerOptions $options = new WorkerOp
204204
// Exec
205205
$this->executeJob($job);
206206

207-
// Delete job
208-
$job->delete();
207+
// Delete job (if not already handled by the handler)
208+
if (false === $job->isDeleted() && false === $job->isReleased()) {
209+
$job->delete();
210+
}
209211

210212
$this->logger?->info(
211213
sprintf(
@@ -216,8 +218,10 @@ public function run(QueueInterface $queue, WorkerOptions $options = new WorkerOp
216218
),
217219
);
218220
} catch (Throwable $exception) {
219-
// Release job
220-
$job->release($this->nextDelayAfterFailure($job, $options));
221+
// Release job (if not already handled by the handler)
222+
if (false === $job->isDeleted() && false === $job->isReleased()) {
223+
$job->release($this->nextDelayAfterFailure($job, $options));
224+
}
221225

222226
$this->logger?->error(
223227
sprintf(

tests/QueueManager/WorkerTest.php

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,4 +245,92 @@ public function testNextDelayAfterFailure(
245245

246246
$this->assertEquals($exceptedDelay, $this->worker->nextDelayAfterFailure($jobMock, $options));
247247
}
248+
249+
public function testRunJobAlreadyDeletedByHandler(): void
250+
{
251+
$jobMock = $this->createMock(JobInterface::class);
252+
253+
$this->queueMock->method('consume')->willReturn($jobMock);
254+
$jobMock->method('getId')->willReturn('Job123');
255+
$jobMock->method('getQueue')->willReturn($this->queueMock);
256+
257+
// Handler deletes the job itself
258+
$this->jobHandlerMock->method('handle')->willReturnCallback(
259+
function (JobInterface $job) use ($jobMock) {
260+
// Simulate that the handler called $job->delete()
261+
$jobMock->method('isDeleted')->willReturn(true);
262+
}
263+
);
264+
265+
// delete() should NOT be called by the Worker since handler already did it
266+
$jobMock->expects($this->never())->method('delete');
267+
$jobMock->expects($this->never())->method('release');
268+
269+
$exitCode = $this->worker->run($this->queueMock, new WorkerOptions(limit: 1));
270+
$this->assertSame(WorkerExit::LIMIT_EXCEEDED->code(), $exitCode);
271+
}
272+
273+
public function testRunJobAlreadyReleasedByHandler(): void
274+
{
275+
$jobMock = $this->createMock(JobInterface::class);
276+
277+
$this->queueMock->method('consume')->willReturn($jobMock);
278+
$jobMock->method('getId')->willReturn('Job123');
279+
$jobMock->method('getQueue')->willReturn($this->queueMock);
280+
281+
// Handler releases the job itself
282+
$this->jobHandlerMock->method('handle')->willReturnCallback(
283+
function (JobInterface $job) use ($jobMock) {
284+
// Simulate that the handler called $job->release()
285+
$jobMock->method('isReleased')->willReturn(true);
286+
}
287+
);
288+
289+
// Neither delete() nor release() should be called by the Worker
290+
$jobMock->expects($this->never())->method('delete');
291+
$jobMock->expects($this->never())->method('release');
292+
293+
$exitCode = $this->worker->run($this->queueMock, new WorkerOptions(limit: 1));
294+
$this->assertSame(WorkerExit::LIMIT_EXCEEDED->code(), $exitCode);
295+
}
296+
297+
public function testRunJobAlreadyReleasedByHandlerThenException(): void
298+
{
299+
$jobMock = $this->createMock(JobInterface::class);
300+
301+
$this->queueMock->method('consume')->willReturn($jobMock);
302+
$jobMock->method('getId')->willReturn('Job123');
303+
$jobMock->method('getQueue')->willReturn($this->queueMock);
304+
$jobMock->method('isReleased')->willReturn(true);
305+
306+
// Handler releases the job then throws
307+
$this->jobHandlerMock->method('handle')->willThrowException(new Exception('Job failed'));
308+
309+
// release() should NOT be called again by the Worker
310+
$jobMock->expects($this->never())->method('release');
311+
$jobMock->expects($this->never())->method('delete');
312+
313+
$exitCode = $this->worker->run($this->queueMock, new WorkerOptions(limit: 1));
314+
$this->assertSame(WorkerExit::LIMIT_EXCEEDED->code(), $exitCode);
315+
}
316+
317+
public function testRunJobAlreadyDeletedByHandlerThenException(): void
318+
{
319+
$jobMock = $this->createMock(JobInterface::class);
320+
321+
$this->queueMock->method('consume')->willReturn($jobMock);
322+
$jobMock->method('getId')->willReturn('Job123');
323+
$jobMock->method('getQueue')->willReturn($this->queueMock);
324+
$jobMock->method('isDeleted')->willReturn(true);
325+
326+
// Handler deletes the job then throws
327+
$this->jobHandlerMock->method('handle')->willThrowException(new Exception('Job failed'));
328+
329+
// Neither delete() nor release() should be called by the Worker
330+
$jobMock->expects($this->never())->method('release');
331+
$jobMock->expects($this->never())->method('delete');
332+
333+
$exitCode = $this->worker->run($this->queueMock, new WorkerOptions(limit: 1));
334+
$this->assertSame(WorkerExit::LIMIT_EXCEEDED->code(), $exitCode);
335+
}
248336
}

0 commit comments

Comments
 (0)