Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void markWorkingIndexes(Set<String> updatedIndexPaths) {
if (meter != null) {
// indexes.size() gives us the number of remaining corrupt indices.
// meter.mark(indexes.size()) increments the current meter count by indexes.size(). We don't want that here.
// We actually want to set the the meter count to indexes.size(), the api doesn't seem to support that.
// We actually want to set the meter count to indexes.size(), the api doesn't seem to support that.
// So we instead add indexes.size() - meter.getCount() , which will always be <= 0. So this effectively will reduce the meter count
// by number of indexes fixed in this call.
meter.mark(indexes.size() - meter.getCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateIndexer;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateIndexerProvider;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticRetryPolicy;

import java.io.IOException;
import java.util.List;
Expand All @@ -39,6 +40,7 @@ public class ElasticDocumentStoreIndexer extends DocumentStoreIndexerBase {
private final int port;
private final String apiKeyId;
private final String apiSecretId;
private final ElasticRetryPolicy retryPolicy;

public ElasticDocumentStoreIndexer(IndexHelper indexHelper, IndexerSupport indexerSupport,
String indexPrefix, String scheme,
Expand All @@ -52,6 +54,7 @@ public ElasticDocumentStoreIndexer(IndexHelper indexHelper, IndexerSupport index
this.port = port;
this.apiKeyId = apiKeyId;
this.apiSecretId = apiSecretId;
this.retryPolicy = ElasticRetryPolicy.createRetryPolicyFromSystemProperties();
setProvider();
}

Expand Down Expand Up @@ -91,7 +94,7 @@ private NodeStateIndexerProvider createElasticIndexerProvider() {
connection = buildStep.build();
}
closer.register(connection);
return new ElasticIndexerProvider(indexHelper, connection);
return new ElasticIndexerProvider(indexHelper, connection, retryPolicy);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexTracker;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticMetricHandler;
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticRetryPolicy;
import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;

Expand All @@ -38,6 +39,7 @@ public class ElasticOutOfBandIndexer extends OutOfBandIndexerBase {
private final int port;
private final String apiKeyId;
private final String apiSecretId;
private final ElasticRetryPolicy retryPolicy;

public ElasticOutOfBandIndexer(IndexHelper indexHelper, IndexerSupport indexerSupport,
String indexPrefix, String scheme,
Expand All @@ -50,6 +52,7 @@ public ElasticOutOfBandIndexer(IndexHelper indexHelper, IndexerSupport indexerSu
this.port = port;
this.apiKeyId = apiKeyId;
this.apiSecretId = apiSecretId;
this.retryPolicy = ElasticRetryPolicy.createRetryPolicyFromSystemProperties();
}

@Override
Expand All @@ -76,6 +79,6 @@ private IndexEditorProvider createElasticEditorProvider() {
ElasticIndexTracker indexTracker = new ElasticIndexTracker(connection,
new ElasticMetricHandler(StatisticsProvider.NOOP));
return new ElasticIndexEditorProvider(indexTracker, connection,
new ExtractedTextCache(10 * FileUtils.ONE_MB, 100));
new ExtractedTextCache(10 * FileUtils.ONE_MB, 100), retryPolicy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticDocument;
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexWriterFactory;
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticRetryPolicy;
import org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
import org.apache.jackrabbit.oak.plugins.index.search.spi.binary.FulltextBinaryTextExtractor;
Expand Down Expand Up @@ -55,13 +56,13 @@ public class ElasticIndexerProvider implements NodeStateIndexerProvider {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final ElasticIndexEditorProvider elasticIndexEditorProvider;

public ElasticIndexerProvider(IndexHelper indexHelper, ElasticConnection connection) {
public ElasticIndexerProvider(IndexHelper indexHelper, ElasticConnection connection, ElasticRetryPolicy retryPolicy) {
this.indexHelper = indexHelper;
this.connection = connection;
this.bulkProcessorHandler = new ElasticBulkProcessorHandler(connection);
ElasticIndexTracker indexTracker = new ElasticIndexTracker(connection, new ElasticMetricHandler(StatisticsProvider.NOOP));
this.indexWriterFactory = new ElasticIndexWriterFactory(connection, indexTracker, bulkProcessorHandler);
this.elasticIndexEditorProvider = new ElasticIndexEditorProvider(indexTracker, connection, null, bulkProcessorHandler);
this.indexWriterFactory = new ElasticIndexWriterFactory(connection, indexTracker, bulkProcessorHandler, retryPolicy);
this.elasticIndexEditorProvider = new ElasticIndexEditorProvider(indexTracker, connection, null, bulkProcessorHandler, retryPolicy);

}

Expand Down
2 changes: 1 addition & 1 deletion oak-search-elastic/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<description>Oak Elasticsearch integration subproject</description>

<properties>
<elasticsearch.java.client.version>8.18.1</elasticsearch.java.client.version>
<elasticsearch.java.client.version>8.18.2</elasticsearch.java.client.version>
<lucene.version>9.12.1</lucene.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class ElasticConnection implements Closeable {
protected static final int DEFAULT_PORT = 9200;
protected static final String DEFAULT_API_KEY_ID = "";
protected static final String DEFAULT_API_KEY_SECRET = "";
protected static final int DEFAULT_MAX_RETRY_TIME = 0;
protected static final int ES_SOCKET_TIMEOUT = 120000;

private final String indexPrefix;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.IndexInfoProvider;
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticRetryPolicy;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticIndexProvider;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.inference.InferenceConfig;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.inference.InferenceConstants;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class ElasticIndexProviderService {
protected static final String PROP_ELASTIC_PORT = "elasticsearch.port";
protected static final String PROP_ELASTIC_API_KEY_ID = "elasticsearch.apiKeyId";
protected static final String PROP_ELASTIC_API_KEY_SECRET = "elasticsearch.apiKeySecret";
protected static final String PROP_ELASTIC_MAX_RETRY_TIME = "elasticsearch.maxRetryTime";
protected static final String PROP_LOCAL_TEXT_EXTRACTION_DIR = "localTextExtractionDir";
private static final boolean DEFAULT_IS_INFERENCE_ENABLED = false;
private static final String ENV_VAR_OAK_INFERENCE_STATISTICS_DISABLED = "OAK_INFERENCE_STATISTICS_DISABLED";
Expand Down Expand Up @@ -117,6 +119,11 @@ public class ElasticIndexProviderService {
@AttributeDefinition(name = "Elasticsearch API key secret", description = "Elasticsearch API key secret")
String elasticsearch_apiKeySecret() default ElasticConnection.DEFAULT_API_KEY_SECRET;

@AttributeDefinition(
name = "Elasticsearch Max Retry time",
description = "Time in seconds that Elasticsearch should retry failed operations. 0 means disabled, no retries. Default is 0 seconds (disabled).")
int elasticsearch_maxRetryTime() default ElasticConnection.DEFAULT_MAX_RETRY_TIME;

@AttributeDefinition(name = "Local text extraction cache path",
description = "Local file system path where text extraction cache stores/load entries to recover from timed out operation")
String localTextExtractionDir();
Expand Down Expand Up @@ -227,7 +234,8 @@ private void activate(BundleContext bundleContext, Config config) {
LOG.info("Registering Index and Editor providers with connection {}", elasticConnection);

registerIndexProvider(bundleContext);
this.elasticIndexEditorProvider = new ElasticIndexEditorProvider(indexTracker, elasticConnection, extractedTextCache);
ElasticRetryPolicy retryPolicy = new ElasticRetryPolicy(100, config.elasticsearch_maxRetryTime() * 1000L, 5, 100);
this.elasticIndexEditorProvider = new ElasticIndexEditorProvider(indexTracker, elasticConnection, extractedTextCache, retryPolicy);
registerIndexEditor(bundleContext, elasticIndexEditorProvider);
if (isElasticAvailable) {
registerIndexCleaner(config);
Expand Down
Loading
Loading