Skip to content

Pull IndigoRecord attribute as the document ID for index updates in bingo-elastic #1987

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
Expand All @@ -36,6 +38,7 @@
import java.security.cert.X509Certificate;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;

import static com.epam.indigo.model.NamingConstants.*;
Expand All @@ -44,7 +47,7 @@
* Class responsible for all operations with Elasticsearch
* Have ability to index, delete, produce stream for further operations like similarity match, filtering on extra textual fields, etc
*/
public class ElasticRepository<T extends IndigoRecord> implements GenericRepository<T> {
public class ElasticRepository<T extends IndigoRecord> implements GenericRepository<T>, AutoCloseable {

private String indexName;
private List<String> hostsNames;
Expand All @@ -57,8 +60,11 @@ public class ElasticRepository<T extends IndigoRecord> implements GenericReposit
private int numShards = 1;
private int numReplicas = 0;
private String refreshInterval = "5m";
private Function<IndigoRecord, String> documentIdProvider = null;
private boolean autoFlush = false;

private ElasticRepository() {

}

private boolean checkIfIndexExists() throws BingoElasticException {
Expand Down Expand Up @@ -136,6 +142,23 @@ public void indexRecords(Iterable<T> records, int batchSize) throws IOException
indexRecords(records, batchSize, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
if (autoFlush) {
flushRecords();
}
}

@Override
public void onFailure(Exception e) {
// do nothing
}
});
}

public void flushRecords() {
FlushRequest flushRequest = new FlushRequest();
this.elasticClient.indices().flushAsync(flushRequest, RequestOptions.DEFAULT, new ActionListener<>() {
@Override
public void onResponse(FlushResponse bulkResponse) {
// do nothing
}

Expand Down Expand Up @@ -190,14 +213,23 @@ public void indexRecords(Iterable<T> flatRecords, int batchSize, ActionListener<
}
}
builder.endObject();
request.add(new IndexRequest(this.indexName)
.source(builder));

IndexRequest indexRequest = new IndexRequest(this.indexName)
.source(builder);

if (this.documentIdProvider != null) {
String documentId = this.documentIdProvider.apply(t);

if (documentId != null && !documentId.isEmpty()) {
indexRequest = indexRequest
.id(documentId);
}
}

request.add(indexRequest);
this.elasticClient.bulkAsync(request, RequestOptions.DEFAULT, actionListener);
}
}
// TODO do we need it?
// FlushRequest flushRequest = new FlushRequest();
// this.elasticClient.indices().flushAsync(flushRequest, RequestOptions.DEFAULT);
}

@Override
Expand All @@ -217,6 +249,11 @@ public boolean deleteAllRecords() {
}
}

@Override
public void close() throws Exception {
elasticClient.close();
}

public static class ElasticRepositoryBuilder<T extends IndigoRecord> {
private final List<Consumer<ElasticRepository<T>>> operations;

Expand Down Expand Up @@ -279,6 +316,45 @@ public ElasticRepositoryBuilder<T> withRefreshInterval(String refreshInterval) {
return this;
}

/**
* Provide a function that maps an IndigoRecord attribute to
* ElasticSearch document _id unique identifier, to allow for
* updating existing records if they exist.
* Mutually exclusive with withUseInternalId
*
* @param documentIdProvider function that maps IndigoRecord to document id string
* @return the repository builder
*/
public ElasticRepositoryBuilder<T> withDocumentIdProvider(Function<IndigoRecord, String> documentIdProvider) {
operations.add(repo -> repo.documentIdProvider = documentIdProvider);
return this;
}

/**
* Use IndigoRecord.internalID as the ElasticSearch document _id
* unique identifier, to allow for updating existing records
* if they exist.
* Mutually exclusive with withDocumentIdProvider
*
* @return the repository builder
*/
public ElasticRepositoryBuilder<T> useInternalIdAsDocumentId() {
operations.add(repo -> repo.documentIdProvider = IndigoRecord::getInternalID);
return this;
}

/**
* Automatically flush updated index after making an index call.
* Not recommended for production use or when inserting a large number of records.
*
* @param autoFlush auto flush updated index
* @return the repository builder
*/
public ElasticRepositoryBuilder<T> withAutoFlush(boolean autoFlush) {
operations.add(repo -> repo.autoFlush = autoFlush);
return this;
}

public ElasticRepository<T> build() {
ElasticRepository<T> repository = new ElasticRepository<>();
operations.forEach(operation -> operation.accept(repository));
Expand Down