Skip to content

Commit 75bc08b

Browse files
committed
Fixed add method so it does not call clientUtil multiple times
Added completedFutureCount and pendingFutureCount to latch Added a thread pool which can process the latch in its own thread
1 parent 5cf36d9 commit 75bc08b

File tree

4 files changed

+163
-74
lines changed

4 files changed

+163
-74
lines changed

evcache-client/src/main/java/com/netflix/evcache/EVCacheImpl.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,10 @@ final public class EVCacheImpl implements EVCache {
7575
private final ChainedDynamicProperty.BooleanProperty _useInMemoryCache;
7676
private final Stats stats;
7777
private EVCacheInMemoryCache<?> cache;
78+
private EVCacheClientUtil clientUtil = null;
7879

7980
private final EVCacheClientPoolManager _poolManager;
80-
private DistributionSummary setTTLSummary, replaceTTLSummary, touchTTLSummary, setDataSizeSummary, replaceDataSizeSummary, appendDataSizeSummary, addDataSizeSummary;
81+
private DistributionSummary setTTLSummary, replaceTTLSummary, touchTTLSummary, setDataSizeSummary, replaceDataSizeSummary, appendDataSizeSummary;
8182
private Counter touchCounter;
8283

8384
EVCacheImpl(String appName, String cacheName, int timeToLive, Transcoder<?> transcoder, boolean enableZoneFallback,
@@ -1711,18 +1712,17 @@ public <T> EVCacheLatch add(String key, T value, Transcoder<T> tc, int timeToLiv
17111712
EVCacheLatch latch = null;
17121713
try {
17131714
CachedData cd = null;
1714-
for (EVCacheClient client : clients) {
1715-
if (cd == null) {
1716-
if (tc != null) {
1717-
cd = tc.encode(value);
1718-
} else if ( _transcoder != null) {
1719-
cd = ((Transcoder<Object>)_transcoder).encode(value);
1720-
} else {
1721-
cd = client.getTranscoder().encode(value);
1722-
}
1715+
if (cd == null) {
1716+
if (tc != null) {
1717+
cd = tc.encode(value);
1718+
} else if ( _transcoder != null) {
1719+
cd = ((Transcoder<Object>)_transcoder).encode(value);
1720+
} else {
1721+
cd = _pool.getEVCacheClientForRead().getTranscoder().encode(value);
17231722
}
1724-
latch = EVCacheClientUtil.add(canonicalKey, cd, timeToLive, _pool, policy);
17251723
}
1724+
if(clientUtil == null) clientUtil = new EVCacheClientUtil(_pool);
1725+
latch = clientUtil.add(canonicalKey, cd, timeToLive, policy);
17261726
if (event != null) {
17271727
event.setCanonicalKeys(Arrays.asList(canonicalKey));
17281728
event.setTTL(timeToLive);

evcache-client/src/main/java/com/netflix/evcache/EVCacheLatch.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,20 @@ public static enum Policy {
7979
*/
8080
List<Future<Boolean>> getCompletedFutures();
8181

82+
/**
83+
* Returns the number of Futures that are still Pending.
84+
*
85+
* @return the current outstanding Future task count
86+
*/
87+
int getPendingFutureCount();
88+
89+
/**
90+
* Returns the number of Future Tasks that are completed.
91+
*
92+
* @return the current completed future task count
93+
*/
94+
int getCompletedFutureCount();
95+
8296
/**
8397
* Returns the number of Tasks that are still Pending.
8498
*

evcache-client/src/main/java/com/netflix/evcache/operation/EVCacheLatchImpl.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,4 +268,26 @@ public String toString() {
268268
return builder.toString();
269269
}
270270

271+
@Override
272+
public int getPendingFutureCount() {
273+
int count = 0;
274+
for (Future<Boolean> future : futures) {
275+
if (!future.isDone()) {
276+
count++;
277+
}
278+
}
279+
return count;
280+
}
281+
282+
@Override
283+
public int getCompletedFutureCount() {
284+
int count = 0;
285+
for (Future<Boolean> future : futures) {
286+
if (future.isDone()) {
287+
count++;
288+
}
289+
}
290+
return count;
291+
}
292+
271293
}
Lines changed: 116 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
package com.netflix.evcache.pool;
22

33
import java.util.List;
4+
import java.util.concurrent.BlockingQueue;
45
import java.util.concurrent.Future;
6+
import java.util.concurrent.LinkedBlockingQueue;
7+
import java.util.concurrent.RejectedExecutionHandler;
8+
import java.util.concurrent.ThreadPoolExecutor;
9+
import java.util.concurrent.TimeUnit;
510

611
import org.slf4j.Logger;
712
import org.slf4j.LoggerFactory;
@@ -21,88 +26,136 @@
2126

2227
public class EVCacheClientUtil {
2328
private static Logger log = LoggerFactory.getLogger(EVCacheClientUtil.class);
24-
private static final ChunkTranscoder ct = new ChunkTranscoder();
25-
public static EVCacheLatch add(String canonicalKey, CachedData cd, int timeToLive, EVCacheClientPool _pool, Policy policy) throws Exception {
29+
private final ChunkTranscoder ct = new ChunkTranscoder();
30+
private final String _appName;
31+
private final DistributionSummary addDataSizeSummary;
32+
private final DistributionSummary addTTLSummary;
33+
private final DynamicBooleanProperty fixup;
34+
private final EVCacheClientPool _pool;
35+
private ThreadPoolExecutor threadPool = null;
36+
37+
public EVCacheClientUtil(EVCacheClientPool pool) {
38+
this._pool = pool;
39+
this._appName = pool.getAppName();
40+
this.addDataSizeSummary = EVCacheMetricsFactory.getDistributionSummary(_appName + "-AddData-Size", _appName, null);
41+
this.addTTLSummary = EVCacheMetricsFactory.getDistributionSummary(_appName + "-AddData-TTL", _appName, null);
42+
this.fixup = EVCacheConfig.getInstance().getDynamicBooleanProperty(_appName + ".addOperation.fixup", Boolean.FALSE);
43+
int maxThreads = 10;
44+
45+
RejectedExecutionHandler block = new RejectedExecutionHandler() {
46+
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
47+
try {
48+
executor.getQueue().put( r );
49+
} catch (InterruptedException e) {
50+
e.printStackTrace();
51+
}
52+
}
53+
};
54+
55+
final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(1000);
56+
threadPool = new ThreadPoolExecutor(maxThreads, maxThreads * 2, 30, TimeUnit.SECONDS, queue);
57+
threadPool.setRejectedExecutionHandler(block);
58+
threadPool.prestartAllCoreThreads();
2659

60+
}
61+
62+
public EVCacheLatch add(String canonicalKey, CachedData cd, int timeToLive, Policy policy) throws Exception {
2763
if (cd == null) return null;
28-
final String _appName = _pool.getAppName();
29-
final DistributionSummary addDataSizeSummary = EVCacheMetricsFactory.getDistributionSummary(_appName + "-AddData-Size", _appName, null);
30-
if (addDataSizeSummary != null) addDataSizeSummary.record(cd.getData().length);
31-
final DistributionSummary addTTLSummary = EVCacheMetricsFactory.getDistributionSummary(_appName + "-AddData-TTL", _appName, null);
32-
if (addTTLSummary != null) addTTLSummary.record(timeToLive);
33-
final DynamicBooleanProperty fixup = EVCacheConfig.getInstance().getDynamicBooleanProperty(_appName + ".addOperation.fixup", Boolean.FALSE);
64+
addDataSizeSummary.record(cd.getData().length);
65+
addTTLSummary.record(timeToLive);
3466

3567
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
36-
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy, clients.length, _pool.getAppName()){
68+
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy, clients.length, _appName){
3769

3870
@Override
3971
public void onComplete(OperationFuture<?> operationFuture) throws Exception {
4072
super.onComplete(operationFuture);
41-
if (getPendingCount() == 0 && fixup.get()) {
42-
final List<Future<Boolean>> futures = getAllFutures();
43-
int successCount = 0, failCount = 0;
44-
for(int i = 0; i < futures.size() ; i++) {
45-
final Future<Boolean> future = futures.get(i);
46-
if(future instanceof EVCacheOperationFuture) {
47-
final EVCacheOperationFuture<Boolean> f = (EVCacheOperationFuture<Boolean>)future;
48-
if(f.getStatus().getStatusCode() == StatusCode.SUCCESS) {
49-
successCount++;
50-
if(log.isDebugEnabled()) log.debug("ADD : Success : APP " + _appName + ", key " + canonicalKey+ ", ServerGroup : " + f.getServerGroup().getName());
51-
} else {
52-
failCount++;
53-
if(log.isDebugEnabled()) log.debug("ADD : Fail : APP " + _appName + ", key : " + canonicalKey + ", ServerGroup : " + f.getServerGroup().getName());
54-
}
55-
}
73+
if (getPendingFutureCount() == 0 && fixup.get()) {
74+
final RemoteRequest req = new RemoteRequest(this, canonicalKey, timeToLive);
75+
threadPool.submit(req);
76+
}
77+
}
78+
};
79+
80+
for (EVCacheClient client : clients) {
81+
final Future<Boolean> future = client.add(canonicalKey, timeToLive, cd, ct, latch);
82+
if(log.isDebugEnabled()) log.debug("ADD : Op Submitted : APP " + _appName + ", key " + canonicalKey + "; future : " + future);
83+
}
84+
return latch;
85+
}
86+
87+
class RemoteRequest implements Runnable {
88+
private EVCacheLatchImpl latch;
89+
private String canonicalKey;
90+
private int timeToLive;
91+
public RemoteRequest(EVCacheLatchImpl latch, String canonicalKey, int timeToLive) {
92+
this.latch = latch;
93+
this.canonicalKey = canonicalKey;
94+
this.timeToLive = timeToLive;
95+
}
96+
public void run() {
97+
final List<Future<Boolean>> futures = latch.getAllFutures();
98+
int successCount = 0, failCount = 0;
99+
for(int i = 0; i < futures.size() ; i++) {
100+
final Future<Boolean> future = futures.get(i);
101+
if(future instanceof EVCacheOperationFuture) {
102+
final EVCacheOperationFuture<Boolean> f = (EVCacheOperationFuture<Boolean>)future;
103+
if(f.getStatus().getStatusCode() == StatusCode.SUCCESS) {
104+
successCount++;
105+
if(log.isDebugEnabled()) log.debug("ADD : Success : APP " + _appName + ", key " + canonicalKey+ ", ServerGroup : " + f.getServerGroup().getName());
106+
} else {
107+
failCount++;
108+
if(log.isDebugEnabled()) log.debug("ADD : Fail : APP " + _appName + ", key : " + canonicalKey + ", ServerGroup : " + f.getServerGroup().getName());
56109
}
57-
58-
if(successCount > 0 && failCount > 0) {
59-
CachedData readData = null;
60-
for(int i = 0; i < futures.size(); i++) {
61-
final Future<Boolean> evFuture = futures.get(i);
62-
if(evFuture instanceof EVCacheOperationFuture) {
63-
final EVCacheOperationFuture<Boolean> f = (EVCacheOperationFuture<Boolean>)evFuture;
64-
if(f.getStatus().getStatusCode() == StatusCode.ERR_EXISTS) {
65-
final EVCacheClient client = _pool.getEVCacheClient(f.getServerGroup());
66-
if(client != null) {
67-
readData = client.get(canonicalKey, ct, false, false);
68-
if(log.isDebugEnabled()) log.debug("Add : Read existing data for: APP " + _pool.getAppName() + ", key " + canonicalKey + "; ServerGroup : " + client.getServerGroupName());
69-
if(readData != null) {
70-
break;
71-
} else {
72-
73-
}
74-
}
110+
}
111+
}
112+
if(log.isDebugEnabled()) log.debug("ADD : Status: APP " + _appName + ", key : " + canonicalKey + ", failCount : " + failCount + "; successCount : " + successCount);
113+
114+
if(successCount > 0 && failCount > 0) {
115+
CachedData readData = null;
116+
for(int i = 0; i < futures.size(); i++) {
117+
final Future<Boolean> evFuture = futures.get(i);
118+
if(evFuture instanceof EVCacheOperationFuture) {
119+
final EVCacheOperationFuture<Boolean> f = (EVCacheOperationFuture<Boolean>)evFuture;
120+
if(f.getStatus().getStatusCode() == StatusCode.ERR_EXISTS) {
121+
final EVCacheClient client = _pool.getEVCacheClient(f.getServerGroup());
122+
if(client != null) {
123+
try {
124+
readData = client.get(canonicalKey, ct, false, false);
125+
} catch (Exception e) {
126+
log.error("Error readig the data", e);
127+
}
128+
if(log.isDebugEnabled()) log.debug("Add : Read existing data for: APP " + _appName + ", key " + canonicalKey + "; ServerGroup : " + client.getServerGroupName());
129+
if(readData != null) {
130+
break;
131+
} else {
132+
75133
}
76134
}
77135
}
78-
if(readData != null) {
79-
for(int i = 0; i < futures.size(); i++) {
80-
final Future<Boolean> evFuture = futures.get(i);
81-
if(evFuture instanceof OperationFuture) {
82-
final EVCacheOperationFuture<Boolean> f = (EVCacheOperationFuture<Boolean>)evFuture;
83-
if(f.getStatus().getStatusCode() == StatusCode.SUCCESS) {
84-
final EVCacheClient client = _pool.getEVCacheClient(f.getServerGroup());
85-
if(client != null) {
86-
futures.remove(i);
87-
client.set(canonicalKey, readData, timeToLive, this);
88-
if(log.isDebugEnabled()) log.debug("Add: Fixup for : APP " + _pool.getAppName() + ", key " + canonicalKey + "; ServerGroup : " + client.getServerGroupName());
89-
EVCacheMetricsFactory.increment(_appName , null, client.getServerGroupName(), _appName + "-AddCall-FixUp");
90-
}
136+
}
137+
}
138+
if(readData != null) {
139+
for(int i = 0; i < futures.size(); i++) {
140+
final Future<Boolean> evFuture = futures.get(i);
141+
if(evFuture instanceof OperationFuture) {
142+
final EVCacheOperationFuture<Boolean> f = (EVCacheOperationFuture<Boolean>)evFuture;
143+
if(f.getStatus().getStatusCode() == StatusCode.SUCCESS) {
144+
final EVCacheClient client = _pool.getEVCacheClient(f.getServerGroup());
145+
if(client != null) {
146+
try {
147+
client.set(canonicalKey, readData, timeToLive, latch);
148+
if(log.isDebugEnabled()) log.debug("Add: Fixup for : APP " + _appName + ", key " + canonicalKey + "; ServerGroup : " + client.getServerGroupName());
149+
EVCacheMetricsFactory.increment(_appName , null, client.getServerGroupName(), _appName + "-AddCall-FixUp");
150+
} catch (Exception e) {
151+
if(log.isDebugEnabled()) log.debug("Add: Fixup Error : APP " + _appName + ", key " + canonicalKey + "; ServerGroup : " + client.getServerGroupName(), e);
91152
}
92153
}
93154
}
94155
}
95156
}
96157
}
97158
}
98-
};
99-
100-
for (EVCacheClient client : clients) {
101-
final Future<Boolean> future = client.add(canonicalKey, timeToLive, cd, ct, latch);
102-
if(log.isDebugEnabled()) log.debug("ADD : Op Submitted : APP " + _pool.getAppName() + ", key " + canonicalKey + "; future : " + future);
103159
}
104-
return latch;
105160
}
106-
107-
108161
}

0 commit comments

Comments
 (0)