Open
Description
Description
The following code:
<?php
$conf = new RdKafka\Conf();
$conf->set('log_level', (string) LOG_DEBUG);
$conf->set('debug', 'all');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("10.0.0.1,10.0.0.2");
$topic = $rk->newTopic("test");
// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
echo "consumer started";
while (true) {
// The first argument is the partition (again).
// The second argument is the timeout.
$msg = $topic->consume(0, 1000);
if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue;
} elseif ($msg->err) {
echo $msg->errstr(), "\n";
break;
} else {
echo $msg->payload, "\n";
}
}
Resulted in this output:
%7|1642813622.832|OFFSET|rdkafka#consumer-1| [thrd:main]: test [0]: no current leader for partition: (re)starting offset query timer for offset BEGINNING
%7|1642813623.331|NOINFO|rdkafka#consumer-1| [thrd:main]: Topic test metadata information unknown
%7|1642813623.331|NOINFO|rdkafka#consumer-1| [thrd:main]: Topic test partition count is zero: should refresh metadata
%7|1642813623.331|CONNECT|rdkafka#consumer-1| [thrd:main]: Cluster connection already in progress: refresh unavailable topics
%7|1642813623.331|METADATA|rdkafka#consumer-1| [thrd:main]: Hinted cache of 1/1 topic(s) being queried
%7|1642813623.331|METADATA|rdkafka#consumer-1| [thrd:main]: Skipping metadata refresh of 1 topic(s): refresh unavailable topics: no usable brokers
%7|1642813623.331|CONNECT|rdkafka#consumer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection
%7|1642813623.332|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic test [0]: timed offset query for BEGINNING in state offset-query
%7|1642813623.332|OFFSET|rdkafka#consumer-1| [thrd:main]: test [0]: no current leader for partition: (re)starting offset query timer for offset BEGINNING
But I expected this output instead:
php-rdkafka Version
6.0.0
librdkafka Version
1.6.0-1
PHP Version
8.1
Operating System
Ubuntu 21
Kafka Version
2.12