Skip to content

Commit afb8f57

Browse files
committed
fix metacommands in client + tests
1 parent cc7b922 commit afb8f57

File tree

11 files changed

+2844
-162
lines changed

11 files changed

+2844
-162
lines changed

evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java

Lines changed: 142 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ public class EVCacheImpl implements EVCache, EVCacheImplMBean {
8181

8282
private static final Logger log = LoggerFactory.getLogger(EVCacheImpl.class);
8383

84+
// Atomic counter to ensure E flag (recasid) uniqueness within the same millisecond
85+
// Format: timestamp (milliseconds) + sequence number
86+
// This prevents multiple threads from getting identical E flags when calling within the same ms
87+
private static final java.util.concurrent.atomic.AtomicLong recasidSequence = new java.util.concurrent.atomic.AtomicLong(0);
88+
8489
private final String _appName;
8590
private final String _cacheName;
8691
private final String _metricPrefix;
@@ -3414,10 +3419,61 @@ protected List<Tag> getTags() {
34143419

34153420
// Meta Protocol Operations Implementation
34163421

3422+
/**
3423+
* Generates a unique recasid (E flag) value for CAS synchronization across zones.
3424+
* Uses timestamp + atomic sequence to ensure uniqueness even when called within the same millisecond.
3425+
*
3426+
* Format: (timestamp_ms << 10) | sequence
3427+
* - Upper 54 bits: timestamp in milliseconds (supports ~570 years from epoch)
3428+
* - Lower 10 bits: sequence number (0-1023, wraps every 1024 operations)
3429+
*
3430+
* This allows up to 1024 unique CAS values per millisecond while keeping values compact.
3431+
* Fits within 64-bit CAS token used by memcached.
3432+
*/
3433+
private static long generateUniqueRecasid() {
3434+
long timestamp = System.currentTimeMillis();
3435+
long sequence = recasidSequence.incrementAndGet() & 0x3FF; // Mask to 10 bits (0-1023)
3436+
return (timestamp << 10) | sequence;
3437+
}
3438+
34173439
@Override
34183440
public EVCacheLatch metaSet(MetaSetOperation.Builder builder, Policy policy) throws EVCacheException {
34193441
if (builder == null) throw new IllegalArgumentException("Builder cannot be null");
3420-
3442+
3443+
// Policy enforcement based on operation type:
3444+
// 1. ADD mode (leases/locks) - REQUIRES Policy.ONE to avoid distributed race conditions
3445+
// 2. Regular SET - REQUIRES Policy.ALL for E flag synchronization
3446+
// 3. CAS validation - User chooses (depends on whether they hold a lease)
3447+
boolean isAddMode = builder.build().getMode() == MetaSetOperation.SetMode.ADD;
3448+
boolean hasCasValidation = builder.build().getCas() > 0;
3449+
3450+
if (isAddMode && policy != Policy.ONE) {
3451+
// ADD mode (leases/locks) requires Policy.ONE
3452+
// - Policy.QUORUM can result in 2+ winners (each client gets quorum in different zones)
3453+
// - Policy.ALL results in no winners (distributed race - each succeeds in 1 zone, fails in others)
3454+
// - Policy.ONE guarantees exactly 1 winner (first to any zone wins)
3455+
if (log.isInfoEnabled()) {
3456+
log.info("META_SET: ADD mode requires Policy.ONE for proper lease semantics. " +
3457+
"Overriding policy from {} to Policy.ONE for app: {}",
3458+
policy, _appName);
3459+
}
3460+
policy = Policy.ONE;
3461+
} else if (!isAddMode && !hasCasValidation && policy != Policy.ALL) {
3462+
// Regular SET (no ADD, no CAS) requires Policy.ALL
3463+
// E flag is ALWAYS auto-generated for multi-zone CAS synchronization
3464+
// Therefore, Policy.ALL is REQUIRED to guarantee all zones have the same CAS
3465+
if (log.isInfoEnabled()) {
3466+
log.info("META_SET: E flag requires Policy.ALL for CAS synchronization. " +
3467+
"Overriding policy from {} to Policy.ALL for app: {} (mode: {})",
3468+
policy, _appName, builder.build().getMode());
3469+
}
3470+
policy = Policy.ALL;
3471+
}
3472+
// CAS validation: No enforcement - user chooses Policy based on whether they hold a lease
3473+
// - WITH lease (mutual exclusion): Use Policy.ALL
3474+
// - WITHOUT lease (competitive): Use Policy.QUORUM
3475+
// We cannot detect if user has lease (it's a different key), so user must choose correctly
3476+
34213477
final boolean throwExc = doThrowException();
34223478
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
34233479
if (clients.length == 0) {
@@ -3447,26 +3503,52 @@ public EVCacheLatch metaSet(MetaSetOperation.Builder builder, Policy policy) thr
34473503

34483504
final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime();
34493505
String status = EVCacheMetricsFactory.SUCCESS;
3450-
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy,
3506+
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy,
34513507
clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName);
3452-
3508+
3509+
// Auto-generate recasid (E flag) for multi-zone CAS synchronization
3510+
// E flag sets CAS explicitly (requires memcached 1.6.21+ with meta commands)
3511+
// Client ALWAYS generates CAS token to ensure all zones have identical CAS values
3512+
long recasidToUse = builder.build().getRecasid();
3513+
if (recasidToUse <= 0) {
3514+
// Auto-generate unique timestamp-based CAS token if not explicitly provided
3515+
// Format: (timestamp_ms << 10) | sequence
3516+
// This provides ~1000 unique values per millisecond for concurrent operations
3517+
long timestamp = System.currentTimeMillis();
3518+
long sequence = recasidSequence.incrementAndGet() & 0x3FF; // 10 bits (0-1023)
3519+
recasidToUse = (timestamp << 10) | sequence;
3520+
if (log.isDebugEnabled() && shouldLog()) {
3521+
log.debug("META_SET: Auto-generated recasid for multi-zone CAS sync: {} (ts={}, seq={}) for key: {}",
3522+
recasidToUse, timestamp, sequence, key);
3523+
}
3524+
} else {
3525+
if (log.isDebugEnabled() && shouldLog()) {
3526+
log.debug("META_SET: Using explicit recasid for multi-zone CAS sync: {} for key: {}",
3527+
recasidToUse, key);
3528+
}
3529+
}
3530+
34533531
try {
34543532
for (EVCacheClient client : clients) {
34553533
final String canonicalKey = evcKey.getCanonicalKey(client.isDuetClient());
3456-
3534+
3535+
if (log.isDebugEnabled() && shouldLog()) {
3536+
log.debug("META_SET : APP " + _appName + ", key : " + canonicalKey);
3537+
}
3538+
34573539
// Create builder with canonical key for this client
34583540
final MetaSetOperation.Builder clientBuilder = new MetaSetOperation.Builder()
34593541
.key(canonicalKey)
34603542
.value(builder.build().getValue())
34613543
.mode(builder.build().getMode())
34623544
.expiration(builder.build().getExpiration())
34633545
.cas(builder.build().getCas())
3546+
.recasid(recasidToUse) // Use the SAME recasid for all zones!
34643547
.returnCas(builder.build().isReturnCas())
34653548
.returnTtl(builder.build().isReturnTtl())
34663549
.markStale(builder.build().isMarkStale());
34673550

34683551
final EVCacheOperationFuture<Boolean> future = client.metaSet(clientBuilder, latch);
3469-
if (log.isDebugEnabled() && shouldLog()) log.debug("META_SET : APP " + _appName + ", key : " + canonicalKey + ", Future : " + future);
34703552
}
34713553
if (event != null) endEvent(event);
34723554
} catch (Exception ex) {
@@ -3545,29 +3627,31 @@ public <T> Map<String, EVCacheItem<T>> metaGetBulk(Collection<String> keys, Meta
35453627

35463628
final EVCacheOperationFuture<Map<String, EVCacheItem<Object>>> future = client.metaGetBulk(canonicalConfig);
35473629
final Map<String, EVCacheItem<Object>> canonicalResult = future.get();
3548-
3630+
35493631
// Convert canonical keys back to original keys and decode values
3550-
for (int i = 0; i < keys.size(); i++) {
3551-
final String originalKey = ((ArrayList<String>) keys).get(i);
3552-
final String canonicalKey = ((ArrayList<String>) canonicalKeys).get(i);
3553-
3632+
int loopIndex = 0;
3633+
for (String originalKey : keys) {
3634+
final String canonicalKey = ((ArrayList<String>) canonicalKeys).get(loopIndex);
3635+
35543636
if (canonicalResult.containsKey(canonicalKey)) {
35553637
final EVCacheItem<Object> canonicalItem = canonicalResult.get(canonicalKey);
35563638
final EVCacheItem<T> item = new EVCacheItem<T>();
3557-
3639+
35583640
// Decode the data using transcoder
35593641
if (canonicalItem.getData() != null && canonicalItem.getData() instanceof CachedData) {
35603642
final CachedData cd = (CachedData) canonicalItem.getData();
3561-
final Transcoder<T> transcoder = (tc == null) ? (Transcoder<T>) _transcoder : tc;
3643+
// Use same transcoder fallback logic as regular get() method
3644+
final Transcoder<T> transcoder = (tc == null) ? ((_transcoder == null) ? (Transcoder<T>) client.getTranscoder() : (Transcoder<T>) _transcoder) : tc;
35623645
item.setData(transcoder.decode(cd));
35633646
} else {
35643647
item.setData((T) canonicalItem.getData());
35653648
}
3566-
3649+
35673650
item.setFlag(canonicalItem.getFlag());
35683651
item.getItemMetaData().copyFrom(canonicalItem.getItemMetaData());
35693652
decanonicalR.put(originalKey, item);
35703653
}
3654+
loopIndex++;
35713655
}
35723656

35733657
if (event != null) endEvent(event);
@@ -3599,7 +3683,18 @@ public <T> Map<String, EVCacheItem<T>> metaGetBulk(Transcoder<T> tc, String... k
35993683
@Override
36003684
public EVCacheLatch metaDelete(MetaDeleteOperation.Builder builder, Policy policy) throws EVCacheException {
36013685
if (builder == null) throw new IllegalArgumentException("Builder cannot be null");
3602-
3686+
3687+
// E flag is ALWAYS auto-generated for multi-zone CAS synchronization
3688+
// Therefore, Policy.ALL is REQUIRED to guarantee all zones have the same tombstone CAS
3689+
if (policy != Policy.ALL) {
3690+
if (log.isInfoEnabled()) {
3691+
log.info("META_DELETE: E flag requires Policy.ALL for CAS synchronization. " +
3692+
"Overriding policy from {} to Policy.ALL for app: {}",
3693+
policy, _appName);
3694+
}
3695+
policy = Policy.ALL;
3696+
}
3697+
36033698
final boolean throwExc = doThrowException();
36043699
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
36053700
if (clients.length == 0) {
@@ -3629,22 +3724,48 @@ public EVCacheLatch metaDelete(MetaDeleteOperation.Builder builder, Policy polic
36293724

36303725
final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime();
36313726
String status = EVCacheMetricsFactory.SUCCESS;
3632-
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy,
3727+
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy,
36333728
clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName);
3634-
3729+
3730+
// Auto-generate recasid (E flag) for multi-zone tombstone CAS synchronization
3731+
// E flag sets tombstone CAS explicitly (requires memcached 1.6.21+ with meta commands)
3732+
// Client ALWAYS generates CAS token to ensure all zones have identical tombstone CAS values
3733+
long recasidToUse = builder.build().getRecasid();
3734+
if (recasidToUse <= 0) {
3735+
// Auto-generate unique timestamp-based CAS token if not explicitly provided
3736+
// Format: (timestamp_ms << 10) | sequence
3737+
// This provides ~1000 unique values per millisecond for concurrent operations
3738+
long timestamp = System.currentTimeMillis();
3739+
long sequence = recasidSequence.incrementAndGet() & 0x3FF; // 10 bits (0-1023)
3740+
recasidToUse = (timestamp << 10) | sequence;
3741+
if (log.isDebugEnabled() && shouldLog()) {
3742+
log.debug("META_DELETE: Auto-generated recasid for multi-zone tombstone CAS sync: {} (ts={}, seq={}) for key: {}",
3743+
recasidToUse, timestamp, sequence, key);
3744+
}
3745+
} else {
3746+
if (log.isDebugEnabled() && shouldLog()) {
3747+
log.debug("META_DELETE: Using explicit recasid for multi-zone tombstone CAS sync: {} for key: {}",
3748+
recasidToUse, key);
3749+
}
3750+
}
3751+
36353752
try {
36363753
for (EVCacheClient client : clients) {
36373754
final String canonicalKey = evcKey.getCanonicalKey(client.isDuetClient());
3638-
3755+
3756+
if (log.isDebugEnabled() && shouldLog()) {
3757+
log.debug("META_DELETE : APP " + _appName + ", key : " + canonicalKey);
3758+
}
3759+
36393760
// Create builder with canonical key for this client
36403761
final MetaDeleteOperation.Builder clientBuilder = new MetaDeleteOperation.Builder()
36413762
.key(canonicalKey)
3642-
.mode(builder.getMode())
3643-
.cas(builder.getCas())
3644-
.returnTtl(builder.isReturnTtl());
3763+
.mode(builder.build().getMode())
3764+
.cas(builder.build().getCas())
3765+
.recasid(recasidToUse) // Use the SAME recasid for all zones!
3766+
.returnTtl(builder.build().isReturnTtl());
36453767

36463768
final EVCacheOperationFuture<Boolean> future = client.metaDelete(clientBuilder, latch);
3647-
if (log.isDebugEnabled() && shouldLog()) log.debug("META_DELETE : APP " + _appName + ", key : " + canonicalKey + ", Future : " + future);
36483769
}
36493770
if (event != null) endEvent(event);
36503771
} catch (Exception ex) {

evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheLatchImpl.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,31 @@ public EVCacheLatchImpl(Policy policy, int _count, String appName) {
6868
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
6969
if (log.isDebugEnabled()) log.debug("Current Latch Count = " + latch.getCount() + "; await for "+ timeout + " " + unit.name() + " appName : " + appName);
7070
final long start = log.isDebugEnabled() ? System.currentTimeMillis() : 0;
71-
final boolean awaitSuccess = latch.await(timeout, unit);
72-
if (log.isDebugEnabled()) log.debug("await success = " + awaitSuccess + " after " + (System.currentTimeMillis() - start) + " msec." + " appName : " + appName + ((evcacheEvent != null) ? " keys : " + evcacheEvent.getEVCacheKeys() : ""));
73-
return awaitSuccess;
71+
final boolean countdownFinished = latch.await(timeout, unit);
72+
if (log.isDebugEnabled()) log.debug("countdown finished = " + countdownFinished + " after " + (System.currentTimeMillis() - start) + " msec." + " appName : " + appName + ((evcacheEvent != null) ? " keys : " + evcacheEvent.getEVCacheKeys() : ""));
73+
74+
// Check if enough operations succeeded (not just completed)
75+
if (!countdownFinished) {
76+
return false; // Timed out
77+
}
78+
79+
// Count how many operations succeeded
80+
int successCount = 0;
81+
for (Future<Boolean> future : futures) {
82+
try {
83+
if (future.isDone() && future.get().equals(Boolean.TRUE)) {
84+
successCount++;
85+
}
86+
} catch (Exception e) {
87+
// Exception means failure
88+
if (log.isDebugEnabled()) log.debug("Future failed with exception", e);
89+
}
90+
}
91+
92+
// Return true only if enough operations succeeded according to policy
93+
final boolean policyMet = successCount >= expectedCompleteCount;
94+
if (log.isDebugEnabled()) log.debug("Policy check: successCount=" + successCount + ", required=" + expectedCompleteCount + ", policyMet=" + policyMet);
95+
return policyMet;
7496
}
7597

7698
/*
@@ -201,11 +223,15 @@ public void setEVCacheEvent(EVCacheEvent e) {
201223
@Override
202224
public void onComplete(OperationFuture<?> future) throws Exception {
203225
if (log.isDebugEnabled()) log.debug("BEGIN : onComplete - Calling Countdown. Completed Future = " + future + "; App : " + appName);
226+
204227
countDown();
205228
completeCount++;
229+
206230
if(evcacheEvent != null) {
207231
if (log.isDebugEnabled()) log.debug(";App : " + evcacheEvent.getAppName() + "; Call : " + evcacheEvent.getCall() + "; Keys : " + evcacheEvent.getEVCacheKeys() + "; completeCount : " + completeCount + "; totalFutureCount : " + totalFutureCount +"; failureCount : " + failureCount);
208232
try {
233+
Object result = future.isDone() ? future.get() : null;
234+
209235
if(future.isDone() && future.get().equals(Boolean.FALSE)) {
210236
failureCount++;
211237
if(failReason == null) failReason = EVCacheMetricsFactory.getInstance().getStatusCode(future.getStatus().getStatusCode());

evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1778,6 +1778,7 @@ public EVCacheOperationFuture<Boolean> metaDelete(net.spy.memcached.protocol.asc
17781778
public EVCacheOperationFuture<Boolean> metaSet(net.spy.memcached.protocol.ascii.MetaSetOperation.Builder builder, EVCacheLatchImpl latch) throws Exception {
17791779
final String key = builder.getKey();
17801780
final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);
1781+
17811782
if (!ensureWriteQueueSize(node, key, Call.SET)) {
17821783
if (log.isInfoEnabled()) log.info("Node : " + node + " is not active. Failing fast and dropping the meta set event.");
17831784
final net.spy.memcached.internal.ListenableFuture<Boolean, net.spy.memcached.internal.OperationCompletionListener> defaultFuture = (net.spy.memcached.internal.ListenableFuture<Boolean, net.spy.memcached.internal.OperationCompletionListener>) getDefaultFuture();

0 commit comments

Comments
 (0)