Skip to content

Commit 4477ac4

Browse files
authored
Merge pull request #1 from pdffiller/feature/topics
Added topics supported
2 parents 032833a + 5813da9 commit 4477ac4

File tree

5 files changed

+173
-22
lines changed

5 files changed

+173
-22
lines changed

README.md

+45
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,51 @@ And redis connection in `config/database.php`
5353

5454
Once you completed the configuration you can use Laravel Queue API. If you used other queue drivers you do not need to change anything else. If you do not know how to use Queue API, please refer to the official Laravel documentation: http://laravel.com/docs/queues
5555

56+
Also you can use additional features.
57+
58+
### Topics
59+
Topic help you to put a job to different queues.
60+
First, you must to create a subscription. You can use pattern for name of topics.
61+
Symbol `*` - one word, `#` - few words divided by point `.`.
62+
Examples: `first.second.*`, `*.second.*`, `#.third`.
63+
64+
```php
65+
/**
66+
* Subscribe
67+
*/
68+
69+
\Queue::subscribe('*.*.test', 'queue1');
70+
71+
\Queue::subscribe('this.*.test', 'queue2');
72+
73+
\Queue::subscribe('this.*.orange', 'queue3');
74+
75+
```
76+
77+
Than you can put job to all subscribers.
78+
79+
```php
80+
/**
81+
* Put job to few queues
82+
*/
83+
\Queue::pushToSubscriber('this.is.test', TestQless::class, ['test' => 'test']);
84+
// Push job to queue1 and queue2, but not to queue3
85+
86+
```
87+
88+
### Recurring Jobs
89+
Sometimes it's not enough simply to schedule one job, but you want to run jobs regularly.
90+
In particular, maybe you have some batch operation that needs to get run once an hour and you don't care what
91+
worker runs it. Recurring jobs are specified much like other jobs:
92+
93+
```php
94+
/**
95+
* Recurring Job
96+
*/
97+
98+
\Queue::recur($intervalInSeconds, $jobClass, $data, $queueName);
99+
100+
```
56101

57102
## Testing
58103

composer.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
}
1616
],
1717
"require": {
18-
"pdffiller/qless-php": "^3.1"
18+
"ext-json": "*",
19+
"pdffiller/qless-php": "3.3.0"
1920
},
2021
"require-dev": {
2122
"illuminate/events": "5.6.*|5.7.*",

src/LaravelQlessServiceProvider.php

+5-1
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,12 @@
33
namespace LaravelQless;
44

55
use Illuminate\Queue\QueueManager;
6+
use Illuminate\Support\Facades\Config;
67
use Illuminate\Support\ServiceProvider;
78
use LaravelQless\Queue\QlessConnector;
9+
use LaravelQless\Topics\QlessTopic;
10+
use Qless\Client;
11+
use Qless\Topics\Topic;
812

913
/**
1014
* Class LaravelQlessServiceProvider
@@ -23,7 +27,7 @@ public function boot()
2327
$queue = $this->app['queue'];
2428

2529
$queue->addConnector('qless', function () {
26-
return new QlessConnector($this->app['events']);
30+
return new QlessConnector;
2731
});
2832
}
2933
}

src/Queue/QlessQueue.php

+64-17
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Illuminate\Queue\Queue;
88
use LaravelQless\Job\QlessJob;
99
use Qless\Client;
10+
use Qless\Topics\Topic;
1011

1112
/**
1213
* Class QlessQueue
@@ -47,22 +48,6 @@ public function __construct(Client $connect, array $config)
4748
$this->config = $config;
4849
}
4950

50-
/**
51-
* @return Client
52-
*/
53-
private function getConnection(): Client
54-
{
55-
return $this->connect;
56-
}
57-
58-
/**
59-
* @return array
60-
*/
61-
private function getConfig(): array
62-
{
63-
return $this->config;
64-
}
65-
6651
/**
6752
* Get the size of the queue.
6853
*
@@ -123,7 +108,11 @@ public function push($job, $data = '', $queueName = null)
123108
*/
124109
public function later($delay, $job, $data = '', $queueName = null)
125110
{
126-
return $this->pushRaw($this->makePayload($job, $data, ['timeout' => $delay]), $queueName, ['timeout' => $delay]);
111+
return $this->pushRaw(
112+
$this->makePayload($job, $data, ['timeout' => $delay]),
113+
$queueName,
114+
['timeout' => $delay]
115+
);
127116
}
128117

129118
/**
@@ -171,6 +160,56 @@ public function pop($queueName = null)
171160
);
172161
}
173162

163+
/**
164+
* @param string $topic
165+
* @param string|null $queueName
166+
* @return bool
167+
*/
168+
public function subscribe(string $topic, string $queueName = null): bool
169+
{
170+
$queueName = $queueName ?? $this->defaultQueue;
171+
172+
/** @var \Qless\Queues\Queue $queue */
173+
$queue = $this->getConnection()->queues[$queueName];
174+
175+
return $queue->subscribe($topic);
176+
}
177+
178+
/**
179+
* @param string $topic
180+
* @param string|null $queueName
181+
* @return bool
182+
*/
183+
public function unSubscribe(string $topic, string $queueName = null): bool
184+
{
185+
$queueName = $queueName ?? $this->defaultQueue;
186+
187+
/** @var \Qless\Queues\Queue $queue */
188+
$queue = $this->getConnection()->queues[$queueName];
189+
190+
return $queue->unSubscribe($topic);
191+
}
192+
193+
/**
194+
* @param string $topic
195+
* @param string $job
196+
* @param array $data
197+
* @param array $options
198+
* @return array
199+
*/
200+
public function pushToTopic(string $topic, string $job, array $data = [], array $options = []): array
201+
{
202+
$topic = new Topic($topic, $this->getConnection());
203+
204+
return $topic->put(
205+
$job,
206+
$data,
207+
null,
208+
$options['timeout'] ?? null,
209+
$options['maxTries'] ?? null
210+
);
211+
}
212+
174213
/**
175214
* @param string $job
176215
* @param mixed|string $data
@@ -218,4 +257,12 @@ public function setConnectionName($name): self
218257

219258
return $this;
220259
}
260+
261+
/**
262+
* @return Client
263+
*/
264+
private function getConnection(): Client
265+
{
266+
return $this->connect;
267+
}
221268
}

