Skip to content

Commit d92e686

Browse files
committed
Fix RedisQueue::size() do not count available jobs in delayed queue
Fix `RedisJob::$attempts` not incremented correctly
1 parent c6df900 commit d92e686

File tree

7 files changed

+89
-91
lines changed

7 files changed

+89
-91
lines changed

.github/workflows/tests.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@ jobs:
1616
coverage: [true]
1717
composer-flags: ['']
1818

19+
services:
20+
redis:
21+
image: redis
22+
ports:
23+
- 6379:6379
24+
options: --health-cmd="redis-cli ping" --health-interval=10s --health-timeout=5s --health-retries=5
25+
1926
steps:
2027
- uses: actions/checkout@v2
2128
with:

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,17 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
66
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
77

8+
## [1.0.1] - 2025-06-05
9+
10+
### Changed
11+
12+
- Improve tests
13+
14+
### Fixed
15+
16+
- `RedisQueue::size()` do not count available jobs in delayed queue
17+
- `RedisJob::$attempts` not incremented correctly
18+
819
## [1.0.0] - 2025-05-19
920

1021
No changes were introduced since the previous beta 10 release.

src/Queue/RedisQueue.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public function __construct(
3939
*/
4040
public function size(): int
4141
{
42+
$this->freeDelayedJobs();
43+
4244
return $this->redis->llen($this->name) ?: 0;
4345
}
4446

@@ -114,7 +116,7 @@ protected function createJob(array $raw): RedisJob
114116
return new RedisJob(
115117
id: (string)$raw['jobId'],
116118
name: $name,
117-
attempts: $raw['attempts'] ?? 0,
119+
attempts: ($raw['attempts'] ?? 0) + 1,
118120
payload: $payload,
119121
queue: $this,
120122
);
@@ -174,7 +176,7 @@ public function release(RedisJob $job, int $delay = 0): void
174176
$job->isReleased() && throw JobException::alreadyReleased($job);
175177
$job->isDeleted() && throw JobException::alreadyDeleted($job);
176178

177-
$this->pushRaw($job, $delay);
179+
$this->push($job, $delay);
178180
}
179181

