|
17 | 17 |
|
18 | 18 | package org.apache.flink.connector.elasticsearch.sink; |
19 | 19 |
|
20 | | -import org.apache.flink.api.common.operators.MailboxExecutor; |
21 | 20 | import org.apache.flink.api.connector.sink2.SinkWriter; |
22 | 21 | import org.apache.flink.api.connector.sink2.SinkWriter.Context; |
23 | 22 | import org.apache.flink.api.java.tuple.Tuple2; |
|
31 | 30 | import org.apache.flink.metrics.groups.SinkWriterMetricGroup; |
32 | 31 | import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; |
33 | 32 | import org.apache.flink.metrics.testutils.MetricListener; |
| 33 | +import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; |
34 | 34 | import org.apache.flink.runtime.metrics.MetricNames; |
35 | 35 | import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; |
36 | 36 | import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; |
37 | 37 | import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; |
38 | 38 | import org.apache.flink.test.junit5.MiniClusterExtension; |
39 | | -import org.apache.flink.util.FlinkRuntimeException; |
40 | 39 | import org.apache.flink.util.TestLoggerExtension; |
41 | | -import org.apache.flink.util.function.ThrowingRunnable; |
42 | 40 |
|
43 | 41 | import org.apache.http.HttpHost; |
44 | 42 | import org.elasticsearch.action.ActionListener; |
@@ -329,7 +327,7 @@ private static ElasticsearchWriter<Tuple2<Integer, String>> createWriter( |
329 | 327 | new DefaultBulkResponseInspector(), |
330 | 328 | new NetworkClientConfig(null, null, null, null, null, null, null, null), |
331 | 329 | metricGroup, |
332 | | - new TestMailbox()); |
| 330 | + new SyncMailboxExecutor()); |
333 | 331 | } |
334 | 332 |
|
335 | 333 | private TestingSinkWriterMetricGroup getSinkWriterMetricGroup() { |
@@ -481,29 +479,4 @@ GetResponse getResponse(String index, int id) throws IOException { |
481 | 479 | return client.get(new GetRequest(index, Integer.toString(id)), RequestOptions.DEFAULT); |
482 | 480 | } |
483 | 481 | } |
484 | | - |
485 | | - private static class TestMailbox implements MailboxExecutor { |
486 | | - |
487 | | - @Override |
488 | | - public void execute( |
489 | | - ThrowingRunnable<? extends Exception> command, |
490 | | - String descriptionFormat, |
491 | | - Object... descriptionArgs) { |
492 | | - try { |
493 | | - command.run(); |
494 | | - } catch (Exception e) { |
495 | | - throw new RuntimeException("Unexpected error", e); |
496 | | - } |
497 | | - } |
498 | | - |
499 | | - @Override |
500 | | - public void yield() throws InterruptedException, FlinkRuntimeException { |
501 | | - Thread.sleep(100); |
502 | | - } |
503 | | - |
504 | | - @Override |
505 | | - public boolean tryYield() throws FlinkRuntimeException { |
506 | | - return false; |
507 | | - } |
508 | | - } |
509 | 482 | } |
0 commit comments