Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OAK-11545 - Use a single long-lived bulk processor for Elastic reindexing #2183

Open
wants to merge 17 commits into
base: trunk
Choose a base branch
from

Conversation

nfsantos
Copy link
Contributor

@nfsantos nfsantos commented Mar 14, 2025

When writing to Elastic indexes, we use a BulkIngester (part of the Elastic Client Java API) to buffer operations before sending them to Elastic in a bulk request and to manage concurrent outgoing requests. This is essential to have good performance. In the current implementation, each instance of ElasticIndexWriter creates a new ElasticBulkProcessorHandler. Since index writers are created per-index, when indexing more than one Elastic index we are creating several BulkIngesters. This is wasteful because a BulkIngester can be used to write to several indexes, so a single instance can be shared by ElasticIndexWriters.

This PR moves the creation of the ElasticBulkProcessor into the ElasticIndexerProvider class so this single instance can be shared by all the index writers created by that indexer provider. This has the following advantages:

  • Decreases the memory usage. Each bulk ingester contains buffers to keep the operations before sending them to Elastic. These buffers can take up significant space, especially if they are increased to optimize the size of the bulk requests.
  • Potentially improve performance because the single BulkIngester would fill up faster with operations from several indexes, leading to fewer but bigger bulk requests.
  • In the incremental indexer, eliminates the cost of creating and destroying several BulkIngesters for every incremental cycle (every 5 seconds).

Technical details

Bulk ingester configuration is now global

The following properties used to configure the Bulk Ingester could be set per-index, in the index definition. Now they are global configuration properties:

Bulk Ingester property New system property Default
maxOperations oak.indexer.elastic.bulkProcessor.maxBulkOperations 8192
maxSize oak.indexer.elastic.bulkProcessor.maxBulkSizeBytes 8MB
flushInterval oak.indexer.elastic.bulkProcessor.bulkFlushIntervalMs 2000 (2s)
maxConcurrentRequests oak.indexer.elastic.bulkProcessor.maxConcurrentRequests 1
failOnError oak.indexer.elastic.bulkProcessor.failOnError true

Changes in default values

The current defaults for the maximum number of operations per bulk and maximum bulk size are too small for good performance. This PR increases them to the following values:

  • oak.indexer.elastic.bulkProcessor.maxBulkOperations: 250 -> 8192
  • oak.indexer.elastic.bulkProcessor.maxBulkSizeBytes: 1MB -> 8MB

Closing writers without closing bulk ingester

The main complexity of this PR is to manage closing a index writer without closing the underlying bulk ingester. Until now, since each index writer contained a bulk ingester, closing the index writer and bulk ingester had the same lifecycle, so they could be closed together. But to use a single long lived bulk ingester for several writers, closing a index writer becomes more complex.

When closing a writer, we want to flush all the operations for that index. However, the bulk ingester will have in its buffers operations for several indexes and cannot easily distinguish between operations of each index. The solution used is to force a flush of the bulk ingester whenever a index writer is closed, and then wait until all bulk requests lower or equal to the one created by the flush request are processed. This will also flush operations for other indexes that are not being closed, but this will only slow down a little the time to close an index, and has not other problems.

@nfsantos nfsantos marked this pull request as ready for review March 18, 2025 17:04
if (totalOperations == 0) { // no need to invoke phaser await if we already know there were no operations
LOG.debug("No operations executed in this processor. Close immediately");
return false;
public boolean closeIndex(String indexName) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method name is a bit misleading. No indexes are actually closed here. Should we call something like flushIndex?

LOG.debug("No operations executed in this processor. Close immediately");
return false;
public boolean closeIndex(String indexName) throws IOException {
LOG.info("Closing index: {}", indexName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is called at every async lane run. Should we lower this to debug/trace?

try {
LOG.debug("Bulk with id {} processed in {} ms", executionId, response.took());
LOG.debug("Bulk with id {} processed in {} ms", executionId, response.took() / 1_000_000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

response.took() returns milliseconds. Dividing it by 1M would result in 0 most of the times.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants