2727import org .apache .jackrabbit .oak .api .PropertyState ;
2828import org .apache .jackrabbit .oak .api .Type ;
2929import org .apache .jackrabbit .oak .plugins .index .ConfigHelper ;
30+ import org .apache .jackrabbit .oak .plugins .index .FormattingUtils ;
3031import org .apache .jackrabbit .oak .plugins .index .elastic .ElasticConnection ;
3132import org .apache .jackrabbit .oak .plugins .index .elastic .ElasticIndexDefinition ;
3233import org .apache .jackrabbit .oak .plugins .index .search .IndexDefinition ;
@@ -57,6 +58,9 @@ public class ElasticBulkProcessorHandler {
5758
5859 private static final Logger LOG = LoggerFactory .getLogger (ElasticBulkProcessorHandler .class );
5960
61+ /**
62+ * Keeps information about an index that is being written by the bulk processor
63+ */
6064 static class IndexInfo {
6165 public final String indexName ;
6266 public final ElasticIndexDefinition indexDefinition ;
@@ -82,6 +86,9 @@ static class IndexInfo {
8286 }
8387 }
8488
89+ /**
90+ * Context object associated with each operation passed to the bulk processor
91+ */
8592 public final static class OperationContext {
8693 final IndexInfo indexInfo ;
8794 final String documentId ;
@@ -106,22 +113,26 @@ public String toString() {
106113 public static final int BULK_SIZE_BYTES_DEFAULT = 8 * 1024 * 1024 ; // 8MB
107114 public static final String BULK_FLUSH_INTERVAL_MS_PROP = "oak.indexer.elastic.bulkProcessor.bulkFlushIntervalMs" ;
108115 public static final int BULK_FLUSH_INTERVAL_MS_DEFAULT = 5000 ;
109- public static final String BULK_PROCESSOR_CONCURRENCY_PROP = "oak.indexer.elastic.bulkProcessor.concurrency" ;
116+ public static final String BULK_MAX_CONCURRENT_REQUESTS_PROP = "oak.indexer.elastic.bulkProcessor.maxConcurrentRequests" ;
117+ private static final int BULK_MAX_CONCURRENT_REQUESTS_DEFAULT = 1 ;
110118 // when true, fails indexing in case of bulk failures
111119 public static final String FAIL_ON_ERROR_PROP = "oak.indexer.elastic.bulkProcessor.failOnError" ;
112120 public static final boolean FAIL_ON_ERROR_DEFAULT = true ;
113121
114122 private static final String SYNC_MODE_PROPERTY = "sync-mode" ;
115123 private static final String SYNC_RT_MODE = "rt" ;
124+ private static final int MAX_SUPPRESSED_ERROR_CAUSES = 50 ;
116125
117126 private final int FAILED_DOC_COUNT_FOR_STATUS_NODE = ConfigHelper .getSystemPropertyAsInt ("oak.failedDocStatusLimit" , 10000 );
118127 private final int BULK_MAX_OPERATIONS = ConfigHelper .getSystemPropertyAsInt (BULK_ACTIONS_PROP , BULK_ACTIONS_DEFAULT );
119128 private final int BULK_MAX_SIZE_BYTES = ConfigHelper .getSystemPropertyAsInt (BULK_SIZE_BYTES_PROP , BULK_SIZE_BYTES_DEFAULT );
120129 private final int BULK_FLUSH_INTERVAL_MS = ConfigHelper .getSystemPropertyAsInt (BULK_FLUSH_INTERVAL_MS_PROP , BULK_FLUSH_INTERVAL_MS_DEFAULT );
121- private final int BULK_PROCESSOR_CONCURRENCY = ConfigHelper .getSystemPropertyAsInt (BULK_PROCESSOR_CONCURRENCY_PROP , 1 );
130+ private final int BULK_MAX_CONCURRENT_REQUESTS = ConfigHelper .getSystemPropertyAsInt (BULK_MAX_CONCURRENT_REQUESTS_PROP , BULK_MAX_CONCURRENT_REQUESTS_DEFAULT );
122131 private final boolean FAIL_ON_ERROR = ConfigHelper .getSystemPropertyAsBoolean (FAIL_ON_ERROR_PROP , FAIL_ON_ERROR_DEFAULT );
132+
123133 private final ElasticConnection elasticConnection ;
124134 private final BulkIngester <OperationContext > bulkIngester ;
135+
125136 // Used to keep track of the sequence number of the batches that are currently being processed.
126137 // This is used to wait until all operations for a writer are processed before closing it.
127138 private final ReentrantLock lock = new ReentrantLock ();
@@ -138,7 +149,7 @@ public ElasticBulkProcessorHandler(@NotNull ElasticConnection elasticConnection)
138149 // BulkIngester does not support retry policies. Some retries though are already implemented in the transport layer.
139150 // More details here: https://github.com/elastic/elasticsearch-java/issues/478
140151 LOG .info ("Creating bulk ingester [maxActions: {}, maxSizeBytes: {} flushInterval {}, concurrency {}]" ,
141- BULK_MAX_OPERATIONS , BULK_MAX_SIZE_BYTES , BULK_FLUSH_INTERVAL_MS , BULK_PROCESSOR_CONCURRENCY_PROP );
152+ BULK_MAX_OPERATIONS , BULK_MAX_SIZE_BYTES , BULK_FLUSH_INTERVAL_MS , BULK_MAX_CONCURRENT_REQUESTS_PROP );
142153 this .bulkIngester = BulkIngester .of (b -> {
143154 b = b .client (elasticConnection .getAsyncClient ())
144155 .listener (new OakBulkListener ());
@@ -151,8 +162,8 @@ public ElasticBulkProcessorHandler(@NotNull ElasticConnection elasticConnection)
151162 if (BULK_FLUSH_INTERVAL_MS > 0 ) {
152163 b = b .flushInterval (BULK_FLUSH_INTERVAL_MS , TimeUnit .MILLISECONDS );
153164 }
154- if (BULK_PROCESSOR_CONCURRENCY > 0 ) {
155- b = b .maxConcurrentRequests (BULK_PROCESSOR_CONCURRENCY );
165+ if (BULK_MAX_CONCURRENT_REQUESTS > 0 ) {
166+ b = b .maxConcurrentRequests (BULK_MAX_CONCURRENT_REQUESTS );
156167 }
157168 return b ;
158169 });
@@ -368,7 +379,9 @@ public void close() throws IOException {
368379
369380 private void checkFailuresForIndex (IndexInfo indexInfo ) throws IOException {
370381 if (!indexInfo .suppressedErrorCauses .isEmpty ()) {
371- IOException ioe = new IOException ("Exception while indexing. See suppressed for details" );
382+ String overflowMessage = (indexInfo .suppressedErrorCauses .size () >= MAX_SUPPRESSED_ERROR_CAUSES ) ?
383+ ". (Too many failed operations in last bulk request, including only " + indexInfo .suppressedErrorCauses .size () + " errors)" : "" ;
384+ IOException ioe = new IOException ("Exception while indexing. See suppressed for details" + overflowMessage );
372385 indexInfo .suppressedErrorCauses .stream ().map (ec -> new IllegalStateException (ec .reason ())).forEach (ioe ::addSuppressed );
373386 throw ioe ;
374387 }
@@ -383,10 +396,9 @@ private IndexInfo getIndexInfoOrFail(String indexName) {
383396 }
384397
385398 private long totalWaitTimeNanos = 0 ;
399+
386400 private void add (BulkOperation operation , OperationContext context ) throws IOException {
387- // if (totalOperations % 4096 == 0) {
388- // LOG.info("Adding operation: [{}]: {}", context.indexInfo.indexName, context.documentId);
389- // }
401+ LOG .info ("Adding operation: [{}]: {}" , context .indexInfo .indexName , context .documentId );
390402 // fail fast: we don't want to wait until the processor gets closed to fail
391403 checkFailuresForIndex (context .indexInfo );
392404 long start = System .nanoTime ();
@@ -397,7 +409,8 @@ private void add(BulkOperation operation, OperationContext context) throws IOExc
397409
398410 public void printStatistics () {
399411 LOG .info ("BulkIngester statistics: [operationsCount: {}, requestCount: {}, avgOperationsPerBulk: {}, operationContentionsCount: {}, requestContentionsCount: {}, waitTimeMs: {}]" ,
400- bulkIngester .operationsCount (), bulkIngester .requestCount (), bulkIngester .operationsCount ()/bulkIngester .requestCount (),
412+ bulkIngester .operationsCount (), bulkIngester .requestCount (),
413+ FormattingUtils .safeComputeAverage (bulkIngester .operationsCount (), bulkIngester .requestCount ()),
401414 bulkIngester .operationContentionsCount (), bulkIngester .requestContentionsCount (),
402415 TimeUnit .NANOSECONDS .toMillis (totalWaitTimeNanos ));
403416 }
@@ -485,7 +498,7 @@ public void afterBulk(long executionId, BulkRequest request, List<OperationConte
485498 // However, this is not performance critical so we can use coarse grained locking
486499 k -> new FailedDocSetTracker (indexInfo .definitionBuilder ));
487500
488- if (FAIL_ON_ERROR ) {
501+ if (FAIL_ON_ERROR && indexInfo . suppressedErrorCauses . size () < MAX_SUPPRESSED_ERROR_CAUSES ) {
489502 indexInfo .suppressedErrorCauses .add (item .error ());
490503 }
491504 String documentId = contexts .get (i ).documentId ;
0 commit comments