Skip to content

Building a Kafka Cluster β˜„οΈ

Lyes S edited this page Jul 2, 2022 · 2 revisions

Table Of Contents

Kafka Cluster Setup

Docker Compose File

version: '3.9'
services:
  zookeeper:
    image: bitnami/zookeeper:latest
    container_name: zookeeper
    restart: unless-stopped
    ports:
      - ${ZOOKEEPER_PORT}:${ZOOKEEPER_PORT}
    environment:
      - ALLOW_ANONYMOUS_LOGIN=${ALLOW_ANONYMOUS_LOGIN}

  kafka1:
    image: bitnami/kafka:2.7.0
    container_name: kafka-broker1
    restart: unless-stopped
    ports:
      - ${KAFKA_1_PORT_1}:${KAFKA_1_PORT_1}
      - ${KAFKA_1_PORT_2}:${KAFKA_1_PORT_2}
    depends_on:
      - zookeeper      
    environment:
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP_INTERNAL},EXTERNAL:${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP_EXTERNAL}
      - KAFKA_CFG_LISTENERS=INTERNAL:${KAFKA_1_CFG_LISTENERS_INTERNAL},EXTERNAL:${KAFKA_1_CFG_LISTENERS_EXTERNAL}
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL:${KAFKA_1_CFG_ADVERTISED_LISTENERS_INTERNAL},EXTERNAL:${KAFKA_1_CFG_ADVERTISED_LISTENERS_EXTERNAL}
      - KAFKA_CFG_ZOOKEEPER_CONNECT=${KAFKA_CFG_ZOOKEEPER_CONNECT}
      - KAFKA_INTER_BROKER_LISTENER_NAME=${KAFKA_INTER_BROKER_LISTENER_NAME}
      - ALLOW_PLAINTEXT_LISTENER=${ALLOW_PLAINTEXT_LISTENER}
      
  kafka2:
    image: bitnami/kafka:2.7.0
    container_name: kafka-broker2
    restart: unless-stopped
    ports:
      - ${KAFKA_2_PORT_1}:${KAFKA_2_PORT_1}
      - ${KAFKA_2_PORT_2}:${KAFKA_2_PORT_2}
    depends_on:
      - zookeeper      
    environment:
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP_INTERNAL},EXTERNAL:${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP_EXTERNAL}
      - KAFKA_CFG_LISTENERS=INTERNAL:${KAFKA_2_CFG_LISTENERS_INTERNAL},EXTERNAL:${KAFKA_2_CFG_LISTENERS_EXTERNAL}
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL:${KAFKA_2_CFG_ADVERTISED_LISTENERS_INTERNAL},EXTERNAL:${KAFKA_2_CFG_ADVERTISED_LISTENERS_EXTERNAL}
      - KAFKA_CFG_ZOOKEEPER_CONNECT=${KAFKA_CFG_ZOOKEEPER_CONNECT}
      - KAFKA_INTER_BROKER_LISTENER_NAME=${KAFKA_INTER_BROKER_LISTENER_NAME}
      - ALLOW_PLAINTEXT_LISTENER=${ALLOW_PLAINTEXT_LISTENER}
    

  kafka3:
    image: bitnami/kafka:2.7.0
    container_name: kafka-broker3
    restart: unless-stopped
    ports:
      - ${KAFKA_3_PORT_1}:${KAFKA_3_PORT_1}
      - ${KAFKA_3_PORT_2}:${KAFKA_3_PORT_2}
    depends_on:
      - zookeeper      
    environment:
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP_INTERNAL},EXTERNAL:${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP_EXTERNAL}
      - KAFKA_CFG_LISTENERS=INTERNAL:${KAFKA_3_CFG_LISTENERS_INTERNAL},EXTERNAL:${KAFKA_3_CFG_LISTENERS_EXTERNAL}
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL:${KAFKA_3_CFG_ADVERTISED_LISTENERS_INTERNAL},EXTERNAL:${KAFKA_3_CFG_ADVERTISED_LISTENERS_EXTERNAL}
      - KAFKA_CFG_ZOOKEEPER_CONNECT=${KAFKA_CFG_ZOOKEEPER_CONNECT}
      - KAFKA_INTER_BROKER_LISTENER_NAME=${KAFKA_INTER_BROKER_LISTENER_NAME}
      - ALLOW_PLAINTEXT_LISTENER=${ALLOW_PLAINTEXT_LISTENER}

  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    container_name: kafdrop
    restart: unless-stopped
    ports:
      - ${KAFDROP_PORT}:${KAFDROP_PORT}
    depends_on:
      - kafka1
      - kafka2
      - kafka3
    environment:
      - KAFKA_BROKERCONNECT=${KAFKA_BROKERCONNECT}
      - JVM_OPTS=${JVM_OPTS}
      - SERVER_SERVLET_CONTEXTPATH=${SERVER_SERVLET_CONTEXTPATH}

