Skip to content

Commit da2224f

Browse files
committed
Stop using exception flow control to stop consumer
1 parent 7186416 commit da2224f

File tree

3 files changed

+33
-46
lines changed

3 files changed

+33
-46
lines changed

src/Clients/Consumer/KafkaConsumer.php

Lines changed: 30 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
use RdKafka\Message;
1212
use RdKafka\TopicPartition;
1313
use SimPod\Kafka\Clients\Consumer\Exception\IncompatibleStatus;
14-
use SimPod\Kafka\Common\Exception\Wakeup;
1514

1615
use function array_map;
1716
use function rd_kafka_err2str;
@@ -30,6 +29,8 @@ final class KafkaConsumer extends RdKafkaConsumer
3029

3130
private LoggerInterface $logger;
3231

32+
private bool $shouldRun = true;
33+
3334
public function __construct(ConsumerConfig $config, ?LoggerInterface $logger = null)
3435
{
3536
$this->logger = $logger ?? new NullLogger();
@@ -144,50 +145,47 @@ function (Message $message) use (
144145
}
145146

146147
/**
147-
* @param callable(Message) : void $onSuccess
148-
* @param callable() : void $onPartitionEof
149-
* @param callable() : void $onTimedOut
148+
* @param callable(Message): void $onSuccess
149+
* @param callable() : void $onPartitionEof
150+
* @param callable() : void $onTimedOut
150151
*/
151152
private function doStart(
152153
int $timeoutMs,
153154
callable $onSuccess,
154155
?callable $onPartitionEof = null,
155156
?callable $onTimedOut = null
156157
): void {
157-
$this->registerSignals();
158+
$this->registerSignals($this->shouldRun);
158159

159-
try {
160-
while (true) {
161-
pcntl_signal_dispatch();
160+
while ($this->shouldRun) {
161+
$message = $this->consume($timeoutMs);
162162

163-
$message = $this->consume($timeoutMs);
163+
switch ($message->err) {
164+
case RD_KAFKA_RESP_ERR_NO_ERROR:
165+
$onSuccess($message);
164166

165-
switch ($message->err) {
166-
case RD_KAFKA_RESP_ERR_NO_ERROR:
167-
$onSuccess($message);
168-
169-
break;
170-
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
171-
if ($onPartitionEof !== null) {
172-
$onPartitionEof();
173-
}
167+
break;
168+
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
169+
if ($onPartitionEof !== null) {
170+
$onPartitionEof();
171+
}
174172

175-
$this->logger->info('No more messages. Will wait for more');
173+
$this->logger->info('No more messages. Will wait for more');
176174

177-
break;
178-
case RD_KAFKA_RESP_ERR__TIMED_OUT:
179-
$this->logger->info(sprintf('Timed out with timeout %d ms', $timeoutMs));
180-
if ($onTimedOut !== null) {
181-
$onTimedOut();
182-
}
175+
break;
176+
case RD_KAFKA_RESP_ERR__TIMED_OUT:
177+
$this->logger->info(sprintf('Timed out with timeout %d ms', $timeoutMs));
178+
if ($onTimedOut !== null) {
179+
$onTimedOut();
180+
}
183181

184-
break;
185-
default:
186-
$exception = IncompatibleStatus::fromMessage($message);
187-
$this->logger->error($exception->getMessage(), ['exception' => $exception]);
188-
}
182+
break;
183+
default:
184+
$exception = IncompatibleStatus::fromMessage($message);
185+
$this->logger->error($exception->getMessage(), ['exception' => $exception]);
189186
}
190-
} catch (Wakeup $wakeup) {
187+
188+
pcntl_signal_dispatch();
191189
}
192190

193191
$this->degisterSignals();
@@ -229,6 +227,6 @@ public function shutdown(): void
229227
{
230228
$this->logger->info('Shutting down');
231229

232-
throw new Wakeup();
230+
$this->shouldRun = false;
233231
}
234232
}

src/Clients/Consumer/WithSignalControl.php

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44

55
namespace SimPod\Kafka\Clients\Consumer;
66

7-
use SimPod\Kafka\Common\Exception\Wakeup;
8-
97
use function pcntl_signal;
108
use function Safe\pcntl_sigprocmask;
119

@@ -24,10 +22,10 @@ private function setupInternalTerminationSignal(ConsumerConfig $config): void
2422
$config->set('internal.termination.signal', SIGIO);
2523
}
2624

27-
private function registerSignals(): void
25+
private function registerSignals(bool &$shouldRun): void
2826
{
29-
$terminationCallback = static function (): void {
30-
throw new Wakeup();
27+
$terminationCallback = static function () use (&$shouldRun): void {
28+
$shouldRun = false;
3129
};
3230

3331
pcntl_signal(SIGTERM, $terminationCallback);

src/Common/Exception/Wakeup.php

Lines changed: 0 additions & 9 deletions
This file was deleted.

0 commit comments

Comments
 (0)