Skip to content

Commit e6892a0

Browse files
daschlMichael Nitschinger
authored andcommitted
SPY-160: Redistribute ops when waiting for authentication to complete.
Motivation ---------- When an operation gets added and the node needs to (re)connect, it waits for a given amount of time (authWaitTime). If the auth does not complete in the period of time, the operation is cancelled. This is suboptimal, especially if the operation is not timed out yet, it could wait longer until it gets timed out/cancelled. Modifications ------------- This changeset makes sure that an operation gets redistributed (assuming FailureMode is NOT cancel) if the authLatch is not counted down in the wait period. Redistributing will only happen (further down the code stack) if the op is 1) not cancelled and 2) not timed out already. Result ------ Still valid operations will be transparently rescheduled until they either go through validly or will eventually time out or get cancelled. Change-Id: I6f849314e11435a6ebc69a4bfc1eab705df850e8 Reviewed-on: http://review.couchbase.org/34852 Reviewed-by: Michael Nitschinger <michael.nitschinger@couchbase.com> Tested-by: Michael Nitschinger <michael.nitschinger@couchbase.com>
1 parent f82790d commit e6892a0

9 files changed

Lines changed: 113 additions & 43 deletions

File tree

src/main/java/net/spy/memcached/BinaryConnectionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c,
7070
return new BinaryMemcachedNodeImpl(sa, c, bufSize,
7171
createReadOperationQueue(), createWriteOperationQueue(),
7272
createOperationQueue(), getOpQueueMaxBlockTime(), doAuth,
73-
getOperationTimeout(), getAuthWaitTime());
73+
getOperationTimeout(), getAuthWaitTime(), this);
7474
}
7575

7676
@Override

src/main/java/net/spy/memcached/DefaultConnectionFactory.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ public MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c,
177177
createOperationQueue(),
178178
getOpQueueMaxBlockTime(),
179179
getOperationTimeout(),
180-
getAuthWaitTime());
180+
getAuthWaitTime(),
181+
this);
181182
} else if (of instanceof BinaryOperationFactory) {
182183
boolean doAuth = false;
183184
if (getAuthDescriptor() != null) {
@@ -190,7 +191,8 @@ public MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c,
190191
getOpQueueMaxBlockTime(),
191192
doAuth,
192193
getOperationTimeout(),
193-
getAuthWaitTime());
194+
getAuthWaitTime(),
195+
this);
194196
} else {
195197
throw new IllegalStateException("Unhandled operation factory type " + of);
196198
}

src/main/java/net/spy/memcached/MemcachedConnection.java

