@@ -399,21 +399,7 @@ public void handleIO() throws IOException {
399
399
Set <SelectionKey > selectedKeys = selector .selectedKeys ();
400
400
401
401
if (selectedKeys .isEmpty () && !shutDown ) {
402
- getLogger ().debug ("No selectors ready, interrupted: "
403
- + Thread .interrupted ());
404
- if (++emptySelects > DOUBLE_CHECK_EMPTY ) {
405
- for (SelectionKey sk : selector .keys ()) {
406
- getLogger ().debug ("%s has %s, interested in %s" , sk , sk .readyOps (),
407
- sk .interestOps ());
408
- if (sk .readyOps () != 0 ) {
409
- getLogger ().debug ("%s has a ready op, handling IO" , sk );
410
- handleIO (sk );
411
- } else {
412
- lostConnection ((MemcachedNode ) sk .attachment ());
413
- }
414
- }
415
- assert emptySelects < EXCESSIVE_EMPTY : "Too many empty selects" ;
416
- }
402
+ handleEmptySelects ();
417
403
} else {
418
404
getLogger ().debug ("Selected %d, selected %d keys" , selected ,
419
405
selectedKeys .size ());
@@ -422,10 +408,20 @@ public void handleIO() throws IOException {
422
408
for (SelectionKey sk : selectedKeys ) {
423
409
handleIO (sk );
424
410
}
425
-
426
411
selectedKeys .clear ();
427
412
}
428
413
414
+ handleOperationalTasks ();
415
+ }
416
+
417
+ /**
418
+ * Helper method for {@link #handleIO()} to encapsulate everything that
419
+ * needs to be checked on a regular basis that has nothing to do directly
420
+ * with reading and writing data.
421
+ *
422
+ * @throws IOException if an error happens during shutdown queue handling.
423
+ */
424
+ private void handleOperationalTasks () throws IOException {
429
425
checkPotentiallyTimedOutConnection ();
430
426
431
427
if (!shutDown && !reconnectQueue .isEmpty ()) {
@@ -437,6 +433,28 @@ public void handleIO() throws IOException {
437
433
handleShutdownQueue ();
438
434
}
439
435
436
+ /**
437
+ * Helper method for {@link #handleIO()} to handle empty select calls.
438
+ */
439
+ private void handleEmptySelects () {
440
+ getLogger ().debug ("No selectors ready, interrupted: "
441
+ + Thread .interrupted ());
442
+
443
+ if (++emptySelects > DOUBLE_CHECK_EMPTY ) {
444
+ for (SelectionKey sk : selector .keys ()) {
445
+ getLogger ().debug ("%s has %s, interested in %s" , sk , sk .readyOps (),
446
+ sk .interestOps ());
447
+ if (sk .readyOps () != 0 ) {
448
+ getLogger ().debug ("%s has a ready op, handling IO" , sk );
449
+ handleIO (sk );
450
+ } else {
451
+ lostConnection ((MemcachedNode ) sk .attachment ());
452
+ }
453
+ }
454
+ assert emptySelects < EXCESSIVE_EMPTY : "Too many empty selects" ;
455
+ }
456
+ }
457
+
440
458
/**
441
459
* Check if nodes need to be shut down and do so if needed.
442
460
*
@@ -614,12 +632,7 @@ private void handleIO(final SelectionKey sk) {
614
632
assert !channel .isConnected () : "connected" ;
615
633
}
616
634
} else {
617
- if (sk .isValid () && sk .isReadable ()) {
618
- handleReads (node );
619
- }
620
- if (sk .isValid () && sk .isWritable ()) {
621
- handleWrites (node );
622
- }
635
+ handleReadsAndWrites (sk , node );
623
636
}
624
637
} catch (ClosedChannelException e ) {
625
638
if (!shutDown ) {
@@ -644,6 +657,26 @@ private void handleIO(final SelectionKey sk) {
644
657
node .fixupOps ();
645
658
}
646
659
660
+ /**
661
+ * A helper method for {@link #handleIO(java.nio.channels.SelectionKey)} to
662
+ * handle reads and writes if appropriate.
663
+ *
664
+ * @param sk the selection key to use.
665
+ * @param node th enode to read write from.
666
+ * @throws IOException if an error occurs during read/write.
667
+ */
668
+ private void handleReadsAndWrites (final SelectionKey sk ,
669
+ final MemcachedNode node ) throws IOException {
670
+ if (sk .isValid ()) {
671
+ if (sk .isReadable ()) {
672
+ handleReads (node );
673
+ }
674
+ if (sk .isWritable ()) {
675
+ handleWrites (node );
676
+ }
677
+ }
678
+ }
679
+
647
680
/**
648
681
* Finish the connect phase and potentially verify its liveness.
649
682
*
@@ -756,32 +789,7 @@ private void handleReads(final MemcachedNode node) throws IOException {
756
789
(int )(timeOnWire / 1000 ));
757
790
metrics .markMeter (OVERALL_RESPONSE_METRIC );
758
791
synchronized (currentOp ) {
759
- currentOp .readFromBuffer (rbuf );
760
-
761
- if (currentOp .getState () == OperationState .COMPLETE ) {
762
- getLogger ().debug ("Completed read op: %s and giving the next %d "
763
- + "bytes" , currentOp , rbuf .remaining ());
764
- Operation op = node .removeCurrentReadOp ();
765
- assert op == currentOp : "Expected to pop " + currentOp + " got "
766
- + op ;
767
-
768
- if (op .hasErrored ()) {
769
- metrics .markMeter (OVERALL_RESPONSE_FAIL_METRIC );
770
- } else {
771
- metrics .markMeter (OVERALL_RESPONSE_SUCC_METRIC );
772
- }
773
- } else if (currentOp .getState () == OperationState .RETRY ) {
774
- getLogger ().debug ("Reschedule read op due to NOT_MY_VBUCKET error: "
775
- + "%s " , currentOp );
776
- ((VBucketAware ) currentOp ).addNotMyVbucketNode (
777
- currentOp .getHandlingNode ());
778
- Operation op = node .removeCurrentReadOp ();
779
- assert op == currentOp : "Expected to pop " + currentOp + " got "
780
- + op ;
781
-
782
- retryOps .add (currentOp );
783
- metrics .markMeter (OVERALL_RESPONSE_RETRY_METRIC );
784
- }
792
+ readBufferAndLogMetrics (currentOp , rbuf , node );
785
793
}
786
794
787
795
currentOp = node .getCurrentReadOp ();
@@ -792,6 +800,43 @@ private void handleReads(final MemcachedNode node) throws IOException {
792
800
}
793
801
}
794
802
803
+ /**
804
+ * Read from the buffer and add metrics information.
805
+ *
806
+ * @param currentOp the current operation to read.
807
+ * @param rbuf the read buffer to read from.
808
+ * @param node the node to read from.
809
+ * @throws IOException if reading was not successful.
810
+ */
811
+ private void readBufferAndLogMetrics (final Operation currentOp ,
812
+ final ByteBuffer rbuf , final MemcachedNode node ) throws IOException {
813
+ currentOp .readFromBuffer (rbuf );
814
+ if (currentOp .getState () == OperationState .COMPLETE ) {
815
+ getLogger ().debug ("Completed read op: %s and giving the next %d "
816
+ + "bytes" , currentOp , rbuf .remaining ());
817
+ Operation op = node .removeCurrentReadOp ();
818
+ assert op == currentOp : "Expected to pop " + currentOp + " got "
819
+ + op ;
820
+
821
+ if (op .hasErrored ()) {
822
+ metrics .markMeter (OVERALL_RESPONSE_FAIL_METRIC );
823
+ } else {
824
+ metrics .markMeter (OVERALL_RESPONSE_SUCC_METRIC );
825
+ }
826
+ } else if (currentOp .getState () == OperationState .RETRY ) {
827
+ getLogger ().debug ("Reschedule read op due to NOT_MY_VBUCKET error: "
828
+ + "%s " , currentOp );
829
+ ((VBucketAware ) currentOp ).addNotMyVbucketNode (
830
+ currentOp .getHandlingNode ());
831
+ Operation op = node .removeCurrentReadOp ();
832
+ assert op == currentOp : "Expected to pop " + currentOp + " got "
833
+ + op ;
834
+
835
+ retryOps .add (currentOp );
836
+ metrics .markMeter (OVERALL_RESPONSE_RETRY_METRIC );
837
+ }
838
+ }
839
+
795
840
/**
796
841
* Deal with an operation where the channel reached the end of a stream.
797
842
*
0 commit comments