Skip to content

aksingh4545/kafka-project

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Kafka Streaming Project – Student Performance Pipeline

Project Overview

This project demonstrates a real-time data streaming pipeline using Apache Kafka, Docker, and Python. The pipeline reads student performance data from a CSV file, streams it into Kafka using a producer, and consumes the data to store it into another CSV file. It showcases key concepts in data engineering, including event-driven architectures, containerization, and fault-tolerant processing.

The project is designed to be educational, highlighting common pitfalls in setting up streaming pipelines and how to resolve them. It uses a sample dataset on student performance metrics (e.g., hours studied, sleep hours, exam scores) to simulate real-world data ingestion and processing.

Objective

To simulate a real-world data engineering workflow involving:

  • Real-time data ingestion from a source (CSV file).
  • Event streaming using Kafka as the messaging system.
  • Consumer-side data processing and transformation.
  • Persistent storage of streamed data into an output file.
  • Debugging and troubleshooting common issues in distributed systems.

This setup can be extended to more complex scenarios like integrating with databases, cloud services, or analytics tools.

Architecture

The architecture follows a simple producer-consumer model with Kafka as the central broker:

CSV Dataset (Input) → Kafka Producer (Python Script) → Kafka Topic (student-performance) → Kafka Consumer (Python Script) → Output CSV (Processed Data)
  • Input Source: A CSV file containing student performance data.
  • Producer: Reads data row-by-row, serializes it to JSON, and publishes to a Kafka topic.
  • Kafka Broker: Handles message queuing, partitioning, and replication (single partition for simplicity).
  • Consumer: Subscribes to the topic, deserializes messages, and appends data to an output CSV.
  • Dependencies: Docker for containerizing Kafka and Zookeeper; Python libraries for Kafka integration and data handling.

This design ensures decoupling between data production and consumption, allowing for scalability and reliability.

Project Structure

The repository is organized as follows:

kafka-project/
├── docker-compose.yml      # Docker configuration for Kafka and Zookeeper
├── producer.py             # Python script for the Kafka producer
├── consumer.py             # Python script for the Kafka consumer
├── README.md               # This documentation file
├── data/                   # Directory for data files
│   ├── Student_Performance_Dataset.csv  # Input dataset (sample student data)
│   └── output.csv          # Output file generated by the consumer (initially empty)
  • data/Student_Performance_Dataset.csv: A sample CSV with columns like gender, race/ethnicity, parental level of education, lunch, test preparation course, math score, reading score, writing score. You can replace this with your own dataset.
  • output.csv: Automatically created/updated by the consumer.

Problem Faced

During development and testing, several issues arose that prevented the pipeline from functioning correctly. These are common in Kafka setups and are documented here for learning purposes:

  • The consumer failed to receive messages from the topic.
  • Data was not being stored in the output.csv file.
  • Connection issues between the Python consumer and the Kafka broker.
  • Errors in parsing incoming Kafka messages, leading to crashes.

These problems highlight the importance of configuration, error handling, and testing in streaming applications.

Root Causes

After investigation, the root causes were identified as follows:

  1. Kafka Broker Connection Failure:

    • The Python consumer could not connect to Kafka due to mismatched listener configurations in Docker.
    • Error Message: kafka.errors.NoBrokersAvailable.
    • Reason: The advertised listener was set to localhost, but the container network required 0.0.0.0 for accessibility.
  2. Kafka Offset Issue:

    • Kafka maintains message offsets per consumer group. If messages were already consumed in a previous run, new consumers wouldn't see old messages unless offsets were reset.
    • Reason: Default consumer behavior starts from the latest offset, missing historical data.
  3. JSON Parsing Failure:

    • During manual testing with Kafka console tools, plain text messages were sent, but the consumer expected JSON-formatted data.
    • Error Message: json.decoder.JSONDecodeError: Expecting value.
    • Reason: Lack of robust deserialization handling for invalid or malformed messages.
  4. Startup Order Dependencies:

    • Kafka depends on Zookeeper; starting components out of order led to initialization failures.

Solution Implemented

To resolve the issues, the following fixes were applied:

  • Fixed Kafka Listener Configuration: Updated docker-compose.yml to use PLAINTEXT://0.0.0.0:9092 for listeners and PLAINTEXT://localhost:9092 for advertised listeners, ensuring connectivity from host to container.
  • Added Consumer Group ID: Specified a unique group_id in the consumer to manage offsets independently.
  • Implemented Safe JSON Deserialization: Added a try-except block in the consumer to skip invalid messages gracefully.
  • Reset Kafka Offsets: Used auto_offset_reset='earliest' to consume from the beginning and included commands to reset offsets manually if needed.
  • Correct Startup Order: Ensured Docker Compose starts Zookeeper before Kafka, and scripts are run after containers are healthy.
  • Error Handling Enhancements: Added print statements for debugging and file existence checks in the consumer.

These changes made the pipeline robust and reliable.

Docker Setup

Kafka and Zookeeper are containerized using Docker Compose for easy setup and portability.

docker-compose.yml

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

This configuration uses Confluent Platform images for stability.

Setup Instructions

