Skip to content

Commit d993927

Browse files
committed
Merge branch '3.x' of https://github.com/easy-swoole/queue into 3.x
2 parents 4f3ed10 + 718227a commit d993927

4 files changed

Lines changed: 53 additions & 4 deletions

File tree

README.md

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,22 @@ $job->setWaitConfirmTime(5);//如果5秒内没确认任务,会重新回到队
4848
$queue->producer()->push($job);//投递任务
4949
//确认一个任务
5050
$queue->consumer()->confirm($job);
51-
```
51+
```
52+
53+
## 消费者控制
54+
55+
当队列未`pop``job`时:
56+
57+
```php
58+
/** @var \EasySwoole\Queue\Queue $queue */
59+
$queue->consumer()->setOnBreak(function(\EasySwoole\Queue\Consumer $consumer) {
60+
// todo
61+
})->listen(function (\EasySwoole\Queue\Job $job){ });
62+
```
63+
64+
设置`breakTime`
65+
66+
```php
67+
/** @var \EasySwoole\Queue\Queue $queue */
68+
$queue->consumer()->setBreakTime(0.1);
69+
```

src/Consumer.php

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@
99
class Consumer
1010
{
1111
private $driver;
12+
1213
private $enableListen = false;
14+
private $onBreak;
15+
private $breakTime = 0.01;
1316

1417
function __construct(QueueDriverInterface $driver)
1518
{
@@ -31,7 +34,10 @@ function listen(callable $call,array $params = [],?callable $onException = null)
3134
if ($job) {
3235
call_user_func($call, $job);
3336
} else {
34-
Coroutine::sleep(0.001);
37+
if($this->onBreak){
38+
call_user_func($this->onBreak,$this);
39+
}
40+
Coroutine::sleep($this->breakTime);
3541
}
3642
}catch (\Throwable $throwable){
3743
if($onException){
@@ -53,4 +59,16 @@ function stopListen(): Consumer
5359
$this->enableListen = false;
5460
return $this;
5561
}
62+
63+
function setOnBreak(callable $call): Consumer
64+
{
65+
$this->onBreak = $call;
66+
return $this;
67+
}
68+
69+
function setBreakTime(float $time): Consumer
70+
{
71+
$this->breakTime = $time;
72+
return $this;
73+
}
5674
}

src/Driver/RedisQueue.php

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,19 @@ public function info(): ?array
132132
/** @var $redis Redis */
133133
return [
134134
'runningQueue'=>$redis->lLen($this->queueName),
135-
'delayQueue'=>$redis->zCard("{$this->queueName}_c")
135+
'delayQueue'=>$redis->zCard("{$this->queueName}_d")
136136
];
137137
});
138138
}
139-
}
139+
140+
public function flush():bool
141+
{
142+
$this->pool->invoke(function ($redis){
143+
/** @var $redis Redis */
144+
$redis->del("{$this->queueName}_c");
145+
$redis->del("{$this->queueName}_d");
146+
$redis->del("{$this->queueName}");
147+
});
148+
return true;
149+
}
150+
}

src/QueueDriverInterface.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,6 @@ public function pop(float $timeout = 3.0, array $params = []): ?Job;
1313
public function info(): ?array;
1414

1515
public function confirm(Job $job,float $timeout = 3.0): bool;
16+
17+
public function flush():bool;
1618
}

0 commit comments

Comments
 (0)