@@ -124,12 +124,12 @@ public String toString() {
124124 private static final String SYNC_RT_MODE = "rt" ;
125125 private static final int MAX_SUPPRESSED_ERROR_CAUSES = 50 ;
126126
127- private final int FAILED_DOC_COUNT_FOR_STATUS_NODE = ConfigHelper .getSystemPropertyAsInt ("oak.failedDocStatusLimit" , 10000 );
128- private final int BULK_MAX_OPERATIONS = ConfigHelper .getSystemPropertyAsInt (BULK_ACTIONS_PROP , BULK_ACTIONS_DEFAULT );
129- private final int BULK_MAX_SIZE_BYTES = ConfigHelper .getSystemPropertyAsInt (BULK_SIZE_BYTES_PROP , BULK_SIZE_BYTES_DEFAULT );
130- private final int BULK_FLUSH_INTERVAL_MS = ConfigHelper .getSystemPropertyAsInt (BULK_FLUSH_INTERVAL_MS_PROP , BULK_FLUSH_INTERVAL_MS_DEFAULT );
131- private final int BULK_MAX_CONCURRENT_REQUESTS = ConfigHelper .getSystemPropertyAsInt (BULK_MAX_CONCURRENT_REQUESTS_PROP , BULK_MAX_CONCURRENT_REQUESTS_DEFAULT );
132- private final boolean FAIL_ON_ERROR = ConfigHelper .getSystemPropertyAsBoolean (FAIL_ON_ERROR_PROP , FAIL_ON_ERROR_DEFAULT );
127+ private final int failedDocCountForStatusNode = ConfigHelper .getSystemPropertyAsInt ("oak.failedDocStatusLimit" , 10000 );
128+ private final int bulkMaxOperations = ConfigHelper .getSystemPropertyAsInt (BULK_ACTIONS_PROP , BULK_ACTIONS_DEFAULT );
129+ private final int bulkMaxSizeBytes = ConfigHelper .getSystemPropertyAsInt (BULK_SIZE_BYTES_PROP , BULK_SIZE_BYTES_DEFAULT );
130+ private final int bulkFlushIntervalMs = ConfigHelper .getSystemPropertyAsInt (BULK_FLUSH_INTERVAL_MS_PROP , BULK_FLUSH_INTERVAL_MS_DEFAULT );
131+ private final int bulkMaxConcurrentRequests = ConfigHelper .getSystemPropertyAsInt (BULK_MAX_CONCURRENT_REQUESTS_PROP , BULK_MAX_CONCURRENT_REQUESTS_DEFAULT );
132+ private final boolean failOnError = ConfigHelper .getSystemPropertyAsBoolean (FAIL_ON_ERROR_PROP , FAIL_ON_ERROR_DEFAULT );
133133
134134 private final ElasticConnection elasticConnection ;
135135 private final BulkIngester <OperationContext > bulkIngester ;
@@ -153,21 +153,21 @@ public ElasticBulkProcessorHandler(@NotNull ElasticConnection elasticConnection)
153153 // BulkIngester does not support retry policies. Some retries though are already implemented in the transport layer.
154154 // More details here: https://github.com/elastic/elasticsearch-java/issues/478
155155 LOG .info ("Creating bulk ingester [maxActions: {}, maxSizeBytes: {} flushInterval {}, concurrency {}]" ,
156- BULK_MAX_OPERATIONS , BULK_MAX_SIZE_BYTES , BULK_FLUSH_INTERVAL_MS , BULK_MAX_CONCURRENT_REQUESTS_PROP );
156+ bulkMaxOperations , bulkMaxSizeBytes , bulkFlushIntervalMs , BULK_MAX_CONCURRENT_REQUESTS_PROP );
157157 this .bulkIngester = BulkIngester .of (b -> {
158158 b = b .client (elasticConnection .getAsyncClient ())
159159 .listener (new OakBulkListener ());
160- if (BULK_MAX_OPERATIONS > 0 ) {
161- b = b .maxOperations (BULK_MAX_OPERATIONS );
160+ if (bulkMaxOperations > 0 ) {
161+ b = b .maxOperations (bulkMaxOperations );
162162 }
163- if (BULK_MAX_SIZE_BYTES > 0 ) {
164- b = b .maxSize (BULK_MAX_SIZE_BYTES );
163+ if (bulkMaxSizeBytes > 0 ) {
164+ b = b .maxSize (bulkMaxSizeBytes );
165165 }
166- if (BULK_FLUSH_INTERVAL_MS > 0 ) {
167- b = b .flushInterval (BULK_FLUSH_INTERVAL_MS , TimeUnit .MILLISECONDS );
166+ if (bulkFlushIntervalMs > 0 ) {
167+ b = b .flushInterval (bulkFlushIntervalMs , TimeUnit .MILLISECONDS );
168168 }
169- if (BULK_MAX_CONCURRENT_REQUESTS > 0 ) {
170- b = b .maxConcurrentRequests (BULK_MAX_CONCURRENT_REQUESTS );
169+ if (bulkMaxConcurrentRequests > 0 ) {
170+ b = b .maxConcurrentRequests (bulkMaxConcurrentRequests );
171171 }
172172 return b ;
173173 });
@@ -309,7 +309,7 @@ public boolean closeIndex(String indexName) throws IOException {
309309 // we are closing. Wait until all requests lower or equal to this number are processed.
310310 OptionalLong lowestPendingBulkRequest = pendingBulks .stream ().mapToLong (Long ::longValue ).min ();
311311 // If there is no pending request, we return immediately
312- long remainingTimeoutNanos = TimeUnit .MILLISECONDS .toNanos (BULK_FLUSH_INTERVAL_MS * 5L );
312+ long remainingTimeoutNanos = TimeUnit .MILLISECONDS .toNanos (bulkFlushIntervalMs * 5L );
313313 while (lowestPendingBulkRequest .isPresent () && lowestPendingBulkRequest .getAsLong () <= highestBulkRequestSent ) {
314314 LOG .debug ("Waiting for request {} to be processed. Lowest pending request: {}" , highestBulkRequestSent , lowestPendingBulkRequest .getAsLong ());
315315 try {
@@ -442,7 +442,7 @@ public FailedDocSetTracker(NodeBuilder definitionBuilder) {
442442 }
443443
444444 public void addFailedDocument (String documentId ) {
445- if (failedDocSet .size () < FAILED_DOC_COUNT_FOR_STATUS_NODE ) {
445+ if (failedDocSet .size () < failedDocCountForStatusNode ) {
446446 failedDocSet .add (documentId );
447447 updated = true ;
448448 } else {
@@ -504,7 +504,7 @@ public void afterBulk(long executionId, BulkRequest request, List<OperationConte
504504 // However, this is not performance critical so we can use coarse grained locking
505505 k -> new FailedDocSetTracker (indexInfo .definitionBuilder ));
506506
507- if (FAIL_ON_ERROR && indexInfo .suppressedErrorCauses .size () < MAX_SUPPRESSED_ERROR_CAUSES ) {
507+ if (failOnError && indexInfo .suppressedErrorCauses .size () < MAX_SUPPRESSED_ERROR_CAUSES ) {
508508 indexInfo .suppressedErrorCauses .add (item .error ());
509509 }
510510 String documentId = contexts .get (i ).documentId ;
0 commit comments