Description
Is your feature request related to a problem? Please describe
OpenSearch Bulk API executes multiple indexing/update/delete operations in a single call. For each of these operation, the name of index/stream or the alias is required and user can additionally also provide a custom doc ID. In case the doc ID is not provided, OpenSearch auto-generates a docID, a 128 bit UUID, which for practical purposes is unique. In absence of custom routing, the doc ID value is used to determine the shard routing info for the document through a function of Mod murmur3 hash on doc ID. Post this,TransportBulkAction on co-ordinator node generates per-Shard TransportShardBulkAction and sends them to the corresponding primaries. The co-ordinator node waits for response from all shards before sending the response back to the client.
In case of a slow shard/node scenario (say, shard in INITIALIZING state or node undergoing Garbage Collection) - the shard ends up becoming a bottleneck in bulk flow, increasing the tail latencies and holding up the resources, queue on the co-ordinator, potentially causing rejections.
The goal of the project is to tweak the document routing logic for auto-generated Doc IDs to better handle slow shard/node, provide better latencies by saving on network roundtrip and reduce the chatter b/w the co-ordinator and nodes.
- [Part-1] Within a bulk request, index all documents belonging to an index to the same primary (selected in random order). Do not fan out and send multiple per-shard transport requests.
- [Part-2] Within a bulk request, index all documents to the same node (if possible) or minimum set of nodes.
- [Part-3] Adaptively Select the candidate shard and node based on the most recent latencies seen(sufficient?) and resource usage. This is to deterministically improve the indexing latency and also reduces the risk of hotspot.
The above optimizations should work within following constraints:
- Get/Update by _id should work
- Does not break (Online) Index Split operations
- Does not disturb Custom routing
- Introduces no deviation in behavior between indexing the same document with index API or bulk API
- Minimal (< 1%) impact on the CPU usage and disk usage.
Describe the solution you'd like
In this section, we discuss the approache to solve the [Part-1].
Pre-generated DocIDs Cache/Store: Maintain a store with doc IDs tagged per-shard. In the TransportShardBulkAction.doRun() execution, instead of computing the docIDs on the fly, the pre-computed values are assigned. A background thread periodically refills the cache store (we can also explore using async futures to execute the cache refill), the per key (ShardID) refill count is a function of shard throughput. In case the store doesn’t have IDs for a shard, the algorithm falls back to the brute force approach. In the bulk request, for ‘m’ DocWriteRequests and ‘n’ Routing Shards, generate (n * m) docIDs and reject the docIDs not mapping to our randomly selected Target Shard. Minimal locking is used to get/refill/evict the store in a thread-safe manner using ConcurrentLinkedQueue and semaphores.
The above approaches need to be Benchmark for one shard is slow vs many shards are slow. Measure for indexing speed, cpu and memory usage (coordinator node), storage efficiency and lookup speed. Another dimension, measure for cluster throughput and rejects.
Related component
Indexing:Performance
Describe alternatives you've considered
Biased Hash Function: Currently in OpenSearch, the routing Shard is closely coupled with docID. The docID generated is a UUID with requirement of no collision for practical purposed. This docID is ran through a Murmur3 Hash (non-cryptographic) and then a mod function to generate the shard ID integer value to uniformly distribute the documents across the shards. We want to explore if it is possible to maintain these 2 primitives.
One of the simplest approach is to encode the shardId (randomly selected) information in the docID, along with the UUID and forgo the Murmur3 Hash Mod for routing shard calculation <encode_version>:<base36_shard_id>:<document_id>
.
The update by doc _id or get by ids query can be handled at the client communication layer by returning the doc ID as the concatenated string value. At co-ordinator node on transport, a decoder extracts the version, shard ID and routes the doc ID to the specific shard ID. Since there is no hash calculation, the document routing is fast, with no footprint on the CPU cycles and JVM. However, there are drawbacks to this approach:
- Backward Incompatible - For the indices on older version code, the document routing falls back to the older request routing algorithm. However, for version upgrade scenario, this behavior starts failing.
- Shard Split - The shard split(Online) need to be handled separately. In case of Shard Split on an existing Index, the search _id requests start failing as unable to locate the resident shard. A workaround here can be to maintain the split history in a durable table (inside Index Metadata), consistent across cluster restarts. Say a Shard (1) splits into 3 shards to give Shard (11), Shard (12) and Shard (13) - we maintain this information in the table and route the requests based on the docID (identify the source shard) and shard split history (identify the split shard). This approach however adds additional map lookup and thus has impact on the latency.
The below table provides the performance trade-off for the various approaches mentioned:
Brute Force | Pre-generated DocIDs Store | Biased Hash Function | |
---|---|---|---|
Memory | No Impact | For slow moving indices, the docIDs will lie unused in the store untill evicted. This will waste memory. | On-par with current implementation |
CPU | CPU cycles wasted for generating the rejected docIDs | Minor overhead in store maintenance | Depends on complexity of Hash Function; will consume more CPU |
Latency | More than other 2 approaches but expected to perform better than current implementation for worse case - slow shard scenario | With doc ID generation running in parallel, latency regression is not expected. | Fast |
Throughput | No Impact | Segment locking when consuming the store can reduce the throughput | No Impact |
Additional context
No response