Skip to content

Commit 30188f6

Browse files
Hardening and defensive fixes across REST, workload, SDK, and task manager layers
- TaskRequest: Add null checks before submitting tasks to prevent NPE on missing task names - TaskRequest: Wrap getTaskResult() calls in try/catch to prevent unhandled exceptions bubbling up - TaskRequest: Remove redundant reset_sdk_client_pool() from init_taskmanager() (already called in shutdown_taskmanager) - WorkLoadGenerate: Omit durability field when level is NONE to let server enforce bucket-level durability - WorkLoadGenerate: Bound get_client_for_bucket retry loop to 30 retries to prevent infinite wait - WorkLoadGenerate: Catch exceptions from get_client_for_bucket inside the retry loop - WorkLoadGenerate: Add catch-all exception handler around actual_run() and guard release_client on null sdk - SDKClient: Remove unused static env1/env2 ClusterEnvironment fields (now managed by SharedClusterManager) - SharedClusterManager: Replace per-connection non-TLS environment with shared sharedNonTLSEnvironment - SharedClusterManager: Add separate TLS/non-TLS environment lifecycle tracking and shutdown guards - TaskManager: Add null check for missing Future in getTaskResult() and abortTask() - TaskManager: Remove task from map on ExecutionException to prevent stale entries Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com>
1 parent 17d39e9 commit 30188f6

5 files changed

Lines changed: 164 additions & 67 deletions

File tree

