Skip to content

Commit

Permalink
add non-bulk index/delete with retry (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
Vincent Royer authored Feb 3, 2022
1 parent ad512d6 commit 65269e1
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ public void bulkIndex(Record record, Pair<String, String> idAndDoc) throws Excep
}
}

public boolean indexDocumentWithRetry(Record<GenericObject> record, Pair<String, String> idAndDoc) {
return retry(() -> indexDocument(record, idAndDoc), "index document");
}

/**
* Index an elasticsearch document and ack the record.
* @param record
Expand All @@ -291,7 +295,7 @@ public boolean indexDocument(Record<GenericObject> record, Pair<String, String>
return false;
}
} catch (final Exception ex) {
log.error("index failed id=" + idAndDoc.getLeft(), ex);
log.warn("index failed id=" + idAndDoc.getLeft(), ex);
record.fail();
throw ex;
}
Expand All @@ -314,6 +318,10 @@ public void bulkDelete(Record<GenericObject> record, String id) throws Exception
}
}

public boolean deleteDocumentWithRetry(Record<GenericObject> record, String id) {
return retry(() -> deleteDocument(record, id), "delete document");
}

/**
* Delete an elasticsearch document and ack the record.
* @param record
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void write(Record<GenericObject> record) throws Exception {
if (elasticSearchConfig.isBulkEnabled()) {
elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
} else {
elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
elasticsearchClient.deleteDocumentWithRetry(record, idAndDoc.getLeft());
}
}
break;
Expand All @@ -107,7 +107,7 @@ public void write(Record<GenericObject> record) throws Exception {
if (elasticSearchConfig.isBulkEnabled()) {
elasticsearchClient.bulkIndex(record, idAndDoc);
} else {
elasticsearchClient.indexDocument(record, idAndDoc);
elasticsearchClient.indexDocumentWithRetry(record, idAndDoc);
}
}
} catch (JsonProcessingException jsonProcessingException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.*;
Expand Down Expand Up @@ -95,7 +96,7 @@ public void fail() {

@Test
public void testIndexDelete() throws Exception {
client.createIndexIfNeeded(INDEX);
assertTrue(client.createIndexIfNeeded(INDEX));
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.indexDocument(mockRecord, Pair.of("1","{ \"a\":1}"));
assertEquals(mockRecord.acked, 1);
Expand All @@ -108,6 +109,51 @@ public void testIndexDelete() throws Exception {
assertEquals(client.totalHits(INDEX), 0);
}

@Test
public void testIndexDeleteWithRetry() throws Exception {
try (ElasticToxiproxiContainer toxiproxy = new ElasticToxiproxiContainer(container, network))
{
toxiproxy.start();

final String index = "indexretry-" + UUID.randomUUID();
ElasticSearchConfig config = new ElasticSearchConfig()
.setElasticSearchUrl("http://" + toxiproxy.getHttpHostAddress())
.setIndexName(index)
.setMaxRetries(1000);

try (ElasticSearchClient client = new ElasticSearchClient(config)) {
try {
assertTrue(client.createIndexIfNeeded(index));

log.info("starting the toxic");
toxiproxy.getProxy().setConnectionCut(false);
toxiproxy.getProxy().toxics().latency("elasticpause", ToxicDirection.DOWNSTREAM, 15000);
toxiproxy.removeToxicAfterDelay("elasticpause", 15000);

MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.indexDocumentWithRetry(mockRecord, Pair.of("1", "{\"a\":1}"));
Awaitility.await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
assertEquals(mockRecord.acked, 1);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 1);
});

toxiproxy.getProxy().toxics().latency("elasticpause", ToxicDirection.DOWNSTREAM, 15000);
toxiproxy.removeToxicAfterDelay("elasticpause", 15000);

client.deleteDocument(mockRecord, "1");
Awaitility.await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 0);
});
} finally {
client.delete(index);
}
}
}
}

@Test
public void testIndexExists() throws IOException {
assertFalse(client.indexExists("mynewindex"));
Expand Down Expand Up @@ -186,7 +232,7 @@ public void testBulkRetry() throws Exception {
// disabled, we want to have full control over flush() method
.setBulkFlushIntervalInMs(-1);

try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try (ElasticSearchClient client = new ElasticSearchClient(config)) {
try {
assertTrue(client.createIndexIfNeeded(index));
MockRecord<GenericObject> mockRecord = new MockRecord<>();
Expand Down Expand Up @@ -269,10 +315,11 @@ public void testBulkBlocking() throws Exception {
log.info("elapsed = {}", elapsed);
assertTrue(elapsed > 29000); // bulkIndex was blocking while elasticsearch was down or busy

Thread.sleep(3000L);
assertEquals(mockRecord.acked, 15);
assertEquals(mockRecord.failed, 0);
assertEquals(client.records.size(), 0);
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
assertEquals(mockRecord.acked, 15);
assertEquals(mockRecord.failed, 0);
assertEquals(client.records.size(), 0);
});

} finally {
client.delete(index);
Expand Down

0 comments on commit 65269e1

Please sign in to comment.