Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions src/main/java/Loader.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import couchbase.loadgen.WorkLoadGenerate;
import couchbase.sdk.SDKClient;
import couchbase.sdk.Server;
import couchbase.sdk.SDKClientPool;
import elasticsearch.EsClient;
import utils.docgen.DRConstants;
import utils.docgen.DocRange;
Expand Down Expand Up @@ -145,7 +146,7 @@ public static void main(String[] args) throws IOException {
Option validate = new Option("validate", "validate", true, "Validate Data during Reads");
options.addOption(validate);

Option gtm = new Option("gtm", "gtm", true, "Go for max doc ops");
Option gtm = new Option("gtm", "gtm", true, "Go for max doc ops (disables rate limiting)");
options.addOption(gtm);

Option deleted = new Option("deleted", "deleted", true, "To verify deleted docs");
Expand Down Expand Up @@ -290,8 +291,10 @@ public static void main(String[] args) throws IOException {
} catch (ClassNotFoundException e1) {
e1.printStackTrace();
}
SDKClient client = new SDKClient(master, cmd.getOptionValue("bucket"), cmd.getOptionValue("scope", "_default"),
cmd.getOptionValue("collection", "_default"));
// Create SDK client pool with 5 client instances for connection isolation
// 5 pools × 200 connections per client = 1,000 total KV connections
// Each pool handles 6-7 workers (32 workers / 5 pools)
SDKClientPool clientPool = new SDKClientPool();
Comment on lines +294 to +297
EsClient esClient = null;
if (ws.elastic) {
if (cmd.getOptionValue(esAPIKey.getOpt()) != null){
Expand All @@ -303,29 +306,40 @@ public static void main(String[] args) throws IOException {
}

try {
client.initialiseSDK();
// Create 5 client instances in the pool
int numClients = 5;
clientPool.create_clients(cmd.getOptionValue("bucket"), master, numClients);
logger.info("Created SDK Client Pool with " + numClients + " client instances");
} catch (Exception e) {
e.printStackTrace();
logger.error("Failed to create SDK Client Pool", e);
System.exit(1);
}
for (int i = 0; i < ws.workers; i++) {
try {
String th_name = "Loader" + i;
boolean _trackFailures = Boolean.parseBoolean(cmd.getOptionValue("trackFailures", "false"));
if (Integer.parseInt(cmd.getOptionValue("retry", "0")) > 0)
_trackFailures = true;
tm.submit(new WorkLoadGenerate(th_name, dg, client, esClient, cmd.getOptionValue("durability", "NONE"),
WorkLoadGenerate wlg = new WorkLoadGenerate(th_name, dg, clientPool, esClient, cmd.getOptionValue("durability", "NONE"),
Integer.parseInt(cmd.getOptionValue("maxTTL", "0")),
cmd.getOptionValue("maxTTLUnit", "seconds"), _trackFailures,
Integer.parseInt(cmd.getOptionValue("retry", "0")), null));
TimeUnit.MILLISECONDS.sleep(500);
Integer.parseInt(cmd.getOptionValue("retry", "0")), null);
wlg.set_collection_for_load(
cmd.getOptionValue("bucket"),
cmd.getOptionValue("scope", "_default"),
cmd.getOptionValue("collection", "_default"));
Comment on lines +324 to +331
// Reduced startup delay from 500ms to 50ms for faster ramp-up
// With 32 workers: 15.5s → 1.55s startup time
tm.submit(wlg);
TimeUnit.MILLISECONDS.sleep(50);
} catch (Exception e) {
e.printStackTrace();
}
}
tm.getAllTaskResult();
tm.shutdown();
client.disconnectCluster();
client.shutdownEnv();
clientPool.shutdown();
Comment on lines 340 to +342
if (ws.elastic && esClient != null) {
try {
esClient.restClient.close();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/SIFTLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public static void main(String[] args) throws IOException {
String cb = cmd.getOptionValue(skipCB.getOpt(), "false");
if (!Boolean.parseBoolean(cb)) {
try {
clientPool.create_clients(cmd.getOptionValue("bucket"), master, 2);
clientPool.create_clients(cmd.getOptionValue("bucket"), master, 32);
} catch (Exception e) {
Comment on lines 224 to 228
// TODO Auto-generated catch block
e.printStackTrace();
Expand Down
10 changes: 7 additions & 3 deletions src/main/java/couchbase/loadgen/WorkLoadGenerate.java
Original file line number Diff line number Diff line change
Expand Up @@ -388,10 +388,14 @@ else if(ops < dg.ws.ops/dg.ws.workers && flag) {
ops = 0;
Instant end = Instant.now();
timeElapsed = Duration.between(start, end);
if(!this.dg.ws.gtm && timeElapsed.toMillis() < 1000)
// Smarter throttling: only sleep if significantly under time limit, and reduce sleep duration
if(!this.dg.ws.gtm && timeElapsed.toMillis() < 900)
try {
long i = (long) ((1000-timeElapsed.toMillis()));
TimeUnit.MILLISECONDS.sleep(i);
// Reduce sleep to 70% of needed time to allow burst periods
long sleepTime = (long) ((900-timeElapsed.toMillis()) * 0.7);
if(sleepTime > 10) { // Only sleep if > 10ms
TimeUnit.MILLISECONDS.sleep(sleepTime);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
Expand Down
31 changes: 24 additions & 7 deletions src/main/java/couchbase/sdk/DocOps.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.couchbase.client.java.kv.ReplaceOptions;
import com.couchbase.client.java.kv.TouchOptions;
import com.couchbase.client.java.kv.UpsertOptions;
import com.couchbase.client.java.json.JsonObject;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -37,6 +38,8 @@ public List<Result> bulkInsert(Collection collection, List<Tuple2<String, Object

// Emit error Results as part of the stream and collect at the end
// This is thread-safe and avoids synchronization overhead
// Now 4000 since each worker has its own connection pool
int concurrency = Math.min(documents.size(), 4000);
return Flux.fromIterable(documents)
.flatMap(documentToInsert -> {
String k = documentToInsert.getT1();
Expand All @@ -45,7 +48,7 @@ public List<Result> bulkInsert(Collection collection, List<Tuple2<String, Object
return reactiveCollection.insert(k, v, insertOptions)
.then(Mono.<Result>empty())
.onErrorResume(error -> Mono.just(new Result(k, v, error, false)));
})
}, concurrency)
.collectList()
.block();
}
Expand All @@ -56,6 +59,8 @@ public List<Result> bulkUpsert(Collection collection, List<Tuple2<String, Object

// Emit error Results as part of the stream and collect at the end
// This is thread-safe and avoids synchronization overhead
// Now 4000 since each worker has its own connection pool
int concurrency = Math.min(documents.size(), 4000);
return Flux.fromIterable(documents)
.flatMap(documentToInsert -> {
String k = documentToInsert.getT1();
Expand All @@ -64,29 +69,37 @@ public List<Result> bulkUpsert(Collection collection, List<Tuple2<String, Object
return reactiveCollection.upsert(k, v, upsertOptions)
.then(Mono.<Result>empty())
.onErrorResume(error -> Mono.just(new Result(k, v, error, false)));
})
}, concurrency)
.collectList()
.block();
}

public List<Tuple2<String, Object>> bulkGets(Collection collection, List<Tuple2<String, Object>> documents, GetOptions getOptions) {
final ReactiveCollection reactiveCollection = collection.reactive();
// Now 4000 since each worker has its own connection pool
int concurrency = Math.min(documents.size(), 4000);
List<Tuple2<String, Object>> returnValue = Flux.fromIterable(documents)
.flatMap(new Function<Tuple2<String, Object>, Publisher<Tuple2<String, Object>>>() {
public Publisher<Tuple2<String, Object>> apply(Tuple2<String, Object> documentToInsert) {
final String id = documentToInsert.getT1();
return reactiveCollection.get(id, getOptions)
.map(new Function<GetResult, Tuple2<String, Object>>() {
public Tuple2<String, Object> apply(GetResult result) {
return Tuples.of(id, result.contentAs(Person.class));
Object content = null;
try {
content = result.contentAs(JsonObject.class);
} catch (Exception e) {
throw new RuntimeException(e);
}
Comment on lines +88 to +93
return Tuples.of(id, content);
}
}).onErrorResume(new Function<Throwable, Mono<Tuple2<String, Object>>>() {
public Mono<Tuple2<String, Object>> apply(Throwable error) {
return Mono.just(Tuples.of(id, error.getClass().getName()));
}
});
}
}).collectList().block();
}, concurrency).collectList().block();
return returnValue;
}

Expand All @@ -95,19 +108,22 @@ public List<Result> bulkDelete(Collection collection, List<String> keys, RemoveO

// Emit error Results as part of the stream and collect at the end
// This is thread-safe and avoids synchronization overhead
// Now 4000 since each worker has its own connection pool
int concurrency = Math.min(keys.size(), 4000);
return Flux.fromIterable(keys)
.flatMap(key -> {
return reactiveCollection.remove(key, removeOptions)
.then(Mono.<Result>empty())
.onErrorResume(error -> Mono.just(new Result(key, null, error, false)));
})
}, concurrency)
.collectList()
.block();
}

public List<ConcurrentHashMap<String, Object>> bulkReplace(Collection collection, List<Tuple2<String, Object>> documents,
ReplaceOptions replaceOptions) {
final ReactiveCollection reactiveCollection = collection.reactive();
int concurrency = Math.min(documents.size(), 2000);
List<ConcurrentHashMap<String, Object>> returnValue = Flux.fromIterable(documents)
.flatMap(new Function<Tuple2<String, Object>, Publisher<ConcurrentHashMap<String, Object>>>() {
public Publisher<ConcurrentHashMap<String, Object>> apply(Tuple2<String, Object> documentToInsert) {
Expand All @@ -134,13 +150,14 @@ public Mono<ConcurrentHashMap<String, Object>> apply(Throwable error) {
}
});
}
}).collectList().block();
}, concurrency).collectList().block();
return returnValue;
}

public List<ConcurrentHashMap<String, Object>> bulkTouch(Collection collection, List<String> keys, final int exp,
TouchOptions touchOptions, Duration exp_duration) {
final ReactiveCollection reactiveCollection = collection.reactive();
int concurrency = Math.min(keys.size(), 2000);
List<ConcurrentHashMap<String, Object>> returnValue = Flux.fromIterable(keys)
Comment on lines 157 to 161
.flatMap(new Function<String, Publisher<ConcurrentHashMap<String, Object>>>() {
public Publisher<ConcurrentHashMap<String, Object>> apply(String key){
Expand All @@ -163,7 +180,7 @@ public Mono<ConcurrentHashMap<String, Object>> apply(Throwable error) {
}
}).defaultIfEmpty(returnValue);
}
}).collectList().block();
}, concurrency).collectList().block();
return returnValue;
}

