Skip to content

Fix live version map unsafe flag #116053

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/116053.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 116053
summary: Fix live version map unsafe flag
area: Engine
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.concurrent.atomic.AtomicLong;

/** Maps _uid value to its version information. */
public final class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
public class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {

private final KeyedLock<BytesRef> keyedLock = new KeyedLock<>();

Expand Down Expand Up @@ -202,7 +202,12 @@ Maps invalidateOldMapForAssert() {
*/
Maps invalidateOldMap(LiveVersionMapArchive archive) {
archive.afterRefresh(old);
return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess);
Maps result = new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess);
// not JMM compatible, similar to beforeRefresh
if (needsSafeAccess) {
result.needsSafeAccess = true;
}
return result;
}

void put(BytesRef uid, VersionValue version) {
Expand Down Expand Up @@ -282,7 +287,15 @@ public void beforeRefresh() throws IOException {
// map. While reopen is running, any lookup will first
// try this new map, then fallback to old, then to the
// current searcher:
maps = maps.buildTransitionMap();
Maps original = maps;
Maps transitionMap = original.buildTransitionMap();
maps = transitionMap;
// this is still not JMM safe, but makes the test pass. There are a few options:
// 1. Do read then modify instead of writing to it in enforceSafeAccess. The read can be non-volatile, the write volatile.
// 2. Make the field volatile (but the comment on it seems to indicate that it would be bad for perf).
if (original.needsSafeAccess) {
transitionMap.needsSafeAccess = true;
}
assert (unsafeKeysMap = unsafeKeysMap.buildTransitionMap()) != null;
// This is not 100% correct, since concurrent indexing ops can change these counters in between our execution of the previous
// line and this one, but that should be minor, and the error won't accumulate over time:
Expand Down Expand Up @@ -345,7 +358,14 @@ boolean isUnsafe() {
}

void enforceSafeAccess() {
maps.needsSafeAccess = true;
Maps copy = maps;
copy.needsSafeAccess = true;
Maps nextCopy;
// loop until we have the same maps after the assignment
while ((nextCopy = maps) != copy) {
nextCopy.needsSafeAccess = true;
copy = nextCopy;
}
}

boolean isSafeAccessRequired() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESTestCase;

Expand Down Expand Up @@ -510,4 +511,42 @@ public void testVersionMapReclaimableRamBytes() throws IOException {
assertEquals(map.reclaimableRefreshRamBytes(), 0L);
assertEquals(map.ramBytesUsedForRefresh(), 0L);
}

/**
* When we only do operations that enforce safe access, we expect to stay as a safe map.
*/
public void testNotUnsafeConcurrently() throws InterruptedException {
LiveVersionMap map = new LiveVersionMap();
AtomicBoolean running = new AtomicBoolean(true);
Thread refresher = new Thread(() -> {
while (running.get()) {
try {
map.beforeRefresh();
map.afterRefresh(true);
} catch (IOException e) {
fail(e);
throw new RuntimeException(e);
}
}
});

refresher.start();
try {
// 1000 is enough to provoke original version
for (int i = 0; i < 100000; ++i) {
BytesRef uid = Uid.encodeId(randomIdentifier());
try (Releasable releasable = map.acquireLock(uid)) {
map.enforceSafeAccess();
map.maybePutIndexUnderLock(uid, new IndexVersionValue(null, 0, 0, 0));
assertFalse(map.isUnsafe());
}
}

} finally {
running.set(false);
refresher.join(10000);

}
assertFalse(refresher.isAlive());
}
}