180182
/**

tests/Queue/AwsSqsQueueTest.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public function testConsumeThrowsChecksumException()
120120
$this->queue->consume();
121121
}
122122

123-
public function testPushReturnsMessageId()
123+
public function testPushReturnsJobId()
124124
{
125125
$jobDescriptorMock = $this->createMock(JobDescriptorInterface::class);
126126

@@ -138,8 +138,8 @@ public function testPushReturnsMessageId()
138138
)
139139
->willReturn(new Result(['MessageId' => 'msg123']));
140140

141-
$messageId = $this->queue->push($jobDescriptorMock, 10);
142-
$this->assertSame('msg123', $messageId);
141+
$jobId = $this->queue->push($jobDescriptorMock, 10);
142+
$this->assertSame('msg123', $jobId);
143143
}
144144

145145
public function testPurge()

tests/Queue/DbQueueTest.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
use Berlioz\QueueManager\Queue\DbQueue;
1616
use Berlioz\QueueManager\Queue\QueueInterface;
1717
use Hector\Connection\Connection;
18+
use PHPUnit\Framework\Attributes\RequiresPhpExtension;
1819

20+
#[RequiresPhpExtension('pdo_sqlite')]
1921
class DbQueueTest extends QueueTestCase
2022
{
2123
public static function newQueue(): QueueInterface

tests/Queue/QueueTestCase.php

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public function testConsume()
4848

4949
$job = $queue->consume();
5050

51-
$this->assertEquals($jobId, $job->getId());
51+
$this->assertEquals($jobId, $job?->getId());
5252
$this->assertEquals(0, $queue->size());
5353
}
5454

@@ -58,6 +58,18 @@ public function testPush()
5858
$queue->push(new JobDescriptor('test', ['foo' => 'value']));
5959

6060
$this->assertEquals(1, $queue->size());
61+
62+
$job = $queue->consume();
63+
64+
$this->assertEquals('test', $job->getName());
65+
}
66+
67+
public function testPushRaw()
68+
{
69+
$queue = static::newQueue();
70+
$queue->pushRaw(new JobDescriptor('test', ['foo' => 'value']));
71+
72+
$this->assertEquals(1, $queue->size());
6173
}
6274

6375
public function testPurge()
@@ -73,4 +85,28 @@ public function testPurge()
7385

7486
$this->assertEquals(0, $queue->size());
7587
}
88+
89+
public function testReleaseJob()
90+
{
91+
$queue = static::newQueue();
92+
$queue->push(new JobDescriptor('test', ['foo' => 'value']));
93+
94+
$this->assertEquals(1, $queue->size());
95+
96+
$job = $queue->consume();
97+
98+
$this->assertEquals(1, $job->getAttempts());
99+
$this->assertEquals(0, $queue->size());
100+
$this->assertFalse($job->isReleased());
101+
102+
$job->release();
103+
104+
$this->assertEquals(1, $queue->size());
105+
$this->assertTrue($job->isReleased());
106+
107+
$job = $queue->consume();
108+
109+
$this->assertEquals(0, $queue->size());
110+
$this->assertEquals(2, $job->getAttempts());
111+
}
76112
}

tests/Queue/RedisQueueTest.php

Lines changed: 25 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* This file is part of Berlioz framework.
44
*
55
* @license https://opensource.org/licenses/MIT MIT License
6-
* @copyright 2025 Ronan GIRON
6+
* @copyright 2024 Ronan GIRON
77
* @author Ronan GIRON <https://github.com/ElGigi>
88
*
99
* For the full copyright and license information, please view the LICENSE
@@ -12,125 +12,65 @@
1212

1313
namespace Berlioz\QueueManager\Tests\Queue;
1414

15-
use Berlioz\QueueManager\Job\JobDescriptorInterface;
16-
use Berlioz\QueueManager\Job\RedisJob;
15+
use Berlioz\QueueManager\Queue\QueueInterface;
1716
use Berlioz\QueueManager\Queue\RedisQueue;
18-
use PHPUnit\Framework\TestCase;
17+
use PHPUnit\Framework\Attributes\RequiresPhpExtension;
1918
use Redis;
19+
use RedisException;
2020

21-
class RedisQueueTest extends TestCase
21+
#[RequiresPhpExtension('redis')]
22+
class RedisQueueTest extends QueueTestCase
2223
{
23-
private Redis $redisMock;
24-
private RedisQueue $queue;
24+
private static ?Redis $redis = null;
2525

26-
protected function setUp(): void
27-
{
28-
$this->redisMock = $this->createMock(Redis::class);
29-
$this->queue = new RedisQueue($this->redisMock, 'testQueue');
30-
}
31-
32-
public function testGetName()
26+
public static function setUpBeforeClass(): void
3327
{
34-
$this->assertSame('testQueue', $this->queue->getName());
28+
try {
29+
self::$redis = new Redis();
30+
self::$redis->connect('127.0.0.1', 6379, 1);
31+
} catch (RedisException) {
32+
self::markTestSkipped('Redis is not available on 127.0.0.1:6379.');
33+
}
3534
}
3635

37-
public function testSize()
38-
{
39-
$this->redisMock
40-
->method('llen')
41-
->with('testQueue')
42-
->willReturn(5);
43-
44-
$this->assertSame(5, $this->queue->size());
45-
}
46-
47-
public function testConsumeReturnsJob()
48-
{
49-
$jobData = json_encode(['jobId' => '123', 'payload' => '{"key":"value"}', 'attempts' => 0]);
50-
51-
$this->redisMock
52-
->method('lpop')
53-
->with('testQueue')
54-
->willReturn($jobData);
55-
56-
$job = $this->queue->consume();
57-
$this->assertInstanceOf(RedisJob::class, $job);
58-
$this->assertSame('123', $job->getId());
59-
}
60-
61-
public function testConsumeReturnsNullWhenEmpty()
62-
{
63-
$this->redisMock
64-
->method('lpop')
65-
->with('testQueue')
66-
->willReturn(false);
67-
68-
$this->assertNull($this->queue->consume());
69-
}
70-
71-
public function testPushReturnsJobId()
36+
protected function setUp(): void
7237
{
73-
$jobDescriptorMock = $this->createMock(JobDescriptorInterface::class);
74-
$jobDescriptorMock->method('jsonSerialize')->willReturn(['key' => 'value']);
75-
76-
$this->redisMock
77-
->expects($this->once())
78-
->method('rPush')
79-
->with(
80-
'testQueue',
81-
$this->callback(function ($jobData) {
82-
$decoded = json_decode($jobData, true);
83-
return isset($decoded['jobId'], $decoded['payload']) && $decoded['payload'] === '{"key":"value"}';
84-
})
85-
);
86-
87-
$jobId = $this->queue->push($jobDescriptorMock);
88-
$this->assertNotEmpty($jobId);
38+
self::$redis->flushAll();
8939
}
9040

91-
public function testPushWithDelayAddsToDelayedQueue()
41+
public static function newQueue(): QueueInterface
9242
{
93-
$jobDescriptorMock = $this->createMock(JobDescriptorInterface::class);
94-
$jobDescriptorMock->method('jsonSerialize')->willReturn(['key' => 'value']);
95-
96-
$this->redisMock
97-
->expects($this->once())
98-
->method('zadd')
99-
->with('testQueue:delayed', ['NX'], $this->greaterThan(time()), $this->callback(function ($jobData) {
100-
$decoded = json_decode($jobData, true);
101-
return isset($decoded['jobId'], $decoded['payload']) && $decoded['payload'] === '{"key":"value"}';
102-
}));
103-
104-
$jobId = $this->queue->push($jobDescriptorMock, 10);
105-
$this->assertNotEmpty($jobId);
43+
return new RedisQueue(redis: self::$redis, name: 'default');
10644
}
10745

10846
public function testFreeDelayedJobs()
10947
{
48+
$redisMock = $this->createMock(Redis::class);
49+
$queue = new RedisQueue($redisMock, 'testQueue');
11050
$jobData = json_encode(['jobId' => '123', 'payload' => '{"key":"value"}', 'attempts' => 0]);
11151

112-
$this->redisMock
52+
$redisMock
11353
->expects($this->once())
11454
->method('set')
11555
->with('testQueue:delayed:lock', '1', ['nx', 'ex' => 10])
11656
->willReturn(true);
11757

118-
$this->redisMock
58+
$redisMock
11959
->expects($this->once())
12060
->method('zrangebyscore')
12161
->with('testQueue:delayed', '-inf', (string)time())
12262
->willReturn([$jobData]);
12363

124-
$this->redisMock
64+
$redisMock
12565
->expects($this->once())
12666
->method('zrem')
12767
->with('testQueue:delayed', $jobData);
12868

129-
$this->redisMock
69+
$redisMock
13070
->expects($this->once())
13171
->method('rpush')
13272
->with('testQueue', $jobData);
13373

134-
$this->queue->freeDelayedJobs();
74+
$queue->freeDelayedJobs();
13575
}
13676
}

0 commit comments

Comments
 (0)