Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
8 changes: 4 additions & 4 deletions apps/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
"lint": "tsc --noEmit"
},
"dependencies": {
"@fastify/cors": "^11.2.0",
"@fastify/helmet": "^13.0.2",
"@fastify/cors": "^9.0.1",
"@fastify/helmet": "^11.1.1",
"@fastify/multipart": "^8.3.0",
"@fastify/rate-limit": "^10.3.0",
"@fastify/rate-limit": "^9.1.0",
"@langchain/core": "^0.3.0",
"@prisma/client": "^5.22.0",
"bullmq": "^5.12.0",
"dotenv": "^16.4.0",
"fastembed": "^2.0.0",
"fastify": "^4.28.0",
"fastify": "^4.29.1",
"ioredis": "^5.4.0",
"langchain": "^0.3.0",
"pino": "^9.0.0",
Expand Down
32 changes: 25 additions & 7 deletions apps/backend/src/app.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import multipart from '@fastify/multipart';
import Fastify, { FastifyInstance } from 'fastify';
import { logger } from './logging/logger.js';
import { metricsHook, metricsRoute } from './metrics/prometheus.js';
import { authMiddleware } from './middleware/auth-middleware.js';
import { configureRateLimit } from './middleware/rate-limit.js';
import { configureSecurity, securityHooks } from './middleware/security.js';
import { closeQueue } from './queue/processing-queue.js';
import { initWorker, shutdownWorker } from './queue/worker-init.js';
import { listRoute } from './routes/documents/list-route.js';
import { statusRoute } from './routes/documents/status-route.js';
import { uploadRoute } from './routes/documents/upload-route.js';
Expand All @@ -30,6 +33,13 @@ export async function createApp(): Promise<FastifyInstance> {
// Rate limiting
await configureRateLimit(app);

// Register multipart
await app.register(multipart, {
limits: {
fileSize: 50 * 1024 * 1024, // 50MB
},
});

// Metrics collection
metricsHook(app);

Expand All @@ -42,17 +52,25 @@ export async function createApp(): Promise<FastifyInstance> {
// Internal routes (no auth)
await callbackRoute(app);

// Auth middleware for protected routes
app.addHook('onRequest', authMiddleware);
// Initialize BullMQ worker (if not in test mode)
if (process.env.NODE_ENV !== 'test') {
initWorker();
}

// Register protected routes
await uploadRoute(app);
await statusRoute(app);
await listRoute(app);
await searchRoute(app);
// Register protected routes (Auth applied here)
await app.register(async (protectedScope) => {
protectedScope.addHook('onRequest', authMiddleware);

await uploadRoute(protectedScope);
await statusRoute(protectedScope);
await listRoute(protectedScope);
await searchRoute(protectedScope);
});

// Cleanup on shutdown
app.addHook('onClose', async () => {
await shutdownWorker();
await closeQueue();
await disconnectPrisma();
});

Expand Down
1 change: 1 addition & 0 deletions apps/backend/src/queue/job-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export function createJobProcessor(connection: any): Worker<ProcessingJob> {
{
connection,
concurrency: 5,
lockDuration: 300000,
}
);

Expand Down
31 changes: 12 additions & 19 deletions apps/backend/src/queue/processing-queue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Queue } from 'bullmq';
import IORedis from 'ioredis';
import { Redis } from 'ioredis';

export interface ProcessingJob {
documentId: string;
Expand All @@ -11,47 +11,40 @@ export interface ProcessingJob {
};
}

let queue: Queue<ProcessingJob> | null = null;
// Bỏ generic, để TS tự infer
let queue: Queue | null = null;

export function createProcessingQueue(): Queue<ProcessingJob> {
if (queue) return queue;
if (queue) return queue as Queue<ProcessingJob>;

const connection = new IORedis(process.env.REDIS_URL!, {
const connection = new Redis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null,
});

queue = new Queue<ProcessingJob>('document-processing', {
queue = new Queue('document-processing', {
connection,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 5000, // 5s → 10s → 20s
delay: 5000,
},
timeout: 300000, // 5 minutes
removeOnComplete: {
age: 3600, // 1 hour
age: 3600,
count: 1000,
},
removeOnFail: {
age: 86400, // 24 hours
age: 86400,
},
},
});

return queue;
return queue as Queue<ProcessingJob>;
}

export function getProcessingQueue(): Queue<ProcessingJob> {
if (!queue) {
return createProcessingQueue();
}
return queue;
}

export async function closeQueue(): Promise<void> {
if (queue) {
await queue.close();
queue = null;
}
}
return queue as Queue<ProcessingJob>;
}
33 changes: 33 additions & 0 deletions apps/backend/src/queue/worker-init.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { Worker } from 'bullmq';
import { Redis } from 'ioredis';
import { createJobProcessor } from './job-processor.js';

let worker: Worker | null = null;

/**
* Initialize the document processing worker
*/
export function initWorker(): Worker {
if (worker) return worker;

const connection = new Redis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null,
});

