Skip to content

Commit 3288baf

Browse files
committed
Initial working implementation with new tests. Needs more testing and polish
1 parent 4c8b070 commit 3288baf

File tree

16 files changed

+610
-338
lines changed

16 files changed

+610
-338
lines changed

oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerProvider.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
2525
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexTracker;
2626
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticMetricHandler;
27+
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticBulkProcessorHandler;
2728
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticDocument;
2829
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexEditorProvider;
2930
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexWriterFactory;
@@ -39,6 +40,7 @@
3940

4041
import org.jetbrains.annotations.Nullable;
4142

43+
import java.io.IOException;
4244
import java.util.concurrent.TimeUnit;
4345

4446
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME;
@@ -49,12 +51,15 @@ public class ElasticIndexerProvider implements NodeStateIndexerProvider {
4951
private final IndexHelper indexHelper;
5052
private final ElasticIndexWriterFactory indexWriterFactory;
5153
private final ElasticConnection connection;
54+
private final ElasticBulkProcessorHandler bulkProcessorHandler;
5255

5356
public ElasticIndexerProvider(IndexHelper indexHelper, ElasticConnection connection) {
5457
this.indexHelper = indexHelper;
55-
this.indexWriterFactory = new ElasticIndexWriterFactory(connection,
56-
new ElasticIndexTracker(connection, new ElasticMetricHandler(StatisticsProvider.NOOP)));
5758
this.connection = connection;
59+
this.bulkProcessorHandler = new ElasticBulkProcessorHandler(connection);
60+
this.indexWriterFactory = new ElasticIndexWriterFactory(connection,
61+
new ElasticIndexTracker(connection, new ElasticMetricHandler(StatisticsProvider.NOOP)), bulkProcessorHandler);
62+
5863
}
5964

6065
@Override
@@ -79,5 +84,11 @@ public ExtractedTextCache getTextCache() {
7984
}
8085

8186
@Override
82-
public void close() {}
87+
public void close() {
88+
try {
89+
this.bulkProcessorHandler.close();
90+
} catch (IOException e) {
91+
throw new RuntimeException(e);
92+
}
93+
}
8394
}

