diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java index 9a797212fa128..50a5d6d41c2f7 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java @@ -265,6 +265,10 @@ public void bulkIndex(Record record, Pair idAndDoc) throws Excep } } + public boolean indexDocumentWithRetry(Record record, Pair idAndDoc) { + return retry(() -> indexDocument(record, idAndDoc), "index document"); + } + /** * Index an elasticsearch document and ack the record. * @param record @@ -291,7 +295,7 @@ public boolean indexDocument(Record record, Pair return false; } } catch (final Exception ex) { - log.error("index failed id=" + idAndDoc.getLeft(), ex); + log.warn("index failed id=" + idAndDoc.getLeft(), ex); record.fail(); throw ex; } @@ -314,6 +318,10 @@ public void bulkDelete(Record record, String id) throws Exception } } + public boolean deleteDocumentWithRetry(Record record, String id) { + return retry(() -> deleteDocument(record, id), "delete document"); + } + /** * Delete an elasticsearch document and ack the record. * @param record diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java index 3a3ebf9f9d703..d7cbd3fafbe46 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java @@ -93,7 +93,7 @@ public void write(Record record) throws Exception { if (elasticSearchConfig.isBulkEnabled()) { elasticsearchClient.bulkDelete(record, idAndDoc.getLeft()); } else { - elasticsearchClient.deleteDocument(record, idAndDoc.getLeft()); + elasticsearchClient.deleteDocumentWithRetry(record, idAndDoc.getLeft()); } } break; @@ -107,7 +107,7 @@ public void write(Record record) throws Exception { if (elasticSearchConfig.isBulkEnabled()) { elasticsearchClient.bulkIndex(record, idAndDoc); } else { - elasticsearchClient.indexDocument(record, idAndDoc); + elasticsearchClient.indexDocumentWithRetry(record, idAndDoc); } } } catch (JsonProcessingException jsonProcessingException) { diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java index d098a7eae1e45..460f4b746265f 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.*; @@ -95,7 +96,7 @@ public void fail() { @Test public void testIndexDelete() throws Exception { - client.createIndexIfNeeded(INDEX); + assertTrue(client.createIndexIfNeeded(INDEX)); MockRecord mockRecord = new MockRecord<>(); client.indexDocument(mockRecord, Pair.of("1","{ \"a\":1}")); assertEquals(mockRecord.acked, 1); @@ -108,6 +109,51 @@ public void testIndexDelete() throws Exception { assertEquals(client.totalHits(INDEX), 0); } + @Test + public void testIndexDeleteWithRetry() throws Exception { + try (ElasticToxiproxiContainer toxiproxy = new ElasticToxiproxiContainer(container, network)) + { + toxiproxy.start(); + + final String index = "indexretry-" + UUID.randomUUID(); + ElasticSearchConfig config = new ElasticSearchConfig() + .setElasticSearchUrl("http://" + toxiproxy.getHttpHostAddress()) + .setIndexName(index) + .setMaxRetries(1000); + + try (ElasticSearchClient client = new ElasticSearchClient(config)) { + try { + assertTrue(client.createIndexIfNeeded(index)); + + log.info("starting the toxic"); + toxiproxy.getProxy().setConnectionCut(false); + toxiproxy.getProxy().toxics().latency("elasticpause", ToxicDirection.DOWNSTREAM, 15000); + toxiproxy.removeToxicAfterDelay("elasticpause", 15000); + + MockRecord mockRecord = new MockRecord<>(); + client.indexDocumentWithRetry(mockRecord, Pair.of("1", "{\"a\":1}")); + Awaitility.await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(mockRecord.acked, 1); + assertEquals(mockRecord.failed, 0); + assertEquals(client.totalHits(index), 1); + }); + + toxiproxy.getProxy().toxics().latency("elasticpause", ToxicDirection.DOWNSTREAM, 15000); + toxiproxy.removeToxicAfterDelay("elasticpause", 15000); + + client.deleteDocument(mockRecord, "1"); + Awaitility.await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(mockRecord.acked, 2); + assertEquals(mockRecord.failed, 0); + assertEquals(client.totalHits(index), 0); + }); + } finally { + client.delete(index); + } + } + } + } + @Test public void testIndexExists() throws IOException { assertFalse(client.indexExists("mynewindex")); @@ -186,7 +232,7 @@ public void testBulkRetry() throws Exception { // disabled, we want to have full control over flush() method .setBulkFlushIntervalInMs(-1); - try (ElasticSearchClient client = new ElasticSearchClient(config);) { + try (ElasticSearchClient client = new ElasticSearchClient(config)) { try { assertTrue(client.createIndexIfNeeded(index)); MockRecord mockRecord = new MockRecord<>(); @@ -269,10 +315,11 @@ public void testBulkBlocking() throws Exception { log.info("elapsed = {}", elapsed); assertTrue(elapsed > 29000); // bulkIndex was blocking while elasticsearch was down or busy - Thread.sleep(3000L); - assertEquals(mockRecord.acked, 15); - assertEquals(mockRecord.failed, 0); - assertEquals(client.records.size(), 0); + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(mockRecord.acked, 15); + assertEquals(mockRecord.failed, 0); + assertEquals(client.records.size(), 0); + }); } finally { client.delete(index);