1313import java .util .Map ;
1414import java .util .concurrent .ConcurrentHashMap ;
1515import java .util .concurrent .CountDownLatch ;
16+ import java .util .concurrent .ExecutionException ;
1617import java .util .concurrent .Future ;
1718import java .util .concurrent .TimeUnit ;
1819import java .util .concurrent .atomic .AtomicInteger ;
1920import java .util .concurrent .atomic .AtomicLong ;
2021import java .util .concurrent .atomic .AtomicReference ;
2122import java .util .function .BiPredicate ;
23+ import java .util .function .Consumer ;
2224
23- import com .netflix .evcache . pool . EVCacheClientUtil ;
25+ import com .netflix .archaius . api . PropertyRepository ;
2426import org .slf4j .Logger ;
2527import org .slf4j .LoggerFactory ;
2628
3537import com .netflix .evcache .operation .EVCacheLatchImpl ;
3638import com .netflix .evcache .operation .EVCacheOperationFuture ;
3739import com .netflix .evcache .pool .EVCacheClient ;
40+ import com .netflix .evcache .pool .EVCacheClientUtil ;
3841import com .netflix .evcache .util .EVCacheConfig ;
3942import com .netflix .spectator .api .BasicTag ;
4043import com .netflix .spectator .api .DistributionSummary ;
@@ -80,16 +83,27 @@ public class EVCacheMemcachedClient extends MemcachedClient {
8083 private final Property <Integer > maxReadDuration , maxWriteDuration ;
8184 private final Property <Boolean > enableDebugLogsOnWrongKey ;
8285
86+ private volatile boolean alwaysDecodeSync ;
87+
8388 public EVCacheMemcachedClient (ConnectionFactory cf , List <InetSocketAddress > addrs ,
8489 Property <Integer > readTimeout , EVCacheClient client ) throws IOException {
8590 super (cf , addrs );
8691 this .connectionFactory = cf ;
8792 this .readTimeout = readTimeout ;
8893 this .client = client ;
8994 this .appName = client .getAppName ();
90- this .maxWriteDuration = EVCacheConfig .getInstance ().getPropertyRepository ().get (appName + ".max.write.duration.metric" , Integer .class ).orElseGet ("evcache.max.write.duration.metric" ).orElse (50 );
91- this .maxReadDuration = EVCacheConfig .getInstance ().getPropertyRepository ().get (appName + ".max.read.duration.metric" , Integer .class ).orElseGet ("evcache.max.read.duration.metric" ).orElse (20 );
92- this .enableDebugLogsOnWrongKey = EVCacheConfig .getInstance ().getPropertyRepository ().get (appName + ".enable.debug.logs.on.wrongkey" , Boolean .class ).orElse (false );
95+ final PropertyRepository props = EVCacheConfig .getInstance ().getPropertyRepository ();
96+ this .maxWriteDuration = props .get (appName + ".max.write.duration.metric" , Integer .class ).orElseGet ("evcache.max.write.duration.metric" ).orElse (50 );
97+ this .maxReadDuration = props .get (appName + ".max.read.duration.metric" , Integer .class ).orElseGet ("evcache.max.read.duration.metric" ).orElse (20 );
98+ this .enableDebugLogsOnWrongKey = props .get (appName + ".enable.debug.logs.on.wrongkey" , Boolean .class ).orElse (false );
99+
100+ // TODO in future remove this flag so that decode does not block the IO loop
101+ // the default/legacy behavior (true) is effectively to decode on the IO loop, set to false to use the transcode threads
102+ this .alwaysDecodeSync = true ;
103+ props .get (appName + ".get.alwaysDecodeSync" , Boolean .class )
104+ .orElseGet ("evcache.get.alwaysDecodeSync" )
105+ .orElse (true )
106+ .subscribe (v -> alwaysDecodeSync = v );
93107 }
94108
95109 public NodeLocator getNodeLocator () {
@@ -126,62 +140,100 @@ private boolean isWrongKeyReturned(String original_key, String returned_key) {
126140 }
127141
128142 public <T > EVCacheOperationFuture <T > asyncGet (final String key , final Transcoder <T > tc , EVCacheGetOperationListener <T > listener ) {
129- final CountDownLatch latch = new CountDownLatch (1 );
130- final EVCacheOperationFuture <T > rv = new EVCacheOperationFuture <T >(key , latch , new AtomicReference <T >(null ), readTimeout .get ().intValue (), executorService , client );
143+ // we should only complete the latch when decode AND complete have completed
144+ final CountDownLatch latch = new CountDownLatch (2 );
145+
146+ final EVCacheOperationFuture <T > rv = new EVCacheOperationFuture <>(
147+ key , latch , new AtomicReference <T >(null ), readTimeout .get (), executorService , client );
148+
149+ final DistributionSummary dataSizeDS = getDataSizeDistributionSummary (
150+ EVCacheMetricsFactory .GET_OPERATION ,
151+ EVCacheMetricsFactory .READ ,
152+ EVCacheMetricsFactory .IPC_SIZE_INBOUND );
153+
154+ @ SuppressWarnings ("unchecked" )
155+ final Transcoder <T > transcoder = (tc == null ) ? (Transcoder <T >) getTranscoder () : tc ;
156+ final boolean shouldLog = log .isDebugEnabled () && client .getPool ().getEVCacheClientPoolManager ().shouldLog (appName );
157+
131158 final Operation op = opFact .get (key , new GetOperation .Callback () {
132- private Future <T > val = null ;
159+ // not volatile since only ever used from memcached loop callbacks
160+ private boolean asyncDecodeIssued = false ;
133161
134- public void receivedStatus (OperationStatus status ) {
135- if (log .isDebugEnabled ()) log .debug ("Getting Key : " + key + "; Status : " + status .getStatusCode ().name () + (log .isTraceEnabled () ? " Node : " + getEVCacheNode (key ) : "" )
136- + "; Message : " + status .getMessage () + "; Elapsed Time - " + (System .currentTimeMillis () - rv .getStartTime ()));
137- try {
138- if (val != null ) {
139- if (log .isTraceEnabled () && client .getPool ().getEVCacheClientPoolManager ().shouldLog (appName )) log .trace ("Key : " + key + "; val : " + val .get ());
140- rv .set (val .get (), status );
162+ // both volatile to ensure sync across transcode threads and memcached loop
163+ private volatile T value ;
164+ private volatile OperationStatus status = null ;
165+
166+ public void gotData (String k , int flags , byte [] data ) {
167+ if (isWrongKeyReturned (key , k )) {
168+ return ;
169+ }
170+
171+ if (shouldLog ) {
172+ log .debug ("Read data : key {}; flags : {}; data : {}" , key , flags , data );
173+ if (data != null ) {
174+ log .debug ("Key : {}; val size : {}" , key , data .length );
141175 } else {
142- if (log .isTraceEnabled () && client .getPool ().getEVCacheClientPoolManager ().shouldLog (appName )) log .trace ("Key : " + key + "; val is null" );
143- rv .set (null , status );
176+ log .debug ("Key : {}; val is null" , key );
144177 }
145- } catch (Exception e ) {
146- log .error (e .getMessage (), e );
147- rv .set (null , status );
148178 }
149- }
150179
151- @ SuppressWarnings ( "unchecked" )
152- public void gotData ( String k , int flags , byte [] data ) {
180+ if ( data != null ) {
181+ dataSizeDS . record ( data . length );
153182
154- if (isWrongKeyReturned (key , k )) return ;
183+ if (tcService == null ) {
184+ log .error ("tcService is null, will not be able to decode" );
185+ throw new RuntimeException ("TranscoderSevice is null. Not able to decode" );
186+ }
155187
156- if (log .isDebugEnabled () && client .getPool ().getEVCacheClientPoolManager ().shouldLog (appName )) log .debug ("Read data : key " + key + "; flags : " + flags + "; data : " + data );
157- if (data != null ) {
158- if (log .isDebugEnabled () && client .getPool ().getEVCacheClientPoolManager ().shouldLog (appName )) log .debug ("Key : " + key + "; val size : " + data .length );
159- getDataSizeDistributionSummary (EVCacheMetricsFactory .GET_OPERATION , EVCacheMetricsFactory .READ , EVCacheMetricsFactory .IPC_SIZE_INBOUND ).record (data .length );
160- if (tc == null ) {
161- if (tcService == null ) {
162- log .error ("tcService is null, will not be able to decode" );
163- throw new RuntimeException ("TranscoderSevice is null. Not able to decode" );
164- } else {
165- final Transcoder <T > t = (Transcoder <T >) getTranscoder ();
166- val = tcService .decode (t , new CachedData (flags , data , t .getMaxSize ()));
167- }
188+ CachedData chunk = new CachedData (flags , data , transcoder .getMaxSize ());
189+ boolean doSync = alwaysDecodeSync || (!transcoder .asyncDecode (chunk ));
190+
191+ if (doSync ) {
192+ value = transcoder .decode (chunk );
193+ rv .set (value , status );
168194 } else {
169- if (tcService == null ) {
170- log .error ("tcService is null, will not be able to decode" );
171- throw new RuntimeException ("TranscoderSevice is null. Not able to decode" );
172- } else {
173- val = tcService .decode (tc , new CachedData (flags , data , tc .getMaxSize ()));
174- }
195+ asyncDecodeIssued = true ;
196+ final Transcoder <T > wrappedTranscoder = decodeAndThen (transcoder , (decoded ) -> {
197+ value = decoded ;
198+ rv .set (decoded , status );
199+ latch .countDown ();
200+ });
201+ tcService .decode (wrappedTranscoder , chunk );
175202 }
176- } else {
177- if (log .isDebugEnabled () && client .getPool ().getEVCacheClientPoolManager ().shouldLog (appName )) log .debug ("Key : " + key + "; val is null" );
203+ }
204+ }
205+
206+ public void receivedStatus (OperationStatus status ) {
207+ this .status = status ;
208+
209+ // On rare occasion, it might be possible that transcoder finishes and starts to call rv.set(),
210+ // at the exact time that receivedStatus here does a set(). This means that through unlucky timing
211+ // here we might drop the decoded value that was set by the transcoder.
212+ //
213+ // We add a simple if check to see if the transcode thread has changed the value after we did rv.set(),
214+ // and if it has, we will do it again. Since value is only set once (after decode), we need only a single
215+ // check here. It is important that it is a separate volatile read from value.
216+
217+ T before = value ;
218+ rv .set (before , status );
219+ T after = value ;
220+
221+ if (after != before ) {
222+ rv .set (after , status );
178223 }
179224 }
180225
181226 public void complete () {
227+ // if an async decode was never issued, issue an extra countdown, since 2 latch values were set
228+ if (!asyncDecodeIssued ) {
229+ latch .countDown ();
230+ }
231+
182232 latch .countDown ();
233+
234+ final String metricHit = (asyncDecodeIssued || value != null ) ? EVCacheMetricsFactory .YES : EVCacheMetricsFactory .NO ;
183235 final String host = ((rv .getStatus ().getStatusCode ().equals (StatusCode .TIMEDOUT ) && rv .getOperation () != null ) ? getHostName (rv .getOperation ().getHandlingNode ().getSocketAddress ()) : null );
184- getTimer (EVCacheMetricsFactory .GET_OPERATION , EVCacheMetricsFactory .READ , rv .getStatus (), ( val != null ? EVCacheMetricsFactory . YES : EVCacheMetricsFactory . NO ) , host , getReadMetricMaxValue ()).record ((System .currentTimeMillis () - rv .getStartTime ()), TimeUnit .MILLISECONDS );
236+ getTimer (EVCacheMetricsFactory .GET_OPERATION , EVCacheMetricsFactory .READ , rv .getStatus (), metricHit , host , getReadMetricMaxValue ()).record ((System .currentTimeMillis () - rv .getStartTime ()), TimeUnit .MILLISECONDS );
185237 rv .signalComplete ();
186238 }
187239 });
@@ -191,6 +243,37 @@ public void complete() {
191243 return rv ;
192244 }
193245
246+ // A Transcode wrapper to allow an action to be performed after decode has completed.
247+ static <T > Transcoder <T > decodeAndThen (Transcoder <T > transcoder , Consumer <T > completed ) {
248+ return new Transcoder <T >() {
249+ @ Override
250+ public boolean asyncDecode (CachedData d ) {
251+ return transcoder .asyncDecode (d );
252+ }
253+
254+ @ Override
255+ public CachedData encode (T o ) {
256+ throw new UnsupportedOperationException ("encode" );
257+ }
258+
259+ @ Override
260+ public T decode (CachedData d ) {
261+ T decoded = null ;
262+ try {
263+ decoded = transcoder .decode (d );
264+ return decoded ;
265+ } finally {
266+ completed .accept (decoded );
267+ }
268+ }
269+
270+ @ Override
271+ public int getMaxSize () {
272+ return transcoder .getMaxSize ();
273+ }
274+ };
275+ }
276+
194277 public <T > EVCacheBulkGetFuture <T > asyncGetBulk (Collection <String > keys ,
195278 final Transcoder <T > tc ,
196279 EVCacheGetOperationListener <T > listener ) {
0 commit comments