Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion evcache-core/src/main/java/com/netflix/evcache/EVCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@
*
* @author smadappa
*/
public interface EVCache {
public interface EVCache extends EVCacheMetaOperations {
// TODO: Remove Async methods (Project rx) and rename COMPLETABLE_* with ASYNC_*
public static enum Call {
GET, GETL, GET_AND_TOUCH, ASYNC_GET, BULK, SET, DELETE, INCR, DECR, TOUCH, APPEND, PREPEND, REPLACE, ADD, APPEND_OR_ADD, GET_ALL, META_GET, META_SET, META_DEBUG,
META_GET_BULK, META_DELETE,
COMPLETABLE_FUTURE_GET, COMPLETABLE_FUTURE_GET_BULK
};

Expand Down
253 changes: 253 additions & 0 deletions evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@

import net.spy.memcached.CachedData;
import net.spy.memcached.transcoders.Transcoder;
import net.spy.memcached.protocol.ascii.MetaSetOperation;
import net.spy.memcached.protocol.ascii.MetaGetBulkOperation;
import net.spy.memcached.protocol.ascii.MetaDeleteOperation;
import rx.Observable;
import rx.Scheduler;
import rx.Single;
Expand Down Expand Up @@ -3409,4 +3412,254 @@ protected List<Tag> getTags() {
return tags;
}

// Meta Protocol Operations Implementation

@Override
public EVCacheLatch metaSet(MetaSetOperation.Builder builder, Policy policy) throws EVCacheException {
if (builder == null) throw new IllegalArgumentException("Builder cannot be null");

final boolean throwExc = doThrowException();
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
if (clients.length == 0) {
incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.META_SET);
if (throwExc) throw new EVCacheException("Could not find a client to perform meta set");
return new EVCacheLatchImpl(policy, 0, _appName);
}

final String key = builder.build().getKey();
final EVCacheKey evcKey = getEVCacheKey(key);
final EVCacheEvent event = createEVCacheEvent(Arrays.asList(clients), Call.META_SET);
if (event != null) {
event.setEVCacheKeys(Arrays.asList(evcKey));
try {
if (shouldThrottle(event)) {
incrementFastFail(EVCacheMetricsFactory.THROTTLED, Call.META_SET);
if (throwExc) throw new EVCacheException("Request Throttled for app " + _appName + " & key " + key);
return new EVCacheLatchImpl(policy, 0, _appName);
}
} catch(EVCacheException ex) {
if(throwExc) throw ex;
incrementFastFail(EVCacheMetricsFactory.THROTTLED, Call.META_SET);
return null;
}
startEvent(event);
}

final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime();
String status = EVCacheMetricsFactory.SUCCESS;
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy,
clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName);

try {
for (EVCacheClient client : clients) {
final String canonicalKey = evcKey.getCanonicalKey(client.isDuetClient());

// Create builder with canonical key for this client
final MetaSetOperation.Builder clientBuilder = new MetaSetOperation.Builder()
.key(canonicalKey)
.value(builder.build().getValue())
.mode(builder.build().getMode())
.expiration(builder.build().getExpiration())
.cas(builder.build().getCas())
.returnCas(builder.build().isReturnCas())
.returnTtl(builder.build().isReturnTtl())
.markStale(builder.build().isMarkStale());

final EVCacheOperationFuture<Boolean> future = client.metaSet(clientBuilder, latch);
if (log.isDebugEnabled() && shouldLog()) log.debug("META_SET : APP " + _appName + ", key : " + canonicalKey + ", Future : " + future);
}
if (event != null) endEvent(event);
} catch (Exception ex) {
status = EVCacheMetricsFactory.ERROR;
if (log.isDebugEnabled() && shouldLog()) log.debug("Exception setting the data for APP " + _appName + ", key : " + evcKey, ex);
if (event != null) eventError(event, ex);
if (!throwExc) return latch;
throw new EVCacheException("Exception setting data for APP " + _appName + ", key : " + evcKey, ex);
} finally {
final long end = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime();
final long duration = end - start;
// Track meta set operation metrics
getTimer(Call.META_SET.name(), EVCacheMetricsFactory.WRITE, null, status, 1, maxWriteDuration.get().intValue(), null).record(duration, TimeUnit.MILLISECONDS);
}
return latch;
}

