Skip to content

Commit f354996

Browse files
committed
队列实现优化
1 parent 865295f commit f354996

5 files changed

Lines changed: 99 additions & 39 deletions

File tree

README.md

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@
66
composer require easyswoole/queue
77
```
88

9-
## 使用
10-
默认自带的队列驱动为Redis队列。
9+
### 队列驱动
10+
11+
任何队列驱动都必须实现```EasySwoole\Queue\QueueDriverInterface```这个接口定义。且实现的对象一定是必须可被克隆(可以看Queue中的Producer方法即知道为何)。
12+
队列在被加载到对应topic的Producer和Consumer时,都会被分别执行一次init()方法。
13+
1114
### 创建队列
1215
```php
1316

@@ -29,15 +32,17 @@ $queue = new Queue($driver);
2932
```
3033
$job = new Job();
3134
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));
32-
$queue->producer()->push($job);
35+
$queue->producer("topic")->push($job);
3336
```
3437
### 普通消费
35-
```
36-
$job = $queue->consumer()->pop();
38+
```php
39+
40+
$job = $queue->consumer("topic")->pop();
3741
//或者是自定义进程中
38-
$queue->consumer()->listen(function (Job $job){
42+
$queue->consumer("topicName")->listen(function (Job $job){
3943
var_dump($job);
4044
});
45+
4146
```
4247

4348
## CLI 单独使用
@@ -67,13 +72,42 @@ $sc->add(function (){
6772
while (1){
6873
Coroutine::sleep(3);
6974
$job = new Job();
70-
$job->setJobData("job create at ".time());
71-
$queue->producer()->push($job);
75+
$job->setJobData("job test create at ".time());
76+
try {
77+
$queue->producer("test")->push($job);
78+
}catch (\Throwable $throwable){
79+
80+
}
81+
}
82+
});
83+
84+
Coroutine::create(function ()use($queue){
85+
while (1){
86+
Coroutine::sleep(5);
87+
$job = new Job();
88+
$job->setJobData("job another create at ".time());
89+
try {
90+
$queue->producer("another")->push($job);
91+
}catch (\Throwable $throwable){
92+
93+
}
7294
}
7395
});
7496

75-
$queue->consumer()->listen(function (Job $job){
76-
var_dump($job->getJobData());
97+
Coroutine::create(function ()use($queue){
98+
$queue->consumer("test")->listen(function (Job $job){
99+
var_dump($job->getJobData() ." hande in test");
100+
},[],function (){
101+
102+
});
103+
});
104+
105+
Coroutine::create(function ()use($queue){
106+
$queue->consumer("another")->listen(function (Job $job){
107+
var_dump($job->getJobData() ." hande in another");
108+
},[],function (){
109+
110+
});
77111
});
78112

79113
});
@@ -86,17 +120,17 @@ $sc->start();
86120
$job = new Job();
87121
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));
88122
$job->setDelayTime(5);//设置延后时间
89-
$queue->producer()->push($job);
123+
$queue->producer("topic")->push($job);
90124
```
91125
## 可信任务
92126
```
93127
$job = new Job();
94128
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));
95129
$job->setRetryTimes(3);//任务如果没有确认,则会执行三次
96130
$job->setWaitConfirmTime(5);//如果5秒内没确认任务,会重新回到队列。默认为3秒
97-
$queue->producer()->push($job);//投递任务
131+
$queue->producer("topic")->push($job);//投递任务
98132
//确认一个任务
99-
$queue->consumer()->confirm($job);
133+
$queue->consumer("topic")->confirm($job);
100134
```
101135

102136
## 消费者控制
@@ -105,7 +139,7 @@ $queue->consumer()->confirm($job);
105139

106140
```php
107141
/** @var \EasySwoole\Queue\Queue $queue */
108-
$queue->consumer()->setOnBreak(function(\EasySwoole\Queue\Consumer $consumer) {
142+
$queue->consumer("topic")->setOnBreak(function(\EasySwoole\Queue\Consumer $consumer) {
109143
// todo
110144
})->listen(function (\EasySwoole\Queue\Job $job){ });
111145
```
@@ -114,5 +148,5 @@ $queue->consumer()->setOnBreak(function(\EasySwoole\Queue\Consumer $consumer) {
114148

115149
```php
116150
/** @var \EasySwoole\Queue\Queue $queue */
117-
$queue->consumer()->setBreakTime(0.1);
151+
$queue->consumer("topic")->setBreakTime(0.1);
118152
```

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
"require": {
1818
"ext-swoole": ">=4.4.0",
1919
"easyswoole/redis": "^2.0",
20-
"easyswoole/pool": "^1.2"
20+
"easyswoole/pool": "^2.0"
2121
},
2222
"require-dev": {
2323
"easyswoole/swoole-ide-helper": "^1.0"

src/Driver/RedisQueue.php

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,33 @@
1515
class RedisQueue implements QueueDriverInterface
1616
{
1717
protected $pool;
18-
protected $queueName;
18+
protected $queueName = 'es_q';
1919
protected $lastCheckDelay = null;
2020

21-
public function __construct(Config $config,string $queueName = 'es_q')
21+
protected $config;
22+
23+
protected $hasInit = false;
24+
25+
public function __construct(Config $config)
26+
{
27+
$this->config = new PoolConfig();
28+
$this->config->setExtraConf($config);
29+
}
30+
31+
public function init(string $topicName, ?string $nodeId): bool
2232
{
23-
$poolConfig = new PoolConfig();
24-
$poolConfig->setExtraConf($config);
25-
$this->pool = new RedisPool($poolConfig);
26-
$this->queueName = $queueName;
33+
if(!$this->hasInit){
34+
$this->hasInit = true;
35+
}
36+
$this->queueName = $topicName;
37+
$this->pool = new RedisPool($this->config);
38+
return true;
2739
}
2840

41+
2942
public function getPoolConfig(): PoolConfig
3043
{
31-
return $this->pool->getConfig();
44+
return $this->config;
3245
}
3346

3447
public function push(Job $job,float $timeout = 3.0): bool
@@ -155,4 +168,9 @@ public function flush():bool
155168
});
156169
return true;
157170
}
171+
172+
public function __clone()
173+
{
174+
175+
}
158176
}

src/Queue.php

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,16 @@
44
namespace EasySwoole\Queue;
55

66

7+
use EasySwoole\Queue\Exception\Exception;
78
use EasySwoole\Utility\Random;
89

910
class Queue
1011
{
1112
private $driver;
1213
private $nodeId;
1314

14-
private $consumer;
15-
private $producer;
15+
private $consumer = [];
16+
private $producer = [];
1617

1718
function __construct(QueueDriverInterface $driver)
1819
{
@@ -25,26 +26,30 @@ function queueDriver():QueueDriverInterface
2526
return $this->driver;
2627
}
2728

28-
function consumer(bool $renew = false):Consumer
29+
function consumer(string $topic,bool $renew = false):Consumer
2930
{
30-
if(!$renew){
31-
$this->consumer = new Consumer($this->driver);
31+
if((!$renew) || !isset($this->consumer[$topic])){
32+
$driver = clone $this->driver;
33+
if(!$driver->init($topic,$this->nodeId)){
34+
throw new Exception("init queue topic:{$topic} driver fail");
35+
}
36+
$temp = new Consumer($driver);
37+
$this->consumer[$topic] = $temp;
3238
}
33-
if($this->consumer == null){
34-
$this->consumer = new Consumer($this->driver);
35-
}
36-
return $this->consumer;
39+
return $this->consumer[$topic];
3740
}
3841

39-
function producer(bool $renew = false):Producer
42+
function producer(string $topic,bool $renew = false):Producer
4043
{
41-
if(!$renew){
42-
$this->producer = new Producer($this->driver, $this->nodeId);
43-
}
44-
if($this->producer == null){
45-
$this->producer = new Producer($this->driver, $this->nodeId);
44+
if((!$renew) || !isset($this->producer[$topic])){
45+
$driver = clone $this->driver;
46+
if(!$driver->init($topic,$this->nodeId)){
47+
throw new Exception("init queue topic:{$topic} driver fail");
48+
}
49+
$temp = new Producer($driver,$this->nodeId);
50+
$this->producer[$topic] = $temp;
4651
}
47-
return $this->producer;
52+
return $this->producer[$topic];
4853
}
4954

5055
function info():?array

src/QueueDriverInterface.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
interface QueueDriverInterface
88
{
9+
public function init(string $topicName,?string $nodeId):bool;
910
public function push(Job $job,float $timeout = 3.0): bool;
1011

1112
public function pop(float $timeout = 3.0, ?array $params = null): ?Job;
@@ -15,4 +16,6 @@ public function info(): ?array;
1516
public function confirm(Job $job,float $timeout = 3.0): bool;
1617

1718
public function flush():bool;
19+
20+
public function __clone();
1821
}

0 commit comments

Comments
 (0)