diff --git a/.vscode/settings.json b/.vscode/settings.json
new file mode 100644
index 000000000..e0f15db2e
--- /dev/null
+++ b/.vscode/settings.json
@@ -0,0 +1,3 @@
+{
+ "java.configuration.updateBuildConfiguration": "automatic"
+}
\ No newline at end of file
diff --git a/README.md b/README.md
index 4aa6eeb3a..b144273a2 100644
--- a/README.md
+++ b/README.md
@@ -175,6 +175,64 @@ rates below are specified as *records/second*.
| High (167 Directives) | 426 | 127,946,398 | 82,677,845,324 | 106,367.27 |
| High (167 Directives) | 426 | 511,785,592 | 330,711,381,296 | 105,768.93 |
+## Byte Size and Time Duration Parsers
+
+The Wrangler library now includes built-in support for parsing and aggregating byte sizes and time durations. This feature allows you to easily work with data that includes size measurements (e.g., "1.5MB", "2GB") and time intervals (e.g., "500ms", "2.5h").
+
+### Supported Units
+
+#### Byte Size Units
+- B (Bytes)
+- KB (Kilobytes)
+- MB (Megabytes)
+- GB (Gigabytes)
+- TB (Terabytes)
+- PB (Petabytes)
+
+#### Time Duration Units
+- ns (Nanoseconds)
+- us (Microseconds)
+- ms (Milliseconds)
+- s (Seconds)
+- m (Minutes)
+- h (Hours)
+- d (Days)
+
+### Using the Aggregate Stats Directive
+
+The `aggregate-stats` directive allows you to aggregate byte sizes and time durations across rows. Here's the syntax:
+
+```
+aggregate-stats :size_column :time_column total_size_column total_time_column [output_size_unit] [output_time_unit]
+```
+
+Parameters:
+- `:size_column` - Column containing byte sizes (e.g., "1.5MB", "2GB")
+- `:time_column` - Column containing time durations (e.g., "500ms", "2.5h")
+- `total_size_column` - Name of the output column for total size
+- `total_time_column` - Name of the output column for total time
+- `output_size_unit` - (Optional) Unit for the output size (default: "MB")
+- `output_time_unit` - (Optional) Unit for the output time (default: "s")
+
+Example:
+```
+# Input data:
+# | data_size | response_time |
+# |-----------|---------------|
+# | 1.5MB | 500ms |
+# | 2.5MB | 750ms |
+# | 1MB | 250ms |
+
+# Directive:
+aggregate-stats :data_size :response_time total_size total_time MB s
+
+# Output:
+# | total_size | total_time |
+# |------------|------------|
+# | 5.0 | 1.5 |
+```
+
+The directive automatically handles mixed units in the input data, converting everything to a common base unit (bytes for sizes, nanoseconds for times) before aggregating and then converting to the requested output units.
## Contact
@@ -216,3 +274,60 @@ Cask is a trademark of Cask Data, Inc. All rights reserved.
Apache, Apache HBase, and HBase are trademarks of The Apache Software Foundation. Used with
permission. No endorsement by The Apache Software Foundation is implied by the use of these marks.
+
+# Wrangler
+
+A data preparation tool for cleaning, transforming, and preparing data for analysis.
+
+## Unit Parsers
+
+### Byte Size Parser
+The byte size parser supports the following units:
+- B (bytes)
+- KB (kilobytes)
+- MB (megabytes)
+- GB (gigabytes)
+- TB (terabytes)
+- PB (petabytes)
+
+Example usage:
+```
+1B // 1 byte
+1KB // 1 kilobyte
+1MB // 1 megabyte
+1GB // 1 gigabyte
+1TB // 1 terabyte
+1PB // 1 petabyte
+```
+
+### Time Duration Parser
+The time duration parser supports the following units:
+- ns (nanoseconds)
+- us (microseconds)
+- ms (milliseconds)
+- s (seconds)
+- m (minutes)
+- h (hours)
+- d (days)
+
+Example usage:
+```
+1ns // 1 nanosecond
+1us // 1 microsecond
+1ms // 1 millisecond
+1s // 1 second
+1m // 1 minute
+1h // 1 hour
+1d // 1 day
+```
+
+### Usage in Directives
+Both byte size and time duration values can be used in directives for data transformation and aggregation:
+
+```
+// Aggregate byte sizes
+aggregate-stats :column1 sum as total_size;
+
+// Aggregate time durations
+aggregate-stats :column2 average as avg_duration;
+```
diff --git a/clickhouse-flatfile-ingestion/README.md b/clickhouse-flatfile-ingestion/README.md
new file mode 100644
index 000000000..0b6c65ce6
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/README.md
@@ -0,0 +1,137 @@
+# ClickHouse Flat File Ingestion Tool
+
+A web-based application for bidirectional data ingestion between ClickHouse database and Flat File platform.
+
+## Features
+
+- Bidirectional data ingestion between ClickHouse and Flat Files
+- JWT-based authentication
+- Schema discovery and validation
+- Progress tracking for large data transfers
+- Support for various file formats (CSV, JSON, etc.)
+- Configurable data mapping
+- Error handling and logging
+
+## Technology Stack
+
+### Backend
+- Spring Boot
+- Spring Security with JWT
+- ClickHouse JDBC Driver
+- Apache Commons CSV
+- Jackson for JSON processing
+
+### Frontend
+- React
+- Material-UI
+- Axios
+- React Router
+- React Query
+
+## Prerequisites
+
+- Java 17 or higher
+- Node.js 16 or higher
+- ClickHouse server
+- PostgreSQL (for user management)
+
+## Environment Variables
+
+Create a `.env` file in the backend directory with the following variables:
+
+```properties
+# Database Configuration
+DB_URL=jdbc:postgresql://localhost:5432/ingestion_db
+DB_USERNAME=your_db_username
+DB_PASSWORD=your_db_password
+
+# ClickHouse Configuration
+CLICKHOUSE_HOST=your_clickhouse_host
+CLICKHOUSE_PORT=8443
+CLICKHOUSE_DATABASE=your_database
+CLICKHOUSE_USER=your_username
+CLICKHOUSE_PASSWORD=your_password
+
+# JWT Configuration
+JWT_SECRET=your_jwt_secret_key
+
+# File Upload Configuration
+UPLOAD_DIR=./uploads
+```
+
+## Installation
+
+1. Clone the repository
+2. Set up environment variables
+3. Build and run the backend:
+ ```bash
+ cd backend
+ ./mvnw clean install
+ ./mvnw spring-boot:run
+ ```
+4. Build and run the frontend:
+ ```bash
+ cd frontend
+ npm install
+ npm start
+ ```
+
+## Usage
+
+1. Access the application at `http://localhost:3000`
+2. Log in with your credentials
+3. Select source (ClickHouse or Flat File)
+4. Configure connection parameters
+5. Select tables and columns
+6. Preview data
+7. Start ingestion process
+
+## Security Considerations
+
+- All sensitive information is stored in environment variables
+- JWT tokens expire after 24 hours
+- Passwords are hashed using BCrypt
+- SSL/TLS encryption for database connections
+- Input validation and sanitization
+- Rate limiting on API endpoints
+
+## API Documentation
+
+### Authentication
+- POST /api/auth/login - Login endpoint
+- POST /api/auth/refresh - Refresh token endpoint
+
+### Ingestion
+- POST /api/ingestion/export - Export data from ClickHouse to file
+- POST /api/ingestion/import - Import data from file to ClickHouse
+- GET /api/ingestion/progress/{jobId} - Get ingestion progress
+- GET /api/ingestion/schema - Get table schema
+- GET /api/ingestion/preview - Get data preview
+
+## Error Handling
+
+The application includes comprehensive error handling for:
+- Invalid credentials
+- Connection failures
+- Schema mismatches
+- File format errors
+- Data validation errors
+- Network timeouts
+
+## Logging
+
+- Application logs are stored in `logs/application.log`
+- Log levels can be configured in `application.properties`
+- Structured logging format for better analysis
+
+## Contributing
+
+1. Fork the repository
+2. Create a feature branch
+3. Commit your changes
+4. Push to the branch
+5. Create a Pull Request
+
+## License
+
+This project is licensed under the MIT License - see the LICENSE file for details.
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/pom.xml b/clickhouse-flatfile-ingestion/backend/pom.xml
new file mode 100644
index 000000000..4a120cb2c
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/pom.xml
@@ -0,0 +1,100 @@
+
+
+ 4.0.0
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.7.0
+
+ com.wrangler
+ clickhouse-flatfile-ingestion
+ 1.0-SNAPSHOT
+ clickhouse-flatfile-ingestion
+ Bidirectional ClickHouse & Flat File Data Ingestion Tool
+
+
+ 11
+ 0.3.2
+ 0.11.5
+ 1.9.0
+ 1.18.24
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.springframework.boot
+ spring-boot-starter-validation
+
+
+
+
+ com.clickhouse
+ clickhouse-jdbc
+ ${clickhouse-jdbc.version}
+
+
+
+
+ io.jsonwebtoken
+ jjwt-api
+ ${jjwt.version}
+
+
+ io.jsonwebtoken
+ jjwt-impl
+ ${jjwt.version}
+ runtime
+
+
+ io.jsonwebtoken
+ jjwt-jackson
+ ${jjwt.version}
+ runtime
+
+
+
+
+ org.apache.commons
+ commons-csv
+ ${commons-csv.version}
+
+
+
+
+ org.projectlombok
+ lombok
+ ${lombok.version}
+ provided
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+ org.projectlombok
+ lombok
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/ClickHouseFlatFileIngestionApplication.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/ClickHouseFlatFileIngestionApplication.java
new file mode 100644
index 000000000..f2c3703af
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/ClickHouseFlatFileIngestionApplication.java
@@ -0,0 +1,30 @@
+package com.ingestion;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.web.servlet.config.annotation.CorsRegistry;
+import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
+
+@SpringBootApplication
+@EnableAsync
+public class ClickHouseFlatFileIngestionApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(ClickHouseFlatFileIngestionApplication.class, args);
+ }
+
+ @Bean
+ public WebMvcConfigurer corsConfigurer() {
+ return new WebMvcConfigurer() {
+ @Override
+ public void addCorsMappings(CorsRegistry registry) {
+ registry.addMapping("/**")
+ .allowedOrigins("*")
+ .allowedMethods("GET", "POST", "PUT", "DELETE", "OPTIONS")
+ .allowedHeaders("*");
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/IngestionApplication.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/IngestionApplication.java
new file mode 100644
index 000000000..5384e838d
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/IngestionApplication.java
@@ -0,0 +1,29 @@
+package com.ingestion;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.web.servlet.config.annotation.CorsRegistry;
+import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
+
+@SpringBootApplication
+public class IngestionApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(IngestionApplication.class, args);
+ }
+
+ @Bean
+ public WebMvcConfigurer corsConfigurer() {
+ return new WebMvcConfigurer() {
+ @Override
+ public void addCorsMappings(CorsRegistry registry) {
+ registry.addMapping("/**")
+ .allowedOrigins("http://localhost:3000")
+ .allowedMethods("GET", "POST", "PUT", "DELETE", "OPTIONS")
+ .allowedHeaders("*")
+ .allowCredentials(true);
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/DatabaseConnectionException.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/DatabaseConnectionException.java
new file mode 100644
index 000000000..13894838f
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/DatabaseConnectionException.java
@@ -0,0 +1,15 @@
+package com.ingestion.config;
+
+/**
+ * Exception thrown when there are issues connecting to the database.
+ */
+public class DatabaseConnectionException extends RuntimeException {
+
+ public DatabaseConnectionException(String message) {
+ super(message);
+ }
+
+ public DatabaseConnectionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/GlobalExceptionHandler.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/GlobalExceptionHandler.java
new file mode 100644
index 000000000..37ca16fee
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/GlobalExceptionHandler.java
@@ -0,0 +1,93 @@
+package com.ingestion.config;
+
+import com.ingestion.dto.ErrorResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.ControllerAdvice;
+import org.springframework.web.bind.annotation.ExceptionHandler;
+import org.springframework.web.context.request.WebRequest;
+import org.springframework.web.servlet.mvc.method.annotation.ResponseEntityExceptionHandler;
+
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Global exception handler for the application.
+ * Centralizes error handling and provides consistent error responses.
+ */
+@ControllerAdvice
+public class GlobalExceptionHandler extends ResponseEntityExceptionHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(GlobalExceptionHandler.class);
+
+ /**
+ * Handles all unhandled exceptions
+ */
+ @ExceptionHandler(Exception.class)
+ public ResponseEntity handleAllExceptions(Exception ex, WebRequest request) {
+ logger.error("Unhandled exception occurred", ex);
+
+ ErrorResponse errorResponse = new ErrorResponse(
+ HttpStatus.INTERNAL_SERVER_ERROR.value(),
+ "An unexpected error occurred",
+ ex.getMessage(),
+ LocalDateTime.now()
+ );
+
+ return new ResponseEntity<>(errorResponse, HttpStatus.INTERNAL_SERVER_ERROR);
+ }
+
+ /**
+ * Handles validation exceptions
+ */
+ @ExceptionHandler(IllegalArgumentException.class)
+ public ResponseEntity handleValidationExceptions(IllegalArgumentException ex, WebRequest request) {
+ logger.warn("Validation error occurred", ex);
+
+ ErrorResponse errorResponse = new ErrorResponse(
+ HttpStatus.BAD_REQUEST.value(),
+ "Validation error",
+ ex.getMessage(),
+ LocalDateTime.now()
+ );
+
+ return new ResponseEntity<>(errorResponse, HttpStatus.BAD_REQUEST);
+ }
+
+ /**
+ * Handles resource not found exceptions
+ */
+ @ExceptionHandler(ResourceNotFoundException.class)
+ public ResponseEntity handleResourceNotFoundException(ResourceNotFoundException ex, WebRequest request) {
+ logger.warn("Resource not found", ex);
+
+ ErrorResponse errorResponse = new ErrorResponse(
+ HttpStatus.NOT_FOUND.value(),
+ "Resource not found",
+ ex.getMessage(),
+ LocalDateTime.now()
+ );
+
+ return new ResponseEntity<>(errorResponse, HttpStatus.NOT_FOUND);
+ }
+
+ /**
+ * Handles database connection exceptions
+ */
+ @ExceptionHandler(DatabaseConnectionException.class)
+ public ResponseEntity handleDatabaseConnectionException(DatabaseConnectionException ex, WebRequest request) {
+ logger.error("Database connection error", ex);
+
+ ErrorResponse errorResponse = new ErrorResponse(
+ HttpStatus.SERVICE_UNAVAILABLE.value(),
+ "Database connection error",
+ ex.getMessage(),
+ LocalDateTime.now()
+ );
+
+ return new ResponseEntity<>(errorResponse, HttpStatus.SERVICE_UNAVAILABLE);
+ }
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/HealthCheckConfig.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/HealthCheckConfig.java
new file mode 100644
index 000000000..1cb62bb9f
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/HealthCheckConfig.java
@@ -0,0 +1,43 @@
+package com.ingestion.config;
+
+import org.springframework.boot.actuate.health.Health;
+import org.springframework.boot.actuate.health.HealthIndicator;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+/**
+ * Configuration for health checks and monitoring.
+ */
+@Configuration
+public class HealthCheckConfig {
+
+ /**
+ * Creates a health indicator for the database connection.
+ */
+ @Bean
+ public HealthIndicator dbHealthIndicator(JdbcTemplate jdbcTemplate) {
+ return () -> {
+ try {
+ jdbcTemplate.queryForObject("SELECT 1", Integer.class);
+ return Health.up().withDetail("database", "ClickHouse").build();
+ } catch (Exception e) {
+ return Health.down()
+ .withDetail("database", "ClickHouse")
+ .withException(e)
+ .build();
+ }
+ };
+ }
+
+ /**
+ * Creates a health indicator for the application.
+ */
+ @Bean
+ public HealthIndicator applicationHealthIndicator() {
+ return () -> Health.up()
+ .withDetail("application", "ClickHouse Flat File Ingestion")
+ .withDetail("status", "Running")
+ .build();
+ }
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/JwtAuthenticationFilter.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/JwtAuthenticationFilter.java
new file mode 100644
index 000000000..3dd75539c
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/JwtAuthenticationFilter.java
@@ -0,0 +1,64 @@
+package com.ingestion.config;
+
+import jakarta.servlet.FilterChain;
+import jakarta.servlet.ServletException;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
+import org.springframework.security.core.context.SecurityContextHolder;
+import org.springframework.security.core.userdetails.UserDetails;
+import org.springframework.security.core.userdetails.UserDetailsService;
+import org.springframework.security.web.authentication.WebAuthenticationDetailsSource;
+import org.springframework.stereotype.Component;
+import org.springframework.web.filter.OncePerRequestFilter;
+
+import java.io.IOException;
+
+/**
+ * JWT Authentication Filter that validates JWT tokens in incoming requests.
+ */
+@Component
+public class JwtAuthenticationFilter extends OncePerRequestFilter {
+
+ private final JwtService jwtService;
+ private final UserDetailsService userDetailsService;
+
+ public JwtAuthenticationFilter(JwtService jwtService, UserDetailsService userDetailsService) {
+ this.jwtService = jwtService;
+ this.userDetailsService = userDetailsService;
+ }
+
+ @Override
+ protected void doFilterInternal(
+ HttpServletRequest request,
+ HttpServletResponse response,
+ FilterChain filterChain
+ ) throws ServletException, IOException {
+ final String authHeader = request.getHeader("Authorization");
+ final String jwt;
+ final String username;
+
+ if (authHeader == null || !authHeader.startsWith("Bearer ")) {
+ filterChain.doFilter(request, response);
+ return;
+ }
+
+ jwt = authHeader.substring(7);
+ username = jwtService.extractUsername(jwt);
+
+ if (username != null && SecurityContextHolder.getContext().getAuthentication() == null) {
+ UserDetails userDetails = this.userDetailsService.loadUserByUsername(username);
+
+ if (jwtService.isTokenValid(jwt, userDetails)) {
+ UsernamePasswordAuthenticationToken authToken = new UsernamePasswordAuthenticationToken(
+ userDetails,
+ null,
+ userDetails.getAuthorities()
+ );
+ authToken.setDetails(new WebAuthenticationDetailsSource().buildDetails(request));
+ SecurityContextHolder.getContext().setAuthentication(authToken);
+ }
+ }
+ filterChain.doFilter(request, response);
+ }
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/JwtService.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/JwtService.java
new file mode 100644
index 000000000..3f93ad717
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/JwtService.java
@@ -0,0 +1,124 @@
+package com.ingestion.config;
+
+import io.jsonwebtoken.Claims;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import io.jsonwebtoken.io.Decoders;
+import io.jsonwebtoken.security.Keys;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.security.core.userdetails.UserDetails;
+import org.springframework.stereotype.Service;
+
+import java.security.Key;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Service for JWT token operations including generation and validation.
+ */
+@Service
+public class JwtService {
+
+ @Value("${jwt.secret}")
+ private String secretKey;
+
+ @Value("${jwt.expiration}")
+ private long jwtExpiration;
+
+ @Value("${jwt.refresh-token.expiration}")
+ private long refreshExpiration;
+
+ /**
+ * Extracts the username from a JWT token.
+ */
+ public String extractUsername(String token) {
+ return extractClaim(token, Claims::getSubject);
+ }
+
+ /**
+ * Extracts a specific claim from a JWT token.
+ */
+ public T extractClaim(String token, Function claimsResolver) {
+ final Claims claims = extractAllClaims(token);
+ return claimsResolver.apply(claims);
+ }
+
+ /**
+ * Generates a JWT token for a user.
+ */
+ public String generateToken(UserDetails userDetails) {
+ return generateToken(new HashMap<>(), userDetails);
+ }
+
+ /**
+ * Generates a JWT token with extra claims for a user.
+ */
+ public String generateToken(Map extraClaims, UserDetails userDetails) {
+ return buildToken(extraClaims, userDetails, jwtExpiration);
+ }
+
+ /**
+ * Generates a refresh token for a user.
+ */
+ public String generateRefreshToken(UserDetails userDetails) {
+ return buildToken(new HashMap<>(), userDetails, refreshExpiration);
+ }
+
+ /**
+ * Builds a JWT token with the specified claims and expiration.
+ */
+ private String buildToken(Map extraClaims, UserDetails userDetails, long expiration) {
+ return Jwts
+ .builder()
+ .setClaims(extraClaims)
+ .setSubject(userDetails.getUsername())
+ .setIssuedAt(new Date(System.currentTimeMillis()))
+ .setExpiration(new Date(System.currentTimeMillis() + expiration))
+ .signWith(getSignInKey(), SignatureAlgorithm.HS256)
+ .compact();
+ }
+
+ /**
+ * Validates a JWT token for a user.
+ */
+ public boolean isTokenValid(String token, UserDetails userDetails) {
+ final String username = extractUsername(token);
+ return (username.equals(userDetails.getUsername())) && !isTokenExpired(token);
+ }
+
+ /**
+ * Checks if a token is expired.
+ */
+ private boolean isTokenExpired(String token) {
+ return extractExpiration(token).before(new Date());
+ }
+
+ /**
+ * Extracts the expiration date from a token.
+ */
+ private Date extractExpiration(String token) {
+ return extractClaim(token, Claims::getExpiration);
+ }
+
+ /**
+ * Extracts all claims from a token.
+ */
+ private Claims extractAllClaims(String token) {
+ return Jwts
+ .parserBuilder()
+ .setSigningKey(getSignInKey())
+ .build()
+ .parseClaimsJws(token)
+ .getBody();
+ }
+
+ /**
+ * Gets the signing key for JWT operations.
+ */
+ private Key getSignInKey() {
+ byte[] keyBytes = Decoders.BASE64.decode(secretKey);
+ return Keys.hmacShaKeyFor(keyBytes);
+ }
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/MetricsConfig.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/MetricsConfig.java
new file mode 100644
index 000000000..180f93b74
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/MetricsConfig.java
@@ -0,0 +1,21 @@
+package com.ingestion.config;
+
+import io.micrometer.core.aop.TimedAspect;
+import io.micrometer.core.instrument.MeterRegistry;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * Configuration for application metrics and monitoring.
+ */
+@Configuration
+public class MetricsConfig {
+
+ /**
+ * Creates a TimedAspect bean for method timing.
+ */
+ @Bean
+ public TimedAspect timedAspect(MeterRegistry registry) {
+ return new TimedAspect(registry);
+ }
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/ResourceNotFoundException.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/ResourceNotFoundException.java
new file mode 100644
index 000000000..eab29796b
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/ResourceNotFoundException.java
@@ -0,0 +1,15 @@
+package com.ingestion.config;
+
+/**
+ * Exception thrown when a requested resource is not found.
+ */
+public class ResourceNotFoundException extends RuntimeException {
+
+ public ResourceNotFoundException(String message) {
+ super(message);
+ }
+
+ public ResourceNotFoundException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/SecurityConfig.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/SecurityConfig.java
new file mode 100644
index 000000000..36e2e03bf
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/config/SecurityConfig.java
@@ -0,0 +1,62 @@
+package com.ingestion.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.security.config.annotation.method.configuration.EnableMethodSecurity;
+import org.springframework.security.config.annotation.web.builders.HttpSecurity;
+import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
+import org.springframework.security.config.http.SessionCreationPolicy;
+import org.springframework.security.web.SecurityFilterChain;
+import org.springframework.security.web.authentication.UsernamePasswordAuthenticationFilter;
+import org.springframework.web.cors.CorsConfiguration;
+import org.springframework.web.cors.CorsConfigurationSource;
+import org.springframework.web.cors.UrlBasedCorsConfigurationSource;
+
+import java.util.Arrays;
+
+@Configuration
+@EnableWebSecurity
+@EnableMethodSecurity
+public class SecurityConfig {
+
+ private final JwtAuthenticationFilter jwtAuthenticationFilter;
+
+ public SecurityConfig(JwtAuthenticationFilter jwtAuthenticationFilter) {
+ this.jwtAuthenticationFilter = jwtAuthenticationFilter;
+ }
+
+ @Bean
+ public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
+ http
+ .csrf(csrf -> csrf.disable())
+ .cors(cors -> cors.configurationSource(corsConfigurationSource()))
+ .authorizeHttpRequests(auth -> auth
+ .requestMatchers("/api/auth/**").permitAll()
+ .requestMatchers("/api/public/**").permitAll()
+ .requestMatchers("/swagger-ui/**", "/v3/api-docs/**").permitAll()
+ .requestMatchers("/api/**").authenticated()
+ .anyRequest().authenticated()
+ )
+ .sessionManagement(session -> session
+ .sessionCreationPolicy(SessionCreationPolicy.STATELESS)
+ )
+ .addFilterBefore(jwtAuthenticationFilter, UsernamePasswordAuthenticationFilter.class);
+
+ return http.build();
+ }
+
+ @Bean
+ public CorsConfigurationSource corsConfigurationSource() {
+ CorsConfiguration configuration = new CorsConfiguration();
+ configuration.setAllowedOrigins(Arrays.asList("*")); // In production, specify exact origins
+ configuration.setAllowedMethods(Arrays.asList("GET", "POST", "PUT", "DELETE", "OPTIONS"));
+ configuration.setAllowedHeaders(Arrays.asList("Authorization", "Content-Type", "X-Requested-With"));
+ configuration.setExposedHeaders(Arrays.asList("Authorization"));
+ configuration.setAllowCredentials(true);
+ configuration.setMaxAge(3600L);
+
+ UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
+ source.registerCorsConfiguration("/**", configuration);
+ return source;
+ }
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/controller/IngestionStatusController.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/controller/IngestionStatusController.java
new file mode 100644
index 000000000..8bf0b46bf
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/controller/IngestionStatusController.java
@@ -0,0 +1,115 @@
+package com.ingestion.controller;
+
+import com.ingestion.model.IngestionStatus;
+import com.ingestion.model.IngestionStatusEnum;
+import com.ingestion.service.IngestionStatusService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
+
+import java.util.List;
+
+@RestController
+@RequestMapping("/api/ingestion-status")
+public class IngestionStatusController {
+
+ @Autowired
+ private IngestionStatusService ingestionStatusService;
+
+ @PostMapping
+ public ResponseEntity createStatus(
+ @RequestParam String tableName,
+ @RequestParam String fileName) {
+ IngestionStatus status = ingestionStatusService.createIngestionStatus(tableName, fileName);
+ return ResponseEntity.ok(status);
+ }
+
+ @PutMapping("/{id}/status")
+ public ResponseEntity updateStatus(
+ @PathVariable String id,
+ @RequestParam IngestionStatusEnum status) {
+ IngestionStatus updatedStatus = ingestionStatusService.updateStatus(id, status);
+ if (updatedStatus != null) {
+ return ResponseEntity.ok(updatedStatus);
+ }
+ return ResponseEntity.notFound().build();
+ }
+
+ @PutMapping("/{id}/progress")
+ public ResponseEntity updateProgress(
+ @PathVariable String id,
+ @RequestParam long processedRows,
+ @RequestParam long failedRows) {
+ IngestionStatus updatedStatus = ingestionStatusService.updateProgress(id, processedRows, failedRows);
+ if (updatedStatus != null) {
+ return ResponseEntity.ok(updatedStatus);
+ }
+ return ResponseEntity.notFound().build();
+ }
+
+ @PutMapping("/{id}/total-rows")
+ public ResponseEntity updateTotalRows(
+ @PathVariable String id,
+ @RequestParam long totalRows) {
+ IngestionStatus updatedStatus = ingestionStatusService.updateTotalRows(id, totalRows);
+ if (updatedStatus != null) {
+ return ResponseEntity.ok(updatedStatus);
+ }
+ return ResponseEntity.notFound().build();
+ }
+
+ @PutMapping("/{id}/error")
+ public ResponseEntity updateErrorMessage(
+ @PathVariable String id,
+ @RequestParam String errorMessage) {
+ IngestionStatus updatedStatus = ingestionStatusService.updateErrorMessage(id, errorMessage);
+ if (updatedStatus != null) {
+ return ResponseEntity.ok(updatedStatus);
+ }
+ return ResponseEntity.notFound().build();
+ }
+
+ @PutMapping("/{id}/retry")
+ public ResponseEntity incrementRetryCount(
+ @PathVariable String id) {
+ IngestionStatus updatedStatus = ingestionStatusService.incrementRetryCount(id);
+ if (updatedStatus != null) {
+ return ResponseEntity.ok(updatedStatus);
+ }
+ return ResponseEntity.notFound().build();
+ }
+
+ @GetMapping("/table/{tableName}")
+ public ResponseEntity> getStatusByTableName(
+ @PathVariable String tableName) {
+ List statuses = ingestionStatusService.getStatusByTableName(tableName);
+ return ResponseEntity.ok(statuses);
+ }
+
+ @GetMapping("/file/{fileName}")
+ public ResponseEntity> getStatusByFileName(
+ @PathVariable String fileName) {
+ List statuses = ingestionStatusService.getStatusByFileName(fileName);
+ return ResponseEntity.ok(statuses);
+ }
+
+ @GetMapping("/incomplete")
+ public ResponseEntity> getIncompleteIngestions() {
+ List statuses = ingestionStatusService.getIncompleteIngestions();
+ return ResponseEntity.ok(statuses);
+ }
+
+ @GetMapping("/status/{status}")
+ public ResponseEntity> getStatusByStatus(
+ @PathVariable String status) {
+ List statuses = ingestionStatusService.getStatusByStatus(status);
+ return ResponseEntity.ok(statuses);
+ }
+
+ @GetMapping("/{id}")
+ public ResponseEntity getStatusById(@PathVariable String id) {
+ return ingestionStatusService.getStatusById(id)
+ .map(ResponseEntity::ok)
+ .orElse(ResponseEntity.notFound().build());
+ }
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/controller/TableMappingController.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/controller/TableMappingController.java
new file mode 100644
index 000000000..71a738439
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/controller/TableMappingController.java
@@ -0,0 +1,55 @@
+package com.ingestion.controller;
+
+import com.ingestion.dto.TableMappingRequest;
+import com.ingestion.entity.TableMapping;
+import com.ingestion.service.TableMappingService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
+
+import javax.validation.Valid;
+import java.util.List;
+
+@RestController
+@RequestMapping("/api/table-mappings")
+public class TableMappingController {
+
+ private final TableMappingService tableMappingService;
+
+ @Autowired
+ public TableMappingController(TableMappingService tableMappingService) {
+ this.tableMappingService = tableMappingService;
+ }
+
+ @PostMapping
+ public ResponseEntity createTableMapping(@Valid @RequestBody TableMappingRequest request) {
+ TableMapping tableMapping = tableMappingService.createTableMapping(request);
+ return ResponseEntity.ok(tableMapping);
+ }
+
+ @GetMapping("/{tableName}")
+ public ResponseEntity getTableMapping(@PathVariable String tableName) {
+ TableMapping tableMapping = tableMappingService.getTableMapping(tableName);
+ return ResponseEntity.ok(tableMapping);
+ }
+
+ @GetMapping
+ public ResponseEntity> getAllTableMappings() {
+ List tableMappings = tableMappingService.getAllTableMappings();
+ return ResponseEntity.ok(tableMappings);
+ }
+
+ @PutMapping("/{tableName}")
+ public ResponseEntity updateTableMapping(
+ @PathVariable String tableName,
+ @Valid @RequestBody TableMappingRequest request) {
+ TableMapping tableMapping = tableMappingService.updateTableMapping(tableName, request);
+ return ResponseEntity.ok(tableMapping);
+ }
+
+ @DeleteMapping("/{tableName}")
+ public ResponseEntity deleteTableMapping(@PathVariable String tableName) {
+ tableMappingService.deleteTableMapping(tableName);
+ return ResponseEntity.ok().build();
+ }
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/ConnectionRequest.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/ConnectionRequest.java
new file mode 100644
index 000000000..b004603d3
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/ConnectionRequest.java
@@ -0,0 +1,31 @@
+package com.ingestion.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import jakarta.validation.constraints.NotBlank;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ConnectionRequest {
+
+ @NotBlank
+ private String host;
+
+ @NotBlank
+ private String port;
+
+ @NotBlank
+ private String database;
+
+ @NotBlank
+ private String username;
+
+ @NotBlank
+ private String password;
+
+ private String connectionType; // CLICKHOUSE or FLATFILE
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/ErrorResponse.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/ErrorResponse.java
new file mode 100644
index 000000000..ce0670e41
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/ErrorResponse.java
@@ -0,0 +1,21 @@
+package com.ingestion.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+
+/**
+ * Standardized error response for the API.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class ErrorResponse {
+
+ private int status;
+ private String error;
+ private String message;
+ private LocalDateTime timestamp;
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/FileUploadRequest.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/FileUploadRequest.java
new file mode 100644
index 000000000..d8f7f31dc
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/FileUploadRequest.java
@@ -0,0 +1,29 @@
+package com.ingestion.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import jakarta.validation.constraints.NotBlank;
+import org.springframework.web.multipart.MultipartFile;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class FileUploadRequest {
+
+ @NotBlank(message = "File name is required")
+ private String fileName;
+
+ @NotBlank(message = "Table name is required")
+ private String tableName;
+
+ private String delimiter;
+
+ private Boolean hasHeader;
+
+ private String encoding;
+
+ private MultipartFile file;
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/FlatFileConfig.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/FlatFileConfig.java
new file mode 100644
index 000000000..192739cd7
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/FlatFileConfig.java
@@ -0,0 +1,37 @@
+package com.ingestion.dto;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import jakarta.validation.constraints.NotBlank;
+import jakarta.validation.constraints.NotNull;
+
+@Data
+@NoArgsConstructor
+public class FlatFileConfig {
+
+ @NotBlank(message = "File name is required")
+ private String fileName;
+
+ @NotBlank(message = "File path is required")
+ private String filePath;
+
+ @NotBlank(message = "File type is required")
+ private String fileType;
+
+ @NotBlank
+ private String delimiter;
+
+ @NotBlank
+ private String encoding;
+
+ @NotNull
+ private Boolean hasHeader;
+
+ @NotNull
+ private Boolean skipEmptyLines;
+
+ private String dateFormat;
+ private String timeFormat;
+ private String timestampFormat;
+ private String timezone;
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/IngestionRequest.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/IngestionRequest.java
new file mode 100644
index 000000000..43492a8a1
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/IngestionRequest.java
@@ -0,0 +1,25 @@
+package com.ingestion.dto;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import jakarta.validation.Valid;
+import jakarta.validation.constraints.NotBlank;
+import jakarta.validation.constraints.NotNull;
+
+@Data
+@NoArgsConstructor
+public class IngestionRequest {
+
+ @NotBlank(message = "Connection ID is required")
+ private String connectionId;
+
+ @NotBlank(message = "Table name is required")
+ private String tableName;
+
+ @NotBlank(message = "File name is required")
+ private String fileName;
+
+ @NotNull(message = "File configuration is required")
+ @Valid
+ private FlatFileConfig fileConfig;
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/IngestionStatusDTO.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/IngestionStatusDTO.java
new file mode 100644
index 000000000..b9148e717
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/IngestionStatusDTO.java
@@ -0,0 +1,21 @@
+package com.ingestion.dto;
+
+import com.ingestion.entity.IngestionStatusEnum;
+import lombok.Data;
+
+import java.time.LocalDateTime;
+
+@Data
+public class IngestionStatusDTO {
+ private String id;
+ private String tableName;
+ private String fileName;
+ private IngestionStatusEnum status;
+ private Integer progress;
+ private Long totalRows;
+ private String errorMessage;
+ private Integer retryCount;
+ private LocalDateTime createdAt;
+ private LocalDateTime updatedAt;
+ private LocalDateTime completedAt;
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/IngestionStatusRequest.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/IngestionStatusRequest.java
new file mode 100644
index 000000000..e769909f9
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/IngestionStatusRequest.java
@@ -0,0 +1,29 @@
+package com.ingestion.dto;
+
+import com.ingestion.entity.IngestionStatusEnum;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import jakarta.validation.constraints.NotBlank;
+import jakarta.validation.constraints.NotNull;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class IngestionStatusRequest {
+ @NotBlank
+ private String tableName;
+
+ @NotBlank
+ private String fileName;
+
+ @NotNull
+ private IngestionStatusEnum status;
+
+ private Integer progress;
+ private Long totalRows;
+ private String errorMessage;
+ private Integer retryCount;
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/TableMappingRequest.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/TableMappingRequest.java
new file mode 100644
index 000000000..18dc00922
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/dto/TableMappingRequest.java
@@ -0,0 +1,38 @@
+package com.ingestion.dto;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import jakarta.validation.Valid;
+import jakarta.validation.constraints.NotBlank;
+import jakarta.validation.constraints.NotEmpty;
+import java.util.List;
+
+@Data
+@NoArgsConstructor
+public class TableMappingRequest {
+
+ @NotBlank(message = "Table name is required")
+ private String tableName;
+
+ @NotBlank(message = "Connection ID is required")
+ private String connectionId;
+
+ @NotEmpty(message = "At least one column mapping is required")
+ @Valid
+ private List columnMappings;
+
+ @Data
+ @NoArgsConstructor
+ public static class ColumnMapping {
+ @NotBlank(message = "Source column name is required")
+ private String sourceColumn;
+
+ @NotBlank(message = "Target column name is required")
+ private String targetColumn;
+
+ private String dataType;
+ private String transformation;
+ private Boolean isNullable;
+ private String defaultValue;
+ }
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/entity/IngestionStatus.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/entity/IngestionStatus.java
new file mode 100644
index 000000000..5b927366d
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/entity/IngestionStatus.java
@@ -0,0 +1,73 @@
+package com.ingestion.entity;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import jakarta.persistence.*;
+import java.time.LocalDateTime;
+
+@Entity
+@Table(name = "ingestion_status")
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+public class IngestionStatus {
+
+ @Id
+ @GeneratedValue(strategy = GenerationType.IDENTITY)
+ private Long id;
+
+ @Column(name = "file_name", nullable = false)
+ private String fileName;
+
+ @Column(name = "table_name", nullable = false)
+ private String tableName;
+
+ @Column(name = "status", nullable = false)
+ @Enumerated(EnumType.STRING)
+ private IngestionStatusType status;
+
+ @Column(name = "records_processed")
+ private Long recordsProcessed;
+
+ @Column(name = "records_failed")
+ private Long recordsFailed;
+
+ @Column(name = "start_time")
+ private LocalDateTime startTime;
+
+ @Column(name = "end_time")
+ private LocalDateTime endTime;
+
+ @Column(name = "error_message")
+ private String errorMessage;
+
+ @Column(name = "created_at")
+ private LocalDateTime createdAt;
+
+ @Column(name = "updated_at")
+ private LocalDateTime updatedAt;
+
+ @PrePersist
+ protected void onCreate() {
+ createdAt = LocalDateTime.now();
+ updatedAt = LocalDateTime.now();
+ if (status == null) {
+ status = IngestionStatusType.PENDING;
+ }
+ if (recordsProcessed == null) {
+ recordsProcessed = 0L;
+ }
+ if (recordsFailed == null) {
+ recordsFailed = 0L;
+ }
+ }
+
+ @PreUpdate
+ protected void onUpdate() {
+ updatedAt = LocalDateTime.now();
+ }
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/entity/IngestionStatusEnum.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/entity/IngestionStatusEnum.java
new file mode 100644
index 000000000..4be6a84e3
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/entity/IngestionStatusEnum.java
@@ -0,0 +1,8 @@
+package com.ingestion.entity;
+
+public enum IngestionStatusEnum {
+ IN_PROGRESS,
+ SUCCESS,
+ FAILED,
+ CANCELLED
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/entity/TableMapping.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/entity/TableMapping.java
new file mode 100644
index 000000000..e8c02474b
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/entity/TableMapping.java
@@ -0,0 +1,50 @@
+package com.ingestion.entity;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.AllArgsConstructor;
+
+import jakarta.persistence.*;
+import java.time.LocalDateTime;
+import java.util.Map;
+
+@Entity
+@Table(name = "table_mappings")
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class TableMapping {
+
+ @Id
+ @GeneratedValue(strategy = GenerationType.IDENTITY)
+ private Long id;
+
+ @Column(name = "source_table", nullable = false)
+ private String sourceTable;
+
+ @Column(name = "target_table", nullable = false)
+ private String targetTable;
+
+ @ElementCollection
+ @CollectionTable(name = "column_mappings", joinColumns = @JoinColumn(name = "table_mapping_id"))
+ @MapKeyColumn(name = "source_column")
+ @Column(name = "target_column")
+ private Map columnMappings;
+
+ @Column(name = "created_at")
+ private LocalDateTime createdAt;
+
+ @Column(name = "updated_at")
+ private LocalDateTime updatedAt;
+
+ @PrePersist
+ protected void onCreate() {
+ createdAt = LocalDateTime.now();
+ updatedAt = LocalDateTime.now();
+ }
+
+ @PreUpdate
+ protected void onUpdate() {
+ updatedAt = LocalDateTime.now();
+ }
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/mapper/IngestionStatusMapper.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/mapper/IngestionStatusMapper.java
new file mode 100644
index 000000000..90a250b4a
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/mapper/IngestionStatusMapper.java
@@ -0,0 +1,51 @@
+package com.ingestion.mapper;
+
+import com.ingestion.dto.IngestionStatusDTO;
+import com.ingestion.entity.IngestionStatus;
+import org.springframework.stereotype.Component;
+
+@Component
+public class IngestionStatusMapper {
+
+ public IngestionStatusDTO toDTO(IngestionStatus entity) {
+ if (entity == null) {
+ return null;
+ }
+
+ IngestionStatusDTO dto = new IngestionStatusDTO();
+ dto.setId(entity.getId());
+ dto.setTableName(entity.getTableName());
+ dto.setFileName(entity.getFileName());
+ dto.setStatus(entity.getStatus());
+ dto.setProgress(entity.getProgress());
+ dto.setTotalRows(entity.getTotalRows());
+ dto.setErrorMessage(entity.getErrorMessage());
+ dto.setRetryCount(entity.getRetryCount());
+ dto.setCreatedAt(entity.getCreatedAt());
+ dto.setUpdatedAt(entity.getUpdatedAt());
+ dto.setCompletedAt(entity.getCompletedAt());
+
+ return dto;
+ }
+
+ public IngestionStatus toEntity(IngestionStatusDTO dto) {
+ if (dto == null) {
+ return null;
+ }
+
+ IngestionStatus entity = new IngestionStatus();
+ entity.setId(dto.getId());
+ entity.setTableName(dto.getTableName());
+ entity.setFileName(dto.getFileName());
+ entity.setStatus(dto.getStatus());
+ entity.setProgress(dto.getProgress());
+ entity.setTotalRows(dto.getTotalRows());
+ entity.setErrorMessage(dto.getErrorMessage());
+ entity.setRetryCount(dto.getRetryCount());
+ entity.setCreatedAt(dto.getCreatedAt());
+ entity.setUpdatedAt(dto.getUpdatedAt());
+ entity.setCompletedAt(dto.getCompletedAt());
+
+ return entity;
+ }
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/model/ClickHouseConnection.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/model/ClickHouseConnection.java
new file mode 100644
index 000000000..38cd42e56
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/model/ClickHouseConnection.java
@@ -0,0 +1,35 @@
+package com.ingestion.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.Builder;
+import jakarta.validation.constraints.NotBlank;
+import jakarta.validation.constraints.NotNull;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+public class ClickHouseConnection {
+
+ @NotBlank
+ private String host;
+
+ @NotNull
+ private Integer port;
+
+ @NotBlank
+ private String database;
+
+ @NotBlank
+ private String username;
+
+ @NotBlank
+ private String password;
+
+ @NotBlank(message = "JWT token is required")
+ private String jwtToken;
+
+ private boolean useHttps = false;
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/model/FlatFileConfig.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/model/FlatFileConfig.java
new file mode 100644
index 000000000..7a6c5a769
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/model/FlatFileConfig.java
@@ -0,0 +1,13 @@
+package com.ingestion.model;
+
+import lombok.Data;
+
+@Data
+public class FlatFileConfig {
+ private char delimiter;
+ private char quoteCharacter;
+ private char escapeCharacter;
+ private boolean hasHeader;
+ private boolean skipEmptyLines;
+ private boolean trimValues;
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/model/IngestionRequest.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/model/IngestionRequest.java
new file mode 100644
index 000000000..f38bed13a
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/model/IngestionRequest.java
@@ -0,0 +1,32 @@
+package com.ingestion.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import javax.validation.Valid;
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class IngestionRequest {
+
+ @NotBlank(message = "Table name is required")
+ private String tableName;
+
+ @NotNull(message = "ClickHouse connection details are required")
+ @Valid
+ private ClickHouseConnection connection;
+
+ @NotNull(message = "Flat file configuration is required")
+ @Valid
+ private FlatFileConfig fileConfig;
+
+ private String batchSize = "10000";
+
+ private String maxRetries = "3";
+
+ private String retryInterval = "5000"; // milliseconds
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/model/IngestionResponse.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/model/IngestionResponse.java
new file mode 100644
index 000000000..f11fcdf4d
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/model/IngestionResponse.java
@@ -0,0 +1,36 @@
+package com.ingestion.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class IngestionResponse {
+
+ private String status; // SUCCESS, FAILED, IN_PROGRESS
+
+ private String message;
+
+ private LocalDateTime startTime;
+
+ private LocalDateTime endTime;
+
+ private long totalRows;
+
+ private long processedRows;
+
+ private long failedRows;
+
+ private String errorDetails;
+
+ private String jobId;
+
+ @Builder.Default
+ private boolean isComplete = false;
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/model/IngestionStatus.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/model/IngestionStatus.java
new file mode 100644
index 000000000..315e4856e
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/model/IngestionStatus.java
@@ -0,0 +1,41 @@
+package com.ingestion.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class IngestionStatus {
+
+ private String jobId;
+
+ private String status; // SUCCESS, FAILED, IN_PROGRESS, CANCELLED
+
+ private LocalDateTime startTime;
+
+ private LocalDateTime lastUpdatedTime;
+
+ private long totalRows;
+
+ private long processedRows;
+
+ private long failedRows;
+
+ private String errorMessage;
+
+ private String tableName;
+
+ private String fileName;
+
+ @Builder.Default
+ private boolean isComplete = false;
+
+ @Builder.Default
+ private int retryCount = 0;
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/model/IngestionStatusEnum.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/model/IngestionStatusEnum.java
new file mode 100644
index 000000000..fc96acc50
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/model/IngestionStatusEnum.java
@@ -0,0 +1,19 @@
+package com.ingestion.model;
+
+public enum IngestionStatusEnum {
+
+ SUCCESS("Success"),
+ FAILED("Failed"),
+ IN_PROGRESS("In Progress"),
+ CANCELLED("Cancelled");
+
+ private final String displayName;
+
+ IngestionStatusEnum(String displayName) {
+ this.displayName = displayName;
+ }
+
+ public String getDisplayName() {
+ return displayName;
+ }
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/repository/IngestionStatusRepository.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/repository/IngestionStatusRepository.java
new file mode 100644
index 000000000..cdfedec37
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/repository/IngestionStatusRepository.java
@@ -0,0 +1,20 @@
+package com.ingestion.repository;
+
+import com.ingestion.entity.IngestionStatus;
+import com.ingestion.entity.IngestionStatusEnum;
+import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
+
+@Repository
+public interface IngestionStatusRepository extends JpaRepository {
+
+ List findByTableName(String tableName);
+
+ List findByFileName(String fileName);
+
+ List findByStatus(IngestionStatusEnum status);
+
+ List findByCompletedAtIsNull();
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/repository/TableMappingRepository.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/repository/TableMappingRepository.java
new file mode 100644
index 000000000..98d92ea97
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/repository/TableMappingRepository.java
@@ -0,0 +1,15 @@
+package com.ingestion.repository;
+
+import com.ingestion.entity.TableMapping;
+import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.stereotype.Repository;
+
+import java.util.Optional;
+
+@Repository
+public interface TableMappingRepository extends JpaRepository {
+
+ Optional findByTableName(String tableName);
+
+ boolean existsByTableName(String tableName);
+}
\ No newline at end of file
diff --git a/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/service/ClickHouseConnectionService.java b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/service/ClickHouseConnectionService.java
new file mode 100644
index 000000000..bf44c0a38
--- /dev/null
+++ b/clickhouse-flatfile-ingestion/backend/src/main/java/com/ingestion/service/ClickHouseConnectionService.java
@@ -0,0 +1,274 @@
+package com.ingestion.service;
+
+import com.ingestion.dto.ConnectionRequest;
+import com.ingestion.model.ClickHouseConnection;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import ru.yandex.clickhouse.ClickHouseDataSource;
+import ru.yandex.clickhouse.settings.ClickHouseProperties;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Service
+@Slf4j
+public class ClickHouseConnectionService {
+
+ private final Map connectionPool = new ConcurrentHashMap<>();
+
+ /**
+ * Creates a connection to ClickHouse database
+ *
+ * @param connectionRequest Connection details
+ * @return Connection ID
+ */
+ public String createConnection(ConnectionRequest connectionRequest) {
+ String connectionId = generateConnectionId(connectionRequest);
+
+ if (connectionPool.containsKey(connectionId)) {
+ log.info("Connection {} already exists", connectionId);
+ return connectionId;
+ }
+
+ try {
+ ClickHouseProperties properties = new ClickHouseProperties();
+ properties.setUser(connectionRequest.getUsername());
+ properties.setPassword(connectionRequest.getPassword());
+
+ String url = String.format("jdbc:clickhouse://%s:%s/%s",
+ connectionRequest.getHost(),
+ connectionRequest.getPort(),
+ connectionRequest.getDatabase());
+
+ ClickHouseDataSource dataSource = new ClickHouseDataSource(url, properties);
+
+ // Test connection
+ try (Connection connection = dataSource.getConnection()) {
+ log.info("Successfully connected to ClickHouse database: {}", connectionRequest.getDatabase());
+ }
+
+ connectionPool.put(connectionId, dataSource);
+ return connectionId;
+ } catch (SQLException e) {
+ log.error("Failed to create connection to ClickHouse: {}", e.getMessage());
+ throw new RuntimeException("Failed to create connection to ClickHouse: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Tests a connection to ClickHouse database
+ *
+ * @param connectionRequest Connection details
+ * @return true if connection is successful, false otherwise
+ */
+ public boolean testConnection(ConnectionRequest connectionRequest) {
+ try {
+ ClickHouseProperties properties = new ClickHouseProperties();
+ properties.setUser(connectionRequest.getUsername());
+ properties.setPassword(connectionRequest.getPassword());
+
+ String url = String.format("jdbc:clickhouse://%s:%s/%s",
+ connectionRequest.getHost(),
+ connectionRequest.getPort(),
+ connectionRequest.getDatabase());
+
+ ClickHouseDataSource dataSource = new ClickHouseDataSource(url, properties);
+
+ try (Connection connection = dataSource.getConnection()) {
+ log.info("Successfully tested connection to ClickHouse database: {}", connectionRequest.getDatabase());
+ return true;
+ }
+ } catch (SQLException e) {
+ log.error("Failed to test connection to ClickHouse: {}", e.getMessage());
+ return false;
+ }
+ }
+
+ /**
+ * Gets a connection from the pool
+ *
+ * @param connectionId Connection ID
+ * @return Connection
+ */
+ public Connection getConnection(String connectionId) {
+ ClickHouseDataSource dataSource = connectionPool.get(connectionId);
+ if (dataSource == null) {
+ throw new RuntimeException("Connection not found: " + connectionId);
+ }
+
+ try {
+ return dataSource.getConnection();
+ } catch (SQLException e) {
+ log.error("Failed to get connection: {}", e.getMessage());
+ throw new RuntimeException("Failed to get connection: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Gets all tables from a database
+ *
+ * @param connectionId Connection ID
+ * @return List of table names
+ */
+ public List getTables(String connectionId) {
+ List tables = new ArrayList<>();
+
+ try (Connection connection = getConnection(connectionId);
+ ResultSet resultSet = connection.getMetaData().getTables(null, null, "%", new String[]{"TABLE"})) {
+
+ while (resultSet.next()) {
+ tables.add(resultSet.getString("TABLE_NAME"));
+ }
+
+ return tables;
+ } catch (SQLException e) {
+ log.error("Failed to get tables: {}", e.getMessage());
+ throw new RuntimeException("Failed to get tables: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Gets the schema of a table
+ *
+ * @param connectionId Connection ID
+ * @param tableName Table name
+ * @return Map of column names to column types
+ */
+ public Map getTableSchema(String connectionId, String tableName) {
+ Map schema = new HashMap<>();
+
+ try (Connection connection = getConnection(connectionId);
+ ResultSet resultSet = connection.getMetaData().getColumns(null, null, tableName, "%")) {
+
+ while (resultSet.next()) {
+ String columnName = resultSet.getString("COLUMN_NAME");
+ String columnType = resultSet.getString("TYPE_NAME");
+ schema.put(columnName, columnType);
+ }
+
+ return schema;
+ } catch (SQLException e) {
+ log.error("Failed to get table schema: {}", e.getMessage());
+ throw new RuntimeException("Failed to get table schema: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Executes a query and returns the result
+ *
+ * @param connectionId Connection ID
+ * @param query SQL query
+ * @return ResultSet
+ */
+ public ResultSet executeQuery(String connectionId, String query) {
+ try {
+ Connection connection = getConnection(connectionId);
+ return connection.createStatement().executeQuery(query);
+ } catch (SQLException e) {
+ log.error("Failed to execute query: {}", e.getMessage());
+ throw new RuntimeException("Failed to execute query: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Executes a query with parameters and returns the result
+ *
+ * @param connectionId Connection ID
+ * @param query SQL query with parameters
+ * @param parameters Query parameters
+ * @return ResultSet
+ */
+ public ResultSet executeQuery(String connectionId, String query, Object... parameters) {
+ try {
+ Connection connection = getConnection(connectionId);
+ java.sql.PreparedStatement preparedStatement = connection.prepareStatement(query);
+
+ for (int i = 0; i < parameters.length; i++) {
+ preparedStatement.setObject(i + 1, parameters[i]);
+ }
+
+ return preparedStatement.executeQuery();
+ } catch (SQLException e) {
+ log.error("Failed to execute query with parameters: {}", e.getMessage());
+ throw new RuntimeException("Failed to execute query with parameters: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Executes a batch insert
+ *
+ * @param connectionId Connection ID
+ * @param tableName Table name
+ * @param columns Column names
+ * @param values List of value arrays
+ * @return Number of rows affected
+ */
+ public int executeBatchInsert(String connectionId, String tableName, List columns, List