Skip to content

Commit 60e2a77

Browse files
committed
perf: allow concurrent decompress away from network loop
We want to ensure the transcode passes everything that is expected to be run asynchronously over to the decode loop. In general, memcached calls us back with gotData, then receivedStatus, then complete. We use gotData to launch the work onto the transcode threadpool and return control to memcached, which would immediately then call receivedStatus. Previously, receivedStatus and complete were set up to interact and set a value on the underlying future but only by synchronously blocking for the transcode future. Given this callback is happening nearly immediately after the gotData callback, we were firing the transcode and nearly always performing a blocking get, which triggers a synchronous decompression on the network IO loop. This is of course very detrimental to evcache driver performance, since the driver cannot even accept new requests to issue to them to memcached backends, and must wait until decompression completes. In this fix, we rearrange things a little to ensure that if the async decode is requested, that we push the completion status updates to happen only after the async decode completes. This is a little ugly because of the current arrangement of the memcached decoder. A future change might be to overhaul this integration and pull it out of the memcached transcode framework and use something a bit more friendly. Also, add a property to control sync decode threading behavior, default to using the existing behaviors for now.
1 parent 454ae98 commit 60e2a77

File tree

1 file changed

+127
-44
lines changed

1 file changed

+127
-44
lines changed

evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java

Lines changed: 127 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,16 @@
1313
import java.util.Map;
1414
import java.util.concurrent.ConcurrentHashMap;
1515
import java.util.concurrent.CountDownLatch;
16+
import java.util.concurrent.ExecutionException;
1617
import java.util.concurrent.Future;
1718
import java.util.concurrent.TimeUnit;
1819
import java.util.concurrent.atomic.AtomicInteger;
1920
import java.util.concurrent.atomic.AtomicLong;
2021
import java.util.concurrent.atomic.AtomicReference;
2122
import java.util.function.BiPredicate;
23+
import java.util.function.Consumer;
2224

23-
import com.netflix.evcache.pool.EVCacheClientUtil;
25+
import com.netflix.archaius.api.PropertyRepository;
2426
import org.slf4j.Logger;
2527
import org.slf4j.LoggerFactory;
2628