Expand Down
45 changes: 26 additions & 19 deletions src/main/java/couchbase/sdk/SDKClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,10 @@ public class SDKClient {

private Bucket bucketObj;
private Cluster cluster;
private ClusterEnvironment environment;

public Collection connection;

public static ClusterEnvironment env1 = ClusterEnvironment.builder()
.timeoutConfig(TimeoutConfig.builder().kvTimeout(Duration.ofSeconds(10)))
.securityConfig(SecurityConfig.enableTls(true)
.trustManagerFactory(InsecureTrustManagerFactory.INSTANCE))
.ioConfig(IoConfig.enableDnsSrv(true))
.ioConfig(IoConfig.numKvConnections(5))
.build();

public static ClusterEnvironment env2 = ClusterEnvironment.builder()
.timeoutConfig(TimeoutConfig.builder().kvTimeout(Duration.ofSeconds(10)))
.ioConfig(IoConfig.enableDnsSrv(true)).ioConfig(IoConfig.numKvConnections(5))
.build();

public SDKClient(Server master, String bucket, String scope, String collection) {
super();
this.master = master;
Expand Down Expand Up @@ -72,11 +60,28 @@ public void initialiseSDK() throws Exception {

public void connectCluster(){
try{
// Create per-instance environment for proper connection isolation
// Each SDKClient gets its own connection pool
ClusterOptions cluster_options;
if(this.master.memcached_port.equals("11207"))
cluster_options = ClusterOptions.clusterOptions(master.rest_username, master.rest_password).environment(env1);
else
cluster_options = ClusterOptions.clusterOptions(master.rest_username, master.rest_password).environment(env2);
if(this.master.memcached_port.equals("11207")) {
this.environment = ClusterEnvironment.builder()
.timeoutConfig(TimeoutConfig.builder().kvTimeout(Duration.ofSeconds(10)))
.securityConfig(SecurityConfig.enableTls(true)
.trustManagerFactory(InsecureTrustManagerFactory.INSTANCE))
.ioConfig(IoConfig.enableDnsSrv(true))
.ioConfig(IoConfig.numKvConnections(200)) // 200 per client instance
.ioConfig(IoConfig.configPollInterval(Duration.ofSeconds(10)))
.build();
cluster_options = ClusterOptions.clusterOptions(master.rest_username, master.rest_password).environment(this.environment);
} else {
this.environment = ClusterEnvironment.builder()
.timeoutConfig(TimeoutConfig.builder().kvTimeout(Duration.ofSeconds(10)))
.ioConfig(IoConfig.enableDnsSrv(true))
.ioConfig(IoConfig.numKvConnections(200)) // 200 per client instance
.ioConfig(IoConfig.configPollInterval(Duration.ofSeconds(10)))
.build();
cluster_options = ClusterOptions.clusterOptions(master.rest_username, master.rest_password).environment(this.environment);
Comment on lines +71 to +83
}
this.cluster = Cluster.connect(master.ip, cluster_options);
logger.info("Cluster connection is successful");
}
Expand All @@ -91,8 +96,10 @@ public void disconnectCluster(){
}

public void shutdownEnv() {
// Just close an environment
this.cluster.environment().shutdown();
// Close the per-instance environment
if (this.environment != null) {
this.environment.shutdown();
}
}

private void connectBucket(String bucket){
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/couchbase/sdk/SDKClientPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ public void create_clients(String bucket_name, Server server, int req_clients) t
for(int i=0; i<req_clients; i++) {
SDKClient tem_client = new SDKClient(server, bucket_name);
tem_client.initialiseSDK();
logger.info("Created SDK client #" + (i+1) + " with bucket: " + bucket_name);
((ArrayList)bucket_hm.get("idle_clients")).add(tem_client);
}
logger.info("SDK Client Pool initialized with " + req_clients + " clients for bucket: " + bucket_name);
}

public SDKClient get_client_for_bucket(String bucket_name, String scope, String collection) {
Expand Down
17 changes: 10 additions & 7 deletions src/main/java/utils/docgen/WorkLoadSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class WorkLoadSettings extends WorkLoadBase {
public String keyPrefix = "test_docs-";
public int workers = 10;
public int ops = 40000;
public int batchSize = ops/workers;
public int batchSize = Math.max(1000, (ops/workers) * 4);
public int keySize = 15;
public int docSize = 256;

Expand Down Expand Up @@ -72,7 +72,10 @@ public WorkLoadSettings(String keyPrefix,
this.expiry = e;
this.workers = workers;
this.ops = ops;
this.batchSize = this.ops/this.workers;
// Improved batch size calculation with reasonable upper bound
// Previous batchSize = Math.max(1000, (ops/workers) * 4) could be 125,000+ with high ops
// New formula caps at 10,000 to prevent memory issues while maintaining throughput
this.batchSize = Math.min(10000, Math.max(1000, (this.ops/this.workers) * 4));
Comment on lines +75 to +78
this.gtm = gtm;
this.expectDeleted = deleted;
this.validate = validate;
Expand Down Expand Up @@ -106,7 +109,7 @@ public WorkLoadSettings(String keyPrefix,
this.workers = workers;
this.ops = ops;

this.batchSize = this.ops/this.workers;
this.batchSize = Math.max(1000, (this.ops/this.workers) * 4);
this.gtm = gtm;
this.expectDeleted = deleted;
this.validate = validate;
Expand All @@ -133,7 +136,7 @@ public WorkLoadSettings(String keyPrefix,
this.expiry = e;
this.workers = workers;
this.ops = ops;
this.batchSize = this.ops/this.workers;
this.batchSize = Math.max(1000, (this.ops/this.workers) * 4);
this.gtm = gtm;
this.expectDeleted = deleted;
this.validate = validate;
Expand Down Expand Up @@ -201,7 +204,7 @@ public WorkLoadSettings(String keyPrefix,
this.is_subdoc_xattr = is_subdoc_xattr;
this.is_subdoc_sys_xattr = is_subdoc_sys_xattr;

this.batchSize = this.ops/this.workers;
this.batchSize = Math.max(1000, (this.ops/this.workers) * 4);
this.gtm = gtm;
this.expectDeleted = deleted;
this.validate = validate;
Expand Down Expand Up @@ -237,14 +240,14 @@ public WorkLoadSettings(String keyPrefix,
this.is_subdoc_xattr = is_subdoc_xattr;
this.is_subdoc_sys_xattr = is_subdoc_sys_xattr;

this.batchSize = this.ops/this.workers;
this.batchSize = Math.max(1000, (this.ops/this.workers) * 4);
this.gtm = gtm;
this.expectDeleted = deleted;
this.validate = validate;
this.mutated = mutated;
this.valueType = valueType;
this.keyType = keyType;

this.elastic = elastic;
this.model = model;
this.mockVector = mockVector;
Expand Down