src/main/java/RestServer/TaskRequest.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,8 @@ private void reset_mongo_sdk_client_pool() {
386386
private void init_taskmanager() {
387387
TaskRequest.taskManager = new TaskManager(this.num_workers);
388388
System.out.println("Init TaskManager workers=" + this.num_workers);
389-
this.reset_sdk_client_pool();
389+
// Note: SDK client pool is already reset in shutdown_taskmanager(),
390+
// no need to reset again here (avoid double shutdownAll() call)
390391
this.reset_mongo_sdk_client_pool();
391392
}
392393

@@ -501,7 +502,13 @@ public ResponseEntity<Map<String, Object>> reset_task_manager() {
501502
public ResponseEntity<Map<String, Object>> submit_task() {
502503
Map<String, Object> body = new HashMap<>();
503504
try {
504-
TaskRequest.taskManager.submit(TaskRequest.loader_tasks.get(this.taskName));
505+
WorkLoadGenerate task = TaskRequest.loader_tasks.get(this.taskName);
506+
if (task == null) {
507+
body.put("status", false);
508+
body.put("error", "Task " + this.taskName + " does not exist in loader_tasks");
509+
return new ResponseEntity<>(body, HttpStatus.OK);
510+
}
511+
TaskRequest.taskManager.submit(task);
505512
TimeUnit.MILLISECONDS.sleep(5);
506513
body.put("status", true);
507514
} catch (Exception e) {
@@ -514,7 +521,13 @@ public ResponseEntity<Map<String, Object>> submit_task() {
514521
public ResponseEntity<Map<String, Object>> submit_task_mongo() {
515522
Map<String, Object> body = new HashMap<>();
516523
try {
517-
TaskRequest.taskManager.submit(TaskRequest.mongo_loader_tasks.get(this.taskName));
524+
mongo.loadgen.WorkLoadGenerate task = TaskRequest.mongo_loader_tasks.get(this.taskName);
525+
if (task == null) {
526+
body.put("status", false);
527+
body.put("error", "Task " + this.taskName + " does not exist in mongo_loader_tasks");
528+
return new ResponseEntity<>(body, HttpStatus.OK);
529+
}
530+
TaskRequest.taskManager.submit(task);
518531
TimeUnit.MILLISECONDS.sleep(5);
519532
body.put("status", true);
520533
} catch (Exception e) {
@@ -529,7 +542,12 @@ public ResponseEntity<Map<String, Object>> get_task_result_mongo() {
529542
try {
530543
mongo.loadgen.WorkLoadGenerate task = TaskRequest.mongo_loader_tasks.get(this.taskName);
531544
if (task != null) {
532-
boolean okay = TaskRequest.taskManager.getTaskResult(task);
545+
boolean okay = false;
546+
try {
547+
okay = TaskRequest.taskManager.getTaskResult(task);
548+
} catch (Exception e) {
549+
body.put("error", "Exception during getTaskResult: " + e.toString());
550+
}
533551
body.put("status", okay);
534552
} else {
535553
body.put("error", "Task " + this.taskName + " does not exists");
@@ -548,7 +566,12 @@ public ResponseEntity<Map<String, Object>> get_task_result() {
548566
WorkLoadGenerate task = TaskRequest.loader_tasks.get(this.taskName);
549567
if (task != null) {
550568
Map<String, Object> failures = new HashMap<>();
551-
boolean okay = TaskRequest.taskManager.getTaskResult(task);
569+
boolean okay = false;
570+
try {
571+
okay = TaskRequest.taskManager.getTaskResult(task);
572+
} catch (Exception e) {
573+
body.put("error", "Exception during getTaskResult: " + e.toString());
574+
}
552575
TaskRequest.loader_tasks.remove(this.taskName);
553576
for (HashMap.Entry<String, List<Result>> optype : task.failedMutations.entrySet()) {
554577
optype.getValue().forEach(

src/main/java/couchbase/loadgen/WorkLoadGenerate.java

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.couchbase.client.core.deps.com.fasterxml.jackson.annotation.PropertyAccessor;
1818
import com.couchbase.client.core.deps.com.fasterxml.jackson.core.JsonProcessingException;
1919
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.ObjectMapper;
20+
import com.couchbase.client.core.msg.kv.DurabilityLevel;
2021
import com.couchbase.client.core.error.DocumentExistsException;
2122
import com.couchbase.client.core.error.DocumentNotFoundException;
2223
import com.couchbase.client.core.error.ServerOutOfMemoryException;
@@ -179,31 +180,50 @@ public void actual_run() {
179180
this.dg.ws.setDurabilityLevel(this.durability);
180181
this.dg.ws.setRetryStrategy(this.retryStrategy);
181182

182-
upsertOptions = UpsertOptions.upsertOptions()
183+
// When DurabilityLevel.NONE is explicitly set, the SDK sends durability_level=0
184+
// in the KV protocol frame, which tells the server "client explicitly wants no
185+
// durability" and may override the bucket-level durability setting.
186+
// To let the server enforce bucket-level durability, we must NOT call
187+
// .durability() when the level is NONE - this omits the durability field
188+
// from the request entirely, allowing the server to apply its own level.
189+
boolean useClientDurability = this.dg.ws.durability != null
190+
&& this.dg.ws.durability != DurabilityLevel.NONE;
191+
192+
UpsertOptions upsertOpts = UpsertOptions.upsertOptions()
183193
.timeout(this.dg.ws.timeout)
184-
.durability(this.dg.ws.durability)
185194
.retryStrategy(this.dg.ws.retryStrategy);
186-
expiryOptions = InsertOptions.insertOptions()
195+
if (useClientDurability) upsertOpts = upsertOpts.durability(this.dg.ws.durability);
196+
upsertOptions = upsertOpts;
197+
198+
InsertOptions expiryOpts = InsertOptions.insertOptions()
187199
.timeout(this.dg.ws.timeout)
188-
.durability(this.dg.ws.durability)
189200
.expiry(this.dg.ws.getDuration(this.exp, this.exp_unit))
190201
.retryStrategy(this.dg.ws.retryStrategy);
191-
setOptions = InsertOptions.insertOptions()
202+
if (useClientDurability) expiryOpts = expiryOpts.durability(this.dg.ws.durability);
203+
expiryOptions = expiryOpts;
204+
205+
InsertOptions setOpts = InsertOptions.insertOptions()
192206
.timeout(this.dg.ws.timeout)
193-
.durability(this.dg.ws.durability)
194207
.retryStrategy(this.dg.ws.retryStrategy);
195-
removeOptions = RemoveOptions.removeOptions()
208+
if (useClientDurability) setOpts = setOpts.durability(this.dg.ws.durability);
209+
setOptions = setOpts;
210+
211+
RemoveOptions removeOpts = RemoveOptions.removeOptions()
196212
.timeout(this.dg.ws.timeout)
197-
.durability(this.dg.ws.durability)
198213
.retryStrategy(this.dg.ws.retryStrategy);
214+
if (useClientDurability) removeOpts = removeOpts.durability(this.dg.ws.durability);
215+
removeOptions = removeOpts;
216+
199217
getOptions = GetOptions.getOptions()
200218
.timeout(this.dg.ws.timeout)
201219
.retryStrategy(this.dg.ws.retryStrategy);
202-
mutateInOptions = MutateInOptions.mutateInOptions()
220+
221+
MutateInOptions mutateOpts = MutateInOptions.mutateInOptions()
203222
.expiry(this.dg.ws.getDuration(this.exp, this.exp_unit))
204223
.timeout(this.dg.ws.timeout)
205-
.durability(this.dg.ws.durability)
206224
.retryStrategy(this.dg.ws.retryStrategy);
225+
if (useClientDurability) mutateOpts = mutateOpts.durability(this.dg.ws.durability);
226+
mutateInOptions = mutateOpts;
207227
lookupInOptions = LookupInOptions.lookupInOptions();
208228

209229
if(dg.ws.expiry == 0) {
@@ -454,9 +474,15 @@ else if(ops < dg.ws.ops/dg.ws.workers && flag) {
454474
@Override
455475
public void run() {
456476
if (this.sdkClientPool != null) {
457-
while (this.sdk == null) {
458-
this.sdk = this.sdkClientPool.get_client_for_bucket(
459-
this.bucket_name, this.scope, this.collection);
477+
int retries = 0;
478+
while (this.sdk == null && retries < 30) {
479+
try {
480+
this.sdk = this.sdkClientPool.get_client_for_bucket(
481+
this.bucket_name, this.scope, this.collection);
482+
} catch (Exception e) {
483+
logger.error("Error getting SDK client from pool for bucket "
484+
+ this.bucket_name + ": " + e.getMessage());
485+
}
460486
if (this.sdk == null) {
461487
try {
462488
TimeUnit.SECONDS.sleep(1);
@@ -466,14 +492,25 @@ public void run() {
466492
this.result = false;
467493
return;
468494
}
495+
retries++;
469496
}
470497
}
498+
if (this.sdk == null) {
499+
logger.error("Failed to acquire SDK client for bucket "
500+
+ this.bucket_name + " after " + retries + " retries");
501+
this.result = false;
502+
return;
503+
}
471504
}
472505
try {
473506
this.actual_run();
474507
}
508+
catch (Exception e) {
509+
logger.error("Unhandled exception in task " + this.taskName + ": " + e.getMessage(), e);
510+
this.result = false;
511+
}
475512
finally{
476-
if (this.sdkClientPool != null)
513+
if (this.sdkClientPool != null && this.sdk != null)
477514
this.sdkClientPool.release_client(this.sdk);
478515
}
479516
}

src/main/java/couchbase/sdk/SDKClient.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,14 @@
11
package couchbase.sdk;
22

33
import java.time.Duration;
4-
import java.util.Properties;
54

65
import org.apache.log4j.LogManager;
76
import org.apache.log4j.Logger;
87

9-
import com.couchbase.client.core.deps.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
10-
import com.couchbase.client.core.env.IoConfig;
11-
import com.couchbase.client.core.env.SecurityConfig;
12-
import com.couchbase.client.core.env.TimeoutConfig;
138
import com.couchbase.client.core.error.AuthenticationFailureException;
149
import com.couchbase.client.java.Bucket;
1510
import com.couchbase.client.java.Cluster;
16-
import com.couchbase.client.java.ClusterOptions;
1711
import com.couchbase.client.java.Collection;
18-
import com.couchbase.client.java.env.ClusterEnvironment;
1912

2013
public class SDKClient {
2114
static Logger logger = LogManager.getLogger(SDKClient.class);
@@ -30,19 +23,6 @@ public class SDKClient {
3023

3124
public Collection connection;
3225

33-
public static ClusterEnvironment env1 = ClusterEnvironment.builder()
34-
.timeoutConfig(TimeoutConfig.builder().kvTimeout(Duration.ofSeconds(10)))
35-
.securityConfig(SecurityConfig.enableTls(true)
36-
.trustManagerFactory(InsecureTrustManagerFactory.INSTANCE))
37-
.ioConfig(IoConfig.enableDnsSrv(true))
38-
.ioConfig(IoConfig.numKvConnections(5))
39-
.build();
40-
41-
public static ClusterEnvironment env2 = ClusterEnvironment.builder()
42-
.timeoutConfig(TimeoutConfig.builder().kvTimeout(Duration.ofSeconds(10)))
43-
.ioConfig(IoConfig.enableDnsSrv(true)).ioConfig(IoConfig.numKvConnections(5))
44-
.build();
45-
4626
public SDKClient(Server master, String bucket, String scope, String collection) {
4727
super();
4828
this.master = master;

src/main/java/couchbase/sdk/SharedClusterManager.java

Lines changed: 69 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -28,26 +28,30 @@ public class SharedClusterManager {
2828
// Increased from default 5 to 500 to support 5,000 collections loading in parallel
2929
private static final int DEFAULT_KV_CONNECTIONS = 5;
3030

31-
// Shared ClusterEnvironment with optimized connection settings
31+
// Shared ClusterEnvironment for TLS connections
3232
private static ClusterEnvironment sharedEnvironment;
3333

34-
// Track whether environment has been shutdown
35-
private static volatile boolean environmentShutdown = false;
34+
// Shared ClusterEnvironment for non-TLS connections
35+
private static ClusterEnvironment sharedNonTLSEnvironment;
36+
37+
// Track whether environments have been shutdown
38+
private static volatile boolean tlsEnvironmentShutdown = false;
39+
private static volatile boolean nonTlsEnvironmentShutdown = false;
3640

3741
private static final Object environmentLock = new Object();
3842

3943
// Store cluster instances per server connection string
4044
private static ConcurrentHashMap<String, ClusterWrapper> clusterMap = new ConcurrentHashMap<>();
4145

42-
// Initialize the shared environment (lazy initialization with recreation)
46+
// Initialize the shared TLS environment (lazy initialization with recreation)
4347
private static void initializeSharedEnvironment() {
44-
if (sharedEnvironment == null || environmentShutdown) {
48+
if (sharedEnvironment == null || tlsEnvironmentShutdown) {
4549
synchronized (environmentLock) {
4650
// Double-check under lock
47-
if (sharedEnvironment == null || environmentShutdown) {
51+
if (sharedEnvironment == null || tlsEnvironmentShutdown) {
4852
try {
49-
if (sharedEnvironment != null && environmentShutdown) {
50-
logger.info("Shared Cluster Environment was shutdown, recreating");
53+
if (sharedEnvironment != null && tlsEnvironmentShutdown) {
54+
logger.info("Shared TLS Cluster Environment was shutdown, recreating");
5155
}
5256

5357
sharedEnvironment = ClusterEnvironment.builder()
@@ -57,10 +61,36 @@ private static void initializeSharedEnvironment() {
5761
.ioConfig(IoConfig.enableDnsSrv(true))
5862
.ioConfig(IoConfig.numKvConnections(DEFAULT_KV_CONNECTIONS))
5963
.build();
60-
environmentShutdown = false;
61-
logger.info("Shared Cluster Environment initialized with " + DEFAULT_KV_CONNECTIONS + " KV connections for massively parallel collection loads");
64+
tlsEnvironmentShutdown = false;
65+
logger.info("Shared TLS Cluster Environment initialized with " + DEFAULT_KV_CONNECTIONS + " KV connections");
6266
} catch (Exception e) {
63-
logger.error("Failed to initialize shared Cluster Environment", e);
67+
logger.error("Failed to initialize shared TLS Cluster Environment", e);
68+
}
69+
}
70+
}
71+
}
72+
}
73+
74+
// Initialize the shared non-TLS environment (lazy initialization with recreation)
75+
private static void initializeSharedNonTLSEnvironment() {
76+
if (sharedNonTLSEnvironment == null || nonTlsEnvironmentShutdown) {
77+
synchronized (environmentLock) {
78+
// Double-check under lock
79+
if (sharedNonTLSEnvironment == null || nonTlsEnvironmentShutdown) {
80+
try {
81+
if (sharedNonTLSEnvironment != null && nonTlsEnvironmentShutdown) {
82+
logger.info("Shared non-TLS Cluster Environment was shutdown, recreating");
83+
}
84+
85+
sharedNonTLSEnvironment = ClusterEnvironment.builder()
86+
.timeoutConfig(TimeoutConfig.builder().kvTimeout(Duration.ofSeconds(10)))
87+
.ioConfig(IoConfig.enableDnsSrv(true))
88+
.ioConfig(IoConfig.numKvConnections(DEFAULT_KV_CONNECTIONS))
89+
.build();
90+
nonTlsEnvironmentShutdown = false;
91+
logger.info("Shared non-TLS Cluster Environment initialized with " + DEFAULT_KV_CONNECTIONS + " KV connections");
92+
} catch (Exception e) {
93+
logger.error("Failed to initialize shared non-TLS Cluster Environment", e);
6494
}
6595
}
6696
}
@@ -110,33 +140,54 @@ public static synchronized void releaseCluster(Server server) {
110140
}
111141

112142
/**
113-
* Shutdown all cluster instances and the shared environment
143+
* Shutdown all cluster instances and the shared environments
114144
*/
115145
public static synchronized void shutdownAll() {
116146
logger.info("Shutting down all shared Cluster instances");
117147
for (ClusterWrapper wrapper : clusterMap.values()) {
118148
if (wrapper.cluster != null) {
119-
wrapper.cluster.disconnect();
149+
try {
150+
wrapper.cluster.disconnect();
151+
} catch (Exception e) {
152+
logger.warn("Error disconnecting cluster: " + e.getMessage());
153+
}
120154
}
121155
}
122156
clusterMap.clear();
123157

124-
if (sharedEnvironment != null) {
125-
sharedEnvironment.shutdown();
126-
environmentShutdown = true;
127-
logger.info("Shared Cluster Environment shutdown complete");
158+
if (sharedEnvironment != null && !tlsEnvironmentShutdown) {
159+
try {
160+
sharedEnvironment.shutdown();
161+
} catch (Exception e) {
162+
logger.warn("Error shutting down TLS environment: " + e.getMessage());
163+
}
164+
tlsEnvironmentShutdown = true;
165+
logger.info("Shared TLS Cluster Environment shutdown complete");
166+
}
167+
168+
if (sharedNonTLSEnvironment != null && !nonTlsEnvironmentShutdown) {
169+
try {
170+
sharedNonTLSEnvironment.shutdown();
171+
} catch (Exception e) {
172+
logger.warn("Error shutting down non-TLS environment: " + e.getMessage());
173+
}
174+
nonTlsEnvironmentShutdown = true;
175+
logger.info("Shared non-TLS Cluster Environment shutdown complete");
128176
}
129177
}
130178

131179
private static Cluster createCluster(Server server) throws AuthenticationFailureException {
180+
initializeSharedEnvironment();
181+
initializeSharedNonTLSEnvironment();
182+
132183
ClusterOptions clusterOptions;
133184
try {
134185
if (server.memcached_port.equals("11207")) {
135186
clusterOptions = ClusterOptions.clusterOptions(server.rest_username, server.rest_password)
136187
.environment(sharedEnvironment);
137188
} else {
138189
clusterOptions = ClusterOptions.clusterOptions(server.rest_username, server.rest_password)
139-
.environment(createNonTLSEnvironment());
190+
.environment(sharedNonTLSEnvironment);
140191
}
141192

142193
Cluster cluster = Cluster.connect(server.ip, clusterOptions);
@@ -152,14 +203,6 @@ private static Cluster createCluster(Server server) throws AuthenticationFailure
152203
}
153204
}
154205

155-
private static ClusterEnvironment createNonTLSEnvironment() {
156-
return ClusterEnvironment.builder()
157-
.timeoutConfig(TimeoutConfig.builder().kvTimeout(Duration.ofSeconds(10)))
158-
.ioConfig(IoConfig.enableDnsSrv(true))
159-
.ioConfig(IoConfig.numKvConnections(DEFAULT_KV_CONNECTIONS))
160-
.build();
161-
}
162-
163206
private static String getClusterKey(Server server) {
164207
return server.ip + ":" + server.memcached_port;
165208
}

0 commit comments

Comments
 (0)