Follow these steps to get the project running:

  1. Install Dependencies:

    • Ensure Docker and Docker Compose are installed.
    • Install Python dependencies: pip install kafka-python pandas.
  2. Start Kafka:

    docker-compose down -v  # Clean up previous volumes if needed
    docker-compose up -d   # Start in detached mode
    
  3. Verify Containers:

    docker ps  # Should show zookeeper and kafka running
    
  4. Create Kafka Topic:

    • Enter the Kafka container: docker exec -it kafka bash
    • Run:
      kafka-topics --create \
      --topic student-performance \
      --bootstrap-server localhost:9092 \
      --partitions 1 \
      --replication-factor 1
      
  5. Verify Topic:

    kafka-topics --list --bootstrap-server localhost:9092
    
  6. Prepare Data: Ensure data/Student_Performance_Dataset.csv exists. If not, create a sample CSV with relevant columns.

Producer Code

The producer reads the CSV, converts rows to JSON, and sends them to Kafka.

from kafka import KafkaProducer
import pandas as pd
import json
import time

print("Producer started...")

# Load the dataset
df = pd.read_csv("data/Student_Performance_Dataset.csv")

# Initialize producer
producer = KafkaProducer(
    bootstrap_servers="localhost:9092",
    value_serializer=lambda v: json.dumps(v).encode("utf-8")
)

topic_name = "student-performance"

# Send each row as a message
for _, row in df.iterrows():
    message = row.to_dict()
    producer.send(topic_name, message)
    print("Sent:", message)
    time.sleep(0.2)  # Delay for simulation

producer.flush()
print("Producer finished.")

Consumer Code

The consumer subscribes to the topic, processes messages, and appends to the output CSV.

from kafka import KafkaConsumer
import json
import pandas as pd
import os

print("Consumer started...")

def safe_json_deserializer(x):
    try:
        return json.loads(x.decode("utf-8"))
    except json.JSONDecodeError:
        print("Warning: Invalid JSON message skipped.")
        return None

# Initialize consumer
consumer = KafkaConsumer(
    "student-performance",
    bootstrap_servers="localhost:9092",
    auto_offset_reset="earliest",
    group_id="student-group-1",
    value_deserializer=safe_json_deserializer
)

output_file = "data/output.csv"

for message in consumer:
    data = message.value
    if data is None:
        continue

    print("Received:", data)

    # Convert to DataFrame and append to CSV
    df = pd.DataFrame([data])
    if not os.path.isfile(output_file):
        df.to_csv(output_file, index=False)
    else:
        df.to_csv(output_file, mode="a", header=False, index=False)

Execution Flow

  1. Start the consumer in one terminal:

    python consumer.py
    
  2. Start the producer in another terminal:

    python producer.py
    

The producer will send data, and the consumer will receive and store it in output.csv.

Kafka Console Testing

For manual testing:

  • Producer Test:

    docker exec -it kafka kafka-console-producer \
    --topic student-performance \
    --bootstrap-server localhost:9092
    

    Type JSON messages (e.g., {"math score": 85}) and press Enter.

  • Consumer Test:

    docker exec -it kafka kafka-console-consumer \
    --topic student-performance \
    --bootstrap-server localhost:9092 \
    --from-beginning
    

Debugging Guide

Common issues and fixes:

  • Check Kafka Logs:

    docker logs kafka
    
  • Restart Kafka:

    docker-compose down -v
    docker-compose up -d
    
  • Check Active Containers:

    docker ps
    
  • Reset Offsets (if needed): Enter Kafka container and run:

    kafka-consumer-groups --bootstrap-server localhost:9092 --group student-group-1 --reset-offsets --to-earliest --execute --topic student-performance
    
  • Connection Issues: Ensure ports 2181 and 9092 are free. Verify bootstrap_servers matches advertised listeners.

  • Parsing Errors: Use safe deserializer; inspect message formats.

  • No Messages Received: Confirm topic exists, producer is sending, and consumer group is correct.

Output

The consumer stores data in data/output.csv, which mirrors the input structure. View it with cat data/output.csv or open in a spreadsheet tool.

Learning Outcome

Through this project, key learnings include:

  • Kafka Producer and Consumer Architecture: Understanding serialization, deserialization, and topic management.
  • Dockerized Kafka Deployment: Container orchestration with dependencies.
  • Consumer Offset Management: Handling message consumption states for reliability.
  • Fault-Tolerant Stream Processing: Implementing error handling for real-world robustness.
  • Real-Time Data Streaming: Simulating end-to-end pipelines.
  • Debugging Distributed Systems: Using logs, console tools, and configurations to troubleshoot.

This builds foundational skills for big data engineering.

Future Enhancements

  • Integrate Apache Spark for advanced processing.
  • Add Confluent Schema Registry for message validation.
  • Store data in a database like Snowflake or PostgreSQL.
  • Build a real-time dashboard using tools like Grafana.
  • Scale with multiple partitions and brokers for high throughput.

Author

Developed by [Your Name or AK]. This is a Kafka Streaming Pipeline using Docker and Python. Feel free to fork and contribute!

If you encounter issues or need customizations, open an issue on GitHub.

About

In repository i have one project that read the data from csv file and store it in the output.csv file , python create streaming of data and kafka do will streaming that

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages