24
24
import com .google .protobuf .RpcCallback ;
25
25
import com .google .protobuf .RpcController ;
26
26
import com .google .protobuf .Service ;
27
+ import edu .umd .cs .findbugs .annotations .Nullable ;
27
28
import java .io .IOException ;
28
29
import java .lang .reflect .InvocationTargetException ;
29
30
import java .nio .ByteBuffer ;
32
33
import java .util .Collections ;
33
34
import java .util .List ;
34
35
import java .util .NavigableSet ;
36
+ import java .util .function .Function ;
35
37
import org .apache .commons .io .IOUtils ;
36
38
import org .apache .hadoop .hbase .Cell ;
37
39
import org .apache .hadoop .hbase .CoprocessorEnvironment ;
@@ -82,7 +84,7 @@ public void getMax(RpcController controller, AggregateRequest request,
82
84
AggregateResponse response = null ;
83
85
PartialResultContext partialResultContext = new PartialResultContext ();
84
86
T max = null ;
85
- boolean hasMoreRows = false ;
87
+ boolean hasMoreRows = true ;
86
88
try {
87
89
ColumnInterpreter <T , S , P , Q , R > ci = constructColumnInterpreterFromRequest (request );
88
90
T temp ;
@@ -109,12 +111,8 @@ public void getMax(RpcController controller, AggregateRequest request,
109
111
postScanPartialResultUpdate (results , partialResultContext );
110
112
results .clear ();
111
113
} while (hasMoreRows );
112
- if (max != null ) {
113
- AggregateResponse .Builder builder = AggregateResponse .newBuilder ();
114
- builder .addFirstPart (ci .getProtoForCellType (max ).toByteString ());
115
- setPartialResultResponse (builder , request , hasMoreRows , partialResultContext );
116
- response = builder .build ();
117
- }
114
+ response = singlePartResponse (request , hasMoreRows , partialResultContext , max ,
115
+ ci ::getProtoForCellType );
118
116
} catch (IOException e ) {
119
117
CoprocessorRpcUtils .setControllerException (controller , e );
120
118
} finally {
@@ -142,7 +140,7 @@ public void getMin(RpcController controller, AggregateRequest request,
142
140
InternalScanner scanner = null ;
143
141
PartialResultContext partialResultContext = new PartialResultContext ();
144
142
T min = null ;
145
- boolean hasMoreRows = false ;
143
+ boolean hasMoreRows = true ;
146
144
try {
147
145
ColumnInterpreter <T , S , P , Q , R > ci = constructColumnInterpreterFromRequest (request );
148
146
T temp ;
@@ -168,12 +166,8 @@ public void getMin(RpcController controller, AggregateRequest request,
168
166
postScanPartialResultUpdate (results , partialResultContext );
169
167
results .clear ();
170
168
} while (hasMoreRows );
171
- if (min != null ) {
172
- AggregateResponse .Builder responseBuilder =
173
- AggregateResponse .newBuilder ().addFirstPart (ci .getProtoForCellType (min ).toByteString ());
174
- setPartialResultResponse (responseBuilder , request , hasMoreRows , partialResultContext );
175
- response = responseBuilder .build ();
176
- }
169
+ response = singlePartResponse (request , hasMoreRows , partialResultContext , min ,
170
+ ci ::getProtoForCellType );
177
171
} catch (IOException e ) {
178
172
CoprocessorRpcUtils .setControllerException (controller , e );
179
173
} finally {
@@ -201,7 +195,7 @@ public void getSum(RpcController controller, AggregateRequest request,
201
195
InternalScanner scanner = null ;
202
196
PartialResultContext partialResultContext = new PartialResultContext ();
203
197
long sum = 0L ;
204
- boolean hasMoreRows = false ;
198
+ boolean hasMoreRows = true ;
205
199
try {
206
200
ColumnInterpreter <T , S , P , Q , R > ci = constructColumnInterpreterFromRequest (request );
207
201
S sumVal = null ;
@@ -230,12 +224,8 @@ public void getSum(RpcController controller, AggregateRequest request,
230
224
postScanPartialResultUpdate (results , partialResultContext );
231
225
results .clear ();
232
226
} while (hasMoreRows );
233
- if (sumVal != null ) {
234
- AggregateResponse .Builder responseBuilder = AggregateResponse .newBuilder ()
235
- .addFirstPart (ci .getProtoForPromotedType (sumVal ).toByteString ());
236
- setPartialResultResponse (responseBuilder , request , hasMoreRows , partialResultContext );
237
- response = responseBuilder .build ();
238
- }
227
+ response = singlePartResponse (request , hasMoreRows , partialResultContext , sumVal ,
228
+ ci ::getProtoForPromotedType );
239
229
} catch (IOException e ) {
240
230
CoprocessorRpcUtils .setControllerException (controller , e );
241
231
} finally {
@@ -262,7 +252,7 @@ public void getRowNum(RpcController controller, AggregateRequest request,
262
252
List <Cell > results = new ArrayList <>();
263
253
InternalScanner scanner = null ;
264
254
PartialResultContext partialResultContext = new PartialResultContext ();
265
- boolean hasMoreRows = false ;
255
+ boolean hasMoreRows = true ;
266
256
try {
267
257
Scan scan = ProtobufUtil .toScan (request .getScan ());
268
258
byte [][] colFamilies = scan .getFamilies ();
@@ -290,10 +280,8 @@ public void getRowNum(RpcController controller, AggregateRequest request,
290
280
} while (hasMoreRows );
291
281
ByteBuffer bb = ByteBuffer .allocate (8 ).putLong (counter );
292
282
bb .rewind ();
293
- AggregateResponse .Builder responseBuilder =
294
- AggregateResponse .newBuilder ().addFirstPart (ByteString .copyFrom (bb ));
295
- setPartialResultResponse (responseBuilder , request , hasMoreRows , partialResultContext );
296
- response = responseBuilder .build ();
283
+ response = responseBuilder (request , hasMoreRows , partialResultContext )
284
+ .addFirstPart (ByteString .copyFrom (bb )).build ();
297
285
} catch (IOException e ) {
298
286
CoprocessorRpcUtils .setControllerException (controller , e );
299
287
} finally {
@@ -337,7 +325,7 @@ public void getAvg(RpcController controller, AggregateRequest request,
337
325
qualifier = qualifiers .pollFirst ();
338
326
}
339
327
List <Cell > results = new ArrayList <>();
340
- boolean hasMoreRows = false ;
328
+ boolean hasMoreRows = true ;
341
329
342
330
do {
343
331
results .clear ();
@@ -353,14 +341,25 @@ public void getAvg(RpcController controller, AggregateRequest request,
353
341
rowCountVal ++;
354
342
postScanPartialResultUpdate (results , partialResultContext );
355
343
} while (hasMoreRows );
356
- if ( sumVal != null ) {
357
- ByteString first = ci . getProtoForPromotedType ( sumVal ). toByteString ();
344
+
345
+ if ( sumVal != null && ! request . getClientSupportsPartialResult ()) {
358
346
AggregateResponse .Builder pair = AggregateResponse .newBuilder ();
347
+ ByteString first = ci .getProtoForPromotedType (sumVal ).toByteString ();
359
348
pair .addFirstPart (first );
360
349
ByteBuffer bb = ByteBuffer .allocate (8 ).putLong (rowCountVal );
361
350
bb .rewind ();
362
351
pair .setSecondPart (ByteString .copyFrom (bb ));
363
- setPartialResultResponse (pair , request , hasMoreRows , partialResultContext );
352
+ response = pair .build ();
353
+ } else if (request .getClientSupportsPartialResult ()) {
354
+ AggregateResponse .Builder pair =
355
+ responseBuilder (request , hasMoreRows , partialResultContext );
356
+ if (sumVal != null ) {
357
+ ByteString first = ci .getProtoForPromotedType (sumVal ).toByteString ();
358
+ pair .addFirstPart (first );
359
+ ByteBuffer bb = ByteBuffer .allocate (8 ).putLong (rowCountVal );
360
+ bb .rewind ();
361
+ pair .setSecondPart (ByteString .copyFrom (bb ));
362
+ }
364
363
response = pair .build ();
365
364
}
366
365
} catch (IOException e ) {
@@ -402,7 +401,7 @@ public void getStd(RpcController controller, AggregateRequest request,
402
401
}
403
402
List <Cell > results = new ArrayList <>();
404
403
405
- boolean hasMoreRows = false ;
404
+ boolean hasMoreRows = true ;
406
405
407
406
do {
408
407
if (shouldBreakForThrottling (request , scan , partialResultContext )) {
@@ -421,16 +420,29 @@ public void getStd(RpcController controller, AggregateRequest request,
421
420
sumSqVal = ci .add (sumSqVal , ci .multiply (tempVal , tempVal ));
422
421
rowCountVal ++;
423
422
} while (hasMoreRows );
424
- if (sumVal != null ) {
423
+
424
+ if (sumVal != null && !request .getClientSupportsPartialResult ()) {
425
+ AggregateResponse .Builder pair = AggregateResponse .newBuilder ();
425
426
ByteString first_sumVal = ci .getProtoForPromotedType (sumVal ).toByteString ();
426
427
ByteString first_sumSqVal = ci .getProtoForPromotedType (sumSqVal ).toByteString ();
427
- AggregateResponse .Builder pair = AggregateResponse .newBuilder ();
428
428
pair .addFirstPart (first_sumVal );
429
429
pair .addFirstPart (first_sumSqVal );
430
430
ByteBuffer bb = ByteBuffer .allocate (8 ).putLong (rowCountVal );
431
431
bb .rewind ();
432
432
pair .setSecondPart (ByteString .copyFrom (bb ));
433
- setPartialResultResponse (pair , request , hasMoreRows , partialResultContext );
433
+ response = pair .build ();
434
+ } else if (request .getClientSupportsPartialResult ()) {
435
+ AggregateResponse .Builder pair =
436
+ responseBuilder (request , hasMoreRows , partialResultContext );
437
+ if (sumVal != null ) {
438
+ ByteString first_sumVal = ci .getProtoForPromotedType (sumVal ).toByteString ();
439
+ ByteString first_sumSqVal = ci .getProtoForPromotedType (sumSqVal ).toByteString ();
440
+ pair .addFirstPart (first_sumVal );
441
+ pair .addFirstPart (first_sumSqVal );
442
+ ByteBuffer bb = ByteBuffer .allocate (8 ).putLong (rowCountVal );
443
+ bb .rewind ();
444
+ pair .setSecondPart (ByteString .copyFrom (bb ));
445
+ }
434
446
response = pair .build ();
435
447
}
436
448
} catch (IOException e ) {
@@ -471,7 +483,7 @@ public void getMedian(RpcController controller, AggregateRequest request,
471
483
}
472
484
List <Cell > results = new ArrayList <>();
473
485
474
- boolean hasMoreRows = false ;
486
+ boolean hasMoreRows = true ;
475
487
do {
476
488
if (shouldBreakForThrottling (request , scan , partialResultContext )) {
477
489
break ;
@@ -493,14 +505,26 @@ public void getMedian(RpcController controller, AggregateRequest request,
493
505
sumVal = ci .add (sumVal , tempVal );
494
506
sumWeights = ci .add (sumWeights , tempWeight );
495
507
} while (hasMoreRows );
496
- ByteString first_sumVal = ci .getProtoForPromotedType (sumVal ).toByteString ();
497
- S s = sumWeights == null ? ci .castToReturnType (ci .getMinValue ()) : sumWeights ;
498
- ByteString first_sumWeights = ci .getProtoForPromotedType (s ).toByteString ();
499
- AggregateResponse .Builder pair = AggregateResponse .newBuilder ();
500
- pair .addFirstPart (first_sumVal );
501
- pair .addFirstPart (first_sumWeights );
502
- setPartialResultResponse (pair , request , hasMoreRows , partialResultContext );
503
- response = pair .build ();
508
+ if (sumVal != null && !request .getClientSupportsPartialResult ()) {
509
+ AggregateResponse .Builder pair = AggregateResponse .newBuilder ();
510
+ ByteString first_sumVal = ci .getProtoForPromotedType (sumVal ).toByteString ();
511
+ S s = sumWeights == null ? ci .castToReturnType (ci .getMinValue ()) : sumWeights ;
512
+ ByteString first_sumWeights = ci .getProtoForPromotedType (s ).toByteString ();
513
+ pair .addFirstPart (first_sumVal );
514
+ pair .addFirstPart (first_sumWeights );
515
+ response = pair .build ();
516
+ } else if (request .getClientSupportsPartialResult ()) {
517
+ AggregateResponse .Builder pair =
518
+ responseBuilder (request , hasMoreRows , partialResultContext );
519
+ if (sumVal != null ) {
520
+ ByteString first_sumVal = ci .getProtoForPromotedType (sumVal ).toByteString ();
521
+ S s = sumWeights == null ? ci .castToReturnType (ci .getMinValue ()) : sumWeights ;
522
+ ByteString first_sumWeights = ci .getProtoForPromotedType (s ).toByteString ();
523
+ pair .addFirstPart (first_sumVal );
524
+ pair .addFirstPart (first_sumWeights );
525
+ }
526
+ response = pair .build ();
527
+ }
504
528
} catch (IOException e ) {
505
529
CoprocessorRpcUtils .setControllerException (controller , e );
506
530
} finally {
@@ -558,20 +582,41 @@ private void postScanPartialResultUpdate(List<Cell> results, PartialResultContex
558
582
}
559
583
}
560
584
561
- private void setPartialResultResponse (AggregateResponse .Builder builder , AggregateRequest request ,
562
- boolean hasMoreRows , PartialResultContext context ) throws IOException {
563
- // If we encountered an RpcThrottlingException, tell the client the partial result we've
564
- // accumulated so far, and what row to start scanning at in order to finish the scan.
585
+ @ Nullable
586
+ private <ACC , RES extends Message > AggregateResponse singlePartResponse (AggregateRequest request ,
587
+ boolean hasMoreRows , PartialResultContext partialResultContext , ACC acc ,
588
+ Function <ACC , RES > toRes ) {
589
+ AggregateResponse response = null ;
590
+ if (acc != null && !request .getClientSupportsPartialResult ()) {
591
+ ByteString first = toRes .apply (acc ).toByteString ();
592
+ response = AggregateResponse .newBuilder ().addFirstPart (first ).build ();
593
+ } else if (request .getClientSupportsPartialResult ()) {
594
+ AggregateResponse .Builder responseBuilder =
595
+ responseBuilder (request , hasMoreRows , partialResultContext );
596
+ if (acc != null ) {
597
+ responseBuilder .addFirstPart (toRes .apply (acc ).toByteString ());
598
+ }
599
+ response = responseBuilder .build ();
600
+ }
601
+ return response ;
602
+ }
603
+
604
+ private AggregateResponse .Builder responseBuilder (AggregateRequest request , boolean hasMoreRows ,
605
+ PartialResultContext context ) {
606
+ AggregateResponse .Builder builder = AggregateResponse .newBuilder ();
565
607
if (request .getClientSupportsPartialResult () && hasMoreRows ) {
566
608
if (context .lastRowSuccessfullyProcessedArray != null ) {
567
609
byte [] lastRowSuccessfullyProcessed = Arrays .copyOfRange (
568
610
context .lastRowSuccessfullyProcessedArray , context .lastRowSuccessfullyProcessedOffset ,
569
611
context .lastRowSuccessfullyProcessedOffset + context .lastRowSuccessfullyProcessedLength );
570
612
builder .setNextChunkStartRow (ByteString .copyFrom (
571
613
ClientUtil .calculateTheClosestNextRowKeyForPrefix (lastRowSuccessfullyProcessed )));
614
+ } else {
615
+ builder .setNextChunkStartRow (request .getScan ().getStartRow ());
572
616
}
573
617
builder .setWaitIntervalMs (context .waitIntervalMs );
574
618
}
619
+ return builder ;
575
620
}
576
621
577
622
@ SuppressWarnings ("unchecked" )
0 commit comments