Skip to content

Improvements to docloader ops rate per sec#40

Open
ritalrw wants to merge 6 commits into
mainfrom
test_perf
Open

Improvements to docloader ops rate per sec#40
ritalrw wants to merge 6 commits into
mainfrom
test_perf

Conversation

@ritalrw
Copy link
Copy Markdown
Collaborator

@ritalrw ritalrw commented Feb 19, 2026

No description provided.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR tunes DocLoader’s throughput controls by increasing per-batch work sizes, raising Couchbase SDK connection concurrency via pooled clients, and adjusting the per-second throttling behavior.

Changes:

  • Reworked batchSize computation to scale more aggressively (and capped it in one constructor).
  • Switched Loader execution to use an SDKClientPool and increased pool sizes (including SIFTLoader).
  • Increased Reactor flatMap concurrency in bulk KV operations and adjusted per-second throttling sleep logic.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
src/main/java/utils/docgen/WorkLoadSettings.java Adjusts batch sizing logic to increase throughput (with inconsistent caps across constructors).
src/main/java/SIFTLoader.java Increases Couchbase SDK client pool size from 2 to 32 for SIFT runs.
src/main/java/Loader.java Migrates from a single SDKClient to SDKClientPool and reduces worker startup delay.
src/main/java/couchbase/sdk/SDKClientPool.java Adds logging around client pool creation.
src/main/java/couchbase/sdk/SDKClient.java Moves to per-client ClusterEnvironment and increases KV connections per client.
src/main/java/couchbase/sdk/DocOps.java Adds explicit Reactor concurrency limits and switches GET decoding to JsonObject.
src/main/java/couchbase/loadgen/WorkLoadGenerate.java Tweaks rate-limit sleep behavior to be less strict and allow bursts.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +75 to +78
// Improved batch size calculation with reasonable upper bound
// Previous batchSize = Math.max(1000, (ops/workers) * 4) could be 125,000+ with high ops
// New formula caps at 10,000 to prevent memory issues while maintaining throughput
this.batchSize = Math.min(10000, Math.max(1000, (this.ops/this.workers) * 4));
Comment thread src/main/java/Loader.java
Comment on lines +294 to +297
// Create SDK client pool with 5 client instances for connection isolation
// 5 pools × 200 connections per client = 1,000 total KV connections
// Each pool handles 6-7 workers (32 workers / 5 pools)
SDKClientPool clientPool = new SDKClientPool();
Comment thread src/main/java/Loader.java
Comment on lines +324 to +331
WorkLoadGenerate wlg = new WorkLoadGenerate(th_name, dg, clientPool, esClient, cmd.getOptionValue("durability", "NONE"),
Integer.parseInt(cmd.getOptionValue("maxTTL", "0")),
cmd.getOptionValue("maxTTLUnit", "seconds"), _trackFailures,
Integer.parseInt(cmd.getOptionValue("retry", "0")), null));
TimeUnit.MILLISECONDS.sleep(500);
Integer.parseInt(cmd.getOptionValue("retry", "0")), null);
wlg.set_collection_for_load(
cmd.getOptionValue("bucket"),
cmd.getOptionValue("scope", "_default"),
cmd.getOptionValue("collection", "_default"));
Comment thread src/main/java/Loader.java
Comment on lines 340 to +342
tm.getAllTaskResult();
tm.shutdown();
client.disconnectCluster();
client.shutdownEnv();
clientPool.shutdown();
Comment on lines +71 to +83
.ioConfig(IoConfig.enableDnsSrv(true))
.ioConfig(IoConfig.numKvConnections(200)) // 200 per client instance
.ioConfig(IoConfig.configPollInterval(Duration.ofSeconds(10)))
.build();
cluster_options = ClusterOptions.clusterOptions(master.rest_username, master.rest_password).environment(this.environment);
} else {
this.environment = ClusterEnvironment.builder()
.timeoutConfig(TimeoutConfig.builder().kvTimeout(Duration.ofSeconds(10)))
.ioConfig(IoConfig.enableDnsSrv(true))
.ioConfig(IoConfig.numKvConnections(200)) // 200 per client instance
.ioConfig(IoConfig.configPollInterval(Duration.ofSeconds(10)))
.build();
cluster_options = ClusterOptions.clusterOptions(master.rest_username, master.rest_password).environment(this.environment);
Comment on lines 224 to 228
String cb = cmd.getOptionValue(skipCB.getOpt(), "false");
if (!Boolean.parseBoolean(cb)) {
try {
clientPool.create_clients(cmd.getOptionValue("bucket"), master, 2);
clientPool.create_clients(cmd.getOptionValue("bucket"), master, 32);
} catch (Exception e) {
Comment on lines +41 to 52
// Now 4000 since each worker has its own connection pool
int concurrency = Math.min(documents.size(), 4000);
return Flux.fromIterable(documents)
.flatMap(documentToInsert -> {
String k = documentToInsert.getT1();
Object v = documentToInsert.getT2();

return reactiveCollection.insert(k, v, insertOptions)
.then(Mono.<Result>empty())
.onErrorResume(error -> Mono.just(new Result(k, v, error, false)));
})
}, concurrency)
.collectList()
Comment on lines +88 to +93
Object content = null;
try {
content = result.contentAs(JsonObject.class);
} catch (Exception e) {
throw new RuntimeException(e);
}

public List<ConcurrentHashMap<String, Object>> bulkReplace(Collection collection, List<Tuple2<String, Object>> documents,
ReplaceOptions replaceOptions) {
final ReactiveCollection reactiveCollection = collection.reactive();
Comment on lines 157 to 161
public List<ConcurrentHashMap<String, Object>> bulkTouch(Collection collection, List<String> keys, final int exp,
TouchOptions touchOptions, Duration exp_duration) {
final ReactiveCollection reactiveCollection = collection.reactive();
int concurrency = Math.min(keys.size(), 2000);
List<ConcurrentHashMap<String, Object>> returnValue = Flux.fromIterable(keys)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants