Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,6 @@ apps/backend/prisma/migrations/
uploads/


apps/backend/local_cache/
apps/backend/local_cache/

venv/
Binary file modified apps/ai-worker/.coverage
Binary file not shown.
91 changes: 52 additions & 39 deletions apps/backend/src/queue/job-processor.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,40 @@
import { UnrecoverableError, Worker } from 'bullmq';
import { Redis } from 'ioredis';
import { getPrisma } from '../services/database.js';
import { ProcessingJob } from './processing-queue.js';

// Error codes that should not be retried
const PERMANENT_ERRORS = [
'PASSWORD_PROTECTED',
'CORRUPT_FILE',
'UNSUPPORTED_FORMAT',
];
] as const;

export function createJobProcessor(connection: any): Worker<ProcessingJob> {
export function createJobProcessor(connection: Redis): Worker<ProcessingJob> {
const worker = new Worker<ProcessingJob>(
'document-processing',
async (job) => {
const prisma = getPrisma();

// Update status to PROCESSING
await prisma.document.update({
where: { id: job.data.documentId },
data: {
status: 'PROCESSING',
retryCount: job.attemptsMade,
},
});
try {
await prisma.document.update({
where: { id: job.data.documentId },
data: {
status: 'PROCESSING',
retryCount: job.attemptsMade,
},
});

// Note: Actual processing happens in Python worker
// This Node.js worker just updates status and waits for callback
// In production, this would not process here but wait for Python

job.log(`Processing document ${job.data.documentId}`);

// The actual processing is done by Python worker polling Redis
// This processor just marks the document as PROCESSING
job.log(`Processing document ${job.data.documentId}`);

// TODO: Implement actual processing hoặc wait for Python callback

} catch (error) {
// Nếu document không tồn tại, không retry
if ((error as any)?.code === 'P2025') {
throw new UnrecoverableError('Document not found');
}
throw error;
}
},
{
connection,
Expand All @@ -40,38 +43,48 @@ export function createJobProcessor(connection: any): Worker<ProcessingJob> {
}
);

// Handle job completion (via callback)
worker.on('completed', async (job) => {
worker.on('completed', (job) => {
console.log(`Job ${job?.id} completed`);
});

// Handle job failure
worker.on('failed', async (job, err) => {
if (!job) return;

const prisma = getPrisma();
const isPermanent = PERMANENT_ERRORS.some(code =>
err.message.includes(code)
);
try {
const prisma = getPrisma();
const isPermanent = PERMANENT_ERRORS.some(code =>
err.message.includes(code)
);

// Fix: Check cả UnrecoverableError
const shouldMarkFailed =
isPermanent ||
err instanceof UnrecoverableError ||
job.attemptsMade >= (job.opts.attempts ?? 3);

if (isPermanent || job.attemptsMade >= 3) {
await prisma.document.update({
where: { id: job.data.documentId },
data: {
status: 'FAILED',
failReason: err.message,
retryCount: job.attemptsMade,
},
});
if (shouldMarkFailed) {
await prisma.document.update({
where: { id: job.data.documentId },
data: {
status: 'FAILED',
failReason: err.message.slice(0, 500), // Truncate để tránh DB error
retryCount: job.attemptsMade,
},
});
}
} catch (updateError) {
console.error('Failed to update document status:', updateError);
}
});

// Thêm error handler cho worker
worker.on('error', (err) => {
console.error('Worker error:', err);
});

return worker;
}

/**
* Mark error as permanent (no retry)
*/
export function permanentError(message: string): UnrecoverableError {
return new UnrecoverableError(message);
}
}
33 changes: 25 additions & 8 deletions apps/backend/src/queue/processing-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,23 @@ export interface ProcessingJob {
};
}

// Bỏ generic, để TS tự infer
let queue: Queue | null = null;
let queue: Queue<ProcessingJob> | null = null;
let connection: Redis | null = null; // Track connection

export function createProcessingQueue(): Queue<ProcessingJob> {
if (queue) return queue as Queue<ProcessingJob>;
export function createProcessingQueue(forceNew = false): Queue<ProcessingJob> {
if (queue && !forceNew) return queue;

const connection = new Redis(process.env.REDIS_URL!, {
if (forceNew && queue) {
queue.close().catch(console.error);
connection?.disconnect();
}

connection = new Redis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null,
enableReadyCheck: false,
});

queue = new Queue('document-processing', {
queue = new Queue<ProcessingJob>('document-processing', {
connection,
defaultJobOptions: {
attempts: 3,
Expand All @@ -39,12 +45,23 @@ export function createProcessingQueue(): Queue<ProcessingJob> {
},
});

return queue as Queue<ProcessingJob>;
return queue;
}

export function getProcessingQueue(): Queue<ProcessingJob> {
if (!queue) {
return createProcessingQueue();
}
return queue as Queue<ProcessingJob>;
return queue;
}

export async function closeQueue(): Promise<void> {
if (queue) {
await queue.close();
queue = null;
}
if (connection) {
connection.disconnect();
connection = null;
}
}
18 changes: 9 additions & 9 deletions apps/backend/src/queue/worker-init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ import { Redis } from 'ioredis';
import { createJobProcessor } from './job-processor.js';

let worker: Worker | null = null;
let connection: Redis | null = null;

/**
* Initialize the document processing worker
*/
export function initWorker(): Worker {
if (worker) return worker;

const connection = new Redis(process.env.REDIS_URL!, {
connection = new Redis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null,
enableReadyCheck: false,
});

worker = createJobProcessor(connection);
Expand All @@ -21,13 +20,14 @@ export function initWorker(): Worker {
return worker;
}

/**
* Gracefully shut down the worker
*/
export async function shutdownWorker(): Promise<void> {
if (worker) {
await worker.close();
worker = null;
console.log('🤖 BullMQ Worker shut down');
}
}
if (connection) {
connection.disconnect();
connection = null;
}
console.log('🤖 BullMQ Worker shut down');
}
2 changes: 1 addition & 1 deletion apps/backend/src/routes/documents/status-route.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { getPrismaClient } from '@/services/database';
import { getPrismaClient } from '@/services/database.js';
import { FastifyInstance } from 'fastify';
import { z } from 'zod';

Expand Down
5 changes: 2 additions & 3 deletions apps/backend/src/routes/documents/upload-route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import path, { basename } from 'path';

const UPLOAD_DIR = process.env.UPLOAD_DIR || '/tmp/uploads';

// Queue initialization
const queue = getProcessingQueue();
// NOTE: Queue is lazily initialized to allow env vars to be set first (important for tests)

export async function uploadRoute(fastify: FastifyInstance): Promise<void> {
fastify.post('/api/documents', async (request, reply) => {
Expand Down Expand Up @@ -204,7 +203,7 @@ export async function uploadRoute(fastify: FastifyInstance): Promise<void> {
} else {
// Heavy lane: Queue for processing (PDF)
console.log('📬 Adding to queue...');
await queue.add('process', {
await getProcessingQueue().add('process', {
documentId: document.id,
filePath: filePath,
format: format as any,
Expand Down
7 changes: 7 additions & 0 deletions apps/backend/vitest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ export default defineConfig({
setupFiles: ['../../tests/setup/setup-file.ts'],
// Global setup for integration tests (starts testcontainers)
globalSetup: ['../../tests/setup/global-setup.ts'],
server: {
deps: {
inline: ['bullmq'],
},
},
coverage: {
provider: 'v8',
reporter: ['text', 'json', 'html'],
Expand All @@ -29,6 +34,8 @@ export default defineConfig({
'@tests': path.resolve(__dirname, '../../tests'),
// Fix Prisma resolution for shared tests
'@prisma/client': path.resolve(__dirname, 'node_modules/@prisma/client'),
'bullmq': path.resolve(__dirname, 'node_modules/bullmq'),
'ioredis': path.resolve(__dirname, 'node_modules/ioredis'),
},
},
});
4 changes: 3 additions & 1 deletion apps/backend/vitest.e2e.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ export default defineConfig({
exclude: ['node_modules', 'dist', '../../tests/e2e/prisma-test.test.ts'],
// E2E tests manage their own setup/teardown per suite
// No global setup needed
// IMPORTANT: E2E tests must run sequentially because each file spins up its own containers
fileParallelism: false,
testTimeout: 120000, // 2 minutes for E2E tests
hookTimeout: 120000,
coverage: {
Expand All @@ -22,7 +24,7 @@ export default defineConfig({
alias: {
'@': path.resolve(__dirname, 'src'),
'@tests': path.resolve(__dirname, '../../tests'),
'@prisma/client': path.resolve(__dirname, '../node_modules/.prisma/client'),
'@prisma/client': path.resolve(__dirname, '../../node_modules/.pnpm/@prisma+client@5.22.0_prisma@5.22.0/node_modules/@prisma/client'),
},
extensions: ['.ts', '.js', '.mjs', '.json'],
},
Expand Down
1 change: 0 additions & 1 deletion docs/production-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ pnpm test tests/integration/production-readiness.test.ts

- [Production Deployment Guide](./production-deployment.md)
- [Deployment Checklist](./deployment-checklist.md)
- [Phase 09 Implementation Summary](./phase-09-implementation-summary.md)

## 🎓 Best Practices

Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,61 @@
# Phase 06: E2E Pipeline (TDD)

**Parent:** [plan.md](./plan.md) | **Status:** Pending | **Priority:** P0
**Parent:** [plan.md](./plan.md) | **Status:** Complete | **Priority:** P0

## Objectives
Connect all components to form a complete pipeline from Upload to Vector Search.

## Acceptance Criteria
- [ ] Standard flow: Upload -> Queue -> (Mock) Python Worker -> Callback -> Embedded -> DB.
- [ ] Query API returns correct high-similarity chunks.
- [ ] System handles concurrent multi-file uploads.
- [x] Standard flow: Upload -> Queue -> (Mock) Python Worker -> Callback -> Embedded -> DB.
- [x] Query API returns correct high-similarity chunks.
- [ ] System handles concurrent multi-file uploads. *(Not implemented - see Notes)*

## Key Files
- `tests/e2e/pipeline.test.ts`: System-wide test scenarios.
- `src/app.ts`: Final integration config.
- `tests/e2e/pipeline/pdf-upload-flow.test.ts`: PDF upload lifecycle test.
- `tests/e2e/pipeline/json-fast-lane.test.ts`: Fast lane (JSON/TXT/MD) processing test.
- `tests/e2e/pipeline/query-flow.test.ts`: Vector search and query validation.
- `tests/e2e/pipeline/error-handling.test.ts`: Error scenarios (password-protected, corrupt, quality gate, duplicate).
- `tests/e2e/setup/e2e-setup.ts`: E2E environment with testcontainers (PostgreSQL + Redis).
- `apps/backend/vitest.e2e.config.ts`: E2E Vitest configuration.
- `apps/backend/src/app.ts`: Final integration config.

## Implementation Steps
1. Write E2E tests covering document lifecycle.
2. Fine-tune embedding and chunking parameters.
3. Ensure transaction integrity for vector/metadata storage.
## Implementation Summary

### Completed Tests
1. **PDF Upload Flow** (`pdf-upload-flow.test.ts`)
- Full pipeline: Upload → PENDING → Mock Callback → COMPLETED → Query
- Heavy lane routing verification

2. **JSON Fast Lane** (`json-fast-lane.test.ts`)
- JSON processed directly without Python worker
- TXT and Markdown via fast lane
- Heading metadata preservation for Markdown

3. **Query Flow** (`query-flow.test.ts`)
- Semantic query returns relevant results
- TopK limit respected
- Results ordered by similarity
- Metadata included in results
- Empty results handling

4. **Error Handling** (`error-handling.test.ts`)
- Password-protected PDF rejection
- Quality gate: TEXT_TOO_SHORT
- Quality gate: EXCESSIVE_NOISE
- Duplicate file detection (409 Conflict)
- Unsupported format rejection (400)
- File size limit (413 Payload Too Large)
- Corrupt file handling

### Test Infrastructure
- Uses `@testcontainers/postgresql` and `@testcontainers/redis`
- Automatic pgvector extension setup
- Prisma schema push (not migrations)
- 120s timeout for container startup

## Verification
- `npm run test:e2e`.
- Manual test: upload PDF -> check `POST /api/query` result.
- `pnpm --filter @ragbase/backend test:e2e`

## Notes
- **Concurrent multi-file uploads test NOT implemented**: Deferred; current tests focus on happy-path and error handling. Consider adding in future iteration if needed.
- **prisma-test.test.ts excluded**: Simple import test, excluded from E2E config.
Loading
Loading