@Override
public <T> Map<String, EVCacheItem<T>> metaGetBulk(Collection<String> keys, Transcoder<T> tc) throws EVCacheException {
return metaGetBulk(keys, new MetaGetBulkOperation.Config(keys), tc);
}

@Override
public <T> Map<String, EVCacheItem<T>> metaGetBulk(Collection<String> keys, MetaGetBulkOperation.Config config, Transcoder<T> tc) throws EVCacheException {
if (null == keys) throw new IllegalArgumentException("Keys cannot be null");
if (keys.isEmpty()) return Collections.<String, EVCacheItem<T>>emptyMap();

final boolean throwExc = doThrowException();
final EVCacheClient client = _pool.getEVCacheClientForRead();
if (client == null) {
incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.META_GET_BULK);
if (throwExc) throw new EVCacheException("Could not find a client to perform meta get bulk");
return Collections.<String, EVCacheItem<T>>emptyMap();
}

final Map<String, EVCacheItem<T>> decanonicalR = new HashMap<String, EVCacheItem<T>>((keys.size() * 4) / 3 + 1);
final Collection<EVCacheKey> evcKeys = new ArrayList<EVCacheKey>();
final Collection<String> canonicalKeys = new ArrayList<String>();

/* Canonicalize keys */
for (String k : keys) {
final EVCacheKey evcKey = getEVCacheKey(k);
evcKeys.add(evcKey);
canonicalKeys.add(evcKey.getCanonicalKey(client.isDuetClient()));
}

final EVCacheEvent event = createEVCacheEvent(Arrays.asList(client), Call.META_GET_BULK);
if (event != null) {
event.setEVCacheKeys(evcKeys);
try {
if (shouldThrottle(event)) {
incrementFastFail(EVCacheMetricsFactory.THROTTLED, Call.META_GET_BULK);
if (throwExc) throw new EVCacheException("Request Throttled for app " + _appName + " & keys " + keys);
return Collections.<String, EVCacheItem<T>>emptyMap();
}
} catch(EVCacheException ex) {
if(throwExc) throw ex;
incrementFastFail(EVCacheMetricsFactory.THROTTLED, Call.META_GET_BULK);
return Collections.<String, EVCacheItem<T>>emptyMap();
}
startEvent(event);
}

final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime();
String status = EVCacheMetricsFactory.SUCCESS;

try {
// Update config with canonical keys
final MetaGetBulkOperation.Config canonicalConfig = new MetaGetBulkOperation.Config(canonicalKeys)
.includeCas(config.isIncludeCas())
.includeTtl(config.isIncludeTtl())
.includeSize(config.isIncludeSize())
.includeLastAccess(config.isIncludeLastAccess())
.serveStale(config.isServeStale())
.maxStaleTime(config.getMaxStaleTime());

final EVCacheOperationFuture<Map<String, EVCacheItem<Object>>> future = client.metaGetBulk(canonicalConfig);
final Map<String, EVCacheItem<Object>> canonicalResult = future.get();

// Convert canonical keys back to original keys and decode values
for (int i = 0; i < keys.size(); i++) {
final String originalKey = ((ArrayList<String>) keys).get(i);
final String canonicalKey = ((ArrayList<String>) canonicalKeys).get(i);

if (canonicalResult.containsKey(canonicalKey)) {
final EVCacheItem<Object> canonicalItem = canonicalResult.get(canonicalKey);
final EVCacheItem<T> item = new EVCacheItem<T>();

// Decode the data using transcoder
if (canonicalItem.getData() != null && canonicalItem.getData() instanceof CachedData) {
final CachedData cd = (CachedData) canonicalItem.getData();
final Transcoder<T> transcoder = (tc == null) ? (Transcoder<T>) _transcoder : tc;
item.setData(transcoder.decode(cd));
} else {
item.setData((T) canonicalItem.getData());
}

item.setFlag(canonicalItem.getFlag());
item.getItemMetaData().copyFrom(canonicalItem.getItemMetaData());
decanonicalR.put(originalKey, item);
}
}

if (event != null) endEvent(event);
} catch (Exception ex) {
status = EVCacheMetricsFactory.ERROR;
if (log.isDebugEnabled() && shouldLog()) log.debug("Exception getting bulk data for APP " + _appName + ", keys : " + keys, ex);
if (event != null) eventError(event, ex);
if (throwExc) throw new EVCacheException("Exception getting bulk data for APP " + _appName + ", keys : " + keys, ex);
} finally {
final long end = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime();
final long duration = end - start;
// Track meta get bulk operation metrics
getTimer(Call.META_GET_BULK.name(), EVCacheMetricsFactory.READ, EVCacheMetricsFactory.YES, status, 1, maxReadDuration.get().intValue(), client.getServerGroup()).record(duration, TimeUnit.MILLISECONDS);
}

