Open
Description
Description
I am using nginx in docker with kafka. I have the following setup.
DockerFile
FROM unit:1.32.1-php8.2
ENV PHP_OPCACHE_VALIDATE_TIMESTAMPS="0" \
PHP_OPCACHE_MAX_ACCELERATED_FILES="10000" \
PHP_OPCACHE_MEMORY_CONSUMPTION="192" \
PHP_OPCACHE_MAX_WASTED_PERCENTAGE="10"
RUN docker-php-ext-install opcache
COPY config/opcache.ini /usr/local/etc/php/conf.d/opcache.ini
RUN apt-get update \
&& apt-get install -y librdkafka-dev git zip unzip wget \
&& pecl install rdkafka \
&& docker-php-ext-enable rdkafka \
# composer
&& curl -s https://getcomposer.org/installer | php \
&& mv composer.phar /usr/local/bin/composer
# Install dependencies and PCNTL extension
RUN apt-get update && apt-get install -y \
libonig-dev \
libxml2-dev \
&& docker-php-ext-install pcntl
# port used by the listener in config.json
EXPOSE 80
WORKDIR /var/www/html/
COPY config.json /docker-entrypoint.d/config.json
docker-compose.yml
services:
php:
build:
context: .
dockerfile: Dockerfile
working_dir: /var/www/html/
container_name: phpApp
ports:
- 80:80
volumes:
- .:/var/www/html/
networks:
- default
kafka:
image: 'bitnami/kafka:3.7.0'
container_name: kafka-3.7
ports:
- 9092:9092
- 29092:29092
environment:
# KRaft settings. No longer using zookeeper method.
- KAFKA_CFG_NODE_ID=0
- KAFKA_KRAFT_CLUSTER_ID=NDllYzhlNzNjMmZmNDEyNT
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_ADVERTISED_HOST_NAME=kafka
- KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS= 0
# Many things I added as part of troubleshooting this issue, as per online info from various sources.
- KAFKA_CFG_PRODUCER_ACKS= 0
- KAFKA_CFG_PRODUCER_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1
- KAFKA_CFG_PRODUCER_BATCH_SIZE=106384
- KAFKA_CFG_PRODUCER_LINGER_MS=0
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR= 1
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR= 1
- KAFKA_AUTO_CREATE_TOPICS_ENABLE:true
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093,PLAINTEXT_HOST://localhost:29092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
php producer, basic example from https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.examples-producer.html
<?php
if(isset($_POST['testData'])){
$payload=trim($_POST['testData']);
$conf = new RdKafka\Conf();
//$conf->set('bootstrap.servers', 'kafka:9092');
$conf->set('metadata.broker.list', 'kafka:9092');
$conf->set('socket.timeout.ms', 60); // or socket.blocking.max.ms, depending on librdkafka version
$conf->set('request.timeout.ms', 60); // request timeout
if (function_exists('pcntl_sigprocmask')) {
pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
$conf->set('internal.termination.signal', SIGIO);
} else {
$conf->set('queue.buffering.max.ms', 1);
}
$producer = new RdKafka\Producer($conf);
$topic="testData";
$kafkaTopic = $producer->newTopic($topic);
$kafkaTopic->produce(RD_KAFKA_PARTITION_UA, 0, $payload);
$producer->poll(0);
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
echo "Message Published";
break;
}
}
}
K6 test script
import http from 'k6/http';
import { check } from 'k6';
export let options = {
vus: 300, // 300 virtual users
duration: '30s', // Run test for 30 seconds
};
let counter = 0;
export default function () {
// Generate XML data
let testData = generateSensorDataStringForKafka();
// Make a POST request with XML data as payload
let res = http.post('http://localhost/producer.php', `testData=${testData}`, {
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
},
});
console.log(`Response body: ${res.body}`);
}
// Function to generate fake data
function generateSensorDataStringForKafka() {
let xml = '';
xml += 'dS='+(counter+1)+',FreezeCounter=0,alarmStatus=0,';
xml += '\',temperature=';
xml += Math.floor(Math.random() * (80 - 70 + 1)) + 70;
xml += ',humidity='+(Math.floor(Math.random() * (60 - 50 + 1)) + 50);
return xml;
if(counter>10000) counter=0;
}
Resulted in this output:
The test runs fine until the request count reaches little above 28000, and then It gives the following error.
phpApp | %3|1718821570.096|FAIL|rdkafka#producer-2836| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Failed to connect to broker at kafka-3.7.kafka-producer-k6_default:9092: Cannot assign requested address (after 6ms in state CONNECT)
phpApp | %3|1718821570.096|FAIL|rdkafka#producer-2816| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Failed to connect to broker at kafka-3.7.kafka-producer-k6_default:9092: Cannot assign requested address (after 6ms in state CONNECT, 1 identical error(s) suppressed)
rdkafka support | enabled
php-rdkafka ersion | 6.0.3
build date | Jun 21 2024 16:21:00
librdkafka version (runtime) | 1.6.0
librdkafka version (build) | 1.6.0.255
If I wait for a minute or so, and run the test again, it goes as normal at around 1300 req/s, until it hits the 28k mark, and stalls again.
Any idea, what is causing this stall? Been pulling hair on this for a while, can't seem to figure out. Much appreciate any assistance.
php-rdkafka Version
6.0.3
librdkafka Version
1.6
PHP Version
PHP 8.2
Operating System
Windows WSL Docker
Kafka Version
bitnami/kafka:3.7.0