Environment Variables

# ================
# =   Zookeeper  =
# ================
ZOOKEEPER_HOST=zookeeper
ZOOKEEPER_PORT=2181
ALLOW_ANONYMOUS_LOGIN=yes

# =======================================
# =   Kafka Brokers Common Properties   =
# =======================================
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP_INTERNAL=PLAINTEXT
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP_EXTERNAL=PLAINTEXT
KAFKA_CFG_ZOOKEEPER_CONNECT=${ZOOKEEPER_HOST}:${ZOOKEEPER_PORT}
KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
ALLOW_PLAINTEXT_LISTENER=yes

# =========================
# =     Kafka Broker 1    =
# =========================
KAFKA_1_HOST=kafka1
KAFKA_1_INTERNAL_PORT=29092
KAFKA_1_EXTERNAL_PORT=9092
KAFKA_1_CFG_LISTENERS_INTERNAL=//:${KAFKA_1_INTERNAL_PORT}
KAFKA_1_CFG_LISTENERS_EXTERNAL=//:${KAFKA_1_EXTERNAL_PORT}
KAFKA_1_CFG_ADVERTISED_LISTENERS_INTERNAL=//${KAFKA_1_HOST}:${KAFKA_1_INTERNAL_PORT}
KAFKA_1_CFG_ADVERTISED_LISTENERS_EXTERNAL=//localhost:${KAFKA_1_EXTERNAL_PORT}
KAFKA_1_PORT_1=${KAFKA_1_EXTERNAL_PORT}
KAFKA_1_PORT_2=${KAFKA_1_INTERNAL_PORT}

# =========================
# =     Kafka Broker 2    =
# =========================
KAFKA_2_HOST=kafka2
KAFKA_2_INTERNAL_PORT=29093
KAFKA_2_EXTERNAL_PORT=9093
KAFKA_2_CFG_LISTENERS_INTERNAL=//:${KAFKA_2_INTERNAL_PORT}
KAFKA_2_CFG_LISTENERS_EXTERNAL=//:${KAFKA_2_EXTERNAL_PORT}
KAFKA_2_CFG_ADVERTISED_LISTENERS_INTERNAL=//${KAFKA_2_HOST}:${KAFKA_2_INTERNAL_PORT}
KAFKA_2_CFG_ADVERTISED_LISTENERS_EXTERNAL=//localhost:${KAFKA_2_EXTERNAL_PORT}
KAFKA_2_PORT_1=${KAFKA_2_EXTERNAL_PORT}
KAFKA_2_PORT_2=${KAFKA_2_INTERNAL_PORT}


# =========================
# =     Kafka Broker 3    =
# =========================
KAFKA_3_HOST=kafka3
KAFKA_3_INTERNAL_PORT=29094
KAFKA_3_EXTERNAL_PORT=9094
KAFKA_3_CFG_LISTENERS_INTERNAL=//:${KAFKA_3_INTERNAL_PORT}
KAFKA_3_CFG_LISTENERS_EXTERNAL=//:${KAFKA_3_EXTERNAL_PORT}
KAFKA_3_CFG_ADVERTISED_LISTENERS_INTERNAL=//${KAFKA_3_HOST}:${KAFKA_3_INTERNAL_PORT}
KAFKA_3_CFG_ADVERTISED_LISTENERS_EXTERNAL=//localhost:${KAFKA_3_EXTERNAL_PORT}
KAFKA_3_PORT_1=${KAFKA_3_EXTERNAL_PORT}
KAFKA_3_PORT_2=${KAFKA_3_INTERNAL_PORT}

# ==================
# =     KAFDROP    =
# ==================
KAFDROP_PORT=9000
KAFKA_BROKERCONNECT=${KAFKA_1_HOST}:${KAFKA_1_INTERNAL_PORT},${KAFKA_2_HOST}:${KAFKA_2_INTERNAL_PORT},${KAFKA_3_HOST}:${KAFKA_3_INTERNAL_PORT}
JVM_OPTS=-Xms32M -Xmx64M
SERVER_SERVLET_CONTEXTPATH=/

Running Kafka Cluster

  • The purpose is to start the Kafka cluster and observe it in Kafdrop UI.

Kafka Cluster Up

