Open
Description
Description
The following code:
<?php
$conf = new RdKafka\Conf();
$conf->set('group.id', 'myConsumerGroup');
$conf->set('metadata.broker.list', 'localhost:9092');
$conf->setRebalanceCb(function ($rk, $err, $partitions) {
var_dump(rd_kafka_err2str($err), $partitions);
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
$rk->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
$rk->assign(null);
break;
case RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS:
var_dump("RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS");
break;
}
});
$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe(['test']);
while (true) {
$message = $consumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timedout\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
Resulted in this output:
%3|1642065445.423|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Receive failed: Invalid response size 1213486160 (0..100000000): increase receive.message.max.bytes (after 0ms in state APIVERSION_QUERY, 3 identical error(s) suppressed)
%3|1642065445.423|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: localhost:9092/bootstrap: Receive failed: Invalid response size 1213486160 (0..100000000): increase receive.message.max.bytes (after 0ms in state APIVERSION_QUERY, 3 identical error(s) suppressed)
%3|1642065482.184|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Receive failed: Invalid response size 1213486160 (0..100000000): increase receive.message.max.bytes (after 0ms in state APIVERSION_QUERY, 4 identical error(s) suppressed)
%3|1642065482.185|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: localhost:9092/bootstrap: Receive failed: Invalid response size 1213486160 (0..100000000): increase receive.message.max.bytes (after 0ms in state APIVERSION_QUERY, 4 identical error(s) suppressed)
%3|1642065512.185|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Receive failed: Invalid response size 1213486160 (0..100000000): increase receive.message.max.bytes (after 0ms in state APIVERSION_QUERY, 3 identical error(s) suppressed)
%3|1642065512.186|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: localhost:9092/bootstrap: Receive failed: Invalid response size 1213486160 (0..100000000): increase receive.message.max.bytes (after 0ms in state APIVERSION_QUERY, 3 identical error(s) suppressed)
%3|1642065549.082|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Receive failed: Invalid response size 1213486160 (0..100000000): increase receive.message.max.bytes (after 0ms in state APIVERSION_QUERY, 4 identical error(s) suppressed)
%3|1642065549.082|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: localhost:9092/bootstrap: Receive failed: Invalid response size 1213486160 (0..100000000): increase receive.message.max.bytes (after 0ms in state APIVERSION_QUERY, 4 identical error(s) suppressed)
But I expected this output instead:
php-rdkafka Version
6.0.0
librdkafka Version
librdkafka version (build) => 1.8.2.255
PHP Version
PHP 8.0.3
Operating System
CentOS Stream release 8
Kafka Version
No response