return decanonicalR;
}

@Override
public <T> Map<String, EVCacheItem<T>> metaGetBulk(String... keys) throws EVCacheException {
return metaGetBulk(Arrays.asList(keys), (Transcoder<T>) _transcoder);
}

@Override
public <T> Map<String, EVCacheItem<T>> metaGetBulk(Transcoder<T> tc, String... keys) throws EVCacheException {
return metaGetBulk(Arrays.asList(keys), tc);
}

@Override
public EVCacheLatch metaDelete(MetaDeleteOperation.Builder builder, Policy policy) throws EVCacheException {
if (builder == null) throw new IllegalArgumentException("Builder cannot be null");

final boolean throwExc = doThrowException();
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
if (clients.length == 0) {
incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.META_DELETE);
if (throwExc) throw new EVCacheException("Could not find a client to perform meta delete");
return new EVCacheLatchImpl(policy, 0, _appName);
}

final String key = builder.build().getKey();
final EVCacheKey evcKey = getEVCacheKey(key);
final EVCacheEvent event = createEVCacheEvent(Arrays.asList(clients), Call.META_DELETE);
if (event != null) {
event.setEVCacheKeys(Arrays.asList(evcKey));
try {
if (shouldThrottle(event)) {
incrementFastFail(EVCacheMetricsFactory.THROTTLED, Call.META_DELETE);
if (throwExc) throw new EVCacheException("Request Throttled for app " + _appName + " & key " + key);
return new EVCacheLatchImpl(policy, 0, _appName);
}
} catch(EVCacheException ex) {
if(throwExc) throw ex;
incrementFastFail(EVCacheMetricsFactory.THROTTLED, Call.META_DELETE);
return null;
}
startEvent(event);
}

final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime();
String status = EVCacheMetricsFactory.SUCCESS;
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy,
clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName);

try {
for (EVCacheClient client : clients) {
final String canonicalKey = evcKey.getCanonicalKey(client.isDuetClient());

// Create builder with canonical key for this client
final MetaDeleteOperation.Builder clientBuilder = new MetaDeleteOperation.Builder()
.key(canonicalKey)
.mode(builder.getMode())
.cas(builder.getCas())
.returnTtl(builder.isReturnTtl());

final EVCacheOperationFuture<Boolean> future = client.metaDelete(clientBuilder, latch);
if (log.isDebugEnabled() && shouldLog()) log.debug("META_DELETE : APP " + _appName + ", key : " + canonicalKey + ", Future : " + future);
}
if (event != null) endEvent(event);
} catch (Exception ex) {
status = EVCacheMetricsFactory.ERROR;
if (log.isDebugEnabled() && shouldLog()) log.debug("Exception deleting the data for APP " + _appName + ", key : " + evcKey, ex);
if (event != null) eventError(event, ex);
if (!throwExc) return latch;
throw new EVCacheException("Exception deleting data for APP " + _appName + ", key : " + evcKey, ex);
} finally {
final long end = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime();
final long duration = end - start;
incrementFastFail(status, Call.META_DELETE);
getTimer(Call.META_DELETE.name(), EVCacheMetricsFactory.WRITE, null, status, 1, maxWriteDuration.get().intValue(), null).record(duration, TimeUnit.MILLISECONDS);
}
return latch;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package com.netflix.evcache;

