Objective: Build a service that processes a large data dump without crashing memory.
Architecture Client → Ingestion API → Kafka → Processor Service → Postgres & OpenSearch
Functional Requirements
- Ingestion API:
- Endpoint:
POST /api/upload. Accepts a large CSV file. - Constraint: Do not load the entire file into memory. Read line-by-line and publish events to a Kafka topic (
raw-data). - Return
202 Acceptedimmediately.
- Endpoint:
- Processor (Consumer):
- Listen to Kafka.
- Validate: Discard invalid rows (e.g., negative amounts) and log errors.
- Persist: Save valid records to Postgres.
- Index: Sync data to OpenSearch.
- Search:
- Endpoint:
GET /api/query. Returns results from OpenSearch. - Pass SQL as Request Body. (Not Good in terms of Security Policy , just for learning)
- Endpoint:
1. Docker Support
- The solution must run via a single command:
docker-compose up --build. - Database and Kafka must be containerized (no local installation required).
- Volumes: Data in Postgres must persist even if containers are restarted.
2. Java 21 Features
- Use Virtual Threads (
spring.threads.virtual.enabled=true) for better I/O handling. - Use Java Records for DTOs.
3. Resilience
- Services must wait for the Database/Kafka to be ready before starting (implement a retry or health-check wait).
4. Testing
- Include at least one integration test using TestContainers
- DB Throughput Issue
- Let’s say , User is pushing 30k lines per sec to DB
- There are 5 user in parallel
- This means DB is handling 150k lines per sec
- This is far beyond DB throughput capabilities => DB will go DOWN
- For High Throughput -> Kafka
- In /api/upload endpoint , uploading files with large size (say 15gb) is not possible because of MAX SIZE UPLOAD THRESHOLD
- The Solution is to use Threading
- Virtual Threads
- Shift Thread management control from OS to JVM
- Allows us to create millions threads without worrying about Thread Pool Size of OS
- Massive Performance Boost
Node JS Tutorial for Beginners #13 - Streams and Buffers
- Buffer:
- Small Temporary Storage Space
- Data is passed on to next destination, after the temporary space is full
- Streaming:
- Small Temporary Storage Space
- Data is passed on to next destination, immediately after it reaches temp storage and destination is accepting data
- Does not wait for temporary storage to be full
Let’s say we have 16GB .csv file
- Buffering
- Will cause OutOfMemory Error
- Buffering with Chunking
- How it works: Your code reads the input stream until it fills a predefined buffer or list. Once the chunk is full, a Virtual Thread is assigned to process that specific batch.
- Is it possible for 16GB? Yes, provided the chunk size is significantly smaller than your available RAM. If your Docker container has 4GB of RAM and you use 100MB chunks, the system will stay stable.
- Streaming
Streaming processes the file byte-by-byte or line-by-line as it flows from the network.
- How it works: You use
HttpServletRequest.getInputStream()to access the raw data stream. A single Virtual Thread reads a line, processes it (or sends it to Kafka), and immediately moves to the next line. - Memory Usage: Your memory footprint is extremely low and constant (usually just the size of a single line of text, plus a small 8KB internal buffer). It doesn't matter if the file is 16GB or 16TB; the RAM usage stays the same.
- Virtual Thread Benefit: When the thread waits for more data to arrive over the network, it "unmounts" from the CPU, allowing the system to handle other tasks.
- To ensure Pure Streaming
- No buffering/storing of data (not even single byte) in the system
- Because we need to dynamically create DATABASE SCHEMA
- Every CSV file has it’s own DB SCHEMA , we cannot use STATIC DB SCHEMA
- We have to use DYNAMIC RUNTIME DB SCHEMA
- DYNAMIC SCHEMA is not possible in JPA , only possible in JDBC.
Query Endpoint
URL: POST http://localhost:8080/api/query
Content-Type: text/plain or application/json
Request Body: Raw SQL query string
Response: JSON array of objects (each row as a key-value map)
Usage Examples
1. Using cURL (Command Line)
- RUN: docker-compose up –build
- STOP: docker-compose stop
- Storing All Configs Variables in .env.template
- User will copy this template to .env and add details
- Replace everywhere with variables of .env
## The Problem
The upload endpoint needs to return the table name to the user immediately, but table creation happens asynchronously in a different thread (Kafka consumer). How do we communicate the table name between these two separate execution contexts?
---
## The Solution: CompletableFuture Bridge
A CompletableFuture acts as a communication bridge between the upload thread and the Kafka consumer thread.
---
## How It Works
### 1. Upload Starts
- User uploads CSV file
- Upload endpoint creates a new CompletableFuture (empty promise)
- Starts async processing (streaming to Kafka)
### 2. Background Processing
- CSV lines stream to Kafka topic
- Kafka consumer receives first line (header)
- Consumer generates table name and creates table
### 3. Signal Completion
- Consumer completes the CompletableFuture with table name
- Example: `future.complete("Table_5")`
### 4. Return to User
- Upload endpoint waits on CompletableFuture (max 30 seconds)
- Receives table name when completed
- Returns HTTP response: "Table Name: Table_5"
---
## Visual Flow
```
Upload Thread Kafka Consumer Thread
───────────── ─────────────────────
│
│ Create CompletableFuture
│ (empty promise)
│
│ Start streaming ──────────────────┐
│ │
│ Wait for table name │
│ (blocking, 30s timeout) │
│ ▼
│ Process header line
│ Generate: "Table_5"
│ CREATE TABLE
│ Complete future ────┐
│ future.complete() │
│ │ │
│ Receive "Table_5" ◄───────────────┴─────────┘
│
│ Return to user
│ "Table Name: Table_5"
│
▼
```
---
## Key Benefits
- **Synchronous response** - User gets table name immediately
- **No polling** - Efficient waiting mechanism
- **Thread-safe** - Safe communication between threads
- **Timeout protection** - Won't wait forever (30 second limit)
---
## Timeout Handling
If table creation takes longer than 30 seconds:
- CompletableFuture throws TimeoutException
- Upload endpoint returns error: "Timeout: Table creation took too long"
- Background processing continues (data still gets inserted)
- User can query for tables later using information_schema
---
The High Volume Ingestion Service implements a sophisticated dynamic schema creation system that automatically generates database tables based on CSV file headers. This eliminates the need for predefined schemas and allows the system to handle any CSV structure.
Problem Traditional database systems require predefined schemas before data insertion.
Solution Parse the CSV header row to dynamically create a table schema at runtime, then insert subsequent rows into that table.
Key Innovation Each uploaded CSV file gets its own dedicated table with a schema derived from the file's header row.
│ DYNAMIC SCHEMA CREATION FLOW │
└─────────────────────────────────────────────────────────────────────────┘
CSV File Upload
│
│ Line 1: "Movie Title,Release-Date,123Rating"
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ BatchInsertion.insertToTable(line) │
│ │
│ if (!isTableCreated) → createDynamicDatabaseSchema(line) │
└────────────────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ STEP 1: Generate Table Name │
│ │
│ CREATE SEQUENCE IF NOT EXISTS dynamicTables START WITH 1 │
│ nextval('dynamicTables') → 1 │
│ tableName = "Table_1" │
└────────────────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ STEP 2: Parse and Sanitize Headers │
│ │
│ Input: ["Movie Title", "Release-Date", "123Rating"] │
│ Output: ["movie_title", "release_date", "col_123rating"] │
│ │
│ Sanitization: │
│ • Trim whitespace │
│ • Replace [^a-zA-Z0-9_] with "_" │
│ • Convert to lowercase │
│ • Prefix "col_" if starts with digit │
└────────────────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ STEP 3: Create Table │
│ │
│ CREATE TABLE IF NOT EXISTS Table_1 ( │
│ movie_title TEXT, │
│ release_date TEXT, │
│ col_123rating TEXT │
│ ); │
│ │
│ isTableCreated = true │
│ tableNameFuture.complete("Table_1") ← Signal to upload endpoint │
└────────────────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ STEP 4: Prepare Insert Statement │
│ │
│ INSERT INTO Table_1 (movie_title, release_date, col_123rating) │
│ VALUES (?, ?, ?); │
│ │
│ placeholdersForInsertQuery = "?, ?, ?" │
└────────────────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ SUBSEQUENT LINES: Insert Data │
│ │
│ Line 2: "Inception,2010-07-16,8.8" │
│ Line 3: "The Matrix,1999-03-31,8.7" │
│ ... │
│ │
│ For each line: │
│ 1. Split CSV (handle quoted commas) │
│ 2. Validate column count │
│ 3. Execute: jdbcTemplate.update(query, content) │
└─────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ PostgreSQL Database │
│ │
│ Table_1 │
│ ┌──────────────┬──────────────┬────────────────┐ │
│ │ movie_title │ release_date │ col_123rating │ │
│ ├──────────────┼──────────────┼────────────────┤ │
│ │ Inception │ 2010-07-16 │ 8.8 │ │
│ │ The Matrix │ 1999-03-31 │ 8.7 │ │
│ └──────────────┴──────────────┴────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
## Step-by-Step Logic
### 1. Table Name Generation
- Create a PostgreSQL sequence named `dynamicTables` (if not exists)
- Get the next value from the sequence
- Concatenate "Table_" with the sequence number
- Result: `Table_1`, `Table_2`, `Table_3`, etc.
**Purpose:** Ensures unique, predictable table names for each upload
---
### 2. Column Name Sanitization
**Input:** CSV header row (e.g., `Movie Title,Release-Date,123Rating,Box Office $`)
**Sanitization Rules:**
1. Trim whitespace from both ends
2. Replace all non-alphanumeric characters with underscores
3. Convert to lowercase
4. If column name starts with a digit, prefix with `col_`
**Example Transformations:**
| Original Header | Sanitized Column Name |
|---|---|
| `Movie Title` | `movie_title` |
| `Release-Date` | `release_date` |
| `Box Office $` | `box_office__` |
| `123Rating` | `col_123rating` |
| `Year (2024)` | `year__2024_` |
**Purpose:** Creates PostgreSQL-compliant column identifiers and prevents SQL injection
---
### 3. Table Creation
- Build CREATE TABLE statement dynamically
- Use sanitized column names
- Set all columns to TEXT type (no type inference)
- Execute the CREATE TABLE statement
- Mark table as created
- Signal completion to upload endpoint
**Key Points:**
- All columns are TEXT type (maximum flexibility)
- No primary keys, foreign keys, or constraints
- Handles any CSV structure automatically
---
### 4. Data Insertion
**For each subsequent line:**
- Split CSV line on commas (respecting quoted strings)
- Validate row has correct number of columns
- Skip "dirty rows" with too many columns
- Execute INSERT using prepared statement with placeholders
- Continue processing remaining lines
**Purpose:** Safely insert data while handling malformed rows gracefully
- It will Create JAR file (Stage 1)
- Copy JAR to Container (Stage 2)
- Run the Container
- Build (to build the whole project)
- Depends on:
- Kafka
- Postgres
- Depends on:
- Spring Boot
- Zookeeper
- PostGres
- Kafka
- Depends on : Zookeeper
- Spring Boot
- Docker
- Kafka
- Apache Kafka Crash Course | What is Kafka?
- Overview :: Spring Kafka
- Zookeeper is Manager for Kafka
- Java Virtual Threads
- Buffering vs Streaming Data
- Node JS Tutorial for Beginners #13 - Streams and Buffers
- How Spring Boot Streams Large Files | Medium
- DO NOT USE MULTIPARTFILE class for upload (it buffers and stores file)
- INSTEAD US HTTPSERVLETREQUEST for True Upload and Downloading Streaming
- The Streaming API – FileUpload
- Postgres with Spring Boot
- Spring Boot Integration With PostgreSQL as a Maven Project - GeeksforGeeks (Generally we use JPA , but in this project we cannot use this)
- Setting Up PostgreSQL with Docker Compose for Development and Production - DEV Community
- Spring JDBC Tutorial | Baeldung
- Batch Inserting
- Bulk Insert Service instead of Line by Line Insert
- Create a Bulk Insert method , that will insert all collected line from kafka in every 5 seconds.
- All the collected lines will be stored in a Data Structure (DS)
- There will be 2 kinds of operations to be performed in DS
- Operation 1: Push the String line to DS
- Operation 2: Bulk Insert from DS to Postgres
- DS should be under mutex lock : Critical Section for Synchronization
- Either Operation 1 will be triggered at a time or Operation 2
- TestContainers for Integrated Testing
- OpenSearch Integration