Skip to content

Commit d6fe622

Browse files
committed
If a server group is in write only mode then it is not accounted for counting success by the latch.
1 parent 75bc08b commit d6fe622

File tree

6 files changed

+35
-27
lines changed

6 files changed

+35
-27
lines changed

evcache-client/src/main/java/com/netflix/evcache/EVCacheImpl.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -693,7 +693,7 @@ public <T> EVCacheLatch touch(String key, int timeToLive, Policy policy) throws
693693

694694
final String canonicalKey = getCanonicalizedKey(key);
695695
try {
696-
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length, _appName);
696+
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName);
697697
touchData(canonicalKey, key, timeToLive, clients, latch);
698698

699699
if (touchTTLSummary == null) this.touchTTLSummary = EVCacheMetricsFactory.getDistributionSummary(_appName + "-TouchData-TTL", _appName, null);
@@ -1048,7 +1048,7 @@ public <T> EVCacheLatch set(String key, T value, Transcoder<T> tc, int timeToLiv
10481048

10491049
final String canonicalKey = getCanonicalizedKey(key);
10501050
final Operation op = EVCacheMetricsFactory.getOperation(_metricName, Call.SET, stats, Operation.TYPE.MILLI);
1051-
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length, _appName);
1051+
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName);
10521052
try {
10531053
CachedData cd = null;
10541054
for (EVCacheClient client : clients) {
@@ -1233,8 +1233,7 @@ public <T> EVCacheLatch delete(String key, Policy policy) throws EVCacheExceptio
12331233
}
12341234

12351235
final Operation op = EVCacheMetricsFactory.getOperation(_metricName, Call.DELETE, stats);
1236-
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy,
1237-
clients.length, _appName);
1236+
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName);
12381237
try {
12391238
for (int i = 0; i < clients.length; i++) {
12401239
Future<Boolean> future = clients[i].delete(canonicalKey, latch);
@@ -1454,8 +1453,7 @@ public <T> EVCacheLatch replace(String key, T value, Transcoder<T> tc, int timeT
14541453

14551454
final String canonicalKey = getCanonicalizedKey(key);
14561455
final Operation op = EVCacheMetricsFactory.getOperation(_metricName, Call.REPLACE, stats, Operation.TYPE.MILLI);
1457-
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy,
1458-
clients.length, _appName);
1456+
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName);
14591457
try {
14601458
final EVCacheFuture[] futures = new EVCacheFuture[clients.length];
14611459
CachedData cd = null;
@@ -1543,8 +1541,7 @@ public <T> EVCacheLatch appendOrAdd(String key, T value, Transcoder<T> tc, int t
15431541
}
15441542
final String canonicalKey = getCanonicalizedKey(key);
15451543
final Operation op = EVCacheMetricsFactory.getOperation(_metricName, Call.APPEND_OR_ADD, stats, Operation.TYPE.MILLI);
1546-
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy,
1547-
clients.length, _appName);
1544+
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName);
15481545
try {
15491546
CachedData cd = null;
15501547
for (EVCacheClient client : clients) {

evcache-client/src/main/java/com/netflix/evcache/operation/EVCacheFutures.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ public class EVCacheFutures implements ListenableFuture<Boolean, OperationComple
2828
private final AtomicInteger completionCounter;
2929
private final EVCacheLatch latch;
3030

31-
public EVCacheFutures(OperationFuture<Boolean>[] futures, String key, String app, ServerGroup serverGroup,
32-
EVCacheLatch latch) {
31+
public EVCacheFutures(OperationFuture<Boolean>[] futures, String key, String app, ServerGroup serverGroup, EVCacheLatch latch) {
3332
this.futures = futures;
3433
this.app = app;
3534
this.serverGroup = serverGroup;

evcache-client/src/main/java/com/netflix/evcache/pool/EVCacheClient.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public ConnectionFactory getConnectionFactory() {
109109
this.chunkingTranscoder = new ChunkTranscoder();
110110
this.maxWriteQueueSize = maxQueueSize;
111111

112-
this.evcacheMemcachedClient = new EVCacheMemcachedClient(connectionFactory, memcachedNodesInZone, readTimeout, appName, zone, id, serverGroup);
112+
this.evcacheMemcachedClient = new EVCacheMemcachedClient(connectionFactory, memcachedNodesInZone, readTimeout, appName, zone, id, serverGroup, this);
113113
this.connectionObserver = new EVCacheConnectionObserver(appName, serverGroup, id);
114114
this.evcacheMemcachedClient.addObserver(connectionObserver);
115115

@@ -872,8 +872,7 @@ public <T> Future<Boolean> set(String key, T value, int timeToLive, EVCacheLatch
872872
if (!ensureWriteQueueSize(node, key)) {
873873
if (log.isInfoEnabled()) log.info("Node : " + node + " is not active. Failing fast and dropping the write event.");
874874
final ListenableFuture<Boolean, OperationCompletionListener> defaultFuture = (ListenableFuture<Boolean, OperationCompletionListener>) getDefaultFuture();
875-
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl) ((EVCacheLatchImpl) evcacheLatch)
876-
.addFuture(defaultFuture);
875+
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl && !isInWriteOnly()) ((EVCacheLatchImpl) evcacheLatch).addFuture(defaultFuture);
877876
return defaultFuture;
878877
}
879878

@@ -916,7 +915,7 @@ public <T> Future<Boolean> appendOrAdd(String key, CachedData value, int timeToL
916915
if (!ensureWriteQueueSize(node, key)) {
917916
if (log.isInfoEnabled()) log.info("Node : " + node + " is not active. Failing fast and dropping the write event.");
918917
final ListenableFuture<Boolean, OperationCompletionListener> defaultFuture = (ListenableFuture<Boolean, OperationCompletionListener>) getDefaultFuture();
919-
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl) ((EVCacheLatchImpl) evcacheLatch).addFuture(defaultFuture);
918+
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl && !isInWriteOnly()) ((EVCacheLatchImpl) evcacheLatch).addFuture(defaultFuture);
920919
return defaultFuture;
921920
}
922921

@@ -935,8 +934,7 @@ public <T> Future<Boolean> replace(String key, T value, int timeToLive, EVCacheL
935934
if (log.isInfoEnabled()) log.info("Node : " + node
936935
+ " is not active. Failing fast and dropping the replace event.");
937936
final ListenableFuture<Boolean, OperationCompletionListener> defaultFuture = (ListenableFuture<Boolean, OperationCompletionListener>) getDefaultFuture();
938-
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl) ((EVCacheLatchImpl) evcacheLatch)
939-
.addFuture(defaultFuture);
937+
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl && !isInWriteOnly()) ((EVCacheLatchImpl) evcacheLatch).addFuture(defaultFuture);
940938
return defaultFuture;
941939
}
942940

@@ -1033,7 +1031,7 @@ public <T> Future<Boolean> touch(String key, int timeToLive, EVCacheLatch latch)
10331031
final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);
10341032
if (!ensureWriteQueueSize(node, key)) {
10351033
final ListenableFuture<Boolean, OperationCompletionListener> defaultFuture = (ListenableFuture<Boolean, OperationCompletionListener>) getDefaultFuture();
1036-
if (latch != null && latch instanceof EVCacheLatchImpl) ((EVCacheLatchImpl) latch).addFuture(defaultFuture);
1034+
if (latch != null && latch instanceof EVCacheLatchImpl && !isInWriteOnly()) ((EVCacheLatchImpl) latch).addFuture(defaultFuture);
10371035
return defaultFuture;
10381036
}
10391037