import java.util.Collection;
import java.util.Map;

import com.netflix.evcache.EVCacheLatch.Policy;
import com.netflix.evcache.operation.EVCacheItem;
import net.spy.memcached.protocol.ascii.MetaGetBulkOperation;
import net.spy.memcached.protocol.ascii.MetaSetOperation;
import net.spy.memcached.protocol.ascii.MetaDeleteOperation;
import net.spy.memcached.transcoders.Transcoder;

/**
* Additional meta protocol operations for EVCache.
* These methods leverage the advanced capabilities of memcached's meta protocol.
*/
public interface EVCacheMetaOperations {

/**
* Advanced set operation using meta protocol with CAS, conditional operations,
* and atomic features across all replicas.
*
* @param builder Meta set configuration builder
* @param policy Latch policy for coordinating across replicas
* @return EVCacheLatch for tracking operation completion
* @throws EVCacheException if operation fails
*/
default EVCacheLatch metaSet(MetaSetOperation.Builder builder, Policy policy) throws EVCacheException {
throw new EVCacheException("Default implementation. If you are implementing EVCache interface you need to implement this method.");
}

/**
* Retrieve values and metadata for multiple keys using meta protocol.
* Following EVCache bulk operation conventions.
*
* @param keys Collection of keys to retrieve
* @param tc Transcoder for deserialization
* @return Map of key to EVCacheItem containing data and metadata
* @throws EVCacheException if operation fails
*/
default <T> Map<String, EVCacheItem<T>> metaGetBulk(Collection<String> keys, Transcoder<T> tc) throws EVCacheException {
throw new EVCacheException("Default implementation. If you are implementing EVCache interface you need to implement this method.");
}

/**
* Retrieve values and metadata for multiple keys using meta protocol with custom configuration.
*
* @param keys Collection of keys to retrieve
* @param config Configuration for meta get bulk behavior
* @param tc Transcoder for deserialization
* @return Map of key to EVCacheItem containing data and metadata
* @throws EVCacheException if operation fails
*/
default <T> Map<String, EVCacheItem<T>> metaGetBulk(Collection<String> keys, MetaGetBulkOperation.Config config, Transcoder<T> tc) throws EVCacheException {
throw new EVCacheException("Default implementation. If you are implementing EVCache interface you need to implement this method.");
}

/**
* Retrieve values and metadata for multiple keys using meta protocol.
* Varargs convenience method.
*
* @param keys Keys to retrieve
* @return Map of key to EVCacheItem containing data and metadata
* @throws EVCacheException if operation fails
*/
default <T> Map<String, EVCacheItem<T>> metaGetBulk(String... keys) throws EVCacheException {
throw new EVCacheException("Default implementation. If you are implementing EVCache interface you need to implement this method.");
}

/**
* Retrieve values and metadata for multiple keys using meta protocol with custom transcoder.
* Varargs convenience method.
*
* @param tc Transcoder for deserialization
* @param keys Keys to retrieve
* @return Map of key to EVCacheItem containing data and metadata
* @throws EVCacheException if operation fails
*/
default <T> Map<String, EVCacheItem<T>> metaGetBulk(Transcoder<T> tc, String... keys) throws EVCacheException {
throw new EVCacheException("Default implementation. If you are implementing EVCache interface you need to implement this method.");
}

/**
* Advanced delete operation using meta protocol with CAS and conditional operations.
* Supports both deletion and invalidation (marking as stale).
*
* @param builder Meta delete configuration builder
* @param policy Latch policy for coordinating across replicas
* @return EVCacheLatch for tracking operation completion
* @throws EVCacheException if operation fails
*/
default EVCacheLatch metaDelete(MetaDeleteOperation.Builder builder, Policy policy) throws EVCacheException {
throw new EVCacheException("Default implementation. If you are implementing EVCache interface you need to implement this method.");
}

}
Loading