A production-ready event-driven file processing system built with NestJS, Azure Service Bus, PostgreSQL, and Docker.
| Requirement | Status | Implementation |
|---|---|---|
| File-Uploader (API) | ||
| Accept batch of files (multipart/form-data) | ✅ | POST /files/upload with multer |
| Store files in Docker volume | ✅ | Azure Blob Storage emulator (Azurite) |
| Save metadata to DB with status=1 | ✅ | PostgreSQL with Prisma ORM |
| Publish message per file to queue | ✅ | Azure Service Bus emulator |
| Endpoint: Upload batch of files | ✅ | POST /files/upload |
| Endpoint: List files | ✅ | GET /files with pagination & filtering |
| Endpoint: Single file metadata | ✅ | GET /files/:id |
| Endpoint: Health check | ✅ | GET /health |
| File-Worker (Consumer) | ||
| Subscribe to queue | ✅ | Azure Service Bus receiver |
| Process messages concurrently | ✅ | p-queue with configurable concurrency |
| Load file metadata from DB | ✅ | PostgreSQL lookup |
| Compute checksum | ✅ | SHA-256 streaming checksum |
| Update status=2 (processed) | ✅ | Status update with timestamp |
| Endpoint: Health check | ✅ | GET /health |
| Non-Functional | ||
| NestJS + TypeScript | ✅ | Both services |
| Docker containerized | ✅ | Multi-stage Dockerfiles |
| docker-compose orchestration | ✅ | Full stack orchestration |
| PostgreSQL database | ✅ | PostgreSQL 16 Alpine |
| Modular structure | ✅ | Feature-based modules |
| Dependency injection | ✅ | NestJS DI throughout |
| Error handling | ✅ | Global exception filter |
| Logging | ✅ | Structured logging with correlation IDs |
| Health checks | ✅ | NestJS Terminus |
| Feature | Description |
|---|---|
| Swagger/OpenAPI | Interactive API documentation at /api |
| Azure Blob Storage | Production-ready storage with Azurite emulator |
| Dead Letter Queue | Failed message handling with DLQ management endpoints |
| Distributed Tracing | OpenTelemetry + Jaeger integration |
| Resilience Patterns | Circuit breaker, retry policies, timeouts (Cockatiel) |
| File Validation | MIME type checking, blocked extensions, filename sanitization |
| Rate Limiting | API rate limiting for upload endpoints |
| Metrics Endpoint | Processing statistics and queue status |
| Correlation IDs | Request tracing across services |
| Shared Library | Reusable code in @file-processing/shared package |
| Repository Pattern | Clean data access abstraction |
| Environment Validation | class-validator for config validation |
| Batch Upload Tracking | Upload batch IDs for grouping files |
| Feature | Description |
|---|---|
| JWT Authentication | Implement JSON Web Token authentication for API endpoints with refresh token support |
| OAuth2/OIDC Integration | Support for external identity providers (Azure AD, Google, Okta) |
| API Key Management | Generate and manage API keys for service-to-service communication |
| Virus Scanning | Integrate ClamAV or cloud-based antivirus to scan uploaded files before processing |
| Feature | Description |
|---|---|
| File Download API | GET /files/:id/download endpoint to retrieve processed files |
| Chunked Uploads | Support for resumable uploads of large files (>100MB) using tus protocol |
| File Versioning | Track file versions and allow rollback to previous versions |
| Bulk Operations | Delete, archive, or reprocess multiple files in a single request |
| File Expiration | Auto-delete files after configurable retention period |
| File Compression | Compress files before storage to save space |
| Duplicate Detection | Detect and handle duplicate file uploads using content hashing |
| Feature | Description |
|---|---|
| Scheduled Processing | Delay file processing to specific times |
| Processing Webhooks | Notify external systems when file processing completes or fails |
| Retry Dashboard | UI to view and manually retry failed processing jobs |
| Feature | Description |
|---|---|
| Alerting Integration | PagerDuty/Slack alerts for processing failures and system issues |
| Feature | Description |
|---|---|
| Kubernetes Manifests | Helm charts and K8s deployment configurations |
| CI/CD Pipeline | GitHub Actions for automated testing, building, and deployment |
| Infrastructure as Code | Terraform/Pulumi scripts for Azure resource provisioning |
| Auto-scaling | Horizontal pod autoscaling based on queue depth |
| Blue-Green Deployment | Zero-downtime deployment strategy |
| Secrets Management | Azure Key Vault integration for secure credential storage |
| Feature | Description |
|---|---|
| Load Testing | k6 or Artillery scripts for performance testing |
| Feature | Description |
|---|---|
| Admin Dashboard | Web UI for file management, monitoring, and configuration |
| Upload Progress | Real-time upload progress with WebSocket updates |
| Batch Status API | Track processing status of entire upload batches |
| Search & Filtering | Full-text search and advanced filtering for file metadata |
| File Preview | In-browser preview for common file types (PDF, images, text) |
| Feature | Description |
|---|---|
| Multi-region Support | Deploy across multiple Azure regions for geo-redundancy |
| Message Partitioning | Partition Service Bus queues for higher throughput |
| Read Replicas | PostgreSQL read replicas for query scalability |
┌─────────────┐ ┌──────────────────┐ ┌────────────────┐
│ Client │─────▶│ File-Uploader │─────▶│ PostgreSQL │
└─────────────┘ │ (Port 3000) │ └────────────────┘
└────────┬─────────┘ ▲
│ │
┌────────▼─────────┐ │
│ Azure Blob │ │
│ Storage │ │
│ (Azurite) │ │
└──────────────────┘ │
│ │
┌────────▼─────────┐ │
│ Azure Service │ │
│ Bus Emulator │ │
│ (Port 5672) │ │
└────────┬─────────┘ │
│ │
┌────────▼─────────┐ │
│ File-Worker │──────────────┘
│ (Port 3001) │
└────────┬─────────┘
│
┌────────▼─────────┐
│ Jaeger │
│ (Port 16686) │
└──────────────────┘
- Framework: NestJS 11 (TypeScript)
- Database: PostgreSQL 16 with Prisma ORM
- Messaging: Azure Service Bus Emulator
- Storage: Azure Blob Storage (Azurite Emulator)
- Tracing: OpenTelemetry + Jaeger
- Containerization: Docker + Docker Compose
- API Documentation: Swagger/OpenAPI
- Testing: Jest + jest-mock-extended
- Docker Desktop (Windows/Mac) or Docker Engine (Linux)
- Docker Compose v2.0+
- Git
git clone <repository-url>
cd gfr.aiCopy the example environment file:
cp .env.example .envThe default values work out of the box for local development.
docker-compose up --buildThis starts all 7 services:
- PostgreSQL (port 5432) - Main database
- MSSQL (port 1433) - Required for Service Bus emulator
- Azure Service Bus Emulator (ports 5672, 5300) - Message queue
- Azurite (ports 10000-10002) - Blob storage emulator
- Jaeger (port 16686) - Distributed tracing UI
- File-Uploader (port 3000) - Upload API
- File-Worker (port 3001) - Background processor
Wait for all health checks to pass (about 30-60 seconds):
docker-compose psAll services should show "healthy" status.
| Service | URL |
|---|---|
| Swagger API Docs | http://localhost:3000/api |
| File-Uploader Health | http://localhost:3000/health |
| File-Worker Health | http://localhost:3001/health |
| File-Worker Metrics | http://localhost:3001/metrics |
| Jaeger UI | http://localhost:16686 |
# Create test files
echo "Hello World" > test1.txt
echo "Test Content" > test2.txt
# Upload files
curl -X POST http://localhost:3000/files/upload \
-F "files=@test1.txt" \
-F "files=@test2.txt"Response:
{
"success": true,
"uploadBatchId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"filesUploaded": 2,
"files": [
{
"id": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
"originalName": "test1.txt",
"status": "UPLOADED",
"sizeBytes": "12"
}
]
}# Get all files
curl http://localhost:3000/files
# Filter by status
curl "http://localhost:3000/files?status=PROCESSED"
# With pagination
curl "http://localhost:3000/files?limit=10&offset=0"curl http://localhost:3000/files/<file-id># File-Uploader health
curl http://localhost:3000/health
# File-Worker health
curl http://localhost:3001/health
# Detailed health
curl http://localhost:3000/health/detailedcurl http://localhost:3001/metricsResponse:
{
"totalProcessed": 100,
"successfulProcessed": 98,
"failedProcessed": 2,
"successRate": "98.00%",
"currentQueueSize": 0,
"activeProcessing": 0
}# 1. Upload files
curl -X POST http://localhost:3000/files/upload \
-F "files=@test1.txt" \
-F "files=@test2.txt"
# 2. Note the file IDs from response
# 3. Wait 2-3 seconds for processing
# 4. Check file status (should be "PROCESSED")
curl http://localhost:3000/files/<file-id>
# 5. Verify checksum is populated in response
# 6. Check worker metrics
curl http://localhost:3001/metricsAll configuration is in .env file:
# Application
NODE_ENV=development
# Database
DB_HOST=postgres
DB_PORT=5432
DB_USERNAME=admin
DB_PASSWORD=admin123
DB_DATABASE=file_processing
# Azure Service Bus
AZURE_SERVICE_BUS_CONNECTION_STRING=Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;
AZURE_SERVICE_BUS_QUEUE_NAME=file-uploaded-queue
# Azure Blob Storage
AZURE_BLOB_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=...;BlobEndpoint=http://azurite:10000/devstoreaccount1;
AZURE_BLOB_STORAGE_CONTAINER_NAME=file-uploads
# File Upload Limits
MAX_FILE_SIZE=10485760
MAX_FILES_PER_BATCH=10
# Worker Configuration
WORKER_CONCURRENCY=5gfr.ai/
├── shared/ # Shared library package
│ ├── src/
│ │ ├── config/ # Environment validation
│ │ ├── core/ # Filters, interceptors, middleware
│ │ ├── database/ # Prisma service
│ │ ├── dead-letter/ # DLQ handling
│ │ ├── dto/ # Shared DTOs
│ │ ├── interfaces/ # Contracts
│ │ ├── repositories/ # Data access layer
│ │ ├── resilience/ # Circuit breaker, retry
│ │ ├── storage/ # Blob storage service
│ │ ├── tracing/ # OpenTelemetry setup
│ │ └── testing/ # Test utilities
│ └── prisma/ # Database schema
│
├── file-uploader/ # Upload API service
│ ├── src/
│ │ ├── config/ # Service configuration
│ │ ├── files/ # File upload module
│ │ ├── health/ # Health checks
│ │ ├── messaging/ # Service Bus publisher
│ │ ├── metrics/ # Metrics service
│ │ └── storage/ # File storage
│ └── Dockerfile
│
├── file-worker/ # Background processor
│ ├── src/
│ │ ├── config/ # Service configuration
│ │ ├── dead-letter/ # DLQ management endpoints
│ │ ├── health/ # Health checks
│ │ ├── messaging/ # Service Bus consumer
│ │ ├── metrics/ # Processing metrics
│ │ └── processor/ # File processing logic
│ └── Dockerfile
│
├── docker-compose.yml # Container orchestration
├── Config.json # Service Bus emulator config
├── .env.example # Environment template
└── README.md
- Start infrastructure services:
docker-compose up postgres azurite servicebus-emulator mssql jaeger -d- Install dependencies:
npm install- Run services in development mode:
# Terminal 1 - File-Uploader
cd file-uploader && npm run start:dev
# Terminal 2 - File-Worker
cd file-worker && npm run start:dev# Shared library
cd shared && npm run build
# File-Uploader tests
cd file-uploader && npm test
# File-Worker tests
cd file-worker && npm test# Generate migration
cd shared && npx prisma migrate dev --name <migration-name>
# Apply migrations (in Docker)
docker-compose exec file-uploader npx prisma migrate deploy# Clean up everything and restart
docker-compose down -v
docker-compose up --build# Check PostgreSQL logs
docker-compose logs postgres
# Access database directly
docker exec -it file-processing-postgres psql -U admin -d file_processing
# Check file metadata
SELECT id, original_name, status, checksum, processed_at FROM "FileMetadata";- Check worker logs:
docker-compose logs file-worker- Verify Service Bus receiver:
curl http://localhost:3001/health- Check dead letter queue:
curl http://localhost:3001/dead-letter/statusOpen Jaeger UI at http://localhost:16686 to view request traces across services.
- Azure Service Bus: Create namespace, update connection string (remove
UseDevelopmentEmulator=true) - Azure Blob Storage: Create storage account, update connection string
- Azure Database for PostgreSQL: Create managed database, update connection string
- Azure Container Apps/AKS: Deploy containers with managed identities
- Add rate limiting
- Implement file type validation
- Configure CORS properly
This project was created as an assignment for GFR Software GmbH.
Waseem Bakhsh