This project demonstrates a real-time data streaming pipeline using Apache Kafka, Docker, and Python. The pipeline reads student performance data from a CSV file, streams it into Kafka using a producer, and consumes the data to store it into another CSV file. It showcases key concepts in data engineering, including event-driven architectures, containerization, and fault-tolerant processing.
The project is designed to be educational, highlighting common pitfalls in setting up streaming pipelines and how to resolve them. It uses a sample dataset on student performance metrics (e.g., hours studied, sleep hours, exam scores) to simulate real-world data ingestion and processing.
To simulate a real-world data engineering workflow involving:
- Real-time data ingestion from a source (CSV file).
- Event streaming using Kafka as the messaging system.
- Consumer-side data processing and transformation.
- Persistent storage of streamed data into an output file.
- Debugging and troubleshooting common issues in distributed systems.
This setup can be extended to more complex scenarios like integrating with databases, cloud services, or analytics tools.
The architecture follows a simple producer-consumer model with Kafka as the central broker:
CSV Dataset (Input) → Kafka Producer (Python Script) → Kafka Topic (student-performance) → Kafka Consumer (Python Script) → Output CSV (Processed Data)
- Input Source: A CSV file containing student performance data.
- Producer: Reads data row-by-row, serializes it to JSON, and publishes to a Kafka topic.
- Kafka Broker: Handles message queuing, partitioning, and replication (single partition for simplicity).
- Consumer: Subscribes to the topic, deserializes messages, and appends data to an output CSV.
- Dependencies: Docker for containerizing Kafka and Zookeeper; Python libraries for Kafka integration and data handling.
This design ensures decoupling between data production and consumption, allowing for scalability and reliability.
The repository is organized as follows:
kafka-project/
├── docker-compose.yml # Docker configuration for Kafka and Zookeeper
├── producer.py # Python script for the Kafka producer
├── consumer.py # Python script for the Kafka consumer
├── README.md # This documentation file
├── data/ # Directory for data files
│ ├── Student_Performance_Dataset.csv # Input dataset (sample student data)
│ └── output.csv # Output file generated by the consumer (initially empty)
- data/Student_Performance_Dataset.csv: A sample CSV with columns like
gender,race/ethnicity,parental level of education,lunch,test preparation course,math score,reading score,writing score. You can replace this with your own dataset. - output.csv: Automatically created/updated by the consumer.
During development and testing, several issues arose that prevented the pipeline from functioning correctly. These are common in Kafka setups and are documented here for learning purposes:
- The consumer failed to receive messages from the topic.
- Data was not being stored in the
output.csvfile. - Connection issues between the Python consumer and the Kafka broker.
- Errors in parsing incoming Kafka messages, leading to crashes.
These problems highlight the importance of configuration, error handling, and testing in streaming applications.
After investigation, the root causes were identified as follows:
-
Kafka Broker Connection Failure:
- The Python consumer could not connect to Kafka due to mismatched listener configurations in Docker.
- Error Message:
kafka.errors.NoBrokersAvailable. - Reason: The advertised listener was set to
localhost, but the container network required0.0.0.0for accessibility.
-
Kafka Offset Issue:
- Kafka maintains message offsets per consumer group. If messages were already consumed in a previous run, new consumers wouldn't see old messages unless offsets were reset.
- Reason: Default consumer behavior starts from the latest offset, missing historical data.
-
JSON Parsing Failure:
- During manual testing with Kafka console tools, plain text messages were sent, but the consumer expected JSON-formatted data.
- Error Message:
json.decoder.JSONDecodeError: Expecting value. - Reason: Lack of robust deserialization handling for invalid or malformed messages.
-
Startup Order Dependencies:
- Kafka depends on Zookeeper; starting components out of order led to initialization failures.
To resolve the issues, the following fixes were applied:
- Fixed Kafka Listener Configuration: Updated
docker-compose.ymlto usePLAINTEXT://0.0.0.0:9092for listeners andPLAINTEXT://localhost:9092for advertised listeners, ensuring connectivity from host to container. - Added Consumer Group ID: Specified a unique
group_idin the consumer to manage offsets independently. - Implemented Safe JSON Deserialization: Added a try-except block in the consumer to skip invalid messages gracefully.
- Reset Kafka Offsets: Used
auto_offset_reset='earliest'to consume from the beginning and included commands to reset offsets manually if needed. - Correct Startup Order: Ensured Docker Compose starts Zookeeper before Kafka, and scripts are run after containers are healthy.
- Error Handling Enhancements: Added print statements for debugging and file existence checks in the consumer.
These changes made the pipeline robust and reliable.
Kafka and Zookeeper are containerized using Docker Compose for easy setup and portability.
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1This configuration uses Confluent Platform images for stability.
Follow these steps to get the project running:
-
Install Dependencies:
- Ensure Docker and Docker Compose are installed.
- Install Python dependencies:
pip install kafka-python pandas.
-
Start Kafka:
docker-compose down -v # Clean up previous volumes if needed docker-compose up -d # Start in detached mode -
Verify Containers:
docker ps # Should show zookeeper and kafka running -
Create Kafka Topic:
- Enter the Kafka container:
docker exec -it kafka bash - Run:
kafka-topics --create \ --topic student-performance \ --bootstrap-server localhost:9092 \ --partitions 1 \ --replication-factor 1
- Enter the Kafka container:
-
Verify Topic:
kafka-topics --list --bootstrap-server localhost:9092 -
Prepare Data: Ensure
data/Student_Performance_Dataset.csvexists. If not, create a sample CSV with relevant columns.
The producer reads the CSV, converts rows to JSON, and sends them to Kafka.
from kafka import KafkaProducer
import pandas as pd
import json
import time
print("Producer started...")
# Load the dataset
df = pd.read_csv("data/Student_Performance_Dataset.csv")
# Initialize producer
producer = KafkaProducer(
bootstrap_servers="localhost:9092",
value_serializer=lambda v: json.dumps(v).encode("utf-8")
)
topic_name = "student-performance"
# Send each row as a message
for _, row in df.iterrows():
message = row.to_dict()
producer.send(topic_name, message)
print("Sent:", message)
time.sleep(0.2) # Delay for simulation
producer.flush()
print("Producer finished.")The consumer subscribes to the topic, processes messages, and appends to the output CSV.
from kafka import KafkaConsumer
import json
import pandas as pd
import os
print("Consumer started...")
def safe_json_deserializer(x):
try:
return json.loads(x.decode("utf-8"))
except json.JSONDecodeError:
print("Warning: Invalid JSON message skipped.")
return None
# Initialize consumer
consumer = KafkaConsumer(
"student-performance",
bootstrap_servers="localhost:9092",
auto_offset_reset="earliest",
group_id="student-group-1",
value_deserializer=safe_json_deserializer
)
output_file = "data/output.csv"
for message in consumer:
data = message.value
if data is None:
continue
print("Received:", data)
# Convert to DataFrame and append to CSV
df = pd.DataFrame([data])
if not os.path.isfile(output_file):
df.to_csv(output_file, index=False)
else:
df.to_csv(output_file, mode="a", header=False, index=False)-
Start the consumer in one terminal:
python consumer.py -
Start the producer in another terminal:
python producer.py
The producer will send data, and the consumer will receive and store it in output.csv.
For manual testing:
-
Producer Test:
docker exec -it kafka kafka-console-producer \ --topic student-performance \ --bootstrap-server localhost:9092Type JSON messages (e.g.,
{"math score": 85}) and press Enter. -
Consumer Test:
docker exec -it kafka kafka-console-consumer \ --topic student-performance \ --bootstrap-server localhost:9092 \ --from-beginning
Common issues and fixes:
-
Check Kafka Logs:
docker logs kafka -
Restart Kafka:
docker-compose down -v docker-compose up -d -
Check Active Containers:
docker ps -
Reset Offsets (if needed): Enter Kafka container and run:
kafka-consumer-groups --bootstrap-server localhost:9092 --group student-group-1 --reset-offsets --to-earliest --execute --topic student-performance -
Connection Issues: Ensure ports 2181 and 9092 are free. Verify
bootstrap_serversmatches advertised listeners. -
Parsing Errors: Use safe deserializer; inspect message formats.
-
No Messages Received: Confirm topic exists, producer is sending, and consumer group is correct.
The consumer stores data in data/output.csv, which mirrors the input structure. View it with cat data/output.csv or open in a spreadsheet tool.
Through this project, key learnings include:
- Kafka Producer and Consumer Architecture: Understanding serialization, deserialization, and topic management.
- Dockerized Kafka Deployment: Container orchestration with dependencies.
- Consumer Offset Management: Handling message consumption states for reliability.
- Fault-Tolerant Stream Processing: Implementing error handling for real-world robustness.
- Real-Time Data Streaming: Simulating end-to-end pipelines.
- Debugging Distributed Systems: Using logs, console tools, and configurations to troubleshoot.
This builds foundational skills for big data engineering.
- Integrate Apache Spark for advanced processing.
- Add Confluent Schema Registry for message validation.
- Store data in a database like Snowflake or PostgreSQL.
- Build a real-time dashboard using tools like Grafana.
- Scale with multiple partitions and brokers for high throughput.
Developed by [Your Name or AK]. This is a Kafka Streaming Pipeline using Docker and Python. Feel free to fork and contribute!
If you encounter issues or need customizations, open an issue on GitHub.