lyes-s ( β—₯β—£_β—’β—€ ) ~/Documents/Kafka $ docker-compose -f kafka-cluster.yml --env-file .env up -d
Creating network "kafka_default" with the default driver
Creating zookeeper ...
Creating zookeeper ... done
Creating kafka-broker3 ...
Creating kafka-broker2 ...
Creating kafka-broker1 ...
Creating kafka-broker3 ... done
Creating kafka-broker2 ... done
Creating kafka-broker1 ... done
Creating kafdrop       ...
Creating kafdrop       ... done
lyes-s ( β—₯β—£_β—’β—€ ) ~/Documents/Kafka $ docker container ls
CONTAINER ID   IMAGE                             COMMAND                  CREATED          STATUS          PORTS                                                        NAMES
b3003a480f30   obsidiandynamics/kafdrop:latest   "/kafdrop.sh"            13 minutes ago   Up 13 minutes   0.0.0.0:9000->9000/tcp                                       kafdrop
ce7c43edd60a   bitnami/kafka:2.7.0               "/opt/bitnami/script…"   13 minutes ago   Up 13 minutes   0.0.0.0:9092->9092/tcp, 0.0.0.0:29092->29092/tcp             kafka-broker1
ed5b71477969   bitnami/kafka:2.7.0               "/opt/bitnami/script…"   13 minutes ago   Up 13 minutes   0.0.0.0:9094->9094/tcp, 9092/tcp, 0.0.0.0:29094->29094/tcp   kafka-broker3
8a714002bcc9   bitnami/kafka:2.7.0               "/opt/bitnami/script…"   13 minutes ago   Up 13 minutes   0.0.0.0:9093->9093/tcp, 9092/tcp, 0.0.0.0:29093->29093/tcp   kafka-broker2
5730ef211b2d   bitnami/zookeeper:latest          "/opt/bitnami/script…"   13 minutes ago   Up 13 minutes   2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp         zookeeper

Kafka Cluster Overview

http://localhost:9000/

Image

Creating Topics with Replication

Kafdrop Topics Creation

  1. "kafka.learning.orders" topic creation with 3 partitions and 2 replicas

Image

  1. "kafka.learning.tweets" topic creation with 4 partitions and 3 replicas

Image

Kafdrop Home Page

Image

Explore "kafka.learning.orders" Topic

Image

Explore "kafka.learning.tweets" Topic

Image

Explore Broker ID 1001

Image

Kafka Cluster in Action

  • Execute the KafkaSimpleProducer & KafkaSimpleConsumer and visualize the interactions in Kafdrop UI.

Producer Java Code

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.Random;

public class KafkaSimpleProducer {

    public static void main(String[] args) {

        //Setup Properties for Kafka Producer
        Properties kafkaProps = new Properties();

        //List of brokers to connect to
        kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "localhost:9092,localhost:9093,localhost:9094");

        //Serializer class used to convert Keys to Byte Arrays
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        //Serializer class used to convert Messages to Byte Arrays
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        //Create a Kafka producer from configuration
        KafkaProducer simpleProducer = new KafkaProducer(kafkaProps);

        //Publish 10 messages at 2 second intervals, with a random key
        try{

            int startKey = (new Random()).nextInt(1000) ;

            for( int i=startKey; i < startKey + 20; i++) {

                //Create a producer Record
                ProducerRecord<String, String> kafkaRecord =
                        new ProducerRecord<String, String>(
                                "kafka.learning.orders",    //Topic name
                                String.valueOf(i),          //Key for the message
                                "This is order" + i         //Message Content
                        );

                System.out.println("Sending Message : "+ kafkaRecord.toString());

                //Publish to Kafka
                simpleProducer.send(kafkaRecord);

                Thread.sleep(2000);
            }
        }
        catch(Exception e) {

        }
        finally {
            simpleProducer.close();
        }

    }
}

Consumer Java Code

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaSimpleConsumer {

    public static void main(String[] args) {

        //Setup Properties for consumer
        Properties kafkaProps = new Properties();

        //List of Kafka brokers to connect to
        kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "localhost:9092,localhost:9093,localhost:9094");

        //Deserializer class to convert Keys from Byte Array to String
        kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");

        //Deserializer class to convert Messages from Byte Array to String
        kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");

        //Consumer Group ID for this consumer
        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG,
                "kafka-java-consumer");

        //Set to consume from the earliest message, on start when no offset is
        //available in Kafka
        kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                "earliest");

        //Create a Consumer
        KafkaConsumer<String, String> simpleConsumer =
                new KafkaConsumer<String, String>(kafkaProps);

        //Subscribe to the kafka.learning.orders topic
        simpleConsumer.subscribe(Arrays.asList("kafka.learning.orders"));

        //Continuously poll for new messages
        while(true) {

            //Poll with timeout of 100 milli seconds
            ConsumerRecords<String, String> messages =
                    simpleConsumer.poll(Duration.ofMillis(100));

            //Print batch of records consumed
            for (ConsumerRecord<String, String> message : messages)
                System.out.println("Message fetched : " + message);
        }


    }
}
Clone this wiki locally