Asynchronous multi-GPU file processing and indexing#328
Conversation
ba4f60a to
b35d8ae
Compare
There was a problem hiding this comment.
Pull request overview
This PR refactors the live-retrieval/indexer API to make uploads asynchronous and to parallelize processing/indexing across multiple GPUs via an in-memory job queue, adding job-status polling/SSE endpoints and updating Milvus configuration/docs accordingly.
Changes:
- Introduces an in-memory
JobQueueand job-status endpoints (/v1/jobs/{jobId}+ SSE stream) to decouple uploads from processing/indexing. - Splits processing/indexing work across GPUs by pinning jobs and model replicas per device (processors + dense/sparse embedding models).
- Updates API responses/status codes, production/example configs, and documentation; adds tests for the new async behavior.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_live_retriever_api.py | Updates API tests for async uploads and adds job/SSE coverage. |
| tests/test_job_queue.py | Adds unit tests for the new in-memory job queue. |
| src/mmore/utils.py | Caches indexers per (collection, device) and threads device/output_path through processing. |
| src/mmore/run_index_api.py | Converts upload/update endpoints to async job submission, adds job status + SSE endpoints, and per-device subprocess processing. |
| src/mmore/rag/retriever.py | Adds queue/concurrency settings to RetrieverConfig. |
| src/mmore/rag/model/sparse/splade.py | Allows passing an explicit device for sparse embeddings. |
| src/mmore/rag/model/sparse/base.py | Threads device through sparse model construction. |
| src/mmore/rag/model/dense/base.py | Threads device through HF embedding model construction. |
| src/mmore/process/processors/pdf_processor.py | Adds per-device model caching/loading for parallel PDF processing. |
| src/mmore/process/processors/media_processor.py | Adds per-device pipeline caching/loading for parallel media processing. |
| src/mmore/process/execution_state.py | Makes execution state init/shutdown concurrency-safe with refcounting. |
| src/mmore/process/dispatcher.py | Adds optional device pinning and lazily initializes the shared multiprocessing pool. |
| src/mmore/job_queue.py | Implements the in-memory queue, device checkout, and retention/eviction. |
| src/mmore/index/indexer.py | Loads embedding models on a given device and tolerates concurrent collection creation. |
| production-config/retriever_api/config.yaml | Switches to Milvus Standalone config and adds queue settings. |
| examples/retriever_api/config.yaml | Notes Standalone Milvus URI option in the example config. |
| docs/source/developer_documentation/index_api.md | Updates API docs to describe async uploads, job status endpoints, and concurrency knobs. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
71fb1ee to
85fc0fd
Compare
| ): | ||
| self.devices = devices or _detect_devices() | ||
| self.n_workers = len(self.devices) * jobs_per_gpu | ||
| self.max_queue_size = max_queue_size or self.n_workers * 10 |
There was a problem hiding this comment.
or self.n_workers * 10 sounds arbitrary, why?
There was a problem hiding this comment.
It is completely arbitrary yes, it makes 10 buffered jobs per worker as a default value which can be reasonable (not too small not too big)
Maybe in a production environment it can be a bit too little depending on how many users we have, let me know if I should increase the default value
In any case it can be changed in the config files
There was a problem hiding this comment.
I guess we can leave it this way if it works with this rule of thumb, it would be cool to test this some day when urgent things have been handled
fabnemEPFL
left a comment
There was a problem hiding this comment.
sounds good to me, let's wait for feedback from the Moove team
|
the current feedback is "migration is longer than hoped" |
|
🥲 |
Summary
Related issues: #324, #326
In production the live-retrieval API processed uploads synchronously, so a single user uploading a document blocked every other request until it finished. This PR makes the processing and indexing pipelines asynchronous and parallel across GPUs: uploads now return immediately with a job id, and documents are processed and indexed concurrently thanks to a queue. To handle concurrent updates on the Milvus database, its version was changed from Lite to Standalone.
Changes
JobQueue(src/mmore/job_queue.py) that schedule jobs on GPUs with size limit and eviction of finished jobsPOST /v1/files,POST /v1/files/bulkandPUT /v1/files/{fileId}endpoints now return202 Acceptedwith ajobIdinstead of blocking. Duplicate ids are rejected (409), a saturated queue returns503GET /v1/jobs/{jobId}for a one-time status andGET /v1/jobs/{jobId}/eventsfor a server-pushed status stream over Server-Sent Events (SSE)artifacts_by_device/pipelines_by_device), and per-device embedding models (dense + splade)jobs_per_gpu,max_queue_size, Milvus Standalone uri) and the API documentationImprovements
GET /v1/jobs/{jobId}or subscribe to the SSE streamjobs_per_gpu(defaults to 1) lets you overlap CPU and GPU work for higher utilization but each replica loads its own models (needs some tuning wrt VRAM)Milvus Standalone server
http://localhost:19530), not Milvus Lite which is single-processTo create it:
Alternatively:
The port exposed by default is
19530DEMO
The experiment runs on a server with 4 NVIDIA Tesla V100 32 GB GPUs, of which only CUDA devices 1, 2 and 3 are available to the indexer (device 0 is reserved for other containers). The workload is a single HTTP request to the bulk-upload endpoint carrying 4 large PDFs (18 MB, 450 pages each), one per available GPU
experiment.mp4
Notes:
CUDA_VISIBLE_DEVICES=1,2,3. Inside the process these are then re-indexed ascuda:0,1,2and_detect_devices()sees 3 GPUsjobs_per_gpu=1and 3 visible GPUs, the 4th PDF queues until one of the first three finishes, then runs