oak-run-elastic/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
2727
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
2828
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexTracker;
29+
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticBulkProcessorHandler;
30+
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticDocument;
2931
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexEditorProvider;
3032
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexWriterFactory;
3133
import org.apache.jackrabbit.oak.plugins.index.elastic.util.ElasticIndexDefinitionBuilder;
@@ -71,8 +73,8 @@ public void nodeIndexed_WithIncludedPaths() throws Exception {
7173
when(elasticsearchAsyncClientMock._jsonpMapper()).thenReturn(jsonMapperMock);
7274
when(elasticConnectionMock.getAsyncClient()).thenReturn(elasticsearchAsyncClientMock);
7375

74-
FulltextIndexWriter indexWriter = new ElasticIndexWriterFactory(elasticConnectionMock,
75-
mock(ElasticIndexTracker.class)).newInstance(idxDefn, defn.builder(), CommitInfo.EMPTY, false);
76+
FulltextIndexWriter<ElasticDocument> indexWriter = new ElasticIndexWriterFactory(elasticConnectionMock,
77+
mock(ElasticIndexTracker.class), mock(ElasticBulkProcessorHandler.class)).newInstance(idxDefn, defn.builder(), CommitInfo.EMPTY, false);
7678
ElasticIndexer indexer = new ElasticIndexer(idxDefn, mock(FulltextBinaryTextExtractor.class), builder,
7779
mock(IndexingProgressReporter.class), indexWriter, mock(ElasticIndexEditorProvider.class), mock(IndexHelper.class));
7880

oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,6 @@ public class ElasticIndexDefinition extends IndexDefinition {
4444

4545
public static final String TYPE_ELASTICSEARCH = "elasticsearch";
4646

47-
public static final String BULK_ACTIONS = "bulkActions";
48-
public static final int BULK_ACTIONS_DEFAULT = 250;
49-
50-
public static final String BULK_SIZE_BYTES = "bulkSizeBytes";
51-
public static final long BULK_SIZE_BYTES_DEFAULT = 1024 * 1024; // 1MB
52-
53-
public static final String BULK_FLUSH_INTERVAL_MS = "bulkFlushIntervalMs";
54-
public static final long BULK_FLUSH_INTERVAL_MS_DEFAULT = 3000;
55-
5647
public static final String NUMBER_OF_SHARDS = "numberOfShards";
5748
public static final int NUMBER_OF_SHARDS_DEFAULT = 1;
5849

@@ -82,10 +73,6 @@ public class ElasticIndexDefinition extends IndexDefinition {
8273
public static final String LIMIT_TOTAL_FIELDS = "limitTotalFields";
8374
public static final long LIMIT_TOTAL_FIELDS_DEFAULT = 1000L;
8475

85-
// when true, fails indexing in case of bulk failures
86-
public static final String FAIL_ON_ERROR = "failOnError";
87-
public static final boolean FAIL_ON_ERROR_DEFAULT = true;
88-
8976
/**
9077
* When 0, the index name gets dynamically generated by adding a random suffix to the index name.
9178
*/
@@ -178,9 +165,6 @@ public class ElasticIndexDefinition extends IndexDefinition {
178165

179166
private final String indexPrefix;
180167
private final String indexAlias;
181-
public final int bulkActions;
182-
public final long bulkSizeBytes;
183-
public final long bulkFlushIntervalMs;
184168
private final boolean similarityTagsEnabled;
185169
private final float similarityTagsBoost;
186170
public final int numberOfShards;
@@ -189,7 +173,6 @@ public class ElasticIndexDefinition extends IndexDefinition {
189173
public final long queryTimeoutMs;
190174
public final Integer trackTotalHits;
191175
public final String dynamicMapping;
192-
public final boolean failOnError;
193176
public final long indexNameSeed;
194177
public final InferenceDefinition inferenceDefinition;
195178
public final long limitTotalFields;
@@ -211,9 +194,6 @@ public ElasticIndexDefinition(NodeState root, NodeState defn, String indexPath,
211194
} else {
212195
this.indexAlias = ElasticIndexNameHelper.getElasticSafeIndexName(indexPrefix, getIndexPath());
213196
}
214-
this.bulkActions = getOptionalValue(defn, BULK_ACTIONS, BULK_ACTIONS_DEFAULT);
215-
this.bulkSizeBytes = getOptionalValue(defn, BULK_SIZE_BYTES, BULK_SIZE_BYTES_DEFAULT);
216-
this.bulkFlushIntervalMs = getOptionalValue(defn, BULK_FLUSH_INTERVAL_MS, BULK_FLUSH_INTERVAL_MS_DEFAULT);
217197
this.numberOfShards = getOptionalValue(defn, NUMBER_OF_SHARDS, NUMBER_OF_SHARDS_DEFAULT);
218198
this.numberOfReplicas = getOptionalValue(defn, NUMBER_OF_REPLICAS, NUMBER_OF_REPLICAS_DEFAULT);
219199
this.similarityTagsEnabled = getOptionalValue(defn, SIMILARITY_TAGS_ENABLED, SIMILARITY_TAGS_ENABLED_DEFAULT);
@@ -223,9 +203,6 @@ public ElasticIndexDefinition(NodeState root, NodeState defn, String indexPath,
223203
this.queryTimeoutMs = getOptionalValue(defn, QUERY_TIMEOUT_MS, QUERY_TIMEOUT_MS_DEFAULT);
224204
this.trackTotalHits = getOptionalValue(defn, TRACK_TOTAL_HITS, TRACK_TOTAL_HITS_DEFAULT);
225205
this.dynamicMapping = getOptionalValue(defn, DYNAMIC_MAPPING, DYNAMIC_MAPPING_DEFAULT);
226-
this.failOnError = getOptionalValue(defn, FAIL_ON_ERROR,
227-
Boolean.parseBoolean(System.getProperty(TYPE_ELASTICSEARCH + "." + FAIL_ON_ERROR, Boolean.toString(FAIL_ON_ERROR_DEFAULT)))
228-
);
229206
this.indexNameSeed = getOptionalValue(defn, INDEX_NAME_SEED, INDEX_NAME_SEED_DEFAULT);
230207
this.similarityTagsFields = getOptionalValues(defn, SIMILARITY_TAGS_FIELDS, Type.STRINGS, String.class, SIMILARITY_TAGS_FIELDS_DEFAULT);
231208
this.limitTotalFields = getOptionalValue(defn, LIMIT_TOTAL_FIELDS, LIMIT_TOTAL_FIELDS_DEFAULT);

0 commit comments

Comments
 (0)