Skip to content

Commit 90e15dc

Browse files
Add sharding (#17)
1 parent f5a546c commit 90e15dc

9 files changed

+220
-64
lines changed

config/database.php

+12
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,17 @@
88
'port' => env('REDIS_PORT', 6379),
99
'database' => 0,
1010
],
11+
'qless1' => [
12+
'host' => env('REDIS_HOST', '127.0.0.1'),
13+
'password' => env('REDIS_PASSWORD', null),
14+
'port' => env('REDIS_PORT', 6379),
15+
'database' => 1,
16+
],
17+
'qless2' => [
18+
'host' => env('REDIS_HOST', '127.0.0.1'),
19+
'password' => env('REDIS_PASSWORD', null),
20+
'port' => env('REDIS_PORT', 6379),
21+
'database' => 2,
22+
],
1123
],
1224
];

config/queue.php

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
'driver' => 'qless',
99
'connection' => 'qless',
1010
'queue' => 'default',
11-
'redis_connection' => 'qless',
11+
// 'redis_connection' => 'qless',
12+
'redis_connection' => ['qless1', 'qless2'],
1213
],
1314
],
1415
];

src/Job/AbstractJob.php

+6-8
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
use Illuminate\Contracts\Queue\ShouldQueue;
77
use Illuminate\Contracts\Support\Arrayable;
88
use Illuminate\Foundation\Bus\Dispatchable;
9-
use LaravelQless\Queue\QlessQueue;
9+
use LaravelQless\Queue\QlessConnector;
1010
use Qless\Jobs\BaseJob;
1111
use LaravelQless\Contracts\QlessJob;
1212

@@ -63,16 +63,14 @@ protected function completeSync(): self
6363

