Skip to content

Commit f09476d

Browse files
authored
Qless sharding work incorrect (#23)
1 parent aca9367 commit f09476d

File tree

2 files changed

+26
-11
lines changed

2 files changed

+26
-11
lines changed

src/Queue/QlessConnectionHandler.php

+8-5
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,17 @@ public function getCurrentClient(): Client
6161

6262
public function getNextClient(): Client
6363
{
64-
if ($this->clientIterator->current() === null) {
64+
$this->clientIterator->next();
65+
66+
if (!$this->clientIterator->valid()) {
6567
$this->clientIterator->rewind();
6668
}
6769

68-
$currentClient = $this->clientIterator->current();
69-
$this->clientIterator->next();
70-
71-
return $currentClient;
70+
return $this->clientIterator->current();
7271
}
7372

73+
public function getClientCount(): int
74+
{
75+
return $this->clientIterator->count();
76+
}
7477
}

src/Queue/QlessQueue.php

+18-6
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
use LaravelQless\Job\AbstractJob;
1010
use LaravelQless\Job\QlessJob;
1111
use Qless\Client;
12-
use Qless\Jobs\BaseJob;
1312
use Qless\Topics\Topic;
1413

1514
/**
@@ -163,13 +162,21 @@ public function recur(int $interval, string $job, array $data, ?string $queueNam
163162
*/
164163
public function pop($queueName = null)
165164
{
166-
$connection = $this->getNextConnection();
165+
$connectionCount = $this->getClientCount();
167166

168-
/** @var \Qless\Queues\Queue $queue */
169-
$queue = $connection->queues[$queueName];
167+
for ($i = 0; $i < $connectionCount; $i++) {
168+
$connection = $this->getNextConnection();
170169

171-
/** @var BaseJob $job */
172-
$job = $queue->pop(self::WORKER_PREFIX . $connection->getWorkerName());
170+
/** @var \Qless\Queues\Queue $queue */
171+
$queue = $connection->queues[$queueName];
172+
173+
/** @var \Qless\Jobs\BaseJob $job */
174+
$job = $queue->pop(self::WORKER_PREFIX . $connection->getWorkerName());
175+
176+
if ($job) {
177+
break;
178+
}
179+
}
173180

174181
if (!$job) {
175182
return null;
@@ -319,4 +326,9 @@ public function getConnection(): Client
319326
{
320327
return $this->getCurrentConnection();
321328
}
329+
330+
public function getClientCount(): int
331+
{
332+
return $this->clients->getClientCount();
333+
}
322334
}

0 commit comments

Comments
 (0)