Skip to content

Commit 22e255b

Browse files
committed
More improvements added.
1 parent 18becad commit 22e255b

4 files changed

Lines changed: 17 additions & 12 deletions

File tree

src/main/java/Loader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public static void main(String[] args) throws IOException {
145145
Option validate = new Option("validate", "validate", true, "Validate Data during Reads");
146146
options.addOption(validate);
147147

148-
Option gtm = new Option("gtm", "gtm", true, "Go for max doc ops");
148+
Option gtm = new Option("gtm", "gtm", true, "Go for max doc ops (disables rate limiting)");
149149
options.addOption(gtm);
150150

151151
Option deleted = new Option("deleted", "deleted", true, "To verify deleted docs");

src/main/java/couchbase/loadgen/WorkLoadGenerate.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -388,10 +388,14 @@ else if(ops < dg.ws.ops/dg.ws.workers && flag) {
388388
ops = 0;
389389
Instant end = Instant.now();
390390
timeElapsed = Duration.between(start, end);
391-
if(!this.dg.ws.gtm && timeElapsed.toMillis() < 1000)
391+
// Smarter throttling: only sleep if significantly under time limit, and reduce sleep duration
392+
if(!this.dg.ws.gtm && timeElapsed.toMillis() < 900)
392393
try {
393-
long i = (long) ((1000-timeElapsed.toMillis()));
394-
TimeUnit.MILLISECONDS.sleep(i);
394+
// Reduce sleep to 70% of needed time to allow burst periods
395+
long sleepTime = (long) ((900-timeElapsed.toMillis()) * 0.7);
396+
if(sleepTime > 10) { // Only sleep if > 10ms
397+
TimeUnit.MILLISECONDS.sleep(sleepTime);
398+
}
395399
} catch (InterruptedException e) {
396400
e.printStackTrace();
397401
}

src/main/java/couchbase/sdk/DocOps.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public List<Result> bulkInsert(Collection collection, List<Tuple2<String, Object
3838

3939
// Emit error Results as part of the stream and collect at the end
4040
// This is thread-safe and avoids synchronization overhead
41-
int concurrency = Math.min(documents.size(), 1000);
41+
int concurrency = Math.min(documents.size(), 5000);
4242
return Flux.fromIterable(documents)
4343
.flatMap(documentToInsert -> {
4444
String k = documentToInsert.getT1();
@@ -58,7 +58,7 @@ public List<Result> bulkUpsert(Collection collection, List<Tuple2<String, Object
5858

5959
// Emit error Results as part of the stream and collect at the end
6060
// This is thread-safe and avoids synchronization overhead
61-
int concurrency = Math.min(documents.size(), 1000);
61+
int concurrency = Math.min(documents.size(), 5000);
6262
return Flux.fromIterable(documents)
6363
.flatMap(documentToInsert -> {
6464
String k = documentToInsert.getT1();
@@ -74,7 +74,7 @@ public List<Result> bulkUpsert(Collection collection, List<Tuple2<String, Object
7474

7575
public List<Tuple2<String, Object>> bulkGets(Collection collection, List<Tuple2<String, Object>> documents, GetOptions getOptions) {
7676
final ReactiveCollection reactiveCollection = collection.reactive();
77-
int concurrency = Math.min(documents.size(), 1000);
77+
int concurrency = Math.min(documents.size(), 5000);
7878
List<Tuple2<String, Object>> returnValue = Flux.fromIterable(documents)
7979
.flatMap(new Function<Tuple2<String, Object>, Publisher<Tuple2<String, Object>>>() {
8080
public Publisher<Tuple2<String, Object>> apply(Tuple2<String, Object> documentToInsert) {
@@ -105,7 +105,7 @@ public List<Result> bulkDelete(Collection collection, List<String> keys, RemoveO
105105

106106
// Emit error Results as part of the stream and collect at the end
107107
// This is thread-safe and avoids synchronization overhead
108-
int concurrency = Math.min(keys.size(), 1000);
108+
int concurrency = Math.min(keys.size(), 5000);
109109
return Flux.fromIterable(keys)
110110
.flatMap(key -> {
111111
return reactiveCollection.remove(key, removeOptions)
@@ -119,7 +119,7 @@ public List<Result> bulkDelete(Collection collection, List<String> keys, RemoveO
119119
public List<ConcurrentHashMap<String, Object>> bulkReplace(Collection collection, List<Tuple2<String, Object>> documents,
120120
ReplaceOptions replaceOptions) {
121121
final ReactiveCollection reactiveCollection = collection.reactive();
122-
int concurrency = Math.min(documents.size(), 1000);
122+
int concurrency = Math.min(documents.size(), 5000);
123123
List<ConcurrentHashMap<String, Object>> returnValue = Flux.fromIterable(documents)
124124
.flatMap(new Function<Tuple2<String, Object>, Publisher<ConcurrentHashMap<String, Object>>>() {
125125
public Publisher<ConcurrentHashMap<String, Object>> apply(Tuple2<String, Object> documentToInsert) {
@@ -153,7 +153,7 @@ public Mono<ConcurrentHashMap<String, Object>> apply(Throwable error) {
153153
public List<ConcurrentHashMap<String, Object>> bulkTouch(Collection collection, List<String> keys, final int exp,
154154
TouchOptions touchOptions, Duration exp_duration) {
155155
final ReactiveCollection reactiveCollection = collection.reactive();
156-
int concurrency = Math.min(keys.size(), 1000);
156+
int concurrency = Math.min(keys.size(), 5000);
157157
List<ConcurrentHashMap<String, Object>> returnValue = Flux.fromIterable(keys)
158158
.flatMap(new Function<String, Publisher<ConcurrentHashMap<String, Object>>>() {
159159
public Publisher<ConcurrentHashMap<String, Object>> apply(String key){

src/main/java/couchbase/sdk/SDKClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,13 @@ public class SDKClient {
3535
.securityConfig(SecurityConfig.enableTls(true)
3636
.trustManagerFactory(InsecureTrustManagerFactory.INSTANCE))
3737
.ioConfig(IoConfig.enableDnsSrv(true))
38-
.ioConfig(IoConfig.numKvConnections(200))
38+
.ioConfig(IoConfig.numKvConnections(500))
3939
.build();
4040

4141
public static ClusterEnvironment env2 = ClusterEnvironment.builder()
4242
.timeoutConfig(TimeoutConfig.builder().kvTimeout(Duration.ofSeconds(10)))
43-
.ioConfig(IoConfig.enableDnsSrv(true)).ioConfig(IoConfig.numKvConnections(200))
43+
.ioConfig(IoConfig.enableDnsSrv(true))
44+
.ioConfig(IoConfig.numKvConnections(500))
4445
.build();
4546

4647
public SDKClient(Server master, String bucket, String scope, String collection) {

0 commit comments

Comments
 (0)