@@ -35,6 +37,7 @@
3537
import com.netflix.evcache.operation.EVCacheLatchImpl;
3638
import com.netflix.evcache.operation.EVCacheOperationFuture;
3739
import com.netflix.evcache.pool.EVCacheClient;
40+
import com.netflix.evcache.pool.EVCacheClientUtil;
3841
import com.netflix.evcache.util.EVCacheConfig;
3942
import com.netflix.spectator.api.BasicTag;
4043
import com.netflix.spectator.api.DistributionSummary;
@@ -80,16 +83,27 @@ public class EVCacheMemcachedClient extends MemcachedClient {
8083
private final Property<Integer> maxReadDuration, maxWriteDuration;
8184
private final Property<Boolean> enableDebugLogsOnWrongKey;
8285

86+
private volatile boolean alwaysDecodeSync;
87+
8388
public EVCacheMemcachedClient(ConnectionFactory cf, List<InetSocketAddress> addrs,
8489
Property<Integer> readTimeout, EVCacheClient client) throws IOException {
8590
super(cf, addrs);
8691
this.connectionFactory = cf;
8792
this.readTimeout = readTimeout;
8893
this.client = client;
8994
this.appName = client.getAppName();
90-
this.maxWriteDuration = EVCacheConfig.getInstance().getPropertyRepository().get(appName + ".max.write.duration.metric", Integer.class).orElseGet("evcache.max.write.duration.metric").orElse(50);
91-
this.maxReadDuration = EVCacheConfig.getInstance().getPropertyRepository().get(appName + ".max.read.duration.metric", Integer.class).orElseGet("evcache.max.read.duration.metric").orElse(20);
92-
this.enableDebugLogsOnWrongKey = EVCacheConfig.getInstance().getPropertyRepository().get(appName + ".enable.debug.logs.on.wrongkey", Boolean.class).orElse(false);
95+
final PropertyRepository props = EVCacheConfig.getInstance().getPropertyRepository();
96+
this.maxWriteDuration = props.get(appName + ".max.write.duration.metric", Integer.class).orElseGet("evcache.max.write.duration.metric").orElse(50);
97+
this.maxReadDuration = props.get(appName + ".max.read.duration.metric", Integer.class).orElseGet("evcache.max.read.duration.metric").orElse(20);
98+
this.enableDebugLogsOnWrongKey = props.get(appName + ".enable.debug.logs.on.wrongkey", Boolean.class).orElse(false);
99+
100+
// TODO in future remove this flag so that decode does not block the IO loop
101+
// the default/legacy behavior (true) is effectively to decode on the IO loop, set to false to use the transcode threads
102+
this.alwaysDecodeSync = true;
103+
props.get(appName + ".get.alwaysDecodeSync", Boolean.class)
104+
.orElseGet("evcache.get.alwaysDecodeSync")
105+
.orElse(true)
106+
.subscribe(v -> alwaysDecodeSync = v);
93107
}
94108

95109
public NodeLocator getNodeLocator() {
@@ -126,62 +140,100 @@ private boolean isWrongKeyReturned(String original_key, String returned_key) {
126140
}
127141

128142
public <T> EVCacheOperationFuture<T> asyncGet(final String key, final Transcoder<T> tc, EVCacheGetOperationListener<T> listener) {
129-
final CountDownLatch latch = new CountDownLatch(1);
130-
final EVCacheOperationFuture<T> rv = new EVCacheOperationFuture<T>(key, latch, new AtomicReference<T>(null), readTimeout.get().intValue(), executorService, client);
143+
// we should only complete the latch when decode AND complete have completed
144+
final CountDownLatch latch = new CountDownLatch(2);
145+
146+
final EVCacheOperationFuture<T> rv = new EVCacheOperationFuture<>(
147+
key, latch, new AtomicReference<T>(null), readTimeout.get(), executorService, client);
148+
149+
final DistributionSummary dataSizeDS = getDataSizeDistributionSummary(
150+
EVCacheMetricsFactory.GET_OPERATION,
151+
EVCacheMetricsFactory.READ,
152+
EVCacheMetricsFactory.IPC_SIZE_INBOUND);
153+
154+
@SuppressWarnings("unchecked")
155+
final Transcoder<T> transcoder = (tc == null) ? (Transcoder<T>) getTranscoder() : tc;
156+
final boolean shouldLog = log.isDebugEnabled() && client.getPool().getEVCacheClientPoolManager().shouldLog(appName);
157+
131158
final Operation op = opFact.get(key, new GetOperation.Callback() {
132-
private Future<T> val = null;
159+
// not volatile since only ever used from memcached loop callbacks
160+
private boolean asyncDecodeIssued = false;
133161

134-
public void receivedStatus(OperationStatus status) {
135-
if (log.isDebugEnabled()) log.debug("Getting Key : " + key + "; Status : " + status.getStatusCode().name() + (log.isTraceEnabled() ? " Node : " + getEVCacheNode(key) : "")
136-
+ "; Message : " + status.getMessage() + "; Elapsed Time - " + (System.currentTimeMillis() - rv.getStartTime()));
137-
try {
138-
if (val != null) {
139-
if (log.isTraceEnabled() && client.getPool().getEVCacheClientPoolManager().shouldLog(appName)) log.trace("Key : " + key + "; val : " + val.get());
140-
rv.set(val.get(), status);
162+
// both volatile to ensure sync across transcode threads and memcached loop
163+
private volatile T value;
164+
private volatile OperationStatus status = null;
165+
166+
public void gotData(String k, int flags, byte[] data) {
167+
if (isWrongKeyReturned(key, k)) {
168+
return;
169+
}
170+
171+
if (shouldLog) {
172+
log.debug("Read data : key {}; flags : {}; data : {}", key, flags, data);
173+
if (data != null) {
174+
log.debug("Key : {}; val size : {}", key, data.length);
141175
} else {
142-
if (log.isTraceEnabled() && client.getPool().getEVCacheClientPoolManager().shouldLog(appName)) log.trace("Key : " + key + "; val is null");
143-
rv.set(null, status);
176+
log.debug("Key : {}; val is null", key);
144177
}
145-
} catch (Exception e) {
146-
log.error(e.getMessage(), e);
147-
rv.set(null, status);
148178
}
149-
}
150179

151-
@SuppressWarnings("unchecked")
152-
public void gotData(String k, int flags, byte[] data) {
180+
if (data != null) {
181+
dataSizeDS.record(data.length);
153182

154-
if (isWrongKeyReturned(key, k)) return;
183+
if (tcService == null) {
184+
log.error("tcService is null, will not be able to decode");
185+
throw new RuntimeException("TranscoderSevice is null. Not able to decode");
186+
}
155187

156-
if (log.isDebugEnabled() && client.getPool().getEVCacheClientPoolManager().shouldLog(appName)) log.debug("Read data : key " + key + "; flags : " + flags + "; data : " + data);
157-
if (data != null) {
158-
if (log.isDebugEnabled() && client.getPool().getEVCacheClientPoolManager().shouldLog(appName)) log.debug("Key : " + key + "; val size : " + data.length);
159-
getDataSizeDistributionSummary(EVCacheMetricsFactory.GET_OPERATION, EVCacheMetricsFactory.READ, EVCacheMetricsFactory.IPC_SIZE_INBOUND).record(data.length);
160-
if (tc == null) {
161-
if (tcService == null) {
162-
log.error("tcService is null, will not be able to decode");
163-
throw new RuntimeException("TranscoderSevice is null. Not able to decode");
164-
} else {
165-
final Transcoder<T> t = (Transcoder<T>) getTranscoder();
166-
val = tcService.decode(t, new CachedData(flags, data, t.getMaxSize()));
167-
}
188+
CachedData chunk = new CachedData(flags, data, transcoder.getMaxSize());
189+
boolean doSync = alwaysDecodeSync || (!transcoder.asyncDecode(chunk));
190+
191+
if (doSync) {
192+
value = transcoder.decode(chunk);
193+
rv.set(value, status);
168194
} else {
169-
if (tcService == null) {
170-
log.error("tcService is null, will not be able to decode");
171-
throw new RuntimeException("TranscoderSevice is null. Not able to decode");
172-
} else {
173-
val = tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()));
174-
}
195+
asyncDecodeIssued = true;
196+
final Transcoder<T> wrappedTranscoder = decodeAndThen(transcoder, (decoded) -> {
197+
value = decoded;
198+
rv.set(decoded, status);
199+
latch.countDown();
200+
});
201+
tcService.decode(wrappedTranscoder, chunk);
175202
}
176-
} else {
177-
if (log.isDebugEnabled() && client.getPool().getEVCacheClientPoolManager().shouldLog(appName)) log.debug("Key : " + key + "; val is null" );
203+
}
204+
}
205+
206+
public void receivedStatus(OperationStatus status) {
207+
this.status = status;
208+
209+
// On rare occasion, it might be possible that transcoder finishes and starts to call rv.set(),
210+
// at the exact time that receivedStatus here does a set(). This means that through unlucky timing
211+
// here we might drop the decoded value that was set by the transcoder.
212+
//
213+
// We add a simple if check to see if the transcode thread has changed the value after we did rv.set(),
214+
// and if it has, we will do it again. Since value is only set once (after decode), we need only a single
215+
// check here. It is important that it is a separate volatile read from value.
216+
217+
T before = value;
218+
rv.set(before, status);
219+
T after = value;
220+
221+
if (after != before) {
222+
rv.set(after, status);
178223
}
179224
}
180225

181226
public void complete() {
227+
// if an async decode was never issued, issue an extra countdown, since 2 latch values were set
228+
if (!asyncDecodeIssued) {
229+
latch.countDown();
230+
}
231+
182232
latch.countDown();
233+
234+
final String metricHit = (asyncDecodeIssued || value != null) ? EVCacheMetricsFactory.YES : EVCacheMetricsFactory.NO;
183235
final String host = ((rv.getStatus().getStatusCode().equals(StatusCode.TIMEDOUT) && rv.getOperation() != null) ? getHostName(rv.getOperation().getHandlingNode().getSocketAddress()) : null);
184-
getTimer(EVCacheMetricsFactory.GET_OPERATION, EVCacheMetricsFactory.READ, rv.getStatus(), (val != null ? EVCacheMetricsFactory.YES : EVCacheMetricsFactory.NO), host, getReadMetricMaxValue()).record((System.currentTimeMillis() - rv.getStartTime()), TimeUnit.MILLISECONDS);
236+
getTimer(EVCacheMetricsFactory.GET_OPERATION, EVCacheMetricsFactory.READ, rv.getStatus(), metricHit, host, getReadMetricMaxValue()).record((System.currentTimeMillis() - rv.getStartTime()), TimeUnit.MILLISECONDS);
185237
rv.signalComplete();
186238
}
187239
});
@@ -191,6 +243,37 @@ public void complete() {
191243
return rv;
192244
}
193245

246+
// A Transcode wrapper to allow an action to be performed after decode has completed.
247+
static <T> Transcoder<T> decodeAndThen(Transcoder<T> transcoder, Consumer<T> completed) {
248+
return new Transcoder<T>() {
249+
@Override
250+
public boolean asyncDecode(CachedData d) {
251+
return transcoder.asyncDecode(d);
252+
}
253+
254+
@Override
255+
public CachedData encode(T o) {
256+
throw new UnsupportedOperationException("encode");
257+
}
258+
259+
@Override
260+
public T decode(CachedData d) {
261+
T decoded = null;
262+
try {
263+
decoded = transcoder.decode(d);
264+
return decoded;
265+
} finally {
266+
completed.accept(decoded);
267+
}
268+
}
269+
270+
@Override
271+
public int getMaxSize() {
272+
return transcoder.getMaxSize();
273+
}
274+
};
275+
}
276+
194277
public <T> EVCacheBulkGetFuture<T> asyncGetBulk(Collection<String> keys,
195278
final Transcoder<T> tc,
196279
EVCacheGetOperationListener<T> listener) {

0 commit comments

Comments
 (0)