Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[segment replication] Add async publish checkpoint task #17619

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.replication.TransportReplicationAction;
import org.opensearch.action.termvectors.TermVectorsRequestBuilder;
import org.opensearch.action.termvectors.TermVectorsResponse;
import org.opensearch.action.update.UpdateResponse;
Expand All @@ -59,9 +60,11 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.set.Sets;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationPressureService;
Expand All @@ -72,6 +75,7 @@
import org.opensearch.index.engine.NRTReplicationReaderManager;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.NodeClosedException;
import org.opensearch.search.SearchService;
Expand All @@ -83,6 +87,7 @@
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.junit.annotations.TestLogging;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.RemoteTransportException;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.client.Requests;
import org.junit.Before;
Expand All @@ -98,6 +103,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -131,6 +137,53 @@ private static String indexOrAlias() {
return randomBoolean() ? INDEX_NAME : "alias";
}

public void testPublishCheckPointFail() throws Exception {
Settings mockNodeSetting = Settings.builder()
.put(TransportReplicationAction.REPLICATION_RETRY_TIMEOUT.getKey(), TimeValue.timeValueSeconds(0))
.build();

final String primaryNode = internalCluster().startDataOnlyNode(mockNodeSetting);
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put("index.refresh_interval", -1).build());
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode(mockNodeSetting);
ensureGreen(INDEX_NAME);

MockTransportService replicaTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
replicaNode
));
AtomicBoolean mockReplicaReceivePublishCheckpointException = new AtomicBoolean(true);
replicaTransportService.addRequestHandlingBehavior(
PublishCheckpointAction.ACTION_NAME + TransportReplicationAction.REPLICA_ACTION_SUFFIX,
(handler, request, channel, task) -> {
if (mockReplicaReceivePublishCheckpointException.get()) {
logger.info("mock remote transport exception");
throw new RemoteTransportException("mock remote transport exception", new OpenSearchRejectedExecutionException());
}
logger.info("replica receive publish checkpoint request");
handler.messageReceived(request, channel, task);
}
);

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
logger.info("ensure publish checkpoint request can be process");
mockReplicaReceivePublishCheckpointException.set(false);

assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(
Settings.builder()
.put(IndexSettings.INDEX_PUBLISH_CHECKPOINT_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.put(IndexSettings.INDEX_LAG_TIME_BEFORE_RESEND_CHECKPOINT_SETTING.getKey(), TimeValue.timeValueSeconds(1))
)
);

waitForSearchableDocs(1, primaryNode, replicaNode);
replicaTransportService.clearAllRules();
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGED_DOCS_SETTING,
LogByteSizeMergePolicyProvider.INDEX_LBS_NO_CFS_RATIO_SETTING,
IndexSettings.DEFAULT_SEARCH_PIPELINE,
IndexSettings.INDEX_PUBLISH_CHECKPOINT_INTERVAL_SETTING,
IndexSettings.INDEX_LAG_TIME_BEFORE_RESEND_CHECKPOINT_SETTING,

// Settings for Searchable Snapshots
IndexSettings.SEARCHABLE_SNAPSHOT_REPOSITORY,
Expand Down
65 changes: 64 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@
private volatile AsyncTranslogFSync fsyncTask;
private volatile AsyncGlobalCheckpointTask globalCheckpointTask;
private volatile AsyncRetentionLeaseSyncTask retentionLeaseSyncTask;
private volatile AsyncPublishCheckpointTask publishCheckpointTask;
private volatile AsyncReplicationTask asyncReplicationTask;

// don't convert to Setting<> and register... we only set this in tests and register via a plugin
Expand Down Expand Up @@ -318,6 +319,9 @@
if (READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(indexSettings.getNodeSettings())) {
this.asyncReplicationTask = new AsyncReplicationTask(this);
}
if (indexSettings.isSegRepEnabledOrRemoteNode()) {
this.publishCheckpointTask = new AsyncPublishCheckpointTask(this);
}
this.translogFactorySupplier = translogFactorySupplier;
this.recoverySettings = recoverySettings;
this.remoteStoreSettings = remoteStoreSettings;
Expand Down Expand Up @@ -514,7 +518,8 @@
fsyncTask,
trimTranslogTask,
globalCheckpointTask,
retentionLeaseSyncTask
retentionLeaseSyncTask,
publishCheckpointTask
);
}
}
Expand Down Expand Up @@ -1108,6 +1113,9 @@
if (READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(indexSettings.getNodeSettings())) {
updateReplicationTask();
}
if (indexSettings.isSegRepEnabledOrRemoteNode()) {
onPublishCheckpointIntervalChange();
}
}