Lines changed: 48 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -313,10 +313,10 @@ protected List<MemcachedNode> createConnections(
313313
for (SocketAddress sa : addrs) {
314314
SocketChannel ch = SocketChannel.open();
315315
ch.configureBlocking(false);
316-
MemcachedNode qa =
317-
this.connectionFactory.createMemcachedNode(sa, ch, bufSize);
316+
MemcachedNode qa = connectionFactory.createMemcachedNode(sa, ch, bufSize);
317+
qa.setConnection(this);
318318
int ops = 0;
319-
ch.socket().setTcpNoDelay(!this.connectionFactory.useNagleAlgorithm());
319+
ch.socket().setTcpNoDelay(!connectionFactory.useNagleAlgorithm());
320320

321321
try {
322322
if (ch.connect(sa)) {
@@ -968,37 +968,56 @@ private void cancelOperations(final Collection<Operation> ops) {
968968
*
969969
* @param ops the operations to redistribute.
970970
*/
971-
private void redistributeOperations(final Collection<Operation> ops) {
971+
public void redistributeOperations(final Collection<Operation> ops) {
972972
for (Operation op : ops) {
973-
if (op.isCancelled() || op.isTimedOut()) {
974-
continue;
975-
}
973+
redistributeOperation(op);
974+
}
975+
}
976976

977-
if (op instanceof MultiGetOperationImpl) {
978-
for (String key : ((MultiGetOperationImpl) op).getRetryKeys()) {
979-
addOperation(key, opFact.get(key,
980-
(GetOperation.Callback) op.getCallback()));
981-
}
982-
} else if (op instanceof KeyedOperation) {
983-
KeyedOperation ko = (KeyedOperation) op;
984-
int added = 0;
985-
for (Operation newop : opFact.clone(ko)) {
986-
if (newop instanceof KeyedOperation) {
987-
KeyedOperation newKeyedOp = (KeyedOperation) newop;
988-
for (String k : newKeyedOp.getKeys()) {
989-
addOperation(k, newop);
990-
}
991-
} else {
992-
newop.cancel();
993-
getLogger().warn("Could not redistribute cloned non-keyed " +
994-
"operation", newop);
977+
/**
978+
* Redistribute the given operation to (potentially) other nodes.
979+
*
980+
* Note that operations can only be redistributed if they have not been
981+
* cancelled already, timed out already or do not have definite targets
982+
* (a key).
983+
*
984+
* @param op the operation to redistribute.
985+
*/
986+
public void redistributeOperation(Operation op) {
987+
if (op.isCancelled() || op.isTimedOut()) {
988+
return;
989+
}
990+
991+
// The operation gets redistributed but has never been actually written,
992+
// it we just straight re-add it without cloning.
993+
if (op.getState() == OperationState.WRITE_QUEUED) {
994+
addOperation(op.getHandlingNode(), op);
995+
}
996+
997+
if (op instanceof MultiGetOperationImpl) {
998+
for (String key : ((MultiGetOperationImpl) op).getRetryKeys()) {
999+
addOperation(key, opFact.get(key,
1000+
(GetOperation.Callback) op.getCallback()));
1001+
}
1002+
} else if (op instanceof KeyedOperation) {
1003+
KeyedOperation ko = (KeyedOperation) op;
1004+
int added = 0;
1005+
for (Operation newop : opFact.clone(ko)) {
1006+
if (newop instanceof KeyedOperation) {
1007+
KeyedOperation newKeyedOp = (KeyedOperation) newop;
1008+
for (String k : newKeyedOp.getKeys()) {
1009+
addOperation(k, newop);
9951010
}
996-
added++;
1011+
} else {
1012+
newop.cancel();
1013+
getLogger().warn("Could not redistribute cloned non-keyed " +
1014+
"operation", newop);
9971015
}
998-
assert added > 0 : "Didn't add any new operations when redistributing";
999-
} else {
1000-
op.cancel();
1016+
added++;
10011017
}
1018+
assert added > 0 : "Didn't add any new operations when redistributing";
1019+
} else {
1020+
op.cancel();
10021021
}
10031022
}
10041023

src/main/java/net/spy/memcached/MemcachedNode.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,4 +238,8 @@ public interface MemcachedNode {
238238
void setContinuousTimeout(boolean timedOut);
239239

240240
int getContinuousTimeout();
241+
242+
MemcachedConnection getConnection();
243+
244+
void setConnection(MemcachedConnection connection);
241245
}

src/main/java/net/spy/memcached/MemcachedNodeROImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,4 +190,14 @@ public void completedRead() {
190190
throw new UnsupportedOperationException();
191191
}
192192

193+
@Override
194+
public MemcachedConnection getConnection() {
195+
throw new UnsupportedOperationException();
196+
}
197+
198+
@Override
199+
public void setConnection(MemcachedConnection connection) {
200+
throw new UnsupportedOperationException();
201+
}
202+
193203
}

src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@
3535
import java.util.concurrent.TimeUnit;
3636
import java.util.concurrent.atomic.AtomicInteger;
3737

38+
import net.spy.memcached.ConnectionFactory;
39+
import net.spy.memcached.FailureMode;
40+
import net.spy.memcached.MemcachedConnection;
3841
import net.spy.memcached.MemcachedNode;
3942
import net.spy.memcached.compat.SpyObject;
4043
import net.spy.memcached.ops.Operation;
@@ -56,6 +59,7 @@ public abstract class TCPMemcachedNodeImpl extends SpyObject implements
5659
private final BlockingQueue<Operation> inputQueue;
5760
private final long opQueueMaxBlockTime;
5861
private final long authWaitTime;
62+
private final ConnectionFactory connectionFactory;
5963
private AtomicInteger reconnectAttempt = new AtomicInteger(1);
6064
private SocketChannel channel;
6165
private int toWrite = 0;
@@ -66,14 +70,15 @@ public abstract class TCPMemcachedNodeImpl extends SpyObject implements
6670
private ArrayList<Operation> reconnectBlocked;
6771
private long defaultOpTimeout;
6872
private long lastReadTimestamp = System.currentTimeMillis();
73+
private MemcachedConnection connection;
6974

7075
// operation Future.get timeout counter
7176
private final AtomicInteger continuousTimeout = new AtomicInteger(0);
7277

7378
public TCPMemcachedNodeImpl(SocketAddress sa, SocketChannel c, int bufSize,
7479
BlockingQueue<Operation> rq, BlockingQueue<Operation> wq,
7580
BlockingQueue<Operation> iq, long opQueueMaxBlockTime,
76-
boolean waitForAuth, long dt, long authWaitTime) {
81+
boolean waitForAuth, long dt, long authWaitTime, ConnectionFactory fact) {
7782
super();
7883
assert sa != null : "No SocketAddress";
7984
assert c != null : "No SocketChannel";
@@ -82,6 +87,7 @@ public TCPMemcachedNodeImpl(SocketAddress sa, SocketChannel c, int bufSize,
8287
assert wq != null : "No operation write queue";
8388
assert iq != null : "No input queue";
8489
socketAddress = sa;
90+
connectionFactory = fact;
8591
this.authWaitTime = authWaitTime;
8692
setChannel(c);
8793
// Since these buffers are allocated rarely (only on client creation
@@ -336,12 +342,20 @@ public final boolean hasWriteOp() {
336342
public final void addOp(Operation op) {
337343
try {
338344
if (!authLatch.await(authWaitTime, TimeUnit.MILLISECONDS)) {
339-
op.cancel();
340-
getLogger().warn("Operation canceled because authentication "
341-
+ "or reconnection and authentication has "
342-
+ "taken more than " + authWaitTime + " milliseconds to "
343-
+ "complete.");
344-
getLogger().debug("Canceled operation %s", op.toString());
345+
FailureMode mode = connectionFactory.getFailureMode();
346+
if (mode == FailureMode.Redistribute || mode == FailureMode.Retry) {
347+
getLogger().debug("Redistributing Operation " + op + " because auth "
348+
+ "latch taken longer than " + authWaitTime + " milliseconds to "
349+
+ "complete on node " + getSocketAddress());
350+
connection.redistributeOperation(op);
351+
} else {
352+
op.cancel();
353+
getLogger().warn("Operation canceled because authentication "
354+
+ "or reconnection and authentication has "
355+
+ "taken more than " + authWaitTime + " milliseconds to "
356+
+ "complete on node " + this);
357+
getLogger().debug("Canceled operation %s", op.toString());
358+
}
345359
return;
346360
}
347361
if (!inputQueue.offer(op, opQueueMaxBlockTime, TimeUnit.MILLISECONDS)) {
@@ -633,4 +647,13 @@ public void completedRead() {
633647
lastReadTimestamp = System.currentTimeMillis();
634648
}
635649

650+
@Override
651+
public MemcachedConnection getConnection() {
652+
return connection;
653+
}
654+
655+
@Override
656+
public void setConnection(MemcachedConnection connection) {
657+
this.connection = connection;
658+
}
636659
}

src/main/java/net/spy/memcached/protocol/ascii/AsciiMemcachedNodeImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.nio.channels.SocketChannel;
2828
import java.util.concurrent.BlockingQueue;
2929

30+
import net.spy.memcached.ConnectionFactory;
3031
import net.spy.memcached.ops.GetOperation;
3132
import net.spy.memcached.ops.Operation;
3233
import net.spy.memcached.ops.OperationState;
@@ -41,9 +42,9 @@ public final class AsciiMemcachedNodeImpl extends TCPMemcachedNodeImpl {
4142
public AsciiMemcachedNodeImpl(SocketAddress sa, SocketChannel c, int bufSize,
4243
BlockingQueue<Operation> rq, BlockingQueue<Operation> wq,
4344
BlockingQueue<Operation> iq, Long opQueueMaxBlockTimeNs, long dt,
44-
long at) {
45+
long at, ConnectionFactory fa) {
4546
// ASCII never does auth
46-
super(sa, c, bufSize, rq, wq, iq, opQueueMaxBlockTimeNs, false, dt, at);
47+
super(sa, c, bufSize, rq, wq, iq, opQueueMaxBlockTimeNs, false, dt, at, fa);
4748
}
4849

4950
@Override

src/main/java/net/spy/memcached/protocol/binary/BinaryMemcachedNodeImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.nio.channels.SocketChannel;
2828
import java.util.concurrent.BlockingQueue;
2929

30+
import net.spy.memcached.ConnectionFactory;
3031
import net.spy.memcached.ops.CASOperation;
3132
import net.spy.memcached.ops.GetOperation;
3233
import net.spy.memcached.ops.Operation;
@@ -47,9 +48,9 @@ public class BinaryMemcachedNodeImpl extends TCPMemcachedNodeImpl {
4748
public BinaryMemcachedNodeImpl(SocketAddress sa, SocketChannel c,
4849
int bufSize, BlockingQueue<Operation> rq, BlockingQueue<Operation> wq,
4950
BlockingQueue<Operation> iq, Long opQueueMaxBlockTimeNs,
50-
boolean waitForAuth, long dt, long at) {
51+
boolean waitForAuth, long dt, long at, ConnectionFactory fa) {
5152
super(sa, c, bufSize, rq, wq, iq, opQueueMaxBlockTimeNs, waitForAuth, dt,
52-
at);
53+
at, fa);
5354
}
5455

5556
@Override

src/test/java/net/spy/memcached/MockMemcachedNode.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,4 +211,14 @@ public void completedRead() {
211211
// noop
212212
}
213213

214+
@Override
215+
public MemcachedConnection getConnection() {
216+
return null;
217+
}
218+
219+
@Override
220+
public void setConnection(MemcachedConnection connection) {
221+
222+
}
223+
214224
}

0 commit comments

Comments
 (0)