worker = createJobProcessor(connection);

console.log('🤖 BullMQ Worker initialized');

return worker;
}

/**
* Gracefully shut down the worker
*/
export async function shutdownWorker(): Promise<void> {
if (worker) {
await worker.close();
worker = null;
console.log('🤖 BullMQ Worker shut down');
}
}
4 changes: 2 additions & 2 deletions apps/backend/src/routes/documents/list-route.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { getPrismaClient } from '@/services/database';
import { ListQuerySchema } from '@/validators';
import { getPrismaClient } from '@/services/database.js';
import { ListQuerySchema } from '@/validators/index.js';
import { FastifyInstance } from 'fastify';

export async function listRoute(fastify: FastifyInstance): Promise<void> {
Expand Down
29 changes: 9 additions & 20 deletions apps/backend/src/routes/documents/upload-route.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,17 @@
import { ChunkerService, EmbeddingService, HashService } from '@/services';
import { getPrismaClient } from '@/services/database';
import { detectFormat, getProcessingLane, validateUpload } from '@/validators';
import multipart from '@fastify/multipart';
import { getProcessingQueue } from '@/queue/processing-queue.js';
import { getPrismaClient } from '@/services/database.js';
import { ChunkerService, EmbeddingService, HashService } from '@/services/index.js';
import { detectFormat, getProcessingLane, validateUpload } from '@/validators/index.js';
import { FastifyInstance } from 'fastify';
import { mkdir, readFile, rm, writeFile } from 'fs/promises';
import path, { basename } from 'path';

const UPLOAD_DIR = process.env.UPLOAD_DIR || '/tmp/uploads';

// Mock queue for now - will be replaced in Phase 05
const mockQueue = {
add: async (name: string, data: any) => {
console.log(`[Mock Queue] Job added: ${name}`, data);
},
};
// Queue initialization
const queue = getProcessingQueue();

export async function uploadRoute(fastify: FastifyInstance): Promise<void> {
// Register multipart
await fastify.register(multipart, {
limits: {
fileSize: 50 * 1024 * 1024, // 50MB
},
});

fastify.post('/api/documents', async (request, reply) => {
try {
console.log('📤 Upload request received');
Expand Down Expand Up @@ -215,10 +204,10 @@ export async function uploadRoute(fastify: FastifyInstance): Promise<void> {
} else {
// Heavy lane: Queue for processing (PDF)
console.log('📬 Adding to queue...');
await mockQueue.add('process', {
await queue.add('process', {
documentId: document.id,
filePath,
format,
filePath: filePath,
format: format as any,
config: {
ocrMode: 'auto',
ocrLanguages: ['en'],
Expand Down
2 changes: 2 additions & 0 deletions apps/backend/vitest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ export default defineConfig({
alias: {
'@': path.resolve(__dirname, 'src'),
'@tests': path.resolve(__dirname, '../../tests'),
// Fix Prisma resolution for shared tests
'@prisma/client': path.resolve(__dirname, 'node_modules/@prisma/client'),
},
},
});
Loading
Loading