@@ -1074,7 +1072,7 @@ public Future<Boolean> delete(String key, EVCacheLatch latch) throws Exception {
10741072
final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);
10751073
if (!ensureWriteQueueSize(node, key)) {
10761074
final ListenableFuture<Boolean, OperationCompletionListener> defaultFuture = (ListenableFuture<Boolean, OperationCompletionListener>) getDefaultFuture();
1077-
if (latch != null && latch instanceof EVCacheLatchImpl) ((EVCacheLatchImpl) latch).addFuture(defaultFuture);
1075+
if (latch != null && latch instanceof EVCacheLatchImpl && !isInWriteOnly()) ((EVCacheLatchImpl) latch).addFuture(defaultFuture);
10781076
return defaultFuture;
10791077
}
10801078

@@ -1142,6 +1140,10 @@ public String getServerGroupName() {
11421140
public boolean isShutdown() {
11431141
return this.shutdown;
11441142
}
1143+
1144+
public boolean isInWriteOnly(){
1145+
return pool.isInWriteOnly(getServerGroup());
1146+
}
11451147

11461148
public Map<SocketAddress, Map<String, String>> getStats(String cmd) {
11471149
if(config.isRendInstance()) {

evcache-client/src/main/java/com/netflix/evcache/pool/EVCacheClientPool.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,16 @@ public List<EVCacheClient> getEVCacheClientsForReadExcluding(ServerGroup serverG
291291
}
292292
return Collections.<EVCacheClient> emptyList();
293293
}
294+
295+
public boolean isInWriteOnly(ServerGroup serverGroup) {
296+
if (memcachedReadInstancesByServerGroup.containsKey(serverGroup)) {
297+
return false;
298+
}
299+
if(memcachedWriteInstancesByServerGroup.containsKey(serverGroup)) {
300+
return true;
301+
}
302+
return false;
303+
}
294304

295305
public EVCacheClient[] getWriteOnlyEVCacheClients() {
296306
try {

evcache-client/src/main/java/com/netflix/evcache/pool/EVCacheClientUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public EVCacheLatch add(String canonicalKey, CachedData cd, int timeToLive, Poli
6565
addTTLSummary.record(timeToLive);
6666

6767
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
68-
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy, clients.length, _appName){
68+
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName){
6969

7070
@Override
7171
public void onComplete(OperationFuture<?> operationFuture) throws Exception {

evcache-client/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import com.netflix.evcache.operation.EVCacheBulkGetFuture;
2626
import com.netflix.evcache.operation.EVCacheLatchImpl;
2727
import com.netflix.evcache.operation.EVCacheOperationFuture;
28+
import com.netflix.evcache.pool.EVCacheClient;
2829
import com.netflix.evcache.pool.ServerGroup;
29-
import com.netflix.evcache.util.EVCacheConfig;
3030
import com.netflix.servo.annotations.DataSourceType;
3131
import com.netflix.servo.monitor.Stopwatch;
3232
import com.netflix.servo.tag.BasicTag;
@@ -39,7 +39,6 @@
3939
import net.spy.memcached.ops.DeleteOperation;
4040
import net.spy.memcached.ops.GetAndTouchOperation;
4141
import net.spy.memcached.ops.GetOperation;
42-
import net.spy.memcached.ops.Mutator;
4342
import net.spy.memcached.ops.Operation;
4443
import net.spy.memcached.ops.OperationCallback;
4544
import net.spy.memcached.ops.OperationStatus;
@@ -60,17 +59,19 @@ public class EVCacheMemcachedClient extends MemcachedClient {
6059
private final String zone;
6160
private final ChainedDynamicProperty.IntProperty readTimeout;
6261
private final ServerGroup serverGroup;
62+
private final EVCacheClient client;
6363
private DistributionSummary getDataSize, bulkDataSize, getAndTouchDataSize;
6464

6565
public EVCacheMemcachedClient(ConnectionFactory cf, List<InetSocketAddress> addrs,
6666
ChainedDynamicProperty.IntProperty readTimeout, String appName, String zone, int id,
67-
ServerGroup serverGroup) throws IOException {
67+
ServerGroup serverGroup, EVCacheClient client) throws IOException {
6868
super(cf, addrs);
6969
this.id = id;
7070
this.appName = appName;
7171
this.zone = zone;
7272
this.readTimeout = readTimeout;
7373
this.serverGroup = serverGroup;
74+
this.client = client;
7475
}
7576

7677
public NodeLocator getNodeLocator() {
@@ -299,7 +300,7 @@ public void complete() {
299300

300301
final DeleteOperation op = opFact.delete(key, callback);
301302
rv.setOperation(op);
302-
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl) ((EVCacheLatchImpl) evcacheLatch).addFuture(rv);
303+
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl && !client.isInWriteOnly()) ((EVCacheLatchImpl) evcacheLatch).addFuture(rv);
303304
mconn.enqueueOperation(key, op);
304305
return rv;
305306
}
@@ -332,7 +333,7 @@ public void complete() {
332333
}
333334
});
334335
rv.setOperation(op);
335-
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl) ((EVCacheLatchImpl) evcacheLatch).addFuture(rv);
336+
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl && !client.isInWriteOnly()) ((EVCacheLatchImpl) evcacheLatch).addFuture(rv);
336337
mconn.enqueueOperation(key, op);
337338
return rv;
338339
}
@@ -423,7 +424,7 @@ public void complete() {
423424
});
424425
rv.setOperation(op);
425426
mconn.enqueueOperation(key, op);
426-
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl) ((EVCacheLatchImpl) evcacheLatch).addFuture(rv);
427+
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl && !client.isInWriteOnly()) ((EVCacheLatchImpl) evcacheLatch).addFuture(rv);
427428
return rv;
428429
}
429430

@@ -489,8 +490,7 @@ public void complete() {
489490
}
490491
});
491492
rv.setOperation(op);
492-
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl) ((EVCacheLatchImpl) evcacheLatch)
493-
.addFuture(rv);
493+
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl && !client.isInWriteOnly()) ((EVCacheLatchImpl) evcacheLatch).addFuture(rv);
494494
mconn.enqueueOperation(key, op);
495495
return rv;
496496
}

0 commit comments

Comments
 (0)