tests/Queue/QlessQueueTest.php

+57-3
Original file line numberDiff line numberDiff line change
@@ -27,22 +27,76 @@ public function testPushPop()
2727
{
2828
$queue = $this->getQueue();
2929

30-
$jobId = $queue->push(Job::class, ['firstKey' => 'firstValue'], 'test_job');
30+
$jobId = $queue->push(Job::class, ['firstKey' => 'firstValue'], 'test_push_pop');
3131

32-
$job = $queue->pop('test_job');
32+
$job = $queue->pop('test_push_pop');
3333

3434
$job->fire();
3535

3636
$this->assertInstanceOf(QlessJob::class, $job);
3737

3838
$this->assertEquals($jobId, $job->getJobId());
3939

40-
$this->assertEquals($job->getQueue(), 'test_job');
40+
$this->assertEquals($job->getQueue(), 'test_push_pop');
4141

4242
$this->assertEquals($job->getName(), Job::class);
4343

4444
$this->assertEquals($job->getData(), ['firstKey' => 'firstValue']);
4545
}
46+
47+
public function testSubscribe()
48+
{
49+
$queue = $this->getQueue();
50+
51+
for ($i = 1; $i<=3; $i++) {
52+
$queue->push(Job::class, ['firstKey' => 'firstValue'], 'qs_' . $i);
53+
$job = $queue->pop('qs_' . $i);
54+
$job->fire();
55+
}
56+
57+
$queue->subscribe('developing#', 'qs_1');
58+
$queue->subscribe('*.test.*', 'qs_2');
59+
$queue->subscribe('*.*.awesome', 'qs_3');
60+
61+
$queue->pushToTopic('developing.is.awesome', Job::class, ['key' => 'value']);
62+
63+
$job1 = $queue->pop('qs_1');
64+
$job1->fire();
65+
$job2 = $queue->pop('qs_2');
66+
$job3 = $queue->pop('qs_3');
67+
$job3->fire();
68+
69+
$this->assertEquals($job1->getName(), Job::class);
70+
$this->assertEquals($job2, null);
71+
$this->assertEquals($job3->getName(), Job::class);
72+
73+
$this->assertEquals($job1->getData(), ['key' => 'value']);
74+
$this->assertEquals($job3->getData(), ['key' => 'value']);
75+
}
76+
public function testUnSubscribe()
77+
{
78+
$queue = $this->getQueue();
79+
80+
for ($i = 1; $i<=2; $i++) {
81+
$queue->push(Job::class, ['firstKey' => 'firstValue'], 'qu_' . $i);
82+
$job = $queue->pop('qu_' . $i);
83+
$job->fire();
84+
}
85+
86+
$queue->subscribe('developing.*.*', 'qu_1');
87+
$queue->subscribe('*.*.awesome', 'qu_2');
88+
89+
$queue->unSubscribe('*.*.awesome', 'qu_2');
90+
91+
$queue->pushToTopic('developing.is.awesome', Job::class, ['key' => 'value']);
92+
93+
$job1 = $queue->pop('qu_1');
94+
$job1->fire();
95+
$job2 = $queue->pop('qu_2');
96+
97+
$this->assertEquals($job1->getName(), Job::class);
98+
$this->assertEquals($job2, null);
99+
}
46100

47101
public function testSize()
48102
{

0 commit comments

Comments
 (0)