Skip to content

Commit e8a82ca

Browse files
committed
Add $retryTime property for DbQueue and AwsSqsQueue
1 parent c94d7fe commit e8a82ca

File tree

6 files changed

+36
-18
lines changed

6 files changed

+36
-18
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
77

88
## [1.0.0-beta7] - 2025-01-27
99

10+
### Added
11+
12+
- `$retryTime` property for `DbQueue` and `AwsSqsQueue`
13+
1014
### Changed
1115

1216
- Methods `QueueInterface::push()` and `QueueInterface::pushRaw()`, accept `\DateTimeInterface` or `\DateInterval` for `$delay` parameter

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ $queue = new DbQueue(
161161
connection: $dbConnection, // Database connection
162162
name: 'default', // Queue name
163163
tableName: 'queue_jobs', // Name of MySQL table
164+
retryTime: 30, // Time to wait after failed job
164165
maxAttempts: 5, // Maximum attempts of a job
165166
);
166167
```
@@ -204,7 +205,7 @@ CREATE
204205
TRIGGER `queue_jobs_AFTER_DELETE` AFTER DELETE ON `queue_jobs`
205206
FOR EACH ROW BEGIN
206207
INSERT INTO `queue_jobs_done` (`job_id`, `create_time`, `queue`, `availability_time`, `attempts`, `lock_time`, `payload`)
207-
VALUES (OLD.`job_id`, OLD.`create_time`, OLD.`queue`, OLD.`availability_time`, OLD.`attempts`, OLD.`lock_time`, OLD.`payload`);
208+
VALUES (OLD.`job_id`, OLD.`create_time`, OLD.`queue`, OLD.`availability_time`, OLD.`attempts`, IFNULL(OLD.`lock_time`, CURRENT_TIMESTAMP), OLD.`payload`);
208209
END;$$
209210
DELIMITER ;
210211
```
@@ -258,5 +259,6 @@ $queue = new AwsSqsQueue(
258259
sqsClient: new SqsClient(...), // Database connection
259260
name: 'default', // Queue name
260261
queueUrl: '...', // AWS queue URL
262+
retryTime: 30, // Time to wait after failed job
261263
);
262264
```

src/Queue/AbstractQueue.php

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,6 @@ protected function getAvailableDateTime(DateTimeInterface|DateInterval|int $dela
8585
*/
8686
protected function getDelayInSeconds(DateTimeInterface $dateTime): int
8787
{
88-
$diff = $this->now()->diff($dateTime);
89-
90-
return
91-
$diff->days * (24 * 60 * 60) +
92-
$diff->h * (60 * 60) +
93-
$diff->i * (60) +
94-
$diff->s;
88+
return $dateTime->getTimestamp() - $this->now()->getTimestamp();
9589
}
9690
}

src/Queue/AwsSqsQueue.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public function __construct(
3030
private SqsClient $sqsClient,
3131
private string $queueUrl,
3232
string $name = 'default',
33+
private int $retryTime = 30,
3334
) {
3435
parent::__construct($name);
3536
}
@@ -88,6 +89,7 @@ public function consume(): ?SqsJob
8889
{
8990
$result = $this->sqsClient->receiveMessage([
9091
'QueueUrl' => $this->queueUrl,
92+
'VisibilityTimeout' => $this->retryTime,
9193
'AttributeNames' => ['ApproximateReceiveCount'],
9294
])->toArray()['Messages'][0] ?? null;
9395

@@ -117,7 +119,7 @@ public function push(JobDescriptorInterface $jobDescriptor, DateTimeInterface|Da
117119
public function pushRaw(mixed $payload, DateTimeInterface|DateInterval|int $delay = 0): string
118120
{
119121
$result = $this->sqsClient->sendMessage([
120-
'DelaySeconds' => $delay,
122+
'DelaySeconds' => $this->getDelayInSeconds($this->getAvailableDateTime($delay)),
121123
'MessageBody' => json_encode($payload),
122124
'QueueUrl' => $this->queueUrl,
123125
]);

src/Queue/DbQueue.php

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
use Hector\Connection\Connection;
2626
use Hector\Query\Component\Order;
2727
use Hector\Query\QueryBuilder;
28+
use Hector\Query\Statement\Conditions;
2829

2930
class_exists(QueryBuilder::class) || throw QueueManagerException::missingPackage('hectororm/query');
3031

@@ -34,11 +35,17 @@ public function __construct(
3435
private Connection $connection,
3536
string $name = 'default',
3637
private string $tableName = 'queue_jobs',
38+
private int $retryTime = 30,
3739
private int $maxAttempts = 5,
3840
) {
3941
parent::__construct($name);
4042
}
4143

44+
private function getRetryDateTimeLimit(): DateTimeInterface
45+
{
46+
return $this->now()->sub(new DateInterval('PT' . $this->retryTime . 'S'));
47+
}
48+
4249
private function getQueryBuilder(): QueryBuilder
4350
{
4451
$builder = new QueryBuilder($this->connection);
@@ -48,16 +55,26 @@ private function getQueryBuilder(): QueryBuilder
4855
return $builder;
4956
}
5057

58+
private function addBuilderConditions(QueryBuilder $builder): QueryBuilder
59+
{
60+
$retryLimit = $this->getRetryDateTimeLimit();
61+
62+
return $builder
63+
->whereLessThanOrEqual('availability_time', $this->now()->format('Y-m-d H:i:s'))
64+
->whereLessThan('attempts', $this->maxAttempts)
65+
->where(function (Conditions $where) use ($retryLimit): void {
66+
$where
67+
->whereNull('lock_time')
68+
->orWhere('lock_time', '<', $retryLimit->format('Y-m-d H:i:s'));
69+
});
70+
}
71+
5172
/**
5273
* @inheritDoc
5374
*/
5475
public function size(): int
5576
{
56-
return $this->getQueryBuilder()
57-
->whereLessThanOrEqual('availability_time', $this->now()->format('Y-m-d H:i:s'))
58-
->whereNull('lock_time')
59-
->whereLessThan('attempts', $this->maxAttempts)
60-
->count();
77+
return $this->addBuilderConditions($this->getQueryBuilder())->count();
6178
}
6279

6380
/**
@@ -68,10 +85,7 @@ public function consume(): ?DbJob
6885
$attempts = 0;
6986

7087
do {
71-
$jobRaw = $this->getQueryBuilder()
72-
->whereLessThanOrEqual('availability_time', $this->now()->format('Y-m-d H:i:s'))
73-
->whereNull('lock_time')
74-
->whereLessThan('attempts', $this->maxAttempts)
88+
$jobRaw = $this->addBuilderConditions($this->getQueryBuilder())
7589
->orderBy('job_id', Order::ORDER_ASC)
7690
->limit(1)
7791
->fetchOne();

tests/Queue/AwsSqsQueueTest.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public function testConsumeReturnsJob()
7575
[
7676
[
7777
'QueueUrl' => 'https://sqs.us-east-1.amazonaws.com/123456789012/testQueue',
78+
'VisibilityTimeout' => 30,
7879
'AttributeNames' => ['ApproximateReceiveCount'],
7980
]
8081
]
@@ -95,6 +96,7 @@ public function testConsumeThrowsChecksumException()
9596
[
9697
[
9798
'QueueUrl' => 'https://sqs.us-east-1.amazonaws.com/123456789012/testQueue',
99+
'VisibilityTimeout' => 30,
98100
'AttributeNames' => ['ApproximateReceiveCount'],
99101
]
100102
]

0 commit comments

Comments
 (0)