Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,59 @@ pixelrag embed --shard-dir ./tiles --output-dir ./embeddings --gpu-ids 0,1
pixelrag build-index --embeddings-dir ./embeddings --output-dir ./index
```

### Qdrant backend

[Qdrant](https://qdrant.tech) is an open-source vector search engine for high-performance and massive scale. [FAISS](https://ai.meta.com/tools/faiss/) remains the default for local indexes. Use Qdrant for configurable quantization, disk-backed vectors, payload filtering, and one collection shared by multiple PixelRAG servers.

Quantization compresses vectors to reduce memory use and speed up search, with a recall tradeoff that depends on the method and settings.

To configure quantization, pass any Qdrant `quantization_config` object in a JSON file.
See Qdrant's [quantization guide](https://qdrant.tech/documentation/manage-data/quantization/#setting-up-quantization-in-qdrant) for supported methods and parameters.

```json
{
"scalar": {
"type": "int8",
"quantile": 0.99,
"always_ram": true
}
}
```

```bash
pip install 'pixelrag[serve,qdrant]' # or 'pixelrag[index,qdrant]'

# Build against a Qdrant server.
# Start one locally with: docker run -p 6333:6333 qdrant/qdrant
pixelrag build-index --embeddings-dir ./embeddings --output-dir ./index \
--backend qdrant --qdrant-url http://localhost:6333 --collection pixelrag \
--qdrant-quantization-config ./quantization.json

# Add documents to an existing collection.
pixelrag build-index --embeddings-dir ./more --output-dir ./index \
--backend qdrant --qdrant-url http://localhost:6333 --collection pixelrag --append

# Replace an existing collection and its configuration.
pixelrag build-index --embeddings-dir ./embeddings --output-dir ./index \
--backend qdrant --qdrant-url http://localhost:6333 --collection pixelrag --recreate