metadataListeners.forEach(c -> c.accept(newIndexMetadata));
Expand Down Expand Up @@ -1155,6 +1163,13 @@
rescheduleRefreshTasks();
}

public void onPublishCheckpointIntervalChange() {
if (publishCheckpointTask.getInterval().equals(indexSettings.getPublishCheckpointInterval())) {
return;
}
reschedulePublishCheckpointTasks();
}

private void updateFsyncTaskIfNecessary() {
if (indexSettings.getTranslogDurability() == Translog.Durability.REQUEST) {
try {
Expand All @@ -1179,6 +1194,14 @@
}
}

private void reschedulePublishCheckpointTasks() {
try {
publishCheckpointTask.close();
} finally {
publishCheckpointTask = new AsyncPublishCheckpointTask(this);
}
}

public CompositeIndexSettings getCompositeIndexSettings() {
return compositeIndexSettings;
}
Expand Down Expand Up @@ -1235,6 +1258,20 @@
}
}

private void maybePublishCheckpoint() {
if (indexSettings.isSegRepEnabledOrRemoteNode()) {
for (IndexShard shard : this.shards.values()) {
try {
if (shard.isPrimaryMode()) {
shard.scheduledPublishCheckpoint();

Check warning on line 1266 in server/src/main/java/org/opensearch/index/IndexService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexService.java#L1266

Added line #L1266 was not covered by tests
}
} catch (IndexShardClosedException | AlreadyClosedException ex) {

Check warning on line 1268 in server/src/main/java/org/opensearch/index/IndexService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexService.java#L1268

Added line #L1268 was not covered by tests
// fine - continue;
}
}

Check warning on line 1271 in server/src/main/java/org/opensearch/index/IndexService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexService.java#L1270-L1271

Added lines #L1270 - L1271 were not covered by tests
}
}

Check warning on line 1273 in server/src/main/java/org/opensearch/index/IndexService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexService.java#L1273

Added line #L1273 was not covered by tests

