Skip to content

Remove commit information from refresh result #94259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
Expand All @@ -20,22 +19,24 @@
*/
public class ShardRefreshReplicaRequest extends ReplicationRequest<ShardRefreshReplicaRequest> {

public static final long NO_FLUSH_PERFORMED = Long.MIN_VALUE;

/**
* Holds the refresh result of the primary shard. This will be used by {@link TransportShardRefreshAction} to construct a
* {@link UnpromotableShardRefreshRequest} to broadcast to the unpromotable replicas. The refresh result is not serialized to maintain
* backwards compatibility for the refresh requests to promotable replicas which do not need the refresh result. For this reason, the
* field is package-private.
* Holds the flushed generation of the primary shard. This will be used by {@link TransportShardRefreshAction} to construct a
* {@link UnpromotableShardRefreshRequest} to broadcast to the unpromotable replicas. The flushedGeneration is not serialized to
* maintain backwards compatibility for the refresh requests to promotable replicas which do not need the refresh result. For this
* reason, the field is package-private.
*/
final Engine.RefreshResult primaryRefreshResult;
final long flushedGeneration;

public ShardRefreshReplicaRequest(StreamInput in) throws IOException {
super(in);
primaryRefreshResult = Engine.RefreshResult.NO_REFRESH;
flushedGeneration = NO_FLUSH_PERFORMED;
}

public ShardRefreshReplicaRequest(ShardId shardId, Engine.RefreshResult primaryRefreshResult) {
public ShardRefreshReplicaRequest(ShardId shardId, long flushedGeneration) {
super(shardId);
this.primaryRefreshResult = primaryRefreshResult;
this.flushedGeneration = flushedGeneration;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ protected void shardOperationOnPrimary(
ActionListener<PrimaryResult<ShardRefreshReplicaRequest, ReplicationResponse>> listener
) {
ActionListener.completeWith(listener, () -> {
ShardRefreshReplicaRequest replicaRequest = new ShardRefreshReplicaRequest(shardRequest.shardId(), primary.refresh(SOURCE_API));
primary.refresh(SOURCE_API);
long lastCommitGeneration = primary.commitStats().getGeneration();
ShardRefreshReplicaRequest replicaRequest = new ShardRefreshReplicaRequest(shardRequest.shardId(), lastCommitGeneration);
replicaRequest.setParentTask(shardRequest.getParentTask());
logger.trace("{} refresh request executed on primary", primary.shardId());
return new PrimaryResult<>(replicaRequest, new ReplicationResponse());
Expand Down Expand Up @@ -111,10 +113,9 @@ public void onPrimaryOperationComplete(
IndexShardRoutingTable indexShardRoutingTable,
ActionListener<Void> listener
) {
assert replicaRequest.primaryRefreshResult.refreshed() : "primary has not refreshed";
UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
indexShardRoutingTable,
replicaRequest.primaryRefreshResult.generation()
replicaRequest.flushedGeneration
);
transportService.sendRequest(
transportService.getLocalNode(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,13 @@

package org.elasticsearch.action.admin.indices.refresh;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.broadcast.unpromotable.BroadcastUnpromotableRequest;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.engine.Engine;

import java.io.IOException;

import static org.elasticsearch.action.ValidateActions.addValidationError;

public class UnpromotableShardRefreshRequest extends BroadcastUnpromotableRequest {

private final long segmentGeneration;
Expand All @@ -33,15 +29,6 @@ public UnpromotableShardRefreshRequest(StreamInput in) throws IOException {
segmentGeneration = in.readVLong();
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
if (segmentGeneration == Engine.RefreshResult.UNKNOWN_GENERATION) {
validationException = addValidationError("segment generation is unknown", validationException);
}
return validationException;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.indices.refresh.ShardRefreshReplicaRequest;
import org.elasticsearch.action.admin.indices.refresh.TransportUnpromotableShardRefreshAction;
import org.elasticsearch.action.admin.indices.refresh.UnpromotableShardRefreshRequest;
import org.elasticsearch.action.support.WriteRequest;
Expand All @@ -37,12 +38,13 @@ public void refreshShard(
@Nullable Translog.Location location,
ActionListener<Boolean> listener
) {
final boolean hasUnpromotableReplicas = indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0;
switch (policy) {
case NONE -> listener.onResponse(false);
case WAIT_UNTIL -> waitUntil(indexShard, location, new ActionListener<>() {
@Override
public void onResponse(Boolean forced) {
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) {
if (hasUnpromotableReplicas) {
refreshUnpromotables(indexShard, location, listener, forced);
} else {
listener.onResponse(forced);
Expand All @@ -56,9 +58,9 @@ public void onFailure(Exception e) {
});
case IMMEDIATE -> immediate(indexShard, new ActionListener<>() {
@Override
public void onResponse(Engine.RefreshResult refreshResult) {
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) {
sendUnpromotableRequests(indexShard, refreshResult.generation(), true, listener);
public void onResponse(Void ignored) {
if (hasUnpromotableReplicas) {
sendUnpromotableRequests(indexShard, indexShard.commitStats().getGeneration(), true, listener);
} else {
listener.onResponse(true);
}
Expand Down Expand Up @@ -87,9 +89,9 @@ public static void refreshReplicaShard(
}
}

private static void immediate(IndexShard indexShard, ActionListener<Engine.RefreshResult> listener) {
Engine.RefreshResult refreshResult = indexShard.refresh(FORCED_REFRESH_AFTER_INDEX);
listener.onResponse(refreshResult);
private static void immediate(IndexShard indexShard, ActionListener<Void> listener) {
indexShard.refresh(FORCED_REFRESH_AFTER_INDEX);
listener.onResponse(null);
}

private static void waitUntil(IndexShard indexShard, Translog.Location location, ActionListener<Boolean> listener) {
Expand Down Expand Up @@ -121,6 +123,12 @@ public void onFailure(Exception e) {
}

private void sendUnpromotableRequests(IndexShard indexShard, long generation, boolean wasForced, ActionListener<Boolean> listener) {
if (generation == ShardRefreshReplicaRequest.NO_FLUSH_PERFORMED) {
listener.onFailure(
new IllegalArgumentException("Shard with unpromotable replicas require a flush to occur, but no flush occurred.")
);
return;
}
UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
indexShard.getReplicationGroup().getRoutingTable(),
generation
Expand Down
20 changes: 2 additions & 18 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1014,14 +1014,14 @@ public boolean refreshNeeded() {
* changes.
*/
@Nullable
public abstract RefreshResult refresh(String source) throws EngineException;
public abstract void refresh(String source) throws EngineException;

/**
* Synchronously refreshes the engine for new search operations to reflect the latest
* changes unless another thread is already refreshing the engine concurrently.
*/
@Nullable
public abstract RefreshResult maybeRefresh(String source) throws EngineException;
public abstract boolean maybeRefresh(String source) throws EngineException;

/**
* Called when our engine is using too much heap and should move buffered indexed/deleted documents to disk.
Expand Down Expand Up @@ -1966,20 +1966,4 @@ public void addSegmentGenerationListener(long minGeneration, ActionListener<Long
public void addFlushListener(Translog.Location location, ActionListener<Long> listener) {
listener.onFailure(new UnsupportedOperationException("Engine type " + this.getClass() + " does not support flush listeners."));
}

/**
* Captures the result of a refresh operation on the index shard.
* <p>
* <code>refreshed</code> is true if a refresh happened. If refreshed, <code>generation</code>
* contains the generation of the index commit that the reader has opened upon refresh.
*/
public record RefreshResult(boolean refreshed, long generation) {

public static final long UNKNOWN_GENERATION = -1L;
public static final RefreshResult NO_REFRESH = new RefreshResult(false);

public RefreshResult(boolean refreshed) {
this(refreshed, UNKNOWN_GENERATION);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1834,21 +1834,20 @@ protected Optional<Exception> preFlightCheckForNoOp(final NoOp noOp) throws IOEx
}

@Override
public RefreshResult refresh(String source) throws EngineException {
return refresh(source, SearcherScope.EXTERNAL, true);
public void refresh(String source) throws EngineException {
refresh(source, SearcherScope.EXTERNAL, true);
}

@Override
public RefreshResult maybeRefresh(String source) throws EngineException {
public boolean maybeRefresh(String source) throws EngineException {
return refresh(source, SearcherScope.EXTERNAL, false);
}

final RefreshResult refresh(String source, SearcherScope scope, boolean block) throws EngineException {
final boolean refresh(String source, SearcherScope scope, boolean block) throws EngineException {
// both refresh types will result in an internal refresh but only the external will also
// pass the new reader reference to the external reader manager.
final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();
boolean refreshed;
long segmentGeneration = RefreshResult.UNKNOWN_GENERATION;
try {
// refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in mid-way.
if (store.tryIncRef()) {
Expand All @@ -1864,14 +1863,6 @@ final RefreshResult refresh(String source, SearcherScope scope, boolean block) t
} else {
refreshed = referenceManager.maybeRefresh();
}
if (refreshed) {
final ElasticsearchDirectoryReader current = referenceManager.acquire();
try {
segmentGeneration = current.getIndexCommit().getGeneration();
} finally {
referenceManager.release(current);
}
}
} finally {
store.decRef();
}
Expand Down Expand Up @@ -1903,7 +1894,7 @@ final RefreshResult refresh(String source, SearcherScope scope, boolean block) t
// for a long time:
maybePruneDeletes();
mergeScheduler.refreshConfig();
return new RefreshResult(refreshed, segmentGeneration);
return refreshed;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,15 +421,14 @@ public List<Segment> segments() {
}

@Override
public RefreshResult refresh(String source) {
public void refresh(String source) {
// we could allow refreshes if we want down the road the reader manager will then reflect changes to a rw-engine
// opened side-by-side
return RefreshResult.NO_REFRESH;
}

@Override
public RefreshResult maybeRefresh(String source) throws EngineException {
return RefreshResult.NO_REFRESH;
public boolean maybeRefresh(String source) throws EngineException {
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1228,12 +1228,12 @@ public Engine.GetResult get(Engine.Get get) {
/**
* Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link AlreadyClosedException}.
*/
public Engine.RefreshResult refresh(String source) {
public void refresh(String source) {
verifyNotClosed();
if (logger.isTraceEnabled()) {
logger.trace("refresh with source [{}]", source);
}
return getEngine().refresh(source);
getEngine().refresh(source);
}

/**
Expand Down Expand Up @@ -3823,7 +3823,7 @@ && isSearchIdle()
if (logger.isTraceEnabled()) {
logger.trace("refresh with source [schedule]");
}
return getEngine().maybeRefresh("schedule").refreshed();
return getEngine().maybeRefresh("schedule");
}
}
final Engine engine = getEngine();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -102,7 +101,6 @@ public void initCommonMocks() {
indexShard = mock(IndexShard.class);
location = mock(Translog.Location.class);
clusterService = createClusterService(threadPool);
when(indexShard.refresh(any())).thenReturn(new Engine.RefreshResult(true, 1));
ReplicationGroup replicationGroup = mock(ReplicationGroup.class);
when(indexShard.getReplicationGroup()).thenReturn(replicationGroup);
when(replicationGroup.getReplicationTargets()).thenReturn(Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5430,9 +5430,8 @@ public void testRefreshScopedSearcher() throws IOException {
engine.index(primaryResponse);
}
assertTrue(engine.refreshNeeded());
var refreshResult = engine.refresh("test", Engine.SearcherScope.INTERNAL, true);
assertTrue(refreshResult.refreshed());
assertNotEquals(refreshResult.generation(), Engine.RefreshResult.UNKNOWN_GENERATION);
var refreshed = engine.refresh("test", Engine.SearcherScope.INTERNAL, true);
assertTrue(refreshed);
try (
Engine.Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
Engine.Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)
Expand Down Expand Up @@ -7589,30 +7588,6 @@ public void onIndexCommitDelete(ShardId shardId, IndexCommit deletedCommit) {
}
}

public void testRefreshResult() throws IOException {
try (
Store store = createStore();
InternalEngine engine =
// disable merges to make sure that the reader doesn't change unexpectedly during the test
createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)
) {
var refresh1Result = engine.refresh("warm_up");
assertTrue(refresh1Result.refreshed());
assertNotEquals("when refreshed, generation must be set", refresh1Result.generation(), Engine.RefreshResult.UNKNOWN_GENERATION);
for (int i = 0; i < 10; i++) {
engine.index(indexForDoc(createParsedDoc(String.valueOf(i), EngineTestCase.randomIdFieldType(), null)));
}
assertTrue(engine.refreshNeeded());
var refresh2Result = engine.refresh("test", Engine.SearcherScope.INTERNAL, true);
assertTrue(refresh2Result.refreshed());
assertThat(refresh2Result.generation(), greaterThanOrEqualTo(refresh1Result.generation()));
engine.flush(true, true);
var refresh3Result = engine.refresh("test");
assertTrue(refresh3Result.refreshed());
assertThat(refresh3Result.generation(), greaterThan(refresh2Result.generation()));
}
}

public void testFlushListener() throws Exception {
try (
Store store = createStore();
Expand Down