6464
private function completeImmediately(): void
6565
{
66-
/**@var QlessQueue $queue */
67-
$queue = app(QlessQueue::class, [
68-
'config' => [
69-
'queue' => $this->queue,
70-
'connection' => $this->connection
71-
]
66+
$connector = new QlessConnector();
67+
$queue = $connector->connect([
68+
'queue' => $this->queue,
69+
'connection' => $this->connection
7270
]);
7371

7472
$jid = $queue->push($this, $this->data, $this->queue);
75-
$connection = $queue->getConnection()->queues[$this->queue];
73+
$connection = $queue->getCurrentConnection()->queues[$this->queue];
7674

7775
/**@var BaseJob $job */
7876
$job = $connection->popByJid($jid);

src/Queue/QlessConnectionHandler.php

+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
<?php
2+
3+
namespace LaravelQless\Queue;
4+
5+
use ArrayIterator;
6+
use Qless\Client;
7+
8+
/**
9+
* Class QlessConnectionHandler
10+
* @package LaravelQless\Queue
11+
*/
12+
class QlessConnectionHandler
13+
{
14+
15+
/** @var Client[] */
16+
private $clients;
17+
18+
/** @var ArrayIterator */
19+
private $clientIterator;
20+
21+
/**
22+
* QlessConnectionHandler constructor.
23+
* @param Client[] $clients
24+
*/
25+
public function __construct(array $clients)
26+
{
27+
$this->init($clients);
28+
}
29+
30+
private function init(array $clients): void
31+
{
32+
foreach ($clients as $client) {
33+
if (!$client instanceof Client) {
34+
continue;
35+
}
36+
$this->clients[] = $client;
37+
38+
}
39+
if (empty($this->clients)) {
40+
throw new \Exception("No configs found");
41+
}
42+
43+
$this->clientIterator = new ArrayIterator($this->clients);
44+
}
45+
46+
public function getRandomClient(): Client
47+
{
48+
return $this->clients[array_rand($this->clients)];
49+
}
50+
51+
public function getAllClients(): array
52+
{
53+
return $this->clients;
54+
}
55+
56+
public function getCurrentClient(): Client
57+
{
58+
if ($this->clientIterator->current() === null) {
59+
return $this->getNextClient();
60+
}
61+
62+
return $this->clientIterator->current();
63+
}
64+
65+
public function getNextClient(): Client
66+
{
67+
if ($this->clientIterator->current() === null) {
68+
$this->clientIterator->rewind();
69+
}
70+
71+
$currentClient = $this->clientIterator->current();
72+
$this->clientIterator->next();
73+
74+
return $currentClient;
75+
}
76+
77+
}

src/Queue/QlessConnector.php

+25-12
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,35 @@
1313
*/
1414
class QlessConnector implements ConnectorInterface
1515
{
16+
private const REDIS_CONNECTION_CONFIG_KEY = 'redis_connection';
17+
private const CONFIG_PATH_PREFIX = 'database.redis.';
18+
19+
private const DEFAULT_CONNECTION_CONFIG = 'qless';
20+
1621
/**
17-
* Establish a queue connection.
18-
*
19-
* @param array $config
20-
*
21-
* @return QlessQueue
22-
*/
22+
* Establish a queue connection.
23+
*
24+
* @param array $config
25+
*
26+
* @return QlessQueue
27+
*/
2328
public function connect(array $config): QlessQueue
2429
{
25-
$redisConnection = Arr::get($config, 'redis_connection', 'qless');
30+
$redisConnection = Arr::get($config, self::REDIS_CONNECTION_CONFIG_KEY, self::DEFAULT_CONNECTION_CONFIG);
31+
32+
if (!is_array($redisConnection)) {
33+
$redisConnection = [$redisConnection];
34+
}
2635

27-
$redisConfig = Config::get('database.redis.' . $redisConnection, []);
36+
$clients = [];
37+
foreach ($redisConnection as $connection) {
38+
$qlessConfig = Config::get(self::CONFIG_PATH_PREFIX . $connection, []);
39+
$clients[] = new Client($qlessConfig);
40+
}
2841

29-
return new QlessQueue(
30-
new Client($redisConfig),
31-
$config
32-
);
42+
return new QlessQueue(
43+
new QlessConnectionHandler($clients),
44+
$config
45+
);
3346
}
3447
}

src/Queue/QlessQueue.php

+59-36
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,6 @@ class QlessQueue extends Queue implements QueueContract
2222

2323
private const WORKER_PREFIX = 'laravel_';
2424

25-
/**
26-
* @var Client
27-
*/
28-
private $connect;
29-
3025
/**
3126
* @var string
3227
*/
@@ -37,14 +32,17 @@ class QlessQueue extends Queue implements QueueContract
3732
*/
3833
private $config;
3934

35+
/** @var QlessConnectionHandler */
36+
private $clients;
37+
4038
/**
4139
* QlessQueue constructor.
42-
* @param Client $connect
40+
* @param QlessConnectionHandler $clients
4341
* @param array $config
4442
*/
45-
public function __construct(Client $connect, array $config)
43+
public function __construct(QlessConnectionHandler $clients, array $config)
4644
{
47-
$this->connect = $connect;
45+
$this->clients = $clients;
4846
$this->defaultQueue = $config['queue'] ?? null;
4947
$this->connectionName = $config['connection'] ?? '';
5048
$this->config = $config;
@@ -53,20 +51,20 @@ public function __construct(Client $connect, array $config)
5351
/**
5452
* Get the size of the queue.
5553
*
56-
* @param string $queue
54+
* @param string $queue
5755
* @return int
5856
*/
5957
public function size($queue = null): int
6058
{
61-
return $this->getConnection()->length($queue ?? '');
59+
return $this->getNextConnection()->length($queue ?? '');
6260
}
6361

6462
/**
6563
* Push a raw payload onto the queue.
6664
*
67-
* @param string $payload
68-
* @param string $queueName
69-
* @param array $options
65+
* @param string $payload
66+
* @param string $queueName
67+
* @param array $options
7068
* @return mixed
7169
*/
7270
public function pushRaw($payload, $queueName = null, array $options = [])
@@ -75,7 +73,7 @@ public function pushRaw($payload, $queueName = null, array $options = [])
7573

7674
$queueName = $queueName ?? $this->defaultQueue;
7775

78-
$queue = $this->getConnection()->queues[$queueName];
76+
$queue = $this->getRandomConnection()->queues[$queueName];
7977

8078
$qlessOptions = $payloadData['data'][self::JOB_OPTIONS_KEY] ?? [];
8179

@@ -96,23 +94,23 @@ public function pushRaw($payload, $queueName = null, array $options = [])
9694
/**
9795
* Push a new job onto the queue.
9896
*
99-
* @param string|object $job
100-
* @param mixed $data
101-
* @param string $queueName
97+
* @param string|object $job
98+
* @param mixed $data
99+
* @param string $queueName
102100
* @return mixed
103101
*/
104102
public function push($job, $data = '', $queueName = null)
105103
{
106-
return $this->pushRaw($this->makePayload($job, (array) $data), $queueName);
104+
return $this->pushRaw($this->makePayload($job, (array)$data), $queueName);
107105
}
108106

109107
/**
110108
* Push a new job onto the queue after a delay.
111109
*
112-
* @param \DateTimeInterface|\DateInterval|int $delay
113-
* @param string|object $job
114-
* @param mixed $data
115-
* @param string $queueName
110+
* @param \DateTimeInterface|\DateInterval|int $delay
111+
* @param string|object $job
112+
* @param mixed $data
113+
* @param string $queueName
116114
* @return mixed
117115
*/
118116
public function later($delay, $job, $data = '', $queueName = null)
@@ -139,7 +137,7 @@ public function later($delay, $job, $data = '', $queueName = null)
139137
public function recur(int $interval, string $job, array $data, ?string $queueName = null): string
140138
{
141139
/** @var \Qless\Queues\Queue $queue */
142-
$queue = $this->getConnection()->queues[$queueName];
140+
$queue = $this->getNextConnection()->queues[$queueName];
143141

144142
$options = $data[self::JOB_OPTIONS_KEY] ?? [];
145143
$options = array_merge($options, ['interval' => $interval]);
@@ -160,16 +158,18 @@ public function recur(int $interval, string $job, array $data, ?string $queueNam
160158
/**
161159
* Pop the next job off of the queue.
162160
*
163-
* @param string $queueName
161+
* @param string $queueName
164162
* @return QlessJob|null
165163
*/
166164
public function pop($queueName = null)
167165
{
166+
$connection = $this->getNextConnection();
167+
168168
/** @var \Qless\Queues\Queue $queue */
169-
$queue = $this->getConnection()->queues[$queueName];
169+
$queue = $connection->queues[$queueName];
170170

171171
/** @var BaseJob $job */
172-
$job = $queue->pop(self::WORKER_PREFIX . $this->connect->getWorkerName());
172+
$job = $queue->pop(self::WORKER_PREFIX . $connection->getWorkerName());
173173

174174
if (!$job) {
175175
return null;
@@ -195,10 +195,14 @@ public function subscribe(string $topic, string $queueName = null): bool
195195
{
196196
$queueName = $queueName ?? $this->defaultQueue;
197197

198-
/** @var \Qless\Queues\Queue $queue */
199-
$queue = $this->getConnection()->queues[$queueName];
198+
$result = true;
199+
foreach ($this->getAllConnections() as $connection) {
200+
/** @var \Qless\Queues\Queue $queue */
201+
$queue = $connection->queues[$queueName];
202+
$result = $queue->subscribe($topic) && $result;
203+
}
200204

201-
return $queue->subscribe($topic);
205+
return $result;
202206
}
203207

204208
/**
@@ -210,10 +214,14 @@ public function unSubscribe(string $topic, string $queueName = null): bool
210214
{
211215
$queueName = $queueName ?? $this->defaultQueue;
212216

213-
/** @var \Qless\Queues\Queue $queue */
214-
$queue = $this->getConnection()->queues[$queueName];
217+
$result = true;
218+
foreach ($this->getAllConnections() as $connection) {
219+
/** @var \Qless\Queues\Queue $queue */
220+
$queue = $connection->queues[$queueName];
221+
$result = $queue->unSubscribe($topic) && $result;
222+
}
215223

216-
return $queue->unSubscribe($topic);
224+
return $result;
217225
}
218226

219227
/**
@@ -225,7 +233,7 @@ public function unSubscribe(string $topic, string $queueName = null): bool
225233
*/
226234
public function pushToTopic(string $topicName, string $job, array $data = [], array $options = [])
227235
{
228-
$topic = new Topic($topicName, $this->getConnection());
236+
$topic = new Topic($topicName, $this->getRandomConnection());
229237

230238
$qlessOptions = $payloadData['data'][self::JOB_OPTIONS_KEY] ?? [];
231239
$options = array_merge($qlessOptions, $options);
@@ -281,10 +289,25 @@ protected function makePayload($job, $data = [], $options = []): string
281289
}
282290

283291
/**
284-
* @return Client
292+
* @return Client[]
285293
*/
286-
public function getConnection(): Client
294+
public function getAllConnections(): array
295+
{
296+
return $this->clients->getAllClients();
297+
}
298+
299+
public function getRandomConnection(): Client
300+
{
301+
return $this->clients->getNextClient();
302+
}
303+
304+
public function getNextConnection(): Client
305+
{
306+
return $this->clients->getNextClient();
307+
}
308+
309+
public function getCurrentConnection(): Client
287310
{
288-
return $this->connect;
311+
return $this->clients->getCurrentClient();
289312
}
290313
}

0 commit comments

Comments
 (0)