Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions fdbclient/ClientKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ void ClientKnobs::initialize(Randomize randomize) {
init( LOCATION_CACHE_EVICTION_SIZE_SIM, 10 ); if( randomize && BUGGIFY ) LOCATION_CACHE_EVICTION_SIZE_SIM = 3;
init( LOCATION_CACHE_ENDPOINT_FAILURE_GRACE_PERIOD, 60 );
init( LOCATION_CACHE_FAILED_ENDPOINT_RETRY_INTERVAL, 60 );
// The interval in seconds to run the cache eviction logic. If enabled will iterate over the location cache entries and remove
// stale/failed entries.
init( LOCATION_CACHE_EVICTION_INTERVAL, 0.0 );
// The maximum entries per cache evition iteration to check if they are expired. If set to a negative number all entries will be validated.
init( LOCATION_CACHE_MAX_ENTRIES_PER_ITERATION, 1000.0 );

init( GET_RANGE_SHARD_LIMIT, 2 );
init( WARM_RANGE_SHARD_LIMIT, 100 );
Expand Down
115 changes: 95 additions & 20 deletions fdbclient/NativeAPI.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,6 @@ ACTOR Future<Void> databaseLogger(DatabaseContext* cx) {

cx->cc.logToTraceEvent(ev);

ev.detail("LocationCacheEntryCount", cx->locationCache.size());
ev.detail("MeanLatency", cx->latencies.mean())
.detail("MedianLatency", cx->latencies.median())
.detail("Latency90", cx->latencies.percentile(0.90))
Expand Down Expand Up @@ -1077,6 +1076,89 @@ Reference<LocationInfo> addCaches(const Reference<LocationInfo>& loc,
return makeReference<LocationInfo>(interfaces, true);
}

// cleanupLocationCache is an actor that periodically cleans up stale/failed entries in the client's location cache by
// removing entries that point to failed storage servers. The cleanup of the location cache is required to ensure that
// the client is not connecting to old/stale storage servers.
ACTOR static Future<Void> cleanupLocationCache(DatabaseContext* cx) {
// Only if the LOCATION_CACHE_EVICTION_INTERVAL is set to a number greater than 0 we have to perform the location
// cache validation.
if (CLIENT_KNOBS->LOCATION_CACHE_EVICTION_INTERVAL <= 0.0) {
return Void();
}

// Track the current position by key to continue after we reached
// CLIENT_KNOBS->LOCATION_CACHE_MAX_ENTRIES_PER_ITERATION in an iteration. Storing the visited key, helps to perform
// the validation on all keys in a circular manner.
state Key currentValidationPosition;

// Iterate over the location caches and check if any of the storage servers have failed. In case that a storage
// server has failed, the location cache entry will be removed/invalidated.
loop {
wait(delay(CLIENT_KNOBS->LOCATION_CACHE_EVICTION_INTERVAL));

std::vector<KeyRangeRef> toRemove;
int checkedEntries = 0;
// Fetch the current ranges of the location cache.
auto ranges = cx->locationCache.ranges();
// Find where we left off using KEY (not iterator).
auto iter = ranges.begin();
if (currentValidationPosition.size() > 0) {
// Seek to last position
iter = cx->locationCache.rangeContaining(currentValidationPosition);
if (iter != ranges.end() && iter.range().begin == currentValidationPosition) {
++iter; // Move past the last processed entry, since we already checked that key range
}
}

for (; iter != ranges.end(); ++iter) {
// Avoid long blocking scans.
if (CLIENT_KNOBS->LOCATION_CACHE_MAX_ENTRIES_PER_ITERATION >= 0 &&
checkedEntries >= CLIENT_KNOBS->LOCATION_CACHE_MAX_ENTRIES_PER_ITERATION) {
break;
}

if (iter->value()) {
auto& locationInfo = iter->value();
// Iterate over all storage interfaces for this location (key range) cache.
for (int i = 0; i < locationInfo->size(); ++i) {
const auto& interf = (*locationInfo)[i];
// Check if the endpoint is marked as failed in the FailureMonitor. If so remove this key range and
// stop iterating over the other storage interfaces. A single failed storage interface is enough to
// remove the cached entry.
if (IFailureMonitor::failureMonitor().getState(interf->interf.getValue.getEndpoint()).isFailed()) {
toRemove.push_back(iter->range());
break;
}
}

// Update the current validated position (key) to the key that starts the current range.
currentValidationPosition = iter.range().begin;
}

checkedEntries++;
}

// If we completed a full scan we have to reset the validated position (key) to an empty key and
// start in the next iteration with the first range.
if (iter == ranges.end()) {
currentValidationPosition = Key();
}

// Remove expired entries
for (const auto& range : toRemove) {
cx->locationCache.insert(range, Reference<LocationInfo>());
}

if (!toRemove.empty()) {
CODE_PROBE(true, "LocationCacheCleanup removed some entries");
TraceEvent("LocationCacheCleanup")
.detail("NumRemovedRanges", toRemove.size())
.detail("NumCheckedEntries", checkedEntries)
.detail("NumLocalityCacheEntries", cx->locationCache.size());
}
}
}

ACTOR Future<Void> updateCachedRanges(DatabaseContext* self, std::map<UID, StorageServerInterface>* cacheServers) {
state Transaction tr;
state Value trueValue = storageCacheValue(std::vector<uint16_t>{ 0 });
Expand Down Expand Up @@ -1611,6 +1693,7 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection

clientDBInfoMonitor = monitorClientDBInfoChange(this, clientInfo, &proxiesChangeTrigger);
tssMismatchHandler = handleTssMismatches(this);
locationCacheCleanup = cleanupLocationCache(this);
clientStatusUpdater.actor = clientStatusUpdateActor(this);
cacheListMonitor = monitorCacheList(this);

Expand Down Expand Up @@ -1921,6 +2004,7 @@ DatabaseContext::~DatabaseContext() {
clientDBInfoMonitor.cancel();
monitorTssInfoChange.cancel();
tssMismatchHandler.cancel();
locationCacheCleanup.cancel();
if (grvUpdateHandler.isValid()) {
grvUpdateHandler.cancel();
}
Expand Down Expand Up @@ -1953,9 +2037,8 @@ Optional<KeyRangeLocationInfo> DatabaseContext::getCachedLocation(const TenantIn

auto range =
isBackward ? locationCache.rangeContainingKeyBefore(resolvedKey) : locationCache.rangeContaining(resolvedKey);
if (range->value()) {
if (range->value())
return KeyRangeLocationInfo(toPrefixRelativeRange(range->range(), tenant.prefix), range->value());
}

return Optional<KeyRangeLocationInfo>();
}
Expand Down Expand Up @@ -2008,6 +2091,7 @@ Reference<LocationInfo> DatabaseContext::setCachedLocation(const KeyRangeRef& ab

int maxEvictionAttempts = 100, attempts = 0;
auto loc = makeReference<LocationInfo>(serverRefs);
// TODO: ideally remove based on TTL expiration times, instead of random
while (locationCache.size() > locationCacheSize && attempts < maxEvictionAttempts) {
CODE_PROBE(true, "NativeAPI storage server locationCache entry evicted");
attempts++;
Expand Down Expand Up @@ -3090,6 +3174,7 @@ bool checkOnlyEndpointFailed(const Database& cx, const Endpoint& endpoint) {
} else {
cx->clearFailedEndpointOnHealthyServer(endpoint);
}

return false;
}

Expand All @@ -3110,21 +3195,15 @@ Future<KeyRangeLocationInfo> getKeyLocation(Database const& cx,
cx, tenant, key, spanContext, debugID, useProvisionalProxies, isBackward, version);
}

bool onlyEndpointFailedAndNeedRefresh = false;
for (int i = 0; i < locationInfo.get().locations->size(); i++) {
if (checkOnlyEndpointFailed(cx, locationInfo.get().locations->get(i, member).getEndpoint())) {
onlyEndpointFailedAndNeedRefresh = true;
cx->invalidateCache(tenant.prefix, key);
// Refresh the cache with a new getKeyLocations made to proxies.
return getKeyLocation_internal(
cx, tenant, key, spanContext, debugID, useProvisionalProxies, isBackward, version);
}
}

if (onlyEndpointFailedAndNeedRefresh) {
cx->invalidateCache(tenant.prefix, key);

// Refresh the cache with a new getKeyLocations made to proxies.
return getKeyLocation_internal(
cx, tenant, key, spanContext, debugID, useProvisionalProxies, isBackward, version);
}

return locationInfo.get();
}

Expand Down Expand Up @@ -3262,17 +3341,13 @@ Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations(Database const& c

bool foundFailed = false;
for (const auto& locationInfo : locations) {
bool onlyEndpointFailedAndNeedRefresh = false;
for (int i = 0; i < locationInfo.locations->size(); i++) {
if (checkOnlyEndpointFailed(cx, locationInfo.locations->get(i, member).getEndpoint())) {
onlyEndpointFailedAndNeedRefresh = true;
cx->invalidateCache(tenant.prefix, locationInfo.range.begin);
foundFailed = true;
break;
}
}

if (onlyEndpointFailedAndNeedRefresh) {
cx->invalidateCache(tenant.prefix, locationInfo.range.begin);
foundFailed = true;
}
}

if (foundFailed) {
Expand Down
4 changes: 4 additions & 0 deletions fdbclient/include/fdbclient/ClientKnobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ class ClientKnobs : public KnobsImpl<ClientKnobs> {
int LOCATION_CACHE_EVICTION_SIZE_SIM;
double LOCATION_CACHE_ENDPOINT_FAILURE_GRACE_PERIOD;
double LOCATION_CACHE_FAILED_ENDPOINT_RETRY_INTERVAL;
double LOCATION_CACHE_EVICTION_INTERVAL;
// The maximum entries per cache evition iteration to check if they are expired.
// If set to a negative number all entries will be validated.
double LOCATION_CACHE_MAX_ENTRIES_PER_ITERATION;

int GET_RANGE_SHARD_LIMIT;
int WARM_RANGE_SHARD_LIMIT;
Expand Down
4 changes: 3 additions & 1 deletion fdbclient/include/fdbclient/DatabaseContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ struct LocationInfo : MultiInterface<ReferencedInterface<StorageServerInterface>
LocationInfo(LocationInfo&&) = delete;
LocationInfo& operator=(const LocationInfo&) = delete;
LocationInfo& operator=(LocationInfo&&) = delete;
bool hasCaches = false;
Reference<Locations> locations() { return Reference<Locations>::addRef(this); }

bool hasCaches = false;
};

using CommitProxyInfo = ModelInterface<CommitProxyInterface>;
Expand Down Expand Up @@ -495,6 +496,7 @@ class DatabaseContext : public ReferenceCounted<DatabaseContext>, public FastAll
Future<Void> tssMismatchHandler;
PromiseStream<std::pair<UID, std::vector<DetailedTSSMismatch>>> tssMismatchStream;
Future<Void> grvUpdateHandler;
Future<Void> locationCacheCleanup;
Reference<CommitProxyInfo> commitProxies;
Reference<GrvProxyInfo> grvProxies;
bool proxyProvisional; // Provisional commit proxy and grv proxy are used at the same time.
Expand Down