Skip to content

Commit 94ef7e5

Browse files
ashwin2002claude
andcommitted
Fix SDK client starvation under high thread count (200-300 collection loaders)
- SDKClientPool: replace ConcurrentLinkedQueue with LinkedBlockingQueue for idle/busy pools - SDKClientPool: get_client_for_bucket now blocks via poll(30, MINUTES) instead of returning null immediately - SDKClientPool: remove idlePool.isEmpty() pre-check that caused spurious null returns under contention - WorkLoadGenerate: remove busy-poll retry loop (while retries < 30) — blocking now handled in pool - WorkLoadGenerate: simplify run() client acquisition to single call with InterruptedException handling Root cause: with 200-300 threads competing for a finite client pool and long-running doc loads holding clients, the 30×1s busy-poll cap expired before clients were released, causing threads to fail with "Failed to acquire SDK client after 30 retries". Blocking in the pool parks threads efficiently and removes the arbitrary time ceiling. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 65970ee commit 94ef7e5

2 files changed

Lines changed: 48 additions & 48 deletions

File tree

src/main/java/couchbase/loadgen/WorkLoadGenerate.java

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -474,30 +474,23 @@ else if(ops < dg.ws.ops/dg.ws.workers && flag) {
474474
@Override
475475
public void run() {
476476
if (this.sdkClientPool != null) {
477-
int retries = 0;
478-
while (this.sdk == null && retries < 30) {
479-
try {
480-
this.sdk = this.sdkClientPool.get_client_for_bucket(
477+
try {
478+
// Pool blocks internally until a client is available (up to its configured timeout).
479+
// No busy-poll needed here — blocking in the pool is cheaper and has no retry cap.
480+
this.sdk = this.sdkClientPool.get_client_for_bucket(
481481
this.bucket_name, this.scope, this.collection);
482-
} catch (Exception e) {
483-
logger.error("Error getting SDK client from pool for bucket "
484-
+ this.bucket_name + ": " + e.getMessage());
485-
}
486-
if (this.sdk == null) {
487-
try {
488-
TimeUnit.SECONDS.sleep(1);
489-
} catch (InterruptedException e) {
490-
logger.error("Interrupted while waiting for SDK client", e);
491-
Thread.currentThread().interrupt();
492-
this.result = false;
493-
return;
494-
}
495-
retries++;
496-
}
482+
} catch (InterruptedException e) {
483+
logger.error("Interrupted while waiting for SDK client for bucket "
484+
+ this.bucket_name, e);
485+
Thread.currentThread().interrupt();
486+
this.result = false;
487+
return;
488+
} catch (Exception e) {
489+
logger.error("Error acquiring SDK client for bucket "
490+
+ this.bucket_name + ": " + e.getMessage(), e);
497491
}
498492
if (this.sdk == null) {
499-
logger.error("Failed to acquire SDK client for bucket "
500-
+ this.bucket_name + " after " + retries + " retries");
493+
logger.error("Failed to acquire SDK client for bucket " + this.bucket_name);
501494
this.result = false;
502495
return;
503496
}

src/main/java/couchbase/sdk/SDKClientPool.java

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
package couchbase.sdk;
22

3-
import java.util.ArrayList;
4-
import java.util.ConcurrentModificationException;
53
import java.util.concurrent.ConcurrentHashMap;
6-
import java.util.concurrent.ConcurrentLinkedQueue;
4+
import java.util.concurrent.LinkedBlockingQueue;
5+
import java.util.concurrent.TimeUnit;
76
import java.util.concurrent.atomic.AtomicInteger;
87

98
import org.apache.log4j.LogManager;
@@ -16,9 +15,12 @@ public class SDKClientPool {
1615
// Thread-safe client collection cache
1716
private ConcurrentHashMap<String, ClientInfo> clientCache = new ConcurrentHashMap<>();
1817

18+
// Block up to this long waiting for an idle client before giving up
19+
private static final int CLIENT_WAIT_TIMEOUT_MINUTES = 30;
20+
1921
// Thread-safe client pools by bucket
20-
private ConcurrentHashMap<String, ConcurrentLinkedQueue<SDKClient>> idleClients = new ConcurrentHashMap<>();
21-
private ConcurrentHashMap<String, ConcurrentLinkedQueue<SDKClient>> busyClients = new ConcurrentHashMap<>();
22+
private ConcurrentHashMap<String, LinkedBlockingQueue<SDKClient>> idleClients = new ConcurrentHashMap<>();
23+
private ConcurrentHashMap<String, LinkedBlockingQueue<SDKClient>> busyClients = new ConcurrentHashMap<>();
2224

2325
public SDKClientPool() {
2426
super();
@@ -29,8 +31,8 @@ public void shutdown() {
2931

3032
// Process all buckets
3133
for (String bucketName : idleClients.keySet()) {
32-
ConcurrentLinkedQueue<SDKClient> idle = idleClients.get(bucketName);
33-
ConcurrentLinkedQueue<SDKClient> busy = busyClients.get(bucketName);
34+
LinkedBlockingQueue<SDKClient> idle = idleClients.get(bucketName);
35+
LinkedBlockingQueue<SDKClient> busy = busyClients.get(bucketName);
3436

3537
if (idle != null) {
3638
for (SDKClient client : idle) {
@@ -54,8 +56,8 @@ public void shutdown() {
5456
}
5557

5658
public void force_close_clients_for_bucket(String bucket_name) {
57-
ConcurrentLinkedQueue<SDKClient> idle = idleClients.get(bucket_name);
58-
ConcurrentLinkedQueue<SDKClient> busy = busyClients.get(bucket_name);
59+
LinkedBlockingQueue<SDKClient> idle = idleClients.get(bucket_name);
60+
LinkedBlockingQueue<SDKClient> busy = busyClients.get(bucket_name);
5961

6062
if (idle != null) {
6163
for (SDKClient client : idle) {
@@ -74,19 +76,20 @@ public void force_close_clients_for_bucket(String bucket_name) {
7476

7577
public void create_clients(String bucket_name, Server server, int req_clients) throws Exception {
7678
// Initialize thread-safe client pools for this bucket if not already present
77-
idleClients.computeIfAbsent(bucket_name, k -> new ConcurrentLinkedQueue<>());
78-
busyClients.computeIfAbsent(bucket_name, k -> new ConcurrentLinkedQueue<>());
79-
80-
ConcurrentLinkedQueue<SDKClient> idlePool = idleClients.get(bucket_name);
79+
idleClients.computeIfAbsent(bucket_name, k -> new LinkedBlockingQueue<>());
80+
busyClients.computeIfAbsent(bucket_name, k -> new LinkedBlockingQueue<>());
8181

82+
LinkedBlockingQueue<SDKClient> idlePool = idleClients.get(bucket_name);
83+
8284
for (int i = 0; i < req_clients; i++) {
8385
SDKClient client = new SDKClient(server, bucket_name);
8486
client.initialiseSDK();
8587
idlePool.add(client);
8688
}
8789
}
8890

89-
public SDKClient get_client_for_bucket(String bucket_name, String scope, String collection) {
91+
public SDKClient get_client_for_bucket(String bucket_name, String scope, String collection)
92+
throws InterruptedException {
9093
String cache_key = bucket_name + ":" + scope + ":" + collection;
9194

9295
// Check if client is already cached for this collection
@@ -95,28 +98,32 @@ public SDKClient get_client_for_bucket(String bucket_name, String scope, String
9598
existing.counter.incrementAndGet();
9699
return existing.client;
97100
}
98-
101+
99102
// Get idle client pool for this bucket
100-
ConcurrentLinkedQueue<SDKClient> idlePool = idleClients.get(bucket_name);
101-
if (idlePool == null || idlePool.isEmpty()) {
103+
LinkedBlockingQueue<SDKClient> idlePool = idleClients.get(bucket_name);
104+
if (idlePool == null) {
102105
return null;
103106
}
104-
105-
// Get client from idle pool atomically
106-
SDKClient client = idlePool.poll();
107+
108+
// Block until a client becomes available or timeout expires.
109+
// With 200-300 threads sharing a finite pool, spinning with a fixed retry cap
110+
// causes spurious failures on long-running loads — blocking here is cheaper and correct.
111+
SDKClient client = idlePool.poll(CLIENT_WAIT_TIMEOUT_MINUTES, TimeUnit.MINUTES);
107112
if (client == null) {
113+
logger.error("Timed out waiting " + CLIENT_WAIT_TIMEOUT_MINUTES
114+
+ " min for idle SDK client for bucket " + bucket_name);
108115
return null;
109116
}
110-
117+
111118
// Configure client for this collection
112119
client.selectCollection(scope, collection);
113-
120+
114121
// Add to busy pool atomically
115-
busyClients.computeIfAbsent(bucket_name, k -> new ConcurrentLinkedQueue<>()).add(client);
116-
122+
busyClients.computeIfAbsent(bucket_name, k -> new LinkedBlockingQueue<>()).add(client);
123+
117124
// Cache client reference with thread-safe counter
118125
clientCache.put(cache_key, new ClientInfo(client, new AtomicInteger(1)));
119-
126+
120127
return client;
121128
}
122129

@@ -142,8 +149,8 @@ public void release_client(SDKClient client) {
142149
clientCache.remove(cache_key);
143150

144151
// Remove from busy pool and add to idle pool atomically
145-
ConcurrentLinkedQueue<SDKClient> busyPool = busyClients.get(bucket_key);
146-
ConcurrentLinkedQueue<SDKClient> idlePool = idleClients.get(bucket_key);
152+
LinkedBlockingQueue<SDKClient> busyPool = busyClients.get(bucket_key);
153+
LinkedBlockingQueue<SDKClient> idlePool = idleClients.get(bucket_key);
147154

148155
if (busyPool != null) {
149156
busyPool.remove(client);

0 commit comments

Comments
 (0)