Skip to content

[Feat] RAG Ingestion 7/11: Async Ingestion Pipeline #1270

@yossiovadia

Description

@yossiovadia

Summary

Implement the async document processing pipeline: attach file to vector store → chunk → embed → store.

Part of #1262

Scope

Files to create:

  • pkg/vectorstore/pipeline.go - Ingestion pipeline orchestrator with worker pool
  • pkg/vectorstore/pipeline_test.go

Files to modify:

  • pkg/apiserver/route_vectorstore.go - Add file attachment endpoints

New Endpoints

Method Path Description
POST /v1/vector_stores/{id}/files Attach file → triggers async processing
GET /v1/vector_stores/{id}/files List attached files with status
DELETE /v1/vector_stores/{id}/files/{file_id} Detach file, remove chunks

Pipeline Flow (async)

POST attach → validate → queue job → return {status: "in_progress"}
                               ↓ (background goroutine)
                        Read file from FileStore
                        → ExtractText(content, filename)
                        → ChunkText(text, strategy)
                        → Embed chunks via Candle binding
                        → InsertChunks into backend
                        → Update status = "completed"

Key Design Decisions

  • Worker pool with configurable worker count (default 2)
  • Job queue via Go channel
  • File status tracking: in_progress → completed / failed
  • Uses candle_binding.GetEmbeddingBatched() for chunk embedding
  • Chunking strategy configurable per-attachment

Acceptance Criteria

  • Attach returns immediately with in_progress status
  • Background processing completes and updates status
  • Failed processing sets status to failed with error message
  • List endpoint shows current status
  • Detach removes all chunks from backend
  • Worker pool handles concurrent jobs
  • Tests for pipeline lifecycle

Dependencies

Depends on PR 1 (types), PR 2 (chunking), PR 3 (backends), PR 4 (file storage).

This is the largest PR (~500 lines).

Branch: feat/rag-07-ingestion-pipelinefeat/rag-ingestion

Metadata

Metadata

Assignees

Labels

athenav0.2 Athena milestone taskspriority/P1Important / Should-Have

Type

No type

Projects

Status

Backlog

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions