2525import com .couchbase .client .java .kv .InsertOptions ;
2626import com .couchbase .client .java .kv .RemoveOptions ;
2727import com .couchbase .client .java .kv .UpsertOptions ;
28+ import com .couchbase .client .java .kv .LookupInSpec ;
29+ import com .couchbase .client .java .kv .LookupInOptions ;
30+ import com .couchbase .client .java .kv .MutateInOptions ;
31+ import com .couchbase .client .java .kv .MutateInSpec ;
2832
2933import couchbase .sdk .DocOps ;
3034import couchbase .sdk .Result ;
3135import couchbase .sdk .SDKClient ;
3236import couchbase .sdk .SDKClientPool ;
37+ import couchbase .sdk .SubDocOps ;
3338import elasticsearch .EsClient ;
3439import reactor .util .function .Tuple2 ;
3540import utils .docgen .DocumentGenerator ;
3641import utils .taskmanager .Task ;
3742
3843public class WorkLoadGenerate extends Task {
3944 DocumentGenerator dg ;
40- public SDKClient sdk = null ;
45+ public SDKClient sdk ;
4146 public DocOps docops ;
47+ public SubDocOps subDocOps ;
4248 public String durability ;
4349 public HashMap <String , List <Result >> failedMutations = new HashMap <String , List <Result >>();
4450 public boolean trackFailures = true ;
@@ -51,6 +57,8 @@ public class WorkLoadGenerate extends Task{
5157 public InsertOptions setOptions ;
5258 public RemoveOptions removeOptions ;
5359 public GetOptions getOptions ;
60+ public MutateInOptions mutateInOptions ;
61+ public LookupInOptions lookupInOptions ;
5462 public EsClient esClient = null ;
5563 private SDKClientPool sdkClientPool ;
5664 static Logger logger = LogManager .getLogger (WorkLoadGenerate .class );
@@ -59,10 +67,33 @@ public class WorkLoadGenerate extends Task{
5967 public String scope = "_default" ;
6068 public String collection = "_default" ;
6169
70+ private void update_subdoc_failed_mutation_result (
71+ String op_type ,
72+ HashMap <String , List <Result >> failed_mutations ,
73+ List <HashMap <String ,Object >> sd_results ) {
74+ if (!trackFailures )
75+ return ;
76+ List <Result > result_arr ;
77+ if (!failedMutations .containsKey (op_type )) {
78+ result_arr = new ArrayList <Result >();
79+ failedMutations .put (op_type , result_arr );
80+ } else {
81+ result_arr = failedMutations .get (op_type );
82+ }
83+
84+ for (HashMap <String , Object > sd_res : sd_results ) {
85+ result_arr .add (new Result ((String )sd_res .get ("id" ),
86+ sd_res .get ("value" ),
87+ (Throwable )sd_res .get ("error" ),
88+ (boolean )sd_res .get ("status" )));
89+ }
90+ }
91+
6292 public WorkLoadGenerate (String taskName , DocumentGenerator dg , SDKClient client , String durability ) {
6393 super (taskName );
6494 this .dg = dg ;
6595 this .docops = new DocOps ();
96+ this .subDocOps = new SubDocOps ();
6697 this .sdk = client ;
6798 this .durability = durability ;
6899 }
@@ -71,6 +102,7 @@ public WorkLoadGenerate(String taskName, DocumentGenerator dg, SDKClient client,
71102 super (taskName );
72103 this .dg = dg ;
73104 this .docops = new DocOps ();
105+ this .subDocOps = new SubDocOps ();
74106 this .sdk = client ;
75107 this .durability = durability ;
76108 this .trackFailures = trackFailures ;
@@ -84,6 +116,7 @@ public WorkLoadGenerate(String taskName, DocumentGenerator dg, SDKClient client,
84116 super (taskName );
85117 this .dg = dg ;
86118 this .docops = new DocOps ();
119+ this .subDocOps = new SubDocOps ();
87120 this .sdk = client ;
88121 this .durability = durability ;
89122 this .trackFailures = trackFailures ;
@@ -98,6 +131,7 @@ public WorkLoadGenerate(String taskName, DocumentGenerator dg, SDKClient client,
98131 super (taskName );
99132 this .dg = dg ;
100133 this .docops = new DocOps ();
134+ this .subDocOps = new SubDocOps ();
101135 this .sdk = client ;
102136 this .esClient = esClient ;
103137 this .durability = durability ;
@@ -113,6 +147,7 @@ public WorkLoadGenerate(String taskName, DocumentGenerator dg, SDKClientPool cli
113147 super (taskName );
114148 this .dg = dg ;
115149 this .docops = new DocOps ();
150+ this .subDocOps = new SubDocOps ();
116151 this .sdkClientPool = clientPool ;
117152 this .esClient = esClient ;
118153 this .durability = durability ;
@@ -123,14 +158,18 @@ public WorkLoadGenerate(String taskName, DocumentGenerator dg, SDKClientPool cli
123158 this .retryStrategy = retryStrategy ;
124159 }
125160
161+ public void stop_load () {
162+ this .stop_load = true ;
163+ }
164+
126165 public void set_collection_for_load (String bucket_name , String scope , String collection ) {
127166 this .bucket_name = bucket_name ;
128167 this .scope = scope ;
129168 this .collection = collection ;
130169 }
131170
132- @ Override
133- public void run () {
171+ public void actual_run () {
172+ this . result = true ;
134173 logger .info ("Starting " + this .taskName );
135174 // Set timeout in WorkLoadSettings
136175 this .dg .ws .setTimeoutDuration (60 , "seconds" );
@@ -158,12 +197,24 @@ public void run() {
158197 getOptions = GetOptions .getOptions ()
159198 .timeout (this .dg .ws .timeout )
160199 .retryStrategy (this .dg .ws .retryStrategy );
200+ mutateInOptions = MutateInOptions .mutateInOptions ()
201+ .expiry (this .dg .ws .getDuration (this .exp , this .exp_unit ))
202+ .timeout (this .dg .ws .timeout )
203+ .durability (this .dg .ws .durability )
204+ .retryStrategy (this .dg .ws .retryStrategy );
205+ lookupInOptions = LookupInOptions .lookupInOptions ();
206+
207+ if (dg .ws .expiry == 0 ) {
208+ // If expiry load is not set and we have exp value set,
209+ // then apply it for inserts and upserts
210+ setOptions = setOptions .expiry (this .dg .ws .getDuration (this .exp , this .exp_unit ));
211+ upsertOptions = upsertOptions .expiry (this .dg .ws .getDuration (this .exp , this .exp_unit ));
212+ }
213+
161214 int ops = 0 ;
162215 boolean flag = false ;
163216 Instant trackFailureTime_start = Instant .now ();
164- while (this .stop_load == false ) {
165- if (this .sdkClientPool != null )
166- this .sdk = this .sdkClientPool .get_client_for_bucket (this .bucket_name , this .scope , this .collection );
217+ while (! this .stop_load ) {
167218 Instant trackFailureTime_end = Instant .now ();
168219 Duration timeElapsed = Duration .between (trackFailureTime_start , trackFailureTime_end );
169220 if (timeElapsed .toMinutes () > 5 ) {
@@ -187,6 +238,7 @@ public void run() {
187238 result = docops .bulkInsert (this .sdk .connection , docs , setOptions );
188239 ops += dg .ws .batchSize *dg .ws .creates /100 ;
189240 if (trackFailures && result .size ()>0 )
241+ this .result = false ;
190242 try {
191243 failedMutations .get ("create" ).addAll (result );
192244 } catch (Exception e ) {
@@ -206,6 +258,7 @@ public void run() {
206258 result = docops .bulkUpsert (this .sdk .connection , docs , upsertOptions );
207259 ops += dg .ws .batchSize *dg .ws .updates /100 ;
208260 if (trackFailures && result .size ()>0 )
261+ this .result = false ;
209262 try {
210263 failedMutations .get ("update" ).addAll (result );
211264 } catch (Exception e ) {
@@ -222,6 +275,7 @@ public void run() {
222275 result = docops .bulkInsert (this .sdk .connection , docs , expiryOptions );
223276 ops += dg .ws .batchSize *dg .ws .expiry /100 ;
224277 if (trackFailures && result .size ()>0 )
278+ this .result = false ;
225279 try {
226280 failedMutations .get ("expiry" ).addAll (result );
227281 } catch (Exception e ) {
@@ -241,6 +295,7 @@ public void run() {
241295 }
242296 ops += dg .ws .batchSize *dg .ws .deletes /100 ;
243297 if (trackFailures && result .size ()>0 )
298+ this .result = false ;
244299 try {
245300 failedMutations .get ("delete" ).addAll (result );
246301 } catch (Exception e ) {
@@ -267,15 +322,13 @@ public void run() {
267322 System .out .println ("Validation failed for key: " + this .sdk .scope + ":" + this .sdk .collection + ":" + name );
268323 System .out .println ("Actual Value - " + a );
269324 System .out .println ("Expected Value - " + b );
270- this .sdk .disconnectCluster ();
271325 System .out .println (this .taskName + " is completed!" );
272326 return ;
273327 }
274328 } else if (!a .equals (b ) && !a .contains ("TimeoutException" )){
275329 System .out .println ("Validation failed for key: " + this .sdk .scope + ":" + this .sdk .collection + ":" + name );
276330 System .out .println ("Actual Value - " + a );
277331 System .out .println ("Expected Value - " + b );
278- this .sdk .disconnectCluster ();
279332 System .out .println (this .taskName + " is completed!" );
280333 return ;
281334 }
@@ -287,6 +340,41 @@ public void run() {
287340 ops += dg .ws .batchSize *dg .ws .reads /100 ;
288341 }
289342 }
343+ if (dg .ws .subdocs > 0 ) {
344+ List <Tuple2 <String ,List <MutateInSpec >>> docs ;
345+
346+ docs = dg .nextSubDocBatch ("insert" );
347+ if (docs .size ()>0 ) {
348+ flag = true ;
349+ List <HashMap <String ,Object >> result = subDocOps .bulkSubDocOperation (this .sdk .connection , docs , mutateInOptions );
350+ ops += dg .ws .batchSize *dg .ws .subdocs /100 ;
351+ this .update_subdoc_failed_mutation_result ("insert" , failedMutations , result );
352+ }
353+
354+ docs = dg .nextSubDocBatch ("upsert" );
355+ if (docs .size ()>0 ) {
356+ flag = true ;
357+ List <HashMap <String ,Object >> result = subDocOps .bulkSubDocOperation (this .sdk .connection , docs , mutateInOptions );
358+ ops += dg .ws .batchSize *dg .ws .subdocs /100 ;
359+ this .update_subdoc_failed_mutation_result ("upsert" , failedMutations , result );
360+ }
361+
362+ List <Tuple2 <String ,List <LookupInSpec >>> lookup_docs = dg .nextSubDocLookupBatch ();
363+ if (lookup_docs .size ()>0 ) {
364+ flag = true ;
365+ List <HashMap <String ,Object >> result = subDocOps .bulkGetSubDocOperation (this .sdk .connection , lookup_docs , lookupInOptions );
366+ ops += dg .ws .batchSize *dg .ws .subdocs /100 ;
367+ this .update_subdoc_failed_mutation_result ("lookup" , failedMutations , result );
368+ }
369+
370+ docs = dg .nextSubDocBatch ("remove" );
371+ if (docs .size ()>0 ) {
372+ flag = true ;
373+ List <HashMap <String ,Object >> result = subDocOps .bulkSubDocOperation (this .sdk .connection , docs , mutateInOptions );
374+ ops += dg .ws .batchSize *dg .ws .subdocs /100 ;
375+ this .update_subdoc_failed_mutation_result ("remove" , failedMutations , result );
376+ }
377+ }
290378 if (ops == 0 )
291379 break ;
292380 else if (ops < dg .ws .ops /dg .ws .workers && flag ) {
@@ -305,8 +393,8 @@ else if(ops < dg.ws.ops/dg.ws.workers && flag) {
305393 }
306394 }
307395 logger .info (this .taskName + " is completed!" );
308- this . result = true ;
309- if ( retryTimes > 0 && failedMutations . size () > 0 )
396+ if ( retryTimes > 0 && failedMutations . size () > 0 ) {
397+ this . result = true ;
310398 for (Entry <String , List <Result >> optype : failedMutations .entrySet ()) {
311399 for (Result r : optype .getValue ()) {
312400 System .out .println ("Loader Retrying: " + r .id () + " -> " + r .err ().getClass ().getSimpleName ());
@@ -320,6 +408,9 @@ else if(ops < dg.ws.ops/dg.ws.workers && flag) {
320408 this .result = false ;
321409 } catch (DocumentExistsException e ) {
322410 System .out .println ("Retry Create failed for key: " + r .id ());
411+ } catch (Exception e ) {
412+ System .out .println ("Exception during create'" + r .id () + "' :: " + e .toString ());
413+ this .result = false ;
323414 }
324415 case "update" :
325416 try {
@@ -330,6 +421,10 @@ else if(ops < dg.ws.ops/dg.ws.workers && flag) {
330421 this .result = false ;
331422 } catch (DocumentExistsException e ) {
332423 System .out .println ("Retry update failed for key: " + r .id ());
424+ this .result = false ;
425+ } catch (Exception e ) {
426+ System .out .println ("Exception during update'" + r .id () + "' :: " + e .toString ());
427+ this .result = false ;
333428 }
334429 case "delete" :
335430 try {
@@ -340,13 +435,25 @@ else if(ops < dg.ws.ops/dg.ws.workers && flag) {
340435 this .result = false ;
341436 } catch (DocumentNotFoundException e ) {
342437 System .out .println ("Retry delete failed for key: " + r .id ());
438+ this .result = false ;
343439 }
344440 }
345441 }
346442 }
443+ }
347444 }
348445
349- public void stop_load () {
350- this .stop_load = true ;
446+ @ Override
447+ public void run () {
448+ if (this .sdkClientPool != null )
449+ this .sdk = this .sdkClientPool .get_client_for_bucket (
450+ this .bucket_name , this .scope , this .collection );
451+ try {
452+ this .actual_run ();
453+ }
454+ finally {
455+ if (this .sdkClientPool != null )
456+ this .sdkClientPool .release_client (this .sdk );
457+ }
351458 }
352459}
0 commit comments