Skip to content

Commit 1c85b3a

Browse files
authored
OAK-11765 - BulkProcessor unable to insert after a failure (#2343)
1 parent 6b6949a commit 1c85b3a

25 files changed

+787
-164
lines changed

oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/TrackingCorruptIndexHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public void markWorkingIndexes(Set<String> updatedIndexPaths) {
9595
if (meter != null) {
9696
// indexes.size() gives us the number of remaining corrupt indices.
9797
// meter.mark(indexes.size()) increments the current meter count by indexes.size(). We don't want that here.
98-
// We actually want to set the the meter count to indexes.size(), the api doesn't seem to support that.
98+
// We actually want to set the meter count to indexes.size(), the api doesn't seem to support that.
9999
// So we instead add indexes.size() - meter.getCount() , which will always be <= 0. So this effectively will reduce the meter count
100100
// by number of indexes fixed in this call.
101101
meter.mark(indexes.size() - meter.getCount());

oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticDocumentStoreIndexer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateIndexer;
2525
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateIndexerProvider;
2626
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
27+
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticRetryPolicy;
2728

2829
import java.io.IOException;
2930
import java.util.List;
@@ -39,6 +40,7 @@ public class ElasticDocumentStoreIndexer extends DocumentStoreIndexerBase {
3940
private final int port;
4041
private final String apiKeyId;
4142
private final String apiSecretId;
43+
private final ElasticRetryPolicy retryPolicy;
4244

4345
public ElasticDocumentStoreIndexer(IndexHelper indexHelper, IndexerSupport indexerSupport,
4446
String indexPrefix, String scheme,
@@ -52,6 +54,7 @@ public ElasticDocumentStoreIndexer(IndexHelper indexHelper, IndexerSupport index
5254
this.port = port;
5355
this.apiKeyId = apiKeyId;
5456
this.apiSecretId = apiSecretId;
57+
this.retryPolicy = ElasticRetryPolicy.createRetryPolicyFromSystemProperties();
5558
setProvider();
5659
}
5760

@@ -91,7 +94,7 @@ private NodeStateIndexerProvider createElasticIndexerProvider() {
9194
connection = buildStep.build();
9295
}
9396
closer.register(connection);
94-
return new ElasticIndexerProvider(indexHelper, connection);
97+
return new ElasticIndexerProvider(indexHelper, connection, retryPolicy);
9598
}
9699

97100
}

oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticOutOfBandIndexer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexTracker;
2626
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticMetricHandler;
2727
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexEditorProvider;
28+
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticRetryPolicy;
2829
import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
2930
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
3031

@@ -38,6 +39,7 @@ public class ElasticOutOfBandIndexer extends OutOfBandIndexerBase {
3839
private final int port;
3940
private final String apiKeyId;
4041
private final String apiSecretId;
42+
private final ElasticRetryPolicy retryPolicy;
4143

4244
public ElasticOutOfBandIndexer(IndexHelper indexHelper, IndexerSupport indexerSupport,
4345
String indexPrefix, String scheme,
@@ -50,6 +52,7 @@ public ElasticOutOfBandIndexer(IndexHelper indexHelper, IndexerSupport indexerSu
5052
this.port = port;
5153
this.apiKeyId = apiKeyId;
5254
this.apiSecretId = apiSecretId;
55+
this.retryPolicy = ElasticRetryPolicy.createRetryPolicyFromSystemProperties();
5356
}
5457

5558
@Override
@@ -76,6 +79,6 @@ private IndexEditorProvider createElasticEditorProvider() {
7679
ElasticIndexTracker indexTracker = new ElasticIndexTracker(connection,
7780
new ElasticMetricHandler(StatisticsProvider.NOOP));
7881
return new ElasticIndexEditorProvider(indexTracker, connection,
79-
new ExtractedTextCache(10 * FileUtils.ONE_MB, 100));
82+
new ExtractedTextCache(10 * FileUtils.ONE_MB, 100), retryPolicy);
8083
}
8184
}

oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerProvider.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticDocument;
2929
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexEditorProvider;
3030
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexWriterFactory;
31+
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticRetryPolicy;
3132
import org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
3233
import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
3334
import org.apache.jackrabbit.oak.plugins.index.search.spi.binary.FulltextBinaryTextExtractor;
@@ -55,13 +56,13 @@ public class ElasticIndexerProvider implements NodeStateIndexerProvider {
5556
private final AtomicBoolean closed = new AtomicBoolean(false);
5657
private final ElasticIndexEditorProvider elasticIndexEditorProvider;
5758

58-
public ElasticIndexerProvider(IndexHelper indexHelper, ElasticConnection connection) {
59+
public ElasticIndexerProvider(IndexHelper indexHelper, ElasticConnection connection, ElasticRetryPolicy retryPolicy) {
5960
this.indexHelper = indexHelper;
6061
this.connection = connection;
6162
this.bulkProcessorHandler = new ElasticBulkProcessorHandler(connection);
6263
ElasticIndexTracker indexTracker = new ElasticIndexTracker(connection, new ElasticMetricHandler(StatisticsProvider.NOOP));
63-
this.indexWriterFactory = new ElasticIndexWriterFactory(connection, indexTracker, bulkProcessorHandler);
64-
this.elasticIndexEditorProvider = new ElasticIndexEditorProvider(indexTracker, connection, null, bulkProcessorHandler);
64+
this.indexWriterFactory = new ElasticIndexWriterFactory(connection, indexTracker, bulkProcessorHandler, retryPolicy);
65+
this.elasticIndexEditorProvider = new ElasticIndexEditorProvider(indexTracker, connection, null, bulkProcessorHandler, retryPolicy);
6566

6667
}
6768

oak-search-elastic/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
<description>Oak Elasticsearch integration subproject</description>
3434

3535
<properties>
36-
<elasticsearch.java.client.version>8.18.1</elasticsearch.java.client.version>
36+
<elasticsearch.java.client.version>8.18.2</elasticsearch.java.client.version>
3737
<lucene.version>9.12.1</lucene.version>
3838
</properties>
3939

oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticConnection.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public class ElasticConnection implements Closeable {
5656
protected static final int DEFAULT_PORT = 9200;
5757
protected static final String DEFAULT_API_KEY_ID = "";
5858
protected static final String DEFAULT_API_KEY_SECRET = "";
59+
protected static final int DEFAULT_MAX_RETRY_TIME = 0;
5960
protected static final int ES_SOCKET_TIMEOUT = 120000;
6061

6162
private final String indexPrefix;

oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexProviderService.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
2424
import org.apache.jackrabbit.oak.plugins.index.IndexInfoProvider;
2525
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexEditorProvider;
26+
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticRetryPolicy;
2627
import org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticIndexProvider;
2728
import org.apache.jackrabbit.oak.plugins.index.elastic.query.inference.InferenceConfig;
2829
import org.apache.jackrabbit.oak.plugins.index.elastic.query.inference.InferenceConstants;
@@ -73,6 +74,7 @@ public class ElasticIndexProviderService {
7374
protected static final String PROP_ELASTIC_PORT = "elasticsearch.port";
7475
protected static final String PROP_ELASTIC_API_KEY_ID = "elasticsearch.apiKeyId";
7576
protected static final String PROP_ELASTIC_API_KEY_SECRET = "elasticsearch.apiKeySecret";
77+
protected static final String PROP_ELASTIC_MAX_RETRY_TIME = "elasticsearch.maxRetryTime";
7678
protected static final String PROP_LOCAL_TEXT_EXTRACTION_DIR = "localTextExtractionDir";
7779
private static final boolean DEFAULT_IS_INFERENCE_ENABLED = false;
7880
private static final String ENV_VAR_OAK_INFERENCE_STATISTICS_DISABLED = "OAK_INFERENCE_STATISTICS_DISABLED";
@@ -117,6 +119,11 @@ public class ElasticIndexProviderService {
117119
@AttributeDefinition(name = "Elasticsearch API key secret", description = "Elasticsearch API key secret")
118120
String elasticsearch_apiKeySecret() default ElasticConnection.DEFAULT_API_KEY_SECRET;
119121

122+
@AttributeDefinition(
123+
name = "Elasticsearch Max Retry time",
124+
description = "Time in seconds that Elasticsearch should retry failed operations. 0 means disabled, no retries. Default is 0 seconds (disabled).")
125+
int elasticsearch_maxRetryTime() default ElasticConnection.DEFAULT_MAX_RETRY_TIME;
126+
120127
@AttributeDefinition(name = "Local text extraction cache path",
121128
description = "Local file system path where text extraction cache stores/load entries to recover from timed out operation")
122129
String localTextExtractionDir();
@@ -227,7 +234,8 @@ private void activate(BundleContext bundleContext, Config config) {
227234
LOG.info("Registering Index and Editor providers with connection {}", elasticConnection);
228235

229236
registerIndexProvider(bundleContext);
230-
this.elasticIndexEditorProvider = new ElasticIndexEditorProvider(indexTracker, elasticConnection, extractedTextCache);
237+
ElasticRetryPolicy retryPolicy = new ElasticRetryPolicy(100, config.elasticsearch_maxRetryTime() * 1000L, 5, 100);
238+
this.elasticIndexEditorProvider = new ElasticIndexEditorProvider(indexTracker, elasticConnection, extractedTextCache, retryPolicy);
231239
registerIndexEditor(bundleContext, elasticIndexEditorProvider);
232240
if (isElasticAvailable) {
233241
registerIndexCleaner(config);

0 commit comments

Comments
 (0)