Skip to content

Commit bb992d9

Browse files
Merge pull request #382 from JLG-WOCFR-DEV/codex/introduce-queue-driver-interface-and-implementation
Add pluggable queue drivers and distributed worker support
2 parents e7082e3 + e3e9d8b commit bb992d9

File tree

11 files changed

+984
-53
lines changed

11 files changed

+984
-53
lines changed

docs/queue-drivers.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# File d'attente distribuée
2+
3+
Le plugin propose désormais deux modes de file :
4+
5+
- **WP-Cron (par défaut)** pour conserver le comportement historique où WordPress planifie les lots via `wp_schedule_single_event`.
6+
- **Redis Streams/Listes** pour externaliser la file sur une instance Redis et faire tourner un worker WP-CLI (`wp blc:worker run`).
7+
8+
## Paramètres disponibles
9+
10+
Dans l’onglet « Réglages », section *File d’attente distribuée* :
11+
12+
- **Pilote de file** : sélectionnez `WP-Cron` ou `Redis`.
13+
- **Hôte Redis** : nom d’hôte ou IP de votre serveur.
14+
- **Port Redis** : port TCP à utiliser (6379 par défaut).
15+
- **Mot de passe** : secret d’authentification si votre instance est protégée (`requirepass`).
16+
- **Travailleurs simultanés** : nombre maximal de workers WP-CLI/externes que vous autorisez en parallèle. Cette valeur est incluse dans les jobs sérialisés pour adapter le dimensionnement côté worker.
17+
18+
## Worker WP-CLI
19+
20+
Lancez un worker via :
21+
22+
```bash
23+
wp blc:worker run --max-jobs=25 --sleep=2
24+
```
25+
26+
- `--max-jobs` limite le nombre de lots traités (0 = illimité).
27+
- `--sleep` définit la pause (en secondes) lorsque la file est vide.
28+
29+
Si le worker ne parvient pas à se connecter à Redis, il programme automatiquement le lot initial via WP-Cron afin de ne pas bloquer la campagne de scan.
30+
31+
## Extensibilité
32+
33+
Un hook `blc_queue_driver_registered` est déclenché pour chaque pilote chargé, et `blc_queue_driver_resolved` lorsque le pilote actif est sélectionné. Cela permet d’ajouter des adaptateurs (AWS SQS, RabbitMQ, etc.) depuis une extension tierce.

liens-morts-detector-jlg/includes/Scanner/LinkScanController.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@ public function __construct(ScanQueue $queue)
1212
$this->queue = $queue;
1313
}
1414

15-
public function runBatch(int $batch = 0, bool $isFullScan = false, bool $bypassRestWindow = false)
15+
public function runBatch(int $batch = 0, bool $isFullScan = false, bool $bypassRestWindow = false, array $jobContext = [])
1616
{
17-
return $this->queue->runBatch($batch, $isFullScan, $bypassRestWindow);
17+
return $this->queue->runBatch($batch, $isFullScan, $bypassRestWindow, $jobContext);
1818
}
1919