# Serve the collection. PixelRAG reads the backend from summary.json.
pixelrag serve --index-dir ./index --qdrant-url http://localhost:6333 \
--qdrant-client-config ./qdrant-client.json --port 30001
```

Configure the orchestrator in `pixelrag.yaml`:

```yaml
index:
backend: qdrant
qdrant_url: http://localhost:6333
collection: pixelrag
client_config: ./qdrant-client.json
quantization_config: ./quantization.json
# Set append: true or recreate: true when the collection already exists.
```

### Training

Fine-tuning lives in `train/` — a **separate uv project** (`wiki-screenshot-training`) with its own
Expand Down
187 changes: 176 additions & 11 deletions embed/src/pixelrag_embed/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import os
import sys
import time
import uuid
from functools import partial
from pathlib import Path

Expand Down Expand Up @@ -243,7 +244,7 @@ def build_ivf(

# Summary
summary = {
"backend": "ivf",
"backend": "faiss",
"total_vectors": n,
"dimension": dim,
"nlist": nlist,
Expand All @@ -263,6 +264,114 @@ def build_ivf(
print(f"Summary: {summary_path}")


def build_qdrant(
embeddings_dir: str,
output_dir: str,
url: str | None = None,
collection: str = "pixelrag",
api_key: str | None = None,
client_config: dict | None = None,
metric: str = "ip",
quantization_config: dict | None = None,
append: bool = False,
recreate: bool = False,
parallel: int = 1,
batch: int = 1000,
):
from pydantic import TypeAdapter
from qdrant_client import QdrantClient, models

client_options = dict(client_config or {})
if url:
client_options["url"] = url
if api_key:
client_options["api_key"] = api_key
if not any(
key in client_options for key in ("url", "host", "location", "path")
):
raise SystemExit(
"Qdrant requires --qdrant-url or an endpoint in --qdrant-client-config"
)

client = QdrantClient(**client_options)
exists = client.collection_exists(collection)
if exists and not (append or recreate):
raise ValueError(
f"collection {collection!r} already exists. Use --append or --recreate"
)

os.makedirs(output_dir, exist_ok=True)
merged = _merge_all_shards(_load_shards(embeddings_dir))
vectors = np.ascontiguousarray(merged["embeddings"], dtype=np.float32)
dim = merged["dim"]

distance = models.Distance.COSINE if metric == "ip" else models.Distance.EUCLID
quantization = (
TypeAdapter(models.QuantizationConfig).validate_python(quantization_config)
if quantization_config
else None
)
if recreate or not exists:
if exists:
client.delete_collection(collection)
client.create_collection(
collection,
vectors_config=models.VectorParams(
size=dim, distance=distance, on_disk=True
),
quantization_config=quantization,
)

# min_tile_height is the only payload filter used during search.
client.create_payload_index(
collection, "tile_height", field_schema=models.PayloadSchemaType.INTEGER
)

fields = {
"article_id": merged["article_ids"],
"tile_index": merged["tile_indices"],
"chunk_index": merged["chunk_indices"],
"y_offset": merged["y_offsets"],
"tile_height": merged["tile_heights"],
}

# Qdrant only allows UUIDs and +ve integers as point IDs.
# Ref: https://qdrant.tech/documentation/manage-data/points/#point-ids
ids = (
str(uuid.uuid5(uuid.NAMESPACE_DNS, f"{article_id}:{tile_index}:{chunk_index}"))
for article_id, tile_index, chunk_index in zip(
fields["article_id"], fields["tile_index"], fields["chunk_index"]
)
)
payloads = (
{name: int(values[i]) for name, values in fields.items()}
for i in range(len(vectors))
)

client.upload_collection(
collection_name=collection,
vectors=vectors,
payload=payloads,
ids=ids,
parallel=parallel,
batch_size=batch,
wait=True,
)

total = client.count(collection_name=collection, exact=True).count
summary = {
"backend": "qdrant",
"total_vectors": total,
"dimension": dim,
"metric": metric,
"collection": collection,
}
summary_path = os.path.join(output_dir, "summary.json")
with open(summary_path, "w") as f:
json.dump(summary, f, indent=2)
print(f"Uploaded {total:,} points to '{collection}'")


def test_search(index_dir: str, nprobe: int = 128, k: int = 10):
"""Test search on a built IVF index."""
import faiss
Expand Down Expand Up @@ -306,7 +415,7 @@ def main():
sub = parser.add_subparsers(dest="command", required=True)

# build
p_build = sub.add_parser("build", help="Build IVF index (default)")
p_build = sub.add_parser("build", help="Build a vector index")
p_build.add_argument("--embeddings-dir", default="./data/embeddings")
p_build.add_argument("--output-dir", default="./output/search_index")
p_build.add_argument(
Expand Down Expand Up @@ -336,6 +445,40 @@ def main():
default=-1,
help="GPU for K-means training (-1 = CPU only)",
)
p_build.add_argument(
"--backend",
choices=["faiss", "qdrant"],
default="faiss",
help="Index backend (default: faiss)",
)
p_build.add_argument(
"--qdrant-url", default=None, help="Qdrant server/Cloud URL (qdrant backend)"
)
p_build.add_argument(
"--qdrant-api-key", default=os.environ.get("QDRANT_API_KEY")
)
p_build.add_argument(
"--qdrant-client-config",
help="Path to a JSON object of QdrantClient constructor arguments",
)
p_build.add_argument(
"--collection", default="pixelrag", help="Qdrant collection name"
)
p_build.add_argument(
"--qdrant-quantization-config",
help="Qdrant quantization_config JSON for a new or recreated collection",
)
qdrant_mode = p_build.add_mutually_exclusive_group()
qdrant_mode.add_argument(
"--append",
action="store_true",
help="Upsert into an existing Qdrant collection",
)
qdrant_mode.add_argument(
"--recreate",
action="store_true",
help="Delete and recreate an existing Qdrant collection",
)

# test
p_test = sub.add_parser("test", help="Test search on built index")
Expand All @@ -346,15 +489,37 @@ def main():
args = parser.parse_args()

if args.command == "build":
build_ivf(
args.embeddings_dir,
args.output_dir,
nlist=args.nlist,
nprobe=args.nprobe,
train_sample=args.train_sample,
metric=args.metric,
gpu_id=args.gpu_id,
)
if args.backend == "qdrant":
client_config = None
if args.qdrant_client_config:
with open(args.qdrant_client_config) as f:
client_config = json.load(f)
quantization_config = None
if args.qdrant_quantization_config:
with open(args.qdrant_quantization_config) as f:
quantization_config = json.load(f)
build_qdrant(
args.embeddings_dir,
args.output_dir,
url=args.qdrant_url,
collection=args.collection,
api_key=args.qdrant_api_key,
client_config=client_config,
metric=args.metric,
quantization_config=quantization_config,
append=args.append,
recreate=args.recreate,
)
else:
build_ivf(
args.embeddings_dir,
args.output_dir,
nlist=args.nlist,
nprobe=args.nprobe,
train_sample=args.train_sample,
metric=args.metric,
gpu_id=args.gpu_id,
)
elif args.command == "test":
test_search(args.index_dir, nprobe=args.nprobe, k=args.k)

Expand Down
75 changes: 47 additions & 28 deletions index/src/pixelrag_index/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,35 +249,54 @@ def _repl(m: re.Match) -> str:
cmd += ["--backend", embed_cfg["backend"]]
subprocess.run(cmd, check=True)

# Stage 4: Build FAISS index
# Auto-adjust nlist based on vector count (IVF needs nlist <= n_vectors)
import numpy as np
# Stage 4: Build the configured search index.
index_cfg = config.get("index", {})
backend = index_cfg.get("backend", "faiss")
cmd = [
sys.executable,
"-m",
"pixelrag_embed.index",
"build",
"--embeddings-dir",
str(embeddings_dir),
"--output-dir",
str(output),
"--backend",
backend,
]
if backend == "qdrant":
logger.info("Stage 4/4: Building Qdrant collection...")
if index_cfg.get("qdrant_url"):
cmd += ["--qdrant-url", index_cfg["qdrant_url"]]
if index_cfg.get("collection"):
cmd += ["--collection", index_cfg["collection"]]
if index_cfg.get("client_config"):
cmd += ["--qdrant-client-config", index_cfg["client_config"]]
if index_cfg.get("quantization_config"):
cmd += [
"--qdrant-quantization-config",
index_cfg["quantization_config"],
]
if index_cfg.get("append"):
cmd += ["--append"]
if index_cfg.get("recreate"):
cmd += ["--recreate"]
else:
# Auto-adjust nlist based on vector count (IVF needs nlist <= n_vectors)
import numpy as np

npz_files = sorted(embeddings_dir.glob("shard_*.npz"))
total_vectors = sum(
np.load(f, mmap_mode="r")["embeddings"].shape[0] for f in npz_files
)
nlist = min(4096, max(1, total_vectors // 40))
logger.info(
"Stage 4/4: Building FAISS index (%d vectors, nlist=%d)...",
total_vectors,
nlist,
)
subprocess.run(
[
sys.executable,
"-m",
"pixelrag_embed.index",
"build",
"--embeddings-dir",
str(embeddings_dir),
"--output-dir",
str(output),
"--nlist",
str(nlist),
],
check=True,
)
npz_files = sorted(embeddings_dir.glob("shard_*.npz"))
total_vectors = sum(
np.load(f, mmap_mode="r")["embeddings"].shape[0] for f in npz_files
)
nlist = min(4096, max(1, total_vectors // 40))
logger.info(
"Stage 4/4: Building FAISS index (%d vectors, nlist=%d)...",
total_vectors,
nlist,
)
cmd += ["--nlist", str(nlist)]
subprocess.run(cmd, check=True)

logger.info("Index built at %s", output)
return output
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ serve = [
"pydantic>=2.0.0",
]
index = ["pixelrag[embed]", "pyyaml>=6.0", "markdown>=3.4"]
all = ["pixelrag[embed,serve,index]"]
qdrant = ["qdrant-client>=1.18.0"]
all = ["pixelrag[embed,serve,index,qdrant]"]
gpu = ["faiss-gpu-cu12>=1.13.2; sys_platform == 'linux'"]
playwright = ["playwright>=1.40.0"]
pdf = ["pdf2image>=1.16.0"]
Expand Down
Loading