A dynamic, multi-user stock data streaming system built with Apache Kafka, Python Flask, and MySQL. Features real-time data visualization, admin approval workflows, and automatic topic management.
- Features
- System Architecture
- Tech Stack
- Installation
- Configuration
- Usage
- Project Structure
- API Endpoints
- Dynamic Topic Management - Topics created on-demand and managed via Kafka Admin API
- Real-Time Streaming - Live stock price data streamed through Kafka
- Multi-User Support - User authentication and personalized subscriptions
- Admin Approval Workflow - Topics require admin approval before activation
- Auto-Activation - Producer automatically activates approved topics
- User registration and authentication
- Subscribe/unsubscribe to stock topics
- Real-time price charts with Chart.js
- Live data feed with latest stock prices
- Multiple topic management
- Responsive modern UI
- 3-tab admin panel (Topics, Users, Subscriptions)
- Approve pending topics with company assignments
- Deactivate topics (auto-deletes from Kafka)
- View user subscriptions per topic
- View topic subscriptions per user
- Real-time subscription monitoring
- User โ Requests topic via UI โ Consumer API
- Consumer โ Creates pending topic in DB
- Admin โ Approves topic + assigns companies
- Producer โ Detects approval โ Creates Kafka topic โ Activates
- Consumer โ Starts consuming โ Streams data to users
- Admin โ Can deactivate โ Producer deletes Kafka topic
| Component | Technology |
|---|---|
| Backend | Python 3.8+, Flask, Flask-Session, Flask-CORS |
| Message Broker | Apache Kafka 3.x |
| Database | MySQL 8.0 |
| Frontend | HTML5, CSS3, JavaScript (ES6+) |
| Visualization | Chart.js 4.4.0 |
| Admin Panel | Python Tkinter |
| Data Source | yfinance (Yahoo Finance API) |
pip install flask flask-cors flask-session mysql-connector-python kafka-python yfinancegit clone https://github.com/yourusername/stock-streamer-pro.git
cd stock-streamer-propip install -r requirements.txtCREATE DATABASE stock_streamer_db;
USE stock_streamer_db;
-- Topics Table
CREATE TABLE topics (
id INT AUTO_INCREMENT PRIMARY KEY,
topic_name VARCHAR(255) UNIQUE NOT NULL,
companies TEXT,
status ENUM('pending', 'approved', 'active', 'deactivate') NOT NULL DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Users Table
CREATE TABLE users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_login TIMESTAMP NULL
);
-- User Subscriptions Table
CREATE TABLE user_subscriptions (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
topic_id INT NOT NULL,
subscribed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (topic_id) REFERENCES topics(id) ON DELETE CASCADE,
UNIQUE KEY (user_id, topic_id)
);
-- Create test user (password: password123)
INSERT INTO users (username, email, password_hash)
VALUES ('testuser', 'test@example.com', SHA2('password123', 256));# Start ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# Start Kafka Broker
bin/kafka-server-start.sh config/server.propertiesImportant Kafka Configuration (config/server.properties):
delete.topic.enable=true
auto.create.topics.enable=false1. db_utils.py
MYSQL_CONFIG = {
"host": "YOUR_MYSQL_HOST",
"user": "YOUR_MYSQL_USER",
"password": "YOUR_MYSQL_PASSWORD",
"database": "stock_streamer_db"
}2. config.json
{
"bootstrap_servers": ["YOUR_KAFKA_HOST:9092"],
"db_config": {
"host": "YOUR_MYSQL_HOST",
"user": "YOUR_MYSQL_USER",
"password": "YOUR_MYSQL_PASSWORD",
"database": "stock_streamer_db"
},
"topic_check_interval": 5,
"publish_interval": 10
}3. index.html
const API_BASE = 'http://YOUR_CONSUMER_HOST:5000';Terminal 1/Node 1: Start Producer
python producer_main.pyTerminal 2/Node 2: Start Consumer API
python consumer.pyTerminal 3/Node 3: Start Admin Panel
python admin.pyTerminal 4/Node 4: Open Web UI
python3 -m http.server 3000
# Open user.html in browser
Browser : 'http://USER_IP:3000'-
Register/Login
- Open
index.htmlin browser - Register new account or login with existing credentials
- Open
-
Request Topic
- Enter topic name (e.g., "technology", "finance")
- Click "Subscribe Now"
- Status shows as "Pending Approval"
-
Admin Approves
- Admin opens GUI panel
- Selects pending topic
- Enters companies (e.g., "GOOGL,MSFT,AAPL")
- Clicks "Approve"
-
Auto-Activation
- Producer detects approval
- Creates Kafka topic
- Marks as "Active" in database
- Consumer starts streaming data
-
View Data
- User sees "Streaming Active" status
- Real-time chart updates
- Live stock prices displayed
-
Unsubscribe
- Click โ button on subscription
- Removed from database
- Can resubscribe anytime
Topics Tab:
- View all topics (pending, approved, active)
- Approve topics with company assignments
- Deactivate topics (triggers Kafka deletion)
- View subscribers for each topic
Users Tab:
- View all registered users
- See user details (last login, email)
- View subscriptions per user
Subscriptions Tab:
- Real-time subscription monitoring
- See which users subscribe to which topics
- Track subscription timestamps
stock-streamer-pro/
โ
โโโ producer/
โ โโโ producer_main.py # Main producer entry point
โ โโโ topic_watcher.py # Thread: Monitors DB for topic changes
โ โโโ input_listener.py # Thread: Fetches stock data
โ โโโ publisher.py # Thread: Publishes to Kafka
โ โโโ db_utils.py # Database helper functions
โ โโโ config.json # Producer configuration
โ
โโโ consumer/
โ โโโ consumer.py # Flask API + Kafka consumers
โ
โโโ admin/
โ โโโ admin.py # Tkinter admin GUI
โ
โโโ ui/
โ โโโ user.html # User web interface
โ
โโโ requirements.txt # Python dependencies
โโโ README.md # This file
| Method | Endpoint | Description |
|---|---|---|
| POST | /register |
Register new user |
| POST | /login |
User login |
| POST | /logout |
User logout |
| GET | /current-user |
Get logged-in user |
| Method | Endpoint | Description |
|---|---|---|
| POST | /request-topic |
Subscribe to topic |
| POST | /unsubscribe-topic |
Unsubscribe from topic |
| GET | /topic-status/<name> |
Check topic status |
| Method | Endpoint | Description |
|---|---|---|
| GET | /get-stock-data/<name> |
Get latest stock data |
| GET | /health |
Health check |
- Akshay R - @Akshay10258
- Amogh Shetty - @amoghshetty09
- Amrutha - @amrutha0307
- Yashas J - @Yashtech-714
