@@ -57,40 +57,6 @@ public class ElasticBulkProcessorHandler {
5757
5858 private static final Logger LOG = LoggerFactory .getLogger (ElasticBulkProcessorHandler .class );
5959
60- public static final String BULK_ACTIONS_PROP = "oak.indexer.elastic.bulkProcessor.maxBulkOperations" ;
61- public static final int BULK_ACTIONS_DEFAULT = 250 ;
62- public static final String BULK_SIZE_BYTES_PROP = "oak.indexer.elastic.bulkProcessor.maxBulkSizeBytes" ;
63- public static final int BULK_SIZE_BYTES_DEFAULT = 1024 * 1024 ; // 1MB
64- public static final String BULK_FLUSH_INTERVAL_MS_PROP = "oak.indexer.elastic.bulkProcessor.bulkFlushIntervalMs" ;
65- public static final int BULK_FLUSH_INTERVAL_MS_DEFAULT = 3000 ;
66- public static final String BULK_PROCESSOR_CONCURRENCY_PROP = "oak.indexer.elastic.bulkProcessor.concurrency" ;
67- // when true, fails indexing in case of bulk failures
68- public static final String FAIL_ON_ERROR_PROP = "oak.indexer.elastic.bulkProcessor.failOnError" ;
69- public static final boolean FAIL_ON_ERROR_DEFAULT = true ;
70- private static final String SYNC_MODE_PROPERTY = "sync-mode" ;
71- private static final String SYNC_RT_MODE = "rt" ;
72-
73- private final int FAILED_DOC_COUNT_FOR_STATUS_NODE = ConfigHelper .getSystemPropertyAsInt ("oak.failedDocStatusLimit" , 10000 );
74- private final int BULK_MAX_OPERATIONS = ConfigHelper .getSystemPropertyAsInt (BULK_ACTIONS_PROP , BULK_ACTIONS_DEFAULT );
75- private final int BULK_MAX_SIZE_BYTES = ConfigHelper .getSystemPropertyAsInt (BULK_SIZE_BYTES_PROP , BULK_SIZE_BYTES_DEFAULT );
76- private final int BULK_FLUSH_INTERVAL_MS = ConfigHelper .getSystemPropertyAsInt (BULK_FLUSH_INTERVAL_MS_PROP , BULK_FLUSH_INTERVAL_MS_DEFAULT );
77- private final int BULK_PROCESSOR_CONCURRENCY = ConfigHelper .getSystemPropertyAsInt (BULK_PROCESSOR_CONCURRENCY_PROP , 1 );
78- private final boolean FAIL_ON_ERROR = ConfigHelper .getSystemPropertyAsBoolean (FAIL_ON_ERROR_PROP , FAIL_ON_ERROR_DEFAULT );
79- protected final ElasticConnection elasticConnection ;
80- protected final BulkIngester <OperationContext > bulkIngester ;
81-
82- // Used to keep track of the sequence number of the batches that are currently being processed.
83- // This is used to wait until all operations for a writer are processed before closing it.
84- private final ReentrantLock lock = new ReentrantLock ();
85- private final Condition bulkProcessedCondition = lock .newCondition ();
86- private final HashSet <Long > pendingBulks = new HashSet <>();
87-
88- private final AtomicBoolean closed = new AtomicBoolean (false );
89- private final ConcurrentHashMap <String , IndexInfo > registeredIndexes = new ConcurrentHashMap <>();
90-
91- protected long totalOperations ;
92- private final ConcurrentLinkedQueue <ErrorCause > globalSuppressedErrorCauses = new ConcurrentLinkedQueue <>();
93-
9460 static class IndexInfo {
9561 public final String indexName ;
9662 public final ElasticIndexDefinition indexDefinition ;
@@ -102,7 +68,6 @@ static class IndexInfo {
10268 */
10369 public final ConcurrentLinkedQueue <ErrorCause > suppressedErrorCauses = new ConcurrentLinkedQueue <>();
10470
105-
10671 long indexOperations = 0 ;
10772 long deleteOperations = 0 ;
10873 long updateOperations = 0 ;
@@ -135,6 +100,39 @@ public String toString() {
135100 }
136101 }
137102
103+ public static final String BULK_ACTIONS_PROP = "oak.indexer.elastic.bulkProcessor.maxBulkOperations" ;
104+ public static final int BULK_ACTIONS_DEFAULT = 8192 ;
105+ public static final String BULK_SIZE_BYTES_PROP = "oak.indexer.elastic.bulkProcessor.maxBulkSizeBytes" ;
106+ public static final int BULK_SIZE_BYTES_DEFAULT = 8 * 1024 * 1024 ; // 8MB
107+ public static final String BULK_FLUSH_INTERVAL_MS_PROP = "oak.indexer.elastic.bulkProcessor.bulkFlushIntervalMs" ;
108+ public static final int BULK_FLUSH_INTERVAL_MS_DEFAULT = 5000 ;
109+ public static final String BULK_PROCESSOR_CONCURRENCY_PROP = "oak.indexer.elastic.bulkProcessor.concurrency" ;
110+ // when true, fails indexing in case of bulk failures
111+ public static final String FAIL_ON_ERROR_PROP = "oak.indexer.elastic.bulkProcessor.failOnError" ;
112+ public static final boolean FAIL_ON_ERROR_DEFAULT = true ;
113+
114+ private static final String SYNC_MODE_PROPERTY = "sync-mode" ;
115+ private static final String SYNC_RT_MODE = "rt" ;
116+
117+ private final int FAILED_DOC_COUNT_FOR_STATUS_NODE = ConfigHelper .getSystemPropertyAsInt ("oak.failedDocStatusLimit" , 10000 );
118+ private final int BULK_MAX_OPERATIONS = ConfigHelper .getSystemPropertyAsInt (BULK_ACTIONS_PROP , BULK_ACTIONS_DEFAULT );
119+ private final int BULK_MAX_SIZE_BYTES = ConfigHelper .getSystemPropertyAsInt (BULK_SIZE_BYTES_PROP , BULK_SIZE_BYTES_DEFAULT );
120+ private final int BULK_FLUSH_INTERVAL_MS = ConfigHelper .getSystemPropertyAsInt (BULK_FLUSH_INTERVAL_MS_PROP , BULK_FLUSH_INTERVAL_MS_DEFAULT );
121+ private final int BULK_PROCESSOR_CONCURRENCY = ConfigHelper .getSystemPropertyAsInt (BULK_PROCESSOR_CONCURRENCY_PROP , 1 );
122+ private final boolean FAIL_ON_ERROR = ConfigHelper .getSystemPropertyAsBoolean (FAIL_ON_ERROR_PROP , FAIL_ON_ERROR_DEFAULT );
123+ private final ElasticConnection elasticConnection ;
124+ private final BulkIngester <OperationContext > bulkIngester ;
125+ // Used to keep track of the sequence number of the batches that are currently being processed.
126+ // This is used to wait until all operations for a writer are processed before closing it.
127+ private final ReentrantLock lock = new ReentrantLock ();
128+ private final Condition bulkProcessedCondition = lock .newCondition ();
129+ private final HashSet <Long > pendingBulks = new HashSet <>();
130+
131+ private final AtomicBoolean closed = new AtomicBoolean (false );
132+ private final ConcurrentHashMap <String , IndexInfo > registeredIndexes = new ConcurrentHashMap <>();
133+
134+ private final ConcurrentLinkedQueue <ErrorCause > globalSuppressedErrorCauses = new ConcurrentLinkedQueue <>();
135+
138136 public ElasticBulkProcessorHandler (@ NotNull ElasticConnection elasticConnection ) {
139137 this .elasticConnection = elasticConnection ;
140138 // BulkIngester does not support retry policies. Some retries though are already implemented in the transport layer.
@@ -352,6 +350,7 @@ public void close() throws IOException {
352350 } else {
353351 LOG .info ("Closing bulk processor handler" );
354352 LOG .trace ("Calling close on bulk ingester {}" , bulkIngester );
353+ printStatistics ();
355354 // This blocks until all requests are processed
356355 // Fail is some of the indexes were not closed
357356 if (!registeredIndexes .isEmpty ()) {
@@ -383,14 +382,24 @@ private IndexInfo getIndexInfoOrFail(String indexName) {
383382 return indexInfo ;
384383 }
385384
385+ private long totalWaitTimeNanos = 0 ;
386386 private void add (BulkOperation operation , OperationContext context ) throws IOException {
387- if (totalOperations % 128 == 0 ) {
388- LOG .info ("Adding operation: [{}]: {}" , context .indexInfo .indexName , context .documentId );
389- }
387+ // if (totalOperations % 4096 == 0) {
388+ // LOG.info("Adding operation: [{}]: {}", context.indexInfo.indexName, context.documentId);
389+ // }
390390 // fail fast: we don't want to wait until the processor gets closed to fail
391391 checkFailuresForIndex (context .indexInfo );
392+ long start = System .nanoTime ();
392393 bulkIngester .add (operation , context );
393- totalOperations ++;
394+ long end = System .nanoTime ();
395+ totalWaitTimeNanos += end - start ;
396+ }
397+
398+ public void printStatistics () {
399+ LOG .info ("BulkIngester statistics: [operationsCount: {}, requestCount: {}, avgOperationsPerBulk: {}, operationContentionsCount: {}, requestContentionsCount: {}, waitTimeMs: {}]" ,
400+ bulkIngester .operationsCount (), bulkIngester .requestCount (), bulkIngester .operationsCount ()/bulkIngester .requestCount (),
401+ bulkIngester .operationContentionsCount (), bulkIngester .requestContentionsCount (),
402+ TimeUnit .NANOSECONDS .toMillis (totalWaitTimeNanos ));
394403 }
395404
396405 private class OakBulkListener implements BulkListener <OperationContext > {
@@ -403,14 +412,11 @@ public void beforeBulk(long executionId, BulkRequest request, List<OperationCont
403412 } finally {
404413 lock .unlock ();
405414 }
415+ if (bulkIngester .requestCount () % 32 == 0 ) {
416+ LOG .info ("Sending bulk with id {} -> #ops: {}" , executionId , contexts .size ());
417+ printStatistics ();
418+ }
406419
407-
408- LOG .info ("Sending bulk with id {} -> #ops: {}" , executionId , contexts .size ());
409- // LOG.info("Bulk Requests: \n{}", request.operations()
410- // .stream()
411- // .map(BulkOperation::toString)
412- // .collect(Collectors.joining("\n"))
413- // );
414420 if (LOG .isTraceEnabled ()) {
415421 LOG .trace ("Bulk Requests: \n {}" , request .operations ()
416422 .stream ()
@@ -461,7 +467,7 @@ public void saveFailedDocSets() {
461467 @ Override
462468 public void afterBulk (long executionId , BulkRequest request , List <OperationContext > contexts , BulkResponse response ) {
463469 try {
464- LOG .info ("Bulk with id {} processed in {} ms" , executionId , response .took () / 1_000_000 );
470+ LOG .debug ("Bulk with id {} processed in {} ms" , executionId , response .took () / 1_000_000 );
465471 if (LOG .isTraceEnabled ()) {
466472 LOG .trace (response .toString ());
467473 }
0 commit comments