20-
public function run($batch = 0, $is_full_scan = false, $bypass_rest_window = false)
20+
public function run($batch = 0, $is_full_scan = false, $bypass_rest_window = false, array $jobContext = [])
2121
{
22-
return $this->runBatch((int) $batch, (bool) $is_full_scan, (bool) $bypass_rest_window);
22+
return $this->runBatch((int) $batch, (bool) $is_full_scan, (bool) $bypass_rest_window, $jobContext);
2323
}
2424
}
2525

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
<?php
2+
3+
namespace JLG\BrokenLinks\Scanner\QueueDrivers;
4+
5+
require_once __DIR__ . '/../ScanQueue.php';
6+
7+
use JLG\BrokenLinks\Scanner\QueueDriverInterface;
8+
9+
class RedisQueueDriver implements QueueDriverInterface
10+
{
11+
/** @var \Redis|null */
12+
private $client;
13+
14+
/** @var bool */
15+
private $connected = false;
16+
17+
/** @var string */
18+
private $host;
19+
20+
/** @var int */
21+
private $port;
22+
23+
/** @var string */
24+
private $password;
25+
26+
/** @var string */
27+
private $queueKey;
28+
29+
/** @var int */
30+
private $blockingTimeout;
31+
32+
public function __construct(array $config = [])
33+
{
34+
$this->host = isset($config['host']) ? (string) $config['host'] : '127.0.0.1';
35+
$this->port = isset($config['port']) ? (int) $config['port'] : 6379;
36+
$this->password = isset($config['password']) ? (string) $config['password'] : '';
37+
$this->queueKey = isset($config['queue']) ? (string) $config['queue'] : 'blc:scan-queue';
38+
$this->blockingTimeout = isset($config['blocking_timeout']) ? (int) $config['blocking_timeout'] : 5;
39+
}
40+
41+
public function getSlug(): string
42+
{
43+
return 'redis';
44+
}
45+
46+
public function getLabel(): string
47+
{
48+
if (function_exists('__')) {
49+
return __('Redis (Streams/Listes)', 'liens-morts-detector-jlg');
50+
}
51+
52+
return 'Redis';
53+
}
54+
55+
public function scheduleBatch(array $job, int $delaySeconds = 0): bool
56+
{
57+
if (!$this->connect()) {
58+
return false;
59+
}
60+
61+
$payload = [
62+
'batch' => isset($job['batch']) ? (int) $job['batch'] : 0,
63+
'is_full_scan' => isset($job['is_full_scan']) ? (bool) $job['is_full_scan'] : false,
64+
'bypass_rest_window' => isset($job['bypass_rest_window']) ? (bool) $job['bypass_rest_window'] : false,
65+
'context' => isset($job['context']) && is_array($job['context']) ? $job['context'] : [],
66+
'available_at' => time() + max(0, $delaySeconds),
67+
'enqueued_at' => time(),
68+
];
69+
70+
$encoded = $this->encode($payload);
71+
if ($encoded === '') {
72+
return false;
73+
}
74+
75+
try {
76+
return (bool) $this->client->rPush($this->queueKey, $encoded);
77+
} catch (\RedisException $exception) {
78+
$this->connected = false;
79+
return false;
80+
}
81+
}
82+
83+
public function receiveBatch(): ?array
84+
{
85+
if (!$this->connect()) {
86+
return null;
87+
}
88+
89+
try {
90+
$result = $this->client->brPop([$this->queueKey], max(1, $this->blockingTimeout));
91+
} catch (\RedisException $exception) {
92+
$this->connected = false;
93+
return null;
94+
}
95+
96+
if (!is_array($result) || count($result) < 2) {
97+
return null;
98+
}
99+
100+
$payload = $this->decode($result[1]);
101+
if (!is_array($payload)) {
102+
return null;
103+
}
104+
105+
$availableAt = isset($payload['available_at']) ? (int) $payload['available_at'] : 0;
106+
if ($availableAt > time()) {
107+
// Not ready yet: requeue at the end and wait.
108+
try {
109+
$this->client->rPush($this->queueKey, $result[1]);
110+
} catch (\RedisException $exception) {
111+
$this->connected = false;
112+
}
113+
114+
$sleepDuration = min(5, max(1, $availableAt - time()));
115+
sleep($sleepDuration);
116+
117+
return null;
118+
}
119+
120+
return $payload;
121+
}
122+
123+
public function acknowledge(array $job): void
124+
{
125+
// Items popped with BRPOP are removed immediately.
126+
}
127+
128+
public function reportFailure(array $job, \Throwable $error): void
129+
{
130+
if (!$this->connect()) {
131+
return;
132+
}
133+
134+
$job['error'] = $error->getMessage();
135+
$job['failed_at'] = time();
136+
137+
try {
138+
$this->client->lPush($this->queueKey . ':failed', $this->encode($job));
139+
} catch (\RedisException $exception) {
140+
$this->connected = false;
141+
}
142+
}
143+
144+
public function isConnected(): bool
145+
{
146+
return $this->connected;
147+
}
148+
149+
public function supportsAsyncPull(): bool
150+
{
151+
return true;
152+
}
153+
154+
private function connect(): bool
155+
{
156+
if ($this->connected && $this->client instanceof \Redis) {
157+
return true;
158+
}
159+
160+
if (!class_exists('Redis')) {
161+
return false;
162+
}
163+
164+
$this->client = new \Redis();
165+
166+
try {
167+
$this->connected = $this->client->connect($this->host, $this->port, 2.5);
168+
if (!$this->connected) {
169+
return false;
170+
}
171+
172+
if ($this->password !== '') {
173+
$authenticated = $this->client->auth($this->password);
174+
if (!$authenticated) {
175+
$this->connected = false;
176+
return false;
177+
}
178+
}
179+
180+
$this->client->setOption(\Redis::OPT_READ_TIMEOUT, $this->blockingTimeout);
181+
} catch (\RedisException $exception) {
182+
$this->connected = false;
183+
return false;
184+
}
185+
186+
return $this->connected;
187+
}
188+
189+
private function encode(array $payload): string
190+
{
191+
if (function_exists('wp_json_encode')) {
192+
$encoded = wp_json_encode($payload);
193+
} else {
194+
$encoded = json_encode($payload);
195+
}
196+
197+
if (!is_string($encoded)) {
198+
return '';
199+
}
200+
201+
return $encoded;
202+
}
203+
204+
private function decode($payload)
205+
{
206+
if (!is_string($payload) || $payload === '') {
207+
return null;
208+
}
209+
210+
$decoded = json_decode($payload, true);
211+
212+
if (!is_array($decoded)) {
213+
return null;
214+
}
215+
216+
return $decoded;
217+
}
218+
}
219+
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
<?php
2+
3+
namespace JLG\BrokenLinks\Scanner\QueueDrivers;
4+
5+
require_once __DIR__ . '/../ScanQueue.php';
6+
7+
use JLG\BrokenLinks\Scanner\QueueDriverInterface;
8+
9+
class WpCronQueueDriver implements QueueDriverInterface
10+
{
11+
/**
12+
* @var string
13+
*/
14+
private $hook;
15+
16+
public function __construct(string $hook = 'blc_check_batch')
17+
{
18+
$this->hook = $hook;
19+
}
20+
21+
public function getSlug(): string
22+
{
23+
return 'wp_cron';
24+
}
25+
26+
public function getLabel(): string
27+
{
28+
if (function_exists('__')) {
29+
return __('WP-Cron natif', 'liens-morts-detector-jlg');
30+
}
31+
32+
return 'WP-Cron';
33+
}
34+
35+
public function scheduleBatch(array $job, int $delaySeconds = 0): bool
36+
{
37+
$delaySeconds = max(0, $delaySeconds);
38+
$timestamp = time() + $delaySeconds;
39+
40+
$batch = isset($job['batch']) ? (int) $job['batch'] : 0;
41+
$is_full_scan = isset($job['is_full_scan']) ? (bool) $job['is_full_scan'] : false;
42+
$bypass_rest_window = isset($job['bypass_rest_window']) ? (bool) $job['bypass_rest_window'] : false;
43+
$context = isset($job['context']) && is_array($job['context']) ? $job['context'] : [];
44+
45+
$result = wp_schedule_single_event($timestamp, $this->hook, [$batch, $is_full_scan, $bypass_rest_window, $context]);
46+
47+
if (false === $result && function_exists('do_action')) {
48+
do_action('blc_queue_schedule_failed', $job, $delaySeconds, $this);
49+
}
50+
51+
return false !== $result;
52+
}
53+
54+
public function receiveBatch(): ?array
55+
{
56+
return null;
57+
}
58+
59+
public function acknowledge(array $job): void
60+
{
61+
// Nothing to do – WP-Cron removes the event when executed.
62+
}
63+
64+
public function reportFailure(array $job, \Throwable $error): void
65+
{
66+
if (function_exists('do_action')) {
67+
do_action('blc_queue_job_failed', $job, $error, $this);
68+
}
69+
}
70+
71+
public function isConnected(): bool
72+
{
73+
return true;
74+
}
75+
76+
public function supportsAsyncPull(): bool
77+
{
78+
return false;
79+
}
80+
}
81+

0 commit comments

Comments
 (0)