private void maybeTrimTranslog() {
for (IndexShard shard : this.shards.values()) {
switch (shard.state()) {
Expand Down Expand Up @@ -1385,6 +1422,28 @@
}
}

final class AsyncPublishCheckpointTask extends BaseAsyncTask {

AsyncPublishCheckpointTask(IndexService indexService) {
super(indexService, indexSettings.getPublishCheckpointInterval());
}

@Override
protected void runInternal() {
indexService.maybePublishCheckpoint();
}

Check warning on line 1434 in server/src/main/java/org/opensearch/index/IndexService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexService.java#L1433-L1434

Added lines #L1433 - L1434 were not covered by tests

@Override
protected String getThreadPool() {
return ThreadPool.Names.GENERIC;
}

@Override
public String toString() {
return "publishCheckpoint";
}
}

final class AsyncReplicationTask extends BaseAsyncTask {

AsyncReplicationTask(IndexService indexService) {
Expand Down Expand Up @@ -1534,6 +1593,10 @@
return refreshTask.getInterval();
}

AsyncPublishCheckpointTask getPublishCheckpointTask() {
return publishCheckpointTask;
}

AsyncTranslogFSync getFsyncTask() { // for tests
return fsyncTask;
}
Expand Down
38 changes: 38 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,22 @@ public static IndexMergePolicy fromString(String text) {
Property.IndexScope
);

public static final Setting<TimeValue> INDEX_PUBLISH_CHECKPOINT_INTERVAL_SETTING = Setting.timeSetting(
"index.publish_checkpoint_interval",
TimeValue.timeValueMinutes(10),
TimeValue.timeValueSeconds(1),
Property.Dynamic,
Property.IndexScope
);

public static final Setting<TimeValue> INDEX_LAG_TIME_BEFORE_RESEND_CHECKPOINT_SETTING = Setting.timeSetting(
"index.lag_time_before_resend_checkpoint",
TimeValue.timeValueMinutes(10),
TimeValue.timeValueSeconds(0),
Property.Dynamic,
Property.IndexScope
);

private final Index index;
private final Version version;
private final Logger logger;
Expand Down Expand Up @@ -831,6 +847,8 @@ public static IndexMergePolicy fromString(String text) {
private final RemoteStorePathStrategy remoteStorePathStrategy;
private final boolean isTranslogMetadataEnabled;
private volatile boolean allowDerivedField;
private volatile TimeValue publishCheckpointInterval;
private volatile TimeValue lagTimeBeforeResendCheckpoint;

/**
* The maximum age of a retention lease before it is considered expired.
Expand Down Expand Up @@ -1063,6 +1081,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
setMergeOnFlushPolicy(scopedSettings.get(INDEX_MERGE_ON_FLUSH_POLICY));
checkPendingFlushEnabled = scopedSettings.get(INDEX_CHECK_PENDING_FLUSH_ENABLED);
defaultSearchPipeline = scopedSettings.get(DEFAULT_SEARCH_PIPELINE);
publishCheckpointInterval = scopedSettings.get(INDEX_PUBLISH_CHECKPOINT_INTERVAL_SETTING);
lagTimeBeforeResendCheckpoint = scopedSettings.get(INDEX_LAG_TIME_BEFORE_RESEND_CHECKPOINT_SETTING);
/* There was unintentional breaking change got introduced with [OpenSearch-6424](https://github.com/opensearch-project/OpenSearch/pull/6424) (version 2.7).
* For indices created prior version (prior to 2.7) which has IndexSort type, they used to type cast the SortField.Type
* to higher bytes size like integer to long. This behavior was changed from OpenSearch 2.7 version not to
Expand Down Expand Up @@ -1200,6 +1220,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING,
this::setRemoteStoreTranslogRepository
);
scopedSettings.addSettingsUpdateConsumer(INDEX_PUBLISH_CHECKPOINT_INTERVAL_SETTING, this::setPublishCheckpointInterval);
scopedSettings.addSettingsUpdateConsumer(INDEX_LAG_TIME_BEFORE_RESEND_CHECKPOINT_SETTING, this::setLagTimeBeforeResendCheckpoint);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
Expand Down Expand Up @@ -2034,4 +2056,20 @@ public void setRemoteStoreRepository(String remoteStoreRepository) {
public void setRemoteStoreTranslogRepository(String remoteStoreTranslogRepository) {
this.remoteStoreTranslogRepository = remoteStoreTranslogRepository;
}

public TimeValue getPublishCheckpointInterval() {
return publishCheckpointInterval;
}

public void setPublishCheckpointInterval(TimeValue publishCheckpointInterval) {
this.publishCheckpointInterval = publishCheckpointInterval;
}

public TimeValue getLagTimeBeforeResendCheckpoint() {
return lagTimeBeforeResendCheckpoint;
}

public void setLagTimeBeforeResendCheckpoint(TimeValue lagTimeBeforeResendCheckpoint) {
this.lagTimeBeforeResendCheckpoint = lagTimeBeforeResendCheckpoint;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1290,6 +1290,30 @@ && shouldSkipReplicationTimer(allocationId) == false
}
}

/**
* In the scenario where segment replication is enabled, determine whether the primary shard has lagging replicas.
*/
public synchronized boolean hasLaggingReplicas() {
assert indexSettings.isSegRepEnabledOrRemoteNode();
assert primaryMode;
boolean hasLaggingReplicas = false;
for (Map.Entry<String, CheckpointState> entry : checkpoints.entrySet()) {
final String allocationId = entry.getKey();
if (allocationId.equals(this.shardAllocationId) == false) {
final CheckpointState cps = entry.getValue();
if (cps.inSync
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& shouldSkipReplicationTimer(allocationId) == false
&& cps.checkpointTimers.containsKey(latestReplicationCheckpoint)
&& cps.checkpointTimers.get(latestReplicationCheckpoint).time() >= indexSettings.getLagTimeBeforeResendCheckpoint()
.millis()) {
hasLaggingReplicas = true;
}
}
}
return hasLaggingReplicas;
}

/**
* After a new checkpoint is published, start a timer per replica for the checkpoint.
* @param checkpoint {@link ReplicationCheckpoint}
Expand All @@ -1307,7 +1331,8 @@ public synchronized void startReplicationLagTimers(ReplicationCheckpoint checkpo
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& shouldSkipReplicationTimer(e.getKey()) == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)
&& cps.checkpointTimers.containsKey(latestReplicationCheckpoint)) {
&& cps.checkpointTimers.containsKey(latestReplicationCheckpoint)
&& cps.checkpointTimers.get(latestReplicationCheckpoint).startTime() == 0) {
cps.checkpointTimers.get(latestReplicationCheckpoint).start();
}
});
Expand Down
10 changes: 10 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4716,6 +4716,16 @@ && isSearchIdle()
return false;
}

public boolean scheduledPublishCheckpoint() {
assert indexSettings.isSegRepEnabledOrRemoteNode();
assert isPrimaryMode();
if (replicationTracker.hasLaggingReplicas()) {
checkpointPublisher.publish(this, getLatestReplicationCheckpoint());
return true;
}
return false;
}

/**
* Returns true if this shards is search idle
*/
Expand Down
Loading