From f6b7298a6aa4fe74ffd50ddb11858b2d1ac19460 Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Tue, 25 Mar 2025 12:59:18 -0500 Subject: [PATCH 1/3] Fix logic in RFS when run exits without real progress Signed-off-by: Andre Kurait --- .../migrations/RfsMigrateDocuments.java | 71 +++++++++++-------- .../migrations/bulkload/SourceTestBase.java | 2 + 2 files changed, 43 insertions(+), 30 deletions(-) diff --git a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java index 6a007ba30d..396007b4f6 100644 --- a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java +++ b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java @@ -429,37 +429,48 @@ private static void exitOnLeaseTimeout( log.atWarn().setMessage("Terminating RfsMigrateDocuments because the lease has expired for {}") .addArgument(workItemId) .log(); - if (progressCursorRef.get() != null) { - log.atWarn().setMessage("Progress cursor set, cancelling active doc migration").log(); - cancellationRunnable.run(); - // Get a new progressCursor after cancellation for most up-to-date checkpoint - var progressCursor = progressCursorRef.get(); - log.atWarn().setMessage("Progress cursor: {}") - .addArgument(progressCursor).log(); - var workItemAndDuration = workItemRef.get(); - if (workItemAndDuration == null) { - throw new IllegalStateException("Unexpected state with progressCursor set without a" + - "work item"); + try { + if (progressCursorRef.get() != null) { + log.atWarn().setMessage("Progress cursor set, cancelling active doc migration").log(); + cancellationRunnable.run(); + // Get a new progressCursor after cancellation for most up-to-date checkpoint + var progressCursor = progressCursorRef.get(); + log.atWarn().setMessage("Progress cursor: {}") + .addArgument(progressCursor).log(); + var workItemAndDuration = workItemRef.get(); + if (workItemAndDuration == null) { + throw new IllegalStateException("Unexpected state with progressCursor set without a" + + "work item"); + } + log.atWarn().setMessage("Work Item and Duration: {}").addArgument(workItemAndDuration) + .log(); + log.atWarn().setMessage("Work Item: {}").addArgument(workItemAndDuration.getWorkItem()) + .log(); + var successorWorkItemIds = getSuccessorWorkItemIds(workItemAndDuration, progressCursor); + if (successorWorkItemIds.size() == 1 && workItemId.equals(successorWorkItemIds.get(0))) { + log.atWarn().setMessage("No real progress was made for work item: {}. Will retry with larger timeout").addArgument(workItemId).log(); + } else { + log.atWarn().setMessage("Successor Work Ids: {}").addArgument(String.join(", ", successorWorkItemIds)) + .log(); + var successorNextAcquisitionLeaseExponent = getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, workItemAndDuration.getLeaseExpirationTime()); + coordinator.createSuccessorWorkItemsAndMarkComplete( + workItemId, + successorWorkItemIds, + successorNextAcquisitionLeaseExponent, + contextSupplier + ); + } + } else { + log.atWarn().setMessage("No progress cursor to create successor work items from. This can happen when" + + "downloading and unpacking shard takes longer than the lease").log(); + log.atWarn().setMessage("Skipping creation of successor work item to retry the existing one with more time") + .log(); } - log.atWarn().setMessage("Work Item and Duration: {}").addArgument(workItemAndDuration) - .log(); - log.atWarn().setMessage("Work Item: {}").addArgument(workItemAndDuration.getWorkItem()) - .log(); - var successorWorkItemIds = getSuccessorWorkItemIds(workItemAndDuration, progressCursor); - log.atWarn().setMessage("Successor Work Ids: {}").addArgument(String.join(", ", successorWorkItemIds)) - .log(); - var successorNextAcquisitionLeaseExponent = getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, workItemAndDuration.getLeaseExpirationTime()); - coordinator.createSuccessorWorkItemsAndMarkComplete( - workItemId, - successorWorkItemIds, - successorNextAcquisitionLeaseExponent, - contextSupplier - ); - } else { - log.atWarn().setMessage("No progress cursor to create successor work items from. This can happen when" + - "downloading and unpacking shard takes longer than the lease").log(); - log.atWarn().setMessage("Skipping creation of successor work item to retry the existing one with more time") - .log(); + } catch (Exception e) { + log.atError().setMessage("Exception during exit on lease timeout, clean shutdown failed") + .setCause(e).log(); + cleanShutdownCompleted.set(false); + System.exit(PROCESS_TIMED_OUT_EXIT_CODE); } cleanShutdownCompleted.set(true); System.exit(PROCESS_TIMED_OUT_EXIT_CODE); diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java index 319951099a..b7ce610c84 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java @@ -76,6 +76,8 @@ public class SourceTestBase { @NotNull protected static Process runAndMonitorProcess(ProcessBuilder processBuilder) throws IOException { + processBuilder.redirectErrorStream(true); + processBuilder.redirectOutput(ProcessBuilder.Redirect.PIPE); var process = processBuilder.start(); log.atInfo().setMessage("Process started with ID: {}").addArgument(() -> process.toHandle().pid()).log(); From ac40c38457b79ad5b39a695cd6eb279765b7ddfe Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Tue, 25 Mar 2025 13:28:25 -0500 Subject: [PATCH 2/3] Mark kafka tests as isolated Signed-off-by: Andre Kurait --- .../testcontainers/CaptureProxyContainer.java | 4 ++-- .../testcontainers/KafkaContainerTestBase.java | 9 +++++---- .../testcontainers/annotations/TestContainerTest.java | 2 +- .../e2etests/KafkaRestartingTrafficReplayerTest.java | 7 +++---- .../kafka/KafkaCommitsWorkBetweenLongPollsTest.java | 7 +++---- .../migrations/replay/kafka/KafkaKeepAliveTests.java | 8 +++----- .../kafka/KafkaTrafficCaptureSourceLongTermTest.java | 6 +++--- 7 files changed, 20 insertions(+), 23 deletions(-) diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/CaptureProxyContainer.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/CaptureProxyContainer.java index 7d03179d38..39a2daf15a 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/CaptureProxyContainer.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/CaptureProxyContainer.java @@ -16,9 +16,9 @@ import lombok.extern.slf4j.Slf4j; import org.testcontainers.containers.Container; import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; import org.testcontainers.containers.wait.strategy.WaitStrategyTarget; +import org.testcontainers.kafka.ConfluentKafkaContainer; import org.testcontainers.lifecycle.Startable; @Slf4j @@ -47,7 +47,7 @@ public CaptureProxyContainer(final String destinationUri, final String kafkaUri) this(() -> destinationUri, () -> kafkaUri); } - public CaptureProxyContainer(final Container destination, final KafkaContainer kafka) { + public CaptureProxyContainer(final Container destination, final ConfluentKafkaContainer kafka) { this(() -> getUriFromContainer(destination), () -> getUriFromContainer(kafka)); } diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/KafkaContainerTestBase.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/KafkaContainerTestBase.java index 862fc9f2c1..e04aa087a4 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/KafkaContainerTestBase.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/KafkaContainerTestBase.java @@ -2,13 +2,14 @@ import org.opensearch.migrations.testutils.SharedDockerImageNames; -import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.kafka.ConfluentKafkaContainer; -public class KafkaContainerTestBase extends TestContainerTestBase { - private static final KafkaContainer kafka = new KafkaContainer(SharedDockerImageNames.KAFKA); +public class KafkaContainerTestBase extends TestContainerTestBase { - public KafkaContainer getContainer() { + private static final ConfluentKafkaContainer kafka = new ConfluentKafkaContainer(SharedDockerImageNames.KAFKA); + + public ConfluentKafkaContainer getContainer() { return kafka; } } diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/TestContainerTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/TestContainerTest.java index 7d001367c2..d0e111d0c6 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/TestContainerTest.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/TestContainerTest.java @@ -12,7 +12,7 @@ @Inherited @Retention(RetentionPolicy.RUNTIME) @Target({ ElementType.TYPE }) -@Tag("longTest") +@Tag("isolatedTest") @Testcontainers(disabledWithoutDocker = true, parallel = true) public @interface TestContainerTest { } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/KafkaRestartingTrafficReplayerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/KafkaRestartingTrafficReplayerTest.java index 0d139e7f66..9e2f09597c 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/KafkaRestartingTrafficReplayerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/KafkaRestartingTrafficReplayerTest.java @@ -40,14 +40,14 @@ import org.junit.jupiter.api.parallel.ResourceLock; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; @Slf4j @Testcontainers(disabledWithoutDocker = true) @WrapWithNettyLeakDetection(disableLeakChecks = true) -@Tag("requiresDocker") +@Tag("isolatedTest") public class KafkaRestartingTrafficReplayerTest extends InstrumentationTest { public static final int INITIAL_STOP_REPLAYER_REQUEST_COUNT = 1; public static final String TEST_GROUP_CONSUMER_ID = "TEST_GROUP_CONSUMER_ID"; @@ -64,7 +64,7 @@ public class KafkaRestartingTrafficReplayerTest extends InstrumentationTest { @Container // see // https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility - private final KafkaContainer embeddedKafkaBroker = new KafkaContainer(SharedDockerImageNames.KAFKA); + private final ConfluentKafkaContainer embeddedKafkaBroker = new ConfluentKafkaContainer(SharedDockerImageNames.KAFKA); private static class CounterLimitedReceiverFactory implements Supplier> { AtomicInteger nextStopPointRef = new AtomicInteger(INITIAL_STOP_REPLAYER_REQUEST_COUNT); @@ -87,7 +87,6 @@ public Consumer get() { @ParameterizedTest @CsvSource(value = { "3,false", "-1,false", "3,true", "-1,true", }) - @Tag("longTest") @ResourceLock("TrafficReplayerRunner") public void fullTest(int testSize, boolean randomize) throws Throwable { var random = new Random(1); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaCommitsWorkBetweenLongPollsTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaCommitsWorkBetweenLongPollsTest.java index 841e4e4e84..f04cf3af74 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaCommitsWorkBetweenLongPollsTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaCommitsWorkBetweenLongPollsTest.java @@ -18,13 +18,13 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; @Slf4j @Testcontainers(disabledWithoutDocker = true) -@Tag("requiresDocker") +@Tag("isolatedTest") public class KafkaCommitsWorkBetweenLongPollsTest extends InstrumentationTest { private static final long DEFAULT_POLL_INTERVAL_MS = 1000; private static final int NUM_RUNS = 5; @@ -32,7 +32,7 @@ public class KafkaCommitsWorkBetweenLongPollsTest extends InstrumentationTest { @Container // see // https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility - private final KafkaContainer embeddedKafkaBroker = new KafkaContainer(SharedDockerImageNames.KAFKA); + private final ConfluentKafkaContainer embeddedKafkaBroker = new ConfluentKafkaContainer(SharedDockerImageNames.KAFKA); @SneakyThrows private KafkaConsumer buildKafkaConsumer() { @@ -49,7 +49,6 @@ private KafkaConsumer buildKafkaConsumer() { } @Test - @Tag("longTest") public void testThatCommitsAndReadsKeepWorking() throws Exception { var kafkaSource = new KafkaTrafficCaptureSource( rootContext, diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaKeepAliveTests.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaKeepAliveTests.java index a87c75c0ab..78eee01f1c 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaKeepAliveTests.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaKeepAliveTests.java @@ -24,13 +24,13 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; @Slf4j @Testcontainers(disabledWithoutDocker = true) -@Tag("requiresDocker") +@Tag("isolatedTest") public class KafkaKeepAliveTests extends InstrumentationTest { public static final String TEST_GROUP_CONSUMER_ID = "TEST_GROUP_CONSUMER_ID"; public static final String HEARTBEAT_INTERVAL_MS_KEY = "heartbeat.interval.ms"; @@ -47,7 +47,7 @@ public class KafkaKeepAliveTests extends InstrumentationTest { @Container // see // https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility - private final KafkaContainer embeddedKafkaBroker = new KafkaContainer(SharedDockerImageNames.KAFKA); + private final ConfluentKafkaContainer embeddedKafkaBroker = new ConfluentKafkaContainer(SharedDockerImageNames.KAFKA); private KafkaTrafficCaptureSource kafkaSource; @@ -90,7 +90,6 @@ private void setupTestCase() throws Exception { } @Test - @Tag("longTest") public void testTimeoutsDontOccurForSlowPolls() throws Exception { var pollIntervalMs = Optional.ofNullable(kafkaProperties.get(KafkaTrafficCaptureSource.MAX_POLL_INTERVAL_KEY)) .map(s -> Integer.valueOf((String) s)) @@ -118,7 +117,6 @@ public void testTimeoutsDontOccurForSlowPolls() throws Exception { } @Test - @Tag("longTest") public void testBlockedReadsAndBrokenCommitsDontCauseReordering() throws Exception { for (int i = 0; i < 2; ++i) { KafkaTestUtils.produceKafkaRecord(testTopicName, kafkaProducer, 1 + i, sendCompleteCount).get(); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceLongTermTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceLongTermTest.java index b827b2d2c9..1dc2b9f769 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceLongTermTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceLongTermTest.java @@ -14,13 +14,13 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; @Slf4j @Testcontainers(disabledWithoutDocker = true) -@Tag("requiresDocker") +@Tag("isolatedTest") public class KafkaTrafficCaptureSourceLongTermTest extends InstrumentationTest { public static final int TEST_RECORD_COUNT = 10; @@ -30,7 +30,7 @@ public class KafkaTrafficCaptureSourceLongTermTest extends InstrumentationTest { @Container // see // https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility - private final KafkaContainer embeddedKafkaBroker = new KafkaContainer(SharedDockerImageNames.KAFKA); + private final ConfluentKafkaContainer embeddedKafkaBroker = new ConfluentKafkaContainer(SharedDockerImageNames.KAFKA); @Test @Tag("isolatedTest") From 23954bce933d84d7e205119dd878d146b5a561dd Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Tue, 25 Mar 2025 13:33:58 -0500 Subject: [PATCH 3/3] Extend timeout on KafkaTrafficCaptureSourceTest Signed-off-by: Andre Kurait --- .../migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java index 263fb5f191..b41343f6a0 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java @@ -39,7 +39,7 @@ class KafkaTrafficCaptureSourceTest extends InstrumentationTest { public static final int NUM_READ_ITEMS_BOUND = 1000; public static final String TEST_TOPIC_NAME = "TEST_TOPIC_NAME"; - private static final Duration TEST_TIMEOUT = Duration.ofSeconds(5); + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(30); @Test public void testRecordToString() {