The total query time in a distributed vector search system is dominated by computation. While distributing the index reduces per-node computation, both random and K-Means partitioning increase total compute due to overlapping candidate regions, resulting in sub-linear scalability.
- How does Faiss handle distributed data handling / partitioning with an HNSW index? Which step is most expensive?
- Within Faiss, does K-Means partitioning yield linear throughput scaling compared to a single-host HNSW index?
- What is the time spent on each step: query routing, computation, aggregation?
- Within Faiss, does random partitioning provide linear throughput scaling compared to a single-host HNSW index?
- What is the time spent on each step: query routing, computation, aggregation?
- Within Faiss, does K-Means partitioning yield linear throughput scaling compared to a single-host HNSW index?
- With
n
compute nodes, can we achieven
times the QPS of a single host? Does this factor vary for random vs K-Means? - Which step (query routing, computation, aggregation, culling, returning) is most expensive or efficient?
To build and benchmark a distributed vector search baseline using FAISS, focusing on different data partitioning strategies in combination with HNSW indexing. The goal is to have a system which implements random partitioning and K-Means partitioning on top of Faiss, while inserting probes in each step to calculate how much time is taken for each step.
-
Query routing across partitions
-
There are three separate methods that we will be analyzing: random partitioning, KMeans partitioning, and no partitioning at all. Their diagrams are as follows:
-
Random: In this architecture, the query vector(s) are simply sent to all of the nodes in parallel - the results are sent back to the head node, aggregated, and culled to ensure only the top-k are returned to the user.
-
KMeans: In this architecture, each node has a corresponding centroid that represents the mean of the vectors within. The query vector(s) are first compared against the centroid of each node. The top-n centroids' corresponding nodes are then queried, following the same process as the previous.
-
None: In this architecture, there is only one node, responsible for both the querying and aggregation. There is no distribution - it is only a control.
-
-
In the above architectures, the head node is responsible for aggregating and culling as well as returning to client.
-
-
Search execution per node
- Each node will have an
IndexHNSW
responsible for the actual vector storage / querying.
- Each node will have an
-
Result aggregation
- Depends on partitioning:
- Random requires aggregating from all nodes
- K-Means requires aggregating from only the queried nodes (depends on the implementation, usually ~2)
- Depends on partitioning:
-
CPU load monitoring
- Use Linux tools (e.g.,
perf
) - Time will be measured using Python
time.perf_counter_ns()
- Use Linux tools (e.g.,
The baseline is CPU-only and will be implemented in three configurations, each differing in data partitioning across nodes:
- Use Python FAISS library initially, later transition to C++ for performance
- Same API for all configurations
- Class:
DistributedHNSWNormal
- Wrapper over
IndexHNSW
from Python FAISS
- Class:
DistributedHNSWRandom
- Same API, delegates insertion randomly to nodes
- All nodes must be searched
- Communication via gRPC
- Vectors assigned to a given shard by a random function / hash
- For retrieval of a given query, query vector is sent to all nodes in parallel and results are aggregated
- Class:
DistributedHNSWKMeans
- Same API, insertion based on K-Means clustering
- Only search most relevant node(s)
- Communication via gRPC
- The number of centroids is deterined by the number of available nodes.
This section is to explain the architecture of the DistributedHNSW
class and its 3 different implementations (and the public API all 3 share)
-
def add(self, vectors: np.ndarray) -> None
- Adds a batch of vectors to the index
vectors: np.ndarray
- n-dimensional vectors whose top-k neighbors are to be searched for
-
def search(self, queries: np.ndarray) -> Tuple[np.ndarray, np.ndarray]
- Search and return the top-k nearest neighbors.
-
def set_ef_search(self, ef: int) -> None
- sets the
ef_search
parameter internally
- sets the
-
def enable_profiling(self, profiling_options: dict) -> None
- enables profiling according to the options in the dict
-
def get_profiling_status(self) -> dict
- returns profiling according to the options previously set. If none set, then default is returned.
-
Datasets
-
SIFT1B
- Float32,
R^128
- Fully utilized dimensions, image features without neural processing
- Harder for kANN
- Float32,
-
Deep100M
- Float32,
R^96
- Clustered, high redundancy, image semantics
- Effective dimension ~60 with PCA
- Easier for kANN
- Float32,
-
For our experiment, we will be using Deep100M as it is more representative of the data being processed in vector databases
-
10,000 query vectors will be sampled randomly from the Deep100M dataset for querying. Similarity calculated using inner product.
-
-
Software
- FAISS v1.11.0
- For now, gRPC will be used.
- To calculate time, Python
time.perf_counter_ns()
will be used
-
Hardware
- CPU: TBD
- Network Stack: TBD
-
Evaluation Metrics
- Throughput
- How many queries per second (QPS)?
- Latency
- How long for a query to go from start -> finish?
- Recall@K
- What is the Recall@1, Recall@10, etc?
- Distance computations
- How many distrance computations are performed for each query?
- Time spent on each stage (querying, aggregating, network requests, etc)
- How much time is spend on each stage?
- Throughput