Skip to content

Commit bcb0a9e

Browse files
committed
修复队列消费者问题
1 parent adc5894 commit bcb0a9e

File tree

1 file changed

+25
-25
lines changed

1 file changed

+25
-25
lines changed

Diff for: src/Service/BaseQueueConsumer.php

+25-25
Original file line numberDiff line numberDiff line change
@@ -70,29 +70,33 @@ public function start(?int $co = null)
7070
$task = function() use($config){
7171
$queue = $this->imiQueue->getQueue($this->name);
7272
do {
73-
Event::trigger('IMI.QUEUE.CONSUMER.BEFORE_POP', [
74-
'queue' => $queue,
75-
], $this, ConsumerBeforePopParam::class);
76-
$message = $queue->pop();
77-
Event::trigger('IMI.QUEUE.CONSUMER.AFTER_POP', [
78-
'queue' => $queue,
79-
'message' => $message,
80-
], $this, ConsumerAfterPopParam::class);
81-
if(null === $message)
82-
{
83-
Coroutine::sleep($config->getTimespan());
84-
}
85-
else
86-
{
87-
Event::trigger('IMI.QUEUE.CONSUMER.BEFORE_CONSUME', [
73+
try {
74+
Event::trigger('IMI.QUEUE.CONSUMER.BEFORE_POP', [
8875
'queue' => $queue,
89-
'message' => $message,
90-
], $this, ConsumerBeforeConsumeParam::class);
91-
$this->consume($message, $queue);
92-
Event::trigger('IMI.QUEUE.CONSUMER.AFTER_CONSUME', [
76+
], $this, ConsumerBeforePopParam::class);
77+
$message = $queue->pop();
78+
Event::trigger('IMI.QUEUE.CONSUMER.AFTER_POP', [
9379
'queue' => $queue,
9480
'message' => $message,
95-
], $this, ConsumerAfterConsumeParam::class);
81+
], $this, ConsumerAfterPopParam::class);
82+
if(null === $message)
83+
{
84+
Coroutine::sleep($config->getTimespan());
85+
}
86+
else
87+
{
88+
Event::trigger('IMI.QUEUE.CONSUMER.BEFORE_CONSUME', [
89+
'queue' => $queue,
90+
'message' => $message,
91+
], $this, ConsumerBeforeConsumeParam::class);
92+
$this->consume($message, $queue);
93+
Event::trigger('IMI.QUEUE.CONSUMER.AFTER_CONSUME', [
94+
'queue' => $queue,
95+
'message' => $message,
96+
], $this, ConsumerAfterConsumeParam::class);
97+
}
98+
} catch(\Throwable $th) {
99+
App::getBean('ErrorLog')->onException($th);
96100
}
97101
} while($this->working);
98102
};
@@ -107,11 +111,7 @@ public function start(?int $co = null)
107111
*/
108112
public function run(ITaskParam $param)
109113
{
110-
try {
111-
($param->getData()['task'])();
112-
} catch(\Throwable $th) {
113-
App::getBean('ErrorLog')->onException($th);
114-
}
114+
($param->getData()['task'])();
115115
}
116116

117117
});

0 commit comments

Comments
 (0)