Skip to content

Commit 211bb7f

Browse files
authored
Merge pull request #3 from haydenk/master
Refactor to run continuously until signal interrupt or termination
2 parents f063e27 + 9c75f31 commit 211bb7f

2 files changed

Lines changed: 37 additions & 9 deletions

File tree

src/Mshauneu/RdKafkaBundle/Command/TopicConsumeCommand.php

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ protected function configure()
3232
->addOption('offset', 'o', InputOption::VALUE_OPTIONAL, 'Offset', TopicCommunicator::OFFSET_BEGINNING)
3333
->addOption('timeout', 't', InputOption::VALUE_OPTIONAL, 'Timeout in ms', 1000);
3434
}
35-
35+
3636
/**
3737
* {@inheritDoc}
3838
* @see \Symfony\Component\Console\Command\Command::execute()
@@ -68,8 +68,16 @@ protected function execute(InputInterface $input, OutputInterface $output)
6868
}
6969

7070
$topicConsumer->consumeStart($offset, $partition);
71-
$topicConsumer->consume($messageHandler, $partition, $timeout);
72-
$topicConsumer->consumeStop();
71+
72+
pcntl_signal(SIGTERM, [&$topicConsumer, 'stop']);
73+
pcntl_signal(SIGINT, [&$topicConsumer, 'stop']);
74+
pcntl_signal(SIGHUP, [&$topicConsumer, 'restart']);
75+
76+
while($topicConsumer->isConsuming()) {
77+
$topicConsumer->consume($messageHandler, $partition, $timeout);
78+
pcntl_signal_dispatch();
79+
}
80+
7381
}
7482

7583
}

src/Mshauneu/RdKafkaBundle/Topic/TopicConsumer.php

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,19 +56,39 @@ public function consume($consumer, $partition = 0, $timeoutInMs = 1000) {
5656
if (true !== $this->isConsuming) {
5757
throw new \Exception ("Please call consumeStart first to start consuming message");
5858
}
59-
60-
while ($message = $this->consumerTopic->consume($partition, $timeoutInMs)) {
61-
$consumer->consume($message->topic_name, $message->partition, $message->offset, $message->key, $message->payload);
62-
}
59+
60+
if($message = $this->consumerTopic->consume($partition, $timeoutInMs)) {
61+
return $consumer->consume($message->topic_name, $message->partition, $message->offset, $message->key, $message->payload);
62+
}
63+
64+
return true;
6365
}
64-
66+
67+
public function stop()
68+
{
69+
$this->consumeStop();
70+
}
71+
6572
/**
6673
* @param number $partition
6774
*/
6875
public function consumeStop($partition = 0) {
6976
$this->consumerTopic->consumeStop($partition);
7077
$this->consumerTopic = null;
7178
$this->isConsuming = false;
72-
}
79+
}
80+
81+
public function restart()
82+
{
83+
// Implement a restart method
84+
}
85+
86+
/**
87+
* @return boolean
88+
*/
89+
public function isConsuming()
90+
{
91+
return $this->isConsuming;
92+
}
7393

7494
}

0 commit comments

Comments
 (0)