diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 0d6b750fbb..44ecbdc071 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -143,7 +143,7 @@ jobs: gradle-version: ${{ env.gradle-version }} gradle-home-cache-cleanup: true - name: Run Gradle tests with striping - run: gradle allTests -Dtest.striping.total=${{ env.gradle-test-parallelization }} -Dtest.striping.index=${{ matrix.index }} -x spotlessCheck --stacktrace --continue + run: gradle allTests -Dtest.striping.total=${{ env.gradle-test-parallelization }} -Dtest.striping.index=${{ matrix.index }} --max-workers 3 -x spotlessCheck --stacktrace --continue env: OS_MIGRATIONS_GRADLE_SCAN_TOS_AGREE_AND_ENABLED: '' - name: Detect Memory Dumps diff --git a/.github/workflows/full_pr_e2e_test.yml b/.github/workflows/full_pr_e2e_test.yml index c57719cd14..c7b03a10ed 100644 --- a/.github/workflows/full_pr_e2e_test.yml +++ b/.github/workflows/full_pr_e2e_test.yml @@ -19,6 +19,7 @@ jobs: uses: ./.github/workflows/require-approval.yml full-es68-e2e-aws-test: + if: github.repository == 'opensearch-project/opensearch-migrations' needs: [get-require-approval] environment: ${{ needs.get-require-approval.outputs.is-require-approval }} runs-on: ubuntu-latest diff --git a/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorTest.java b/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorTest.java index aad7506fce..b7f47430a0 100644 --- a/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorTest.java +++ b/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorTest.java @@ -25,9 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.params.ParameterizedTest; @@ -52,423 +50,387 @@ @Tag("isolatedTest") @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class WorkCoordinatorTest { - private SearchClusterContainer container; - private WorkCoordinatorFactory factory; - static Stream containerVersions() { return SupportedClusters.supportedSources(true).stream(); } public static final String DUMMY_FINISHED_DOC_ID = "dummy_finished_doc"; - private Supplier httpClientSupplier; - - @BeforeEach - void setupHttpClientSupplier() { - if (container != null) { - httpClientSupplier = () -> new CoordinateWorkHttpClient(ConnectionContextTestParams.builder() - .host(container.getUrl()) - .build() - .toConnectionContext()); - } - } - - @AfterEach - void tearDown() { - if (container != null) { - container.close(); - container = null; - } - if (httpClientSupplier != null) { - httpClientSupplier = null; + private record TestSetup( + SearchClusterContainer container, + WorkCoordinatorFactory factory, + Supplier httpClientSupplier + ) implements AutoCloseable { + @Override + public void close() { + if (container != null) { + container.close(); + } } - } - - void setupOpenSearchContainer(SearchClusterContainer.ContainerVersion version) throws Exception { - // Create a new container with the specified version - container = new SearchClusterContainer(version); - factory = new WorkCoordinatorFactory(container.getContainerVersion().getVersion()); - var testContext = WorkCoordinationTestContext.factory().noOtelTracking(); - // Start the container. This step might take some time... - container.start(); - httpClientSupplier = () -> new CoordinateWorkHttpClient(ConnectionContextTestParams.builder() - .host(container.getUrl()) - .build() - .toConnectionContext()); - try (var workCoordinator = factory.get(httpClientSupplier.get(), 2, "testWorker")) { - workCoordinator.setup(testContext::createCoordinationInitializationStateContext); + + static TestSetup create(SearchClusterContainer.ContainerVersion version) throws Exception { + var container = new SearchClusterContainer(version); + var factory = new WorkCoordinatorFactory(container.getContainerVersion().getVersion()); + var testContext = WorkCoordinationTestContext.factory().noOtelTracking(); + + container.start(); + + var httpClientSupplier = (Supplier) () -> + new CoordinateWorkHttpClient(ConnectionContextTestParams.builder() + .host(container.getUrl()) + .build() + .toConnectionContext()); + + try (var workCoordinator = factory.get(httpClientSupplier.get(), 2, "testWorker")) { + workCoordinator.setup(testContext::createCoordinationInitializationStateContext); + } + + return new TestSetup(container, factory, httpClientSupplier); } - } - - @SneakyThrows - private JsonNode searchForExpiredDocs(long expirationEpochSeconds) { - final var body = "{" - + OpenSearchWorkCoordinator.QUERY_INCOMPLETE_EXPIRED_ITEMS_STR.replace( - OpenSearchWorkCoordinator.OLD_EXPIRATION_THRESHOLD_TEMPLATE, - "" + expirationEpochSeconds - ) - + "}"; - log.atInfo().setMessage("Searching with... {}").addArgument(body).log(); - var response = httpClientSupplier.get() - .makeJsonRequest( - AbstractedHttpClient.GET_METHOD, - OpenSearchWorkCoordinator.INDEX_NAME + "/_search", - null, - body - ); + + @SneakyThrows + JsonNode searchForExpiredDocs(long expirationEpochSeconds) { + final var body = "{" + + OpenSearchWorkCoordinator.QUERY_INCOMPLETE_EXPIRED_ITEMS_STR.replace( + OpenSearchWorkCoordinator.OLD_EXPIRATION_THRESHOLD_TEMPLATE, + "" + expirationEpochSeconds + ) + + "}"; + log.atInfo().setMessage("Searching with... {}").addArgument(body).log(); + var response = httpClientSupplier.get() + .makeJsonRequest( + AbstractedHttpClient.GET_METHOD, + OpenSearchWorkCoordinator.INDEX_NAME + "/_search", + null, + body + ); - var objectMapper = new ObjectMapper(); - return objectMapper.readTree(response.getPayloadBytes()).path("hits"); + var objectMapper = new ObjectMapper(); + return objectMapper.readTree(response.getPayloadBytes()).path("hits"); + } } @ParameterizedTest @MethodSource("containerVersions") public void testAcquireLeaseHasNoUnnecessaryConflicts(SearchClusterContainer.ContainerVersion version) throws Exception { - setupOpenSearchContainer(version); - var testContext = WorkCoordinationTestContext.factory().withAllTracking(); - final var NUM_DOCS = 100; - try (var workCoordinator = factory.get(httpClientSupplier.get(), 3600, "docCreatorWorker")) { - Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); - for (var i = 0; i < NUM_DOCS; ++i) { - final var docId = "R" + i; - var newWorkItem = IWorkCoordinator.WorkItemAndDuration.WorkItem.valueFromWorkItemString(docId + "__0__0"); - workCoordinator.createUnassignedWorkItem(newWorkItem.toString(), testContext::createUnassignedWorkContext); + try (var setup = TestSetup.create(version)) { + var testContext = WorkCoordinationTestContext.factory().withAllTracking(); + final var NUM_DOCS = 100; + try (var workCoordinator = setup.factory.get(setup.httpClientSupplier.get(), 3600, "docCreatorWorker")) { + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + for (var i = 0; i < NUM_DOCS; ++i) { + final var docId = "R" + i; + var newWorkItem = IWorkCoordinator.WorkItemAndDuration.WorkItem.valueFromWorkItemString(docId + "__0__0"); + workCoordinator.createUnassignedWorkItem(newWorkItem.toString(), testContext::createUnassignedWorkContext); + } + Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); } - Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); - } - final var seenWorkerItems = new ConcurrentHashMap(); - final var expiration = Duration.ofSeconds(60); - for (int i = 0; i < NUM_DOCS; ++i) { - var label = "" + i; - getWorkItemAndVerify(testContext, label, seenWorkerItems, expiration, false, false); - } - try (var workCoordinator = factory.get(httpClientSupplier.get(), 3600, "finalPass")) { - var rval = workCoordinator.acquireNextWorkItem(expiration, testContext::createAcquireNextItemContext); - Assertions.assertInstanceOf(IWorkCoordinator.NoAvailableWorkToBeDone.class, rval); + final var seenWorkerItems = new ConcurrentHashMap(); + final var expiration = Duration.ofSeconds(60); + for (int i = 0; i < NUM_DOCS; ++i) { + var label = "" + i; + getWorkItemAndVerify(testContext, label, seenWorkerItems, expiration, false, false, setup); + } + try (var workCoordinator = setup.factory.get(setup.httpClientSupplier.get(), 3600, "finalPass")) { + var rval = workCoordinator.acquireNextWorkItem(expiration, testContext::createAcquireNextItemContext); + Assertions.assertInstanceOf(IWorkCoordinator.NoAvailableWorkToBeDone.class, rval); + } + var metrics = testContext.inMemoryInstrumentationBundle.getFinishedMetrics(); + Assertions.assertEquals(1, + InMemoryInstrumentationBundle.getMetricValueOrZero(metrics, "noNextWorkAvailableCount")); + Assertions.assertEquals(0, + InMemoryInstrumentationBundle.getMetricValueOrZero(metrics, "acquireNextWorkItemRetries")); + Assertions.assertEquals(NUM_DOCS, + InMemoryInstrumentationBundle.getMetricValueOrZero(metrics, "nextWorkAssignedCount")); + + Assertions.assertEquals(NUM_DOCS, seenWorkerItems.size()); } - var metrics = testContext.inMemoryInstrumentationBundle.getFinishedMetrics(); - Assertions.assertEquals(1, - InMemoryInstrumentationBundle.getMetricValueOrZero(metrics, "noNextWorkAvailableCount")); - Assertions.assertEquals(0, - InMemoryInstrumentationBundle.getMetricValueOrZero(metrics, "acquireNextWorkItemRetries")); - Assertions.assertEquals(NUM_DOCS, - InMemoryInstrumentationBundle.getMetricValueOrZero(metrics, "nextWorkAssignedCount")); - - Assertions.assertEquals(NUM_DOCS, seenWorkerItems.size()); } @ParameterizedTest @MethodSource("containerVersions") public void testAcquireLeaseForQueryInParallel(SearchClusterContainer.ContainerVersion version) throws Exception { - // Setup test container and context - setupOpenSearchContainer(version); - var testContext = WorkCoordinationTestContext.factory().withAllTracking(); - final int NUM_DOCS = 25; - final int MAX_RUNS = 2; - final Duration EXPIRATION = Duration.ofSeconds(10); - - // Make lease acquire calls in parallel across this many requests - var executor = Executors.newFixedThreadPool(5); - - // Create unassigned work items - try (var workCoordinator = factory.get(httpClientSupplier.get(), 3600, "docCreatorWorker")) { - Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); - List> creationFutures = - IntStream.range(0, NUM_DOCS) - .mapToObj(i -> CompletableFuture.supplyAsync(() -> { - try { - return workCoordinator.createUnassignedWorkItem("R__0__" + i, testContext::createUnassignedWorkContext); - } catch (IOException e) { - throw new RuntimeException(e); - } - })) - .toList(); - CompletableFuture.allOf(creationFutures.toArray(new CompletableFuture[0])).join(); - Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); - } + try (var setup = TestSetup.create(version)) { + var testContext = WorkCoordinationTestContext.factory().withAllTracking(); + final int NUM_DOCS = 25; + final int MAX_RUNS = 2; + final Duration EXPIRATION = Duration.ofSeconds(10); + + var executor = Executors.newFixedThreadPool(5); + + try (var workCoordinator = setup.factory.get(setup.httpClientSupplier.get(), 3600, "docCreatorWorker")) { + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + List> creationFutures = + IntStream.range(0, NUM_DOCS) + .mapToObj(i -> CompletableFuture.supplyAsync(() -> { + try { + return workCoordinator.createUnassignedWorkItem("R__0__" + i, testContext::createUnassignedWorkContext); + } catch (IOException e) { + throw new RuntimeException(e); + } + })) + .toList(); + CompletableFuture.allOf(creationFutures.toArray(new CompletableFuture[0])).join(); + Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + } - // Process work items in multiple runs - for (int run = 0; run < MAX_RUNS; run++) { - var seenWorkerItems = new ConcurrentHashMap(); - List> acquisitionFutures = new ArrayList<>(); - boolean markAsComplete = (run == MAX_RUNS - 1); - Instant runStart = Instant.now(); - - for (int i = 0; i < NUM_DOCS; i++) { - String label = run + "-" + i; - acquisitionFutures.add( - CompletableFuture.supplyAsync(() -> - getWorkItemAndVerify(testContext, label, seenWorkerItems, EXPIRATION, true, markAsComplete), - executor - ) + for (int run = 0; run < MAX_RUNS; run++) { + var seenWorkerItems = new ConcurrentHashMap(); + List> acquisitionFutures = new ArrayList<>(); + boolean markAsComplete = (run == MAX_RUNS - 1); + Instant runStart = Instant.now(); + + for (int i = 0; i < NUM_DOCS; i++) { + String label = run + "-" + i; + acquisitionFutures.add( + CompletableFuture.supplyAsync(() -> + getWorkItemAndVerify(testContext, label, seenWorkerItems, EXPIRATION, true, markAsComplete, setup), + executor + ) + ); + } + CompletableFuture.allOf(acquisitionFutures.toArray(new CompletableFuture[0])).get( + EXPIRATION.toMillis(), TimeUnit.MILLISECONDS ); - } - // Complete future failing if it takes longer than our expiration - // If a timeout occurs, the expiration may need to be increased to run on this setup - CompletableFuture.allOf(acquisitionFutures.toArray(new CompletableFuture[0])).get( - EXPIRATION.toMillis(), TimeUnit.MILLISECONDS - ); - Assertions.assertEquals(NUM_DOCS, seenWorkerItems.size(), "Not all work items were processed"); - - // Validate that no further work is available - try (var workCoordinator = factory.get(httpClientSupplier.get(), 3600, "NONE")) { - var nextWorkItem = workCoordinator.acquireNextWorkItem(EXPIRATION, testContext::createAcquireNextItemContext); - log.atInfo().setMessage("Next work item picked={}").addArgument(nextWorkItem).log(); - Assertions.assertInstanceOf(IWorkCoordinator.NoAvailableWorkToBeDone.class, nextWorkItem); - } catch (OpenSearchWorkCoordinator.PotentialClockDriftDetectedException e) { - log.atError().setCause(e) - .setMessage("Unexpected clock drift error. Got response: {}") - .addArgument(() -> searchForExpiredDocs(e.getTimestampEpochSeconds())) - .log(); - throw new AssertionError("Unexpected clock drift error.", e); + Assertions.assertEquals(NUM_DOCS, seenWorkerItems.size(), "Not all work items were processed"); + + try (var workCoordinator = setup.factory.get(setup.httpClientSupplier.get(), 3600, "NONE")) { + var nextWorkItem = workCoordinator.acquireNextWorkItem(EXPIRATION, testContext::createAcquireNextItemContext); + log.atInfo().setMessage("Next work item picked={}").addArgument(nextWorkItem).log(); + Assertions.assertInstanceOf(IWorkCoordinator.NoAvailableWorkToBeDone.class, nextWorkItem); + } catch (OpenSearchWorkCoordinator.PotentialClockDriftDetectedException e) { + log.atError().setCause(e) + .setMessage("Unexpected clock drift error. Got response: {}") + .addArgument(() -> setup.searchForExpiredDocs(e.getTimestampEpochSeconds())) + .log(); + throw new AssertionError("Unexpected clock drift error.", e); + } + + Instant runEnd = Instant.now(); + Duration elapsed = Duration.between(runStart, runEnd); + Assertions.assertFalse(elapsed.compareTo(EXPIRATION) > 0, + String.format("Test run elapsed duration %s exceeded EXPIRATION %s. Increase expiration duration.", elapsed, EXPIRATION)); + log.atInfo().setMessage("Test run duration {} with Expiration {}").addArgument(elapsed).addArgument(EXPIRATION).log(); + if (run < MAX_RUNS - 1) { + var sleepBetweenRuns = EXPIRATION.plus(Duration.ofSeconds(1)); + log.atInfo().setMessage("Sleeping for {}").addArgument(sleepBetweenRuns).log(); + Thread.sleep(sleepBetweenRuns.toMillis()); + } } - // Check elapsed time does not exceed the expiration duration - Instant runEnd = Instant.now(); - Duration elapsed = Duration.between(runStart, runEnd); - Assertions.assertFalse(elapsed.compareTo(EXPIRATION) > 0, - String.format("Test run elapsed duration %s exceeded EXPIRATION %s. Increase expiration duration.", elapsed, EXPIRATION)); - log.atInfo().setMessage("Test run duration {} with Expiration {}").addArgument(elapsed).addArgument(EXPIRATION).log(); - // Sleep between runs if needed, to elapse expiration - if (run < MAX_RUNS - 1) { - var sleepBetweenRuns = EXPIRATION.plus(Duration.ofSeconds(1)); - log.atInfo().setMessage("Sleeping for {}").addArgument(sleepBetweenRuns).log(); - Thread.sleep(sleepBetweenRuns.toMillis()); + try (var workCoordinator = setup.factory.get(setup.httpClientSupplier.get(), 3600, "docCreatorWorker")) { + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); } - } - // Final verification: all work items should be complete - try (var workCoordinator = factory.get(httpClientSupplier.get(), 3600, "docCreatorWorker")) { - Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + var metrics = testContext.inMemoryInstrumentationBundle.getFinishedMetrics(); + Assertions.assertNotEquals(0, + InMemoryInstrumentationBundle.getMetricValueOrZero(metrics, "acquireNextWorkItemRetries")); } - - var metrics = testContext.inMemoryInstrumentationBundle.getFinishedMetrics(); - Assertions.assertNotEquals(0, - InMemoryInstrumentationBundle.getMetricValueOrZero(metrics, "acquireNextWorkItemRetries")); } @ParameterizedTest @MethodSource("containerVersions") public void testAddSuccessorWorkItems(SearchClusterContainer.ContainerVersion version) throws Exception { - setupOpenSearchContainer(version); - var testContext = WorkCoordinationTestContext.factory().withAllTracking(); - final var NUM_DOCS = 20; - final var NUM_SUCCESSOR_ITEMS = 3; - try (var workCoordinator = factory.get(httpClientSupplier.get(), 3600, "docCreatorWorker")) { - Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); - for (var i = 0; i < NUM_DOCS; ++i) { - final var docId = "R__0__" + i; - workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext); + try (var setup = TestSetup.create(version)) { + var testContext = WorkCoordinationTestContext.factory().withAllTracking(); + final var NUM_DOCS = 20; + final var NUM_SUCCESSOR_ITEMS = 3; + try (var workCoordinator = setup.factory.get(setup.httpClientSupplier.get(), 3600, "docCreatorWorker")) { + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + for (var i = 0; i < NUM_DOCS; ++i) { + final var docId = "R__0__" + i; + workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext); + } + Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); } - Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); - } - try (var workCoordinator = factory.get(httpClientSupplier.get(), 3600, "claimItemWorker")) { - for (var i = 0; i < NUM_DOCS; ++i) { - String workItemId = getWorkItemAndVerify( - testContext, - "claimItemWorker", - new ConcurrentHashMap<>(), - Duration.ofSeconds(10), - false, - false - ); - var currentNumPendingItems = workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext); + try (var workCoordinator = setup.factory.get(setup.httpClientSupplier.get(), 3600, "claimItemWorker")) { + for (var i = 0; i < NUM_DOCS; ++i) { + String workItemId = getWorkItemAndVerify( + testContext, + "claimItemWorker", + new ConcurrentHashMap<>(), + Duration.ofSeconds(10), + false, + false, + setup + ); + var currentNumPendingItems = workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext); + + var successorWorkItems = new ArrayList(); + for (int j = 0; j < NUM_SUCCESSOR_ITEMS; j++) { + successorWorkItems.add("successor__" + i + "__" + j); + } - var successorWorkItems = new ArrayList(); - for (int j = 0; j < NUM_SUCCESSOR_ITEMS; j++) { - successorWorkItems.add("successor__" + i + "__" + j); + workCoordinator.createSuccessorWorkItemsAndMarkComplete( + workItemId, + successorWorkItems, + 0, + testContext::createSuccessorWorkItemsContext + ); + Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + + Assertions.assertEquals( + currentNumPendingItems + NUM_SUCCESSOR_ITEMS - 1, + workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext) + ); } - - workCoordinator.createSuccessorWorkItemsAndMarkComplete( - workItemId, - successorWorkItems, - 0, - testContext::createSuccessorWorkItemsContext - ); - Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); - Assertions.assertEquals( - currentNumPendingItems + NUM_SUCCESSOR_ITEMS - 1, + NUM_SUCCESSOR_ITEMS * NUM_DOCS, workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext) ); } - Assertions.assertEquals( - NUM_SUCCESSOR_ITEMS * NUM_DOCS, - workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext) - ); - } - // Now go claim NUM_DOCS * NUM_SUCCESSOR_ITEMS items to verify all were created and are claimable. - try (var workCoordinator = factory.get(httpClientSupplier.get(), 3600, "claimItemWorker")) { - for (var i = 0; i < NUM_DOCS * NUM_SUCCESSOR_ITEMS; ++i) { - getWorkItemAndVerify( - testContext, - "claimWorker_" + i, - new ConcurrentHashMap<>(), - Duration.ofSeconds(10), - false, - true - ); + try (var workCoordinator = setup.factory.get(setup.httpClientSupplier.get(), 3600, "claimItemWorker")) { + for (var i = 0; i < NUM_DOCS * NUM_SUCCESSOR_ITEMS; ++i) { + getWorkItemAndVerify( + testContext, + "claimWorker_" + i, + new ConcurrentHashMap<>(), + Duration.ofSeconds(10), + false, + true, + setup + ); + } + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); } - Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); } } @ParameterizedTest @MethodSource("containerVersions") public void testAddSuccessorWorkItemsSimultaneous(SearchClusterContainer.ContainerVersion version) throws Exception { - setupOpenSearchContainer(version); - var testContext = WorkCoordinationTestContext.factory().withAllTracking(); - final var NUM_DOCS = 20; - final var NUM_SUCCESSOR_ITEMS = 3; - var executorService = Executors.newFixedThreadPool(NUM_DOCS); - try (var workCoordinator = factory.get(httpClientSupplier.get(), 3600, "docCreatorWorker")) { - Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); - for (var i = 0; i < NUM_DOCS; ++i) { - final var docId = "R__0__" + i; - workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext); + try (var setup = TestSetup.create(version)) { + var testContext = WorkCoordinationTestContext.factory().withAllTracking(); + final var NUM_DOCS = 20; + final var NUM_SUCCESSOR_ITEMS = 3; + var executorService = Executors.newFixedThreadPool(NUM_DOCS); + try (var workCoordinator = setup.factory.get(setup.httpClientSupplier.get(), 3600, "docCreatorWorker")) { + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + for (var i = 0; i < NUM_DOCS; ++i) { + final var docId = "R__0__" + i; + workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext); + } + Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); } - Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); - } - final var seenWorkerItems = new ConcurrentHashMap(); - var allFutures = new ArrayList>(); - final var expiration = Duration.ofSeconds(5); - for (int i = 0; i < NUM_DOCS; ++i) { - int finalI = i; - allFutures.add( - CompletableFuture.supplyAsync( - () -> getWorkItemAndCompleteWithSuccessors(testContext, "successor__0__" + finalI, seenWorkerItems, expiration, true, NUM_SUCCESSOR_ITEMS), - executorService - ) - ); - } - CompletableFuture.allOf(allFutures.toArray(CompletableFuture[]::new)).join(); - Assertions.assertEquals(NUM_DOCS, seenWorkerItems.size()); - try (var workCoordinator = factory.get(httpClientSupplier.get(), 3600, "checkResults")) { - Assertions.assertEquals(NUM_SUCCESSOR_ITEMS * NUM_DOCS, workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext)); + final var seenWorkerItems = new ConcurrentHashMap(); + var allFutures = new ArrayList>(); + final var expiration = Duration.ofSeconds(5); + for (int i = 0; i < NUM_DOCS; ++i) { + int finalI = i; + allFutures.add( + CompletableFuture.supplyAsync( + () -> getWorkItemAndCompleteWithSuccessors(testContext, "successor__0__" + finalI, seenWorkerItems, expiration, true, NUM_SUCCESSOR_ITEMS, setup), + executorService + ) + ); + } + CompletableFuture.allOf(allFutures.toArray(CompletableFuture[]::new)).join(); + Assertions.assertEquals(NUM_DOCS, seenWorkerItems.size()); + try (var workCoordinator = setup.factory.get(setup.httpClientSupplier.get(), 3600, "checkResults")) { + Assertions.assertEquals(NUM_SUCCESSOR_ITEMS * NUM_DOCS, workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext)); + } } } @ParameterizedTest @MethodSource("containerVersions") public void testAddSuccessorWorkItemsPartiallyCompleted(SearchClusterContainer.ContainerVersion version) throws Exception { - setupOpenSearchContainer(version); - // A partially completed successor item will have a `successor_items` field and _some_ of the successor work items will be created - // but not all. This tests that the coordinator handles this case correctly by continuing to make the originally specific successor items. - var testContext = WorkCoordinationTestContext.factory().withAllTracking(); - var docId = "R0"; - var initialWorkItem = docId + "__0__0"; - var N_SUCCESSOR_ITEMS = 3; - var successorItems = (ArrayList) IntStream.range(1, N_SUCCESSOR_ITEMS + 1).mapToObj(i -> docId + "__0__" + i).collect(Collectors.toList()); - - var originalWorkItemExpiration = Duration.ofSeconds(5); - final var seenWorkerItems = new ConcurrentHashMap(); - - try (var workCoordinator = factory.get(httpClientSupplier.get(), 3600, "successorTest")) { - Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); - workCoordinator.createUnassignedWorkItem(initialWorkItem, testContext::createUnassignedWorkContext); - Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); - // Claim the work item - getWorkItemAndVerify(testContext, "successorTest", seenWorkerItems, originalWorkItemExpiration, false, false); - var client = httpClientSupplier.get(); - // Add the list of successors to the work item - var body = "{\"doc\": {\"successor_items\": \"" + String.join(",", successorItems) + "\"}}"; - var updatePath = workCoordinator.getPathForUpdates(initialWorkItem); - var response = client.makeJsonRequest("POST", updatePath, null, body); - Assertions.assertEquals(200, response.getStatusCode(), "Unexpected response " + response.toDiagnosticString()); - // Create a successor item and then claim it with a long lease. - workCoordinator.createUnassignedWorkItem(successorItems.get(0), testContext::createUnassignedWorkContext); - // Now, we should be able to claim the first successor item with a different worker id - // We should NOT be able to claim the other successor items yet (since they haven't been created yet) or the original item - String workItemId = getWorkItemAndVerify(testContext, "claimSuccessorItem", seenWorkerItems, Duration.ofSeconds(600), false, true); - Assertions.assertEquals(successorItems.get(0), workItemId); // We need to ensure that the item we just claimed is the expected one. - - // Sleep for the remainder of the original work item's lease so that it becomes available. - Thread.sleep(originalWorkItemExpiration.toMillis() + 1000); - - // Okay, we're now in a state where the only document available is the original, incomplete one. - // We need to make sure that if we try to acquire this work item, it will jump into `createSuccessorWorkItemsAndMarkComplete`, - // which we can verify because it should be completed successfully and have created the two missing items. - // After cleaning up the original, acquireNewWorkItem will re-run to claim a valid work item (one of the newly created successor items). - var nextSuccessorWorkItem = getWorkItemAndVerify(testContext, "cleanupOriginalAndClaimNextSuccessor", seenWorkerItems, originalWorkItemExpiration, false, true); - Assertions.assertTrue(successorItems.contains(nextSuccessorWorkItem)); - // Now: the original work item is completed, the first successor item is completed (a few lines above) and the second successor is completed (immediately above) - Assertions.assertEquals(N_SUCCESSOR_ITEMS - 2, workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext)); - - // Now, we should be able to claim the remaining successor items but the _next_ call should fail because there are no available items - for (int i = 1; i < (N_SUCCESSOR_ITEMS - 1); i++) { - workItemId = getWorkItemAndVerify(testContext, "claimItem_" + i, seenWorkerItems, originalWorkItemExpiration, false, true); - Assertions.assertTrue(successorItems.contains(workItemId)); + try (var setup = TestSetup.create(version)) { + var testContext = WorkCoordinationTestContext.factory().withAllTracking(); + var docId = "R0"; + var initialWorkItem = docId + "__0__0"; + var N_SUCCESSOR_ITEMS = 3; + var successorItems = (ArrayList) IntStream.range(1, N_SUCCESSOR_ITEMS + 1).mapToObj(i -> docId + "__0__" + i).collect(Collectors.toList()); + + var originalWorkItemExpiration = Duration.ofSeconds(5); + final var seenWorkerItems = new ConcurrentHashMap(); + + try (var workCoordinator = setup.factory.get(setup.httpClientSupplier.get(), 3600, "successorTest")) { + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + workCoordinator.createUnassignedWorkItem(initialWorkItem, testContext::createUnassignedWorkContext); + Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + getWorkItemAndVerify(testContext, "successorTest", seenWorkerItems, originalWorkItemExpiration, false, false, setup); + var client = setup.httpClientSupplier.get(); + var body = "{\"doc\": {\"successor_items\": \"" + String.join(",", successorItems) + "\"}}"; + var updatePath = workCoordinator.getPathForUpdates(initialWorkItem); + var response = client.makeJsonRequest("POST", updatePath, null, body); + Assertions.assertEquals(200, response.getStatusCode(), "Unexpected response " + response.toDiagnosticString()); + workCoordinator.createUnassignedWorkItem(successorItems.get(0), testContext::createUnassignedWorkContext); + String workItemId = getWorkItemAndVerify(testContext, "claimSuccessorItem", seenWorkerItems, Duration.ofSeconds(600), false, true, setup); + Assertions.assertEquals(successorItems.get(0), workItemId); + + Thread.sleep(originalWorkItemExpiration.toMillis() + 1000); + + var nextSuccessorWorkItem = getWorkItemAndVerify(testContext, "cleanupOriginalAndClaimNextSuccessor", seenWorkerItems, originalWorkItemExpiration, false, true, setup); + Assertions.assertTrue(successorItems.contains(nextSuccessorWorkItem)); + Assertions.assertEquals(N_SUCCESSOR_ITEMS - 2, workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext)); + + for (int i = 1; i < (N_SUCCESSOR_ITEMS - 1); i++) { + workItemId = getWorkItemAndVerify(testContext, "claimItem_" + i, seenWorkerItems, originalWorkItemExpiration, false, true, setup); + Assertions.assertTrue(successorItems.contains(workItemId)); + } + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + Assertions.assertThrows(NoWorkToBeDoneException.class, () -> { + getWorkItemAndVerify(testContext, "finalClaimItem", seenWorkerItems, originalWorkItemExpiration, false, false, setup); + }); + Assertions.assertEquals(N_SUCCESSOR_ITEMS + 1, seenWorkerItems.size()); } - Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); - Assertions.assertThrows(NoWorkToBeDoneException.class, () -> { - getWorkItemAndVerify(testContext, "finalClaimItem", seenWorkerItems, originalWorkItemExpiration, false, false); - }); - Assertions.assertEquals(N_SUCCESSOR_ITEMS + 1, seenWorkerItems.size()); } } - @ParameterizedTest @MethodSource("containerVersions") public void testAddSuccessorItemsFailsIfAlreadyDifferentSuccessorItems(SearchClusterContainer.ContainerVersion version) throws Exception { - setupOpenSearchContainer(version); - // A partially completed successor item will have a `successor_items` field and _some_ of the successor work items will be created - // but not all. This tests that the coordinator handles this case correctly by continuing to make the originally specific successor items. - var testContext = WorkCoordinationTestContext.factory().withAllTracking(); - var docId = "R0"; - var initialWorkItem = docId + "__0__0"; - var N_SUCCESSOR_ITEMS = 3; - var successorItems = (ArrayList) IntStream.range(0, N_SUCCESSOR_ITEMS).mapToObj(i -> docId + "_successor_" + i).collect(Collectors.toList()); - - var originalWorkItemExpiration = Duration.ofSeconds(5); - try (var workCoordinator = factory.get(httpClientSupplier.get(), 3600, "successorTest")) { - Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); - workCoordinator.createUnassignedWorkItem(initialWorkItem, testContext::createUnassignedWorkContext); - Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); - // Claim the work item - getWorkItemAndVerify(testContext, "successorTest", new ConcurrentHashMap<>(), originalWorkItemExpiration, false, false); - var client = httpClientSupplier.get(); - // Add an INCORRECT list of successors to the work item - var incorrectSuccessors = "successor_99,successor_98,successor_97"; - var body = "{\"doc\": {\"successor_items\": \"" + incorrectSuccessors + "\"}}"; - var updatePath = workCoordinator.getPathForUpdates(initialWorkItem); - var response = client.makeJsonRequest("POST", updatePath, null, body); - Assertions.assertEquals(200, response.getStatusCode(), "Unexpected response " + response.toDiagnosticString()); - - // Now attempt to go through with the correct successor item list - Assertions.assertThrows(IllegalStateException.class, - () -> workCoordinator.createSuccessorWorkItemsAndMarkComplete(docId, successorItems, 0, - testContext::createSuccessorWorkItemsContext)); + try (var setup = TestSetup.create(version)) { + var testContext = WorkCoordinationTestContext.factory().withAllTracking(); + var docId = "R0"; + var initialWorkItem = docId + "__0__0"; + var N_SUCCESSOR_ITEMS = 3; + var successorItems = (ArrayList) IntStream.range(0, N_SUCCESSOR_ITEMS).mapToObj(i -> docId + "_successor_" + i).collect(Collectors.toList()); + + var originalWorkItemExpiration = Duration.ofSeconds(5); + try (var workCoordinator = setup.factory.get(setup.httpClientSupplier.get(), 3600, "successorTest")) { + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + workCoordinator.createUnassignedWorkItem(initialWorkItem, testContext::createUnassignedWorkContext); + Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + getWorkItemAndVerify(testContext, "successorTest", new ConcurrentHashMap<>(), originalWorkItemExpiration, false, false, setup); + var client = setup.httpClientSupplier.get(); + var incorrectSuccessors = "successor_99,successor_98,successor_97"; + var body = "{\"doc\": {\"successor_items\": \"" + incorrectSuccessors + "\"}}"; + var updatePath = workCoordinator.getPathForUpdates(initialWorkItem); + var response = client.makeJsonRequest("POST", updatePath, null, body); + Assertions.assertEquals(200, response.getStatusCode(), "Unexpected response " + response.toDiagnosticString()); + + Assertions.assertThrows(IllegalStateException.class, + () -> workCoordinator.createSuccessorWorkItemsAndMarkComplete(docId, successorItems, 0, + testContext::createSuccessorWorkItemsContext)); + } } } - // Create a test where a work item tries to create itself as a successor -- it should fail and NOT be marked as complete. Another worker should pick it up and double the lease time. @ParameterizedTest @MethodSource("containerVersions") public void testCreatingSelfAsSuccessorWorkItemFails(SearchClusterContainer.ContainerVersion version) throws Exception { - setupOpenSearchContainer(version); - // A partially completed successor item will have a `successor_items` field and _some_ of the successor work items will be created - // but not all. This tests that the coordinator handles this case correctly by continuing to make the originally specific successor items. - var testContext = WorkCoordinationTestContext.factory().withAllTracking(); - var initialWorkItem = "R0__0__0"; - var successorItems = new ArrayList<>(List.of("R0__0__0", "R1__0__0", "R2__0__0")); - - try (var workCoordinator = factory.get(httpClientSupplier.get(), 3600, "successorTest")) { - Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); - workCoordinator.createUnassignedWorkItem(initialWorkItem, testContext::createUnassignedWorkContext); - Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); - // Claim the work item - getWorkItemAndVerify(testContext, "successorTest", new ConcurrentHashMap<>(), Duration.ofSeconds(5), false, false); - - // Now attempt to go through with the correct successor item list - Assertions.assertThrows(IllegalArgumentException.class, - () -> workCoordinator.createSuccessorWorkItemsAndMarkComplete(initialWorkItem, successorItems, - 0, - testContext::createSuccessorWorkItemsContext)); + try (var setup = TestSetup.create(version)) { + var testContext = WorkCoordinationTestContext.factory().withAllTracking(); + var initialWorkItem = "R0__0__0"; + var successorItems = new ArrayList<>(List.of("R0__0__0", "R1__0__0", "R2__0__0")); + + try (var workCoordinator = setup.factory.get(setup.httpClientSupplier.get(), 3600, "successorTest")) { + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + workCoordinator.createUnassignedWorkItem(initialWorkItem, testContext::createUnassignedWorkContext); + Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + getWorkItemAndVerify(testContext, "successorTest", new ConcurrentHashMap<>(), Duration.ofSeconds(5), false, false, setup); + + Assertions.assertThrows(IllegalArgumentException.class, + () -> workCoordinator.createSuccessorWorkItemsAndMarkComplete(initialWorkItem, successorItems, + 0, + testContext::createSuccessorWorkItemsContext)); + } } } @@ -479,7 +441,8 @@ private String getWorkItemAndCompleteWithSuccessors( ConcurrentHashMap seenWorkerItems, Duration expirationWindow, boolean placeFinishedDoc, - int numSuccessorItems + int numSuccessorItems, + TestSetup setup ) { var workItemId = getWorkItemAndVerify( testContext, @@ -487,14 +450,15 @@ private String getWorkItemAndCompleteWithSuccessors( seenWorkerItems, expirationWindow, placeFinishedDoc, - false + false, + setup ); ArrayList successorWorkItems = new ArrayList<>(); for (int j = 0; j < numSuccessorItems; j++) { // Replace "__" with "_" in workerId to create a unique name successorWorkItems.add(workItemId.replace("__", "_") + "__0__" + j); } - try (var workCoordinator = factory.get(httpClientSupplier.get(), 3600, workerName)) { + try (var workCoordinator = setup.factory.get(setup.httpClientSupplier.get(), 3600, workerName)) { workCoordinator.createSuccessorWorkItemsAndMarkComplete( workItemId, successorWorkItems, 0, @@ -506,7 +470,6 @@ private String getWorkItemAndCompleteWithSuccessors( return workItemId; } - class NoWorkToBeDoneException extends Exception { public NoWorkToBeDoneException() { super(); @@ -522,11 +485,12 @@ private String getWorkItemAndVerify( ConcurrentHashMap seenWorkerItems, Duration expirationWindow, boolean placeFinishedDoc, - boolean markCompleted + boolean markCompleted, + TestSetup setup ) { try ( - var workCoordinator = factory.get( - httpClientSupplier.get(), + var workCoordinator = setup.factory.get( + setup.httpClientSupplier.get(), 3600, workerName ) ) { @@ -536,7 +500,7 @@ private String getWorkItemAndVerify( workCoordinator.completeWorkItem(doneId, testContext::createCompleteWorkContext); } - final var oldNow = workCoordinator.getClock().instant(); // + final var oldNow = workCoordinator.getClock().instant(); return workCoordinator.acquireNextWorkItem(expirationWindow, testContext::createAcquireNextItemContext) .visit(new IWorkCoordinator.WorkAcquisitionOutcomeVisitor<>() { @Override @@ -573,10 +537,9 @@ public String onAcquiredWork(IWorkCoordinator.WorkItemAndDuration workItem) thro log.atError() .setCause(e) .setMessage("Unexpected clock drift error. Got response: {}") - .addArgument(() -> searchForExpiredDocs(e.getTimestampEpochSeconds())) + .addArgument(() -> setup.searchForExpiredDocs(e.getTimestampEpochSeconds())) .log(); throw e; } } - } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/KafkaRestartingTrafficReplayerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/KafkaRestartingTrafficReplayerTest.java index 0d139e7f66..d627370ed5 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/KafkaRestartingTrafficReplayerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/KafkaRestartingTrafficReplayerTest.java @@ -40,14 +40,14 @@ import org.junit.jupiter.api.parallel.ResourceLock; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; @Slf4j @Testcontainers(disabledWithoutDocker = true) @WrapWithNettyLeakDetection(disableLeakChecks = true) @Tag("requiresDocker") +@Tag("isolatedTest") public class KafkaRestartingTrafficReplayerTest extends InstrumentationTest { public static final int INITIAL_STOP_REPLAYER_REQUEST_COUNT = 1; public static final String TEST_GROUP_CONSUMER_ID = "TEST_GROUP_CONSUMER_ID"; @@ -61,10 +61,139 @@ public class KafkaRestartingTrafficReplayerTest extends InstrumentationTest { public static final Duration MAX_WAIT_TIME_FOR_TOPIC = Duration.ofMillis(PRODUCER_SLEEP_INTERVAL_MS * 2); public static final long DEFAULT_POLL_INTERVAL_MS = 5000; - @Container - // see - // https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility - private final KafkaContainer embeddedKafkaBroker = new KafkaContainer(SharedDockerImageNames.KAFKA); + private record TestSetup( + ConfluentKafkaContainer kafkaContainer + ) implements AutoCloseable { + @Override + public void close() throws Exception { + if (kafkaContainer != null) { + kafkaContainer.close(); + } + } + + static TestSetup create() { + ConfluentKafkaContainer kafkaContainer = new ConfluentKafkaContainer(SharedDockerImageNames.KAFKA); + kafkaContainer.start(); + return new TestSetup(kafkaContainer); + } + + @SneakyThrows + KafkaConsumer buildKafkaConsumer() { + var kafkaConsumerProps = KafkaTrafficCaptureSource.buildKafkaProperties( + kafkaContainer.getBootstrapServers(), + TEST_GROUP_CONSUMER_ID, + false, + null + ); + kafkaConsumerProps.setProperty("max.poll.interval.ms", DEFAULT_POLL_INTERVAL_MS + ""); + var kafkaConsumer = new KafkaConsumer(kafkaConsumerProps); + log.atInfo().setMessage("Just built KafkaConsumer={}").addArgument(kafkaConsumer).log(); + return kafkaConsumer; + } + + Producer buildKafkaProducer() { + var kafkaProps = new Properties(); + kafkaProps.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer" + ); + kafkaProps.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArraySerializer" + ); + // Property details: + // https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#delivery-timeout-ms + kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 10000); + kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); + kafkaProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000); + kafkaProps.put(ProducerConfig.CLIENT_ID_CONFIG, TEST_GROUP_PRODUCER_ID); + kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + try { + return new KafkaProducer(kafkaProps); + } catch (Exception e) { + log.atError().setCause(e).log(); + System.exit(1); + throw e; + } + } + + void loadStreamsToKafka(KafkaConsumer kafkaConsumer, Stream streams) + throws Exception { + var kafkaProducer = buildKafkaProducer(); + var counter = new AtomicInteger(); + loadStreamsAsynchronouslyWithCloseableResource( + kafkaConsumer, + streams, + s -> s.forEach( + trafficStream -> KafkaTestUtils.writeTrafficStreamRecord( + kafkaProducer, + trafficStream, + TEST_TOPIC_NAME, + "KEY_" + counter.incrementAndGet() + ) + ) + ); + Thread.sleep(PRODUCER_SLEEP_INTERVAL_MS); + } + + void loadStreamsAsynchronouslyWithCloseableResource( + KafkaConsumer kafkaConsumer, + R closeableResource, + Consumer loader + ) throws Exception { + try { + new Thread(() -> loader.accept(closeableResource)).start(); + var startTime = Instant.now(); + while (!kafkaConsumer.listTopics().isEmpty()) { + Thread.sleep(10); + Assertions.assertTrue( + Duration.between(startTime, Instant.now()).compareTo(MAX_WAIT_TIME_FOR_TOPIC) < 0 + ); + } + } finally { + closeableResource.close(); + } + } + + Supplier loadStreamsToKafkaFromCompressedFile( + TestContext rootCtx, + KafkaConsumer kafkaConsumer, + String filename, + int recordCount + ) throws Exception { + var kafkaProducer = buildKafkaProducer(); + loadStreamsAsynchronouslyWithCloseableResource( + kafkaConsumer, + new V0_1TrafficCaptureSource(rootCtx, filename), + originalTrafficSource -> { + try { + for (int i = 0; i < recordCount; ++i) { + List chunks = null; + chunks = originalTrafficSource.readNextTrafficStreamChunk(rootCtx::createReadChunkContext) + .get(); + for (int j = 0; j < chunks.size(); ++j) { + KafkaTestUtils.writeTrafficStreamRecord( + kafkaProducer, + chunks.get(j).getStream(), + TEST_TOPIC_NAME, + "KEY_" + i + "_" + j + ); + Thread.sleep(PRODUCER_SLEEP_INTERVAL_MS); + } + } + } catch (Exception e) { + throw Lombok.sneakyThrow(e); + } + } + ); + return () -> new KafkaTrafficCaptureSource( + rootCtx, + kafkaConsumer, + TEST_TOPIC_NAME, + Duration.ofMillis(DEFAULT_POLL_INTERVAL_MS) + ); + } + } private static class CounterLimitedReceiverFactory implements Supplier> { AtomicInteger nextStopPointRef = new AtomicInteger(INITIAL_STOP_REPLAYER_REQUEST_COUNT); @@ -90,165 +219,50 @@ public Consumer get() { @Tag("longTest") @ResourceLock("TrafficReplayerRunner") public void fullTest(int testSize, boolean randomize) throws Throwable { - var random = new Random(1); - try ( - var httpServer = SimpleNettyHttpServer.makeServer( - false, - Duration.ofMillis(2), - response -> TestHttpServerContext.makeResponse(random, response) - ) - ) { - var streamAndConsumer = ExhaustiveTrafficStreamGenerator.generateStreamAndSumOfItsTransactions( - TestContext.noOtelTracking(), - testSize, - randomize - ); - var trafficStreams = streamAndConsumer.stream.collect(Collectors.toList()); - log.atInfo().setMessage("{}") - .addArgument(() -> trafficStreams.stream() - .map(TrafficStreamUtils::summarizeTrafficStream) - .collect(Collectors.joining("\n")) + try (var setup = TestSetup.create()) { + var random = new Random(1); + try ( + var httpServer = SimpleNettyHttpServer.makeServer( + false, + Duration.ofMillis(2), + response -> TestHttpServerContext.makeResponse(random, response) ) - .log(); - - loadStreamsToKafka( - buildKafkaConsumer(), - Streams.concat(trafficStreams.stream(), Stream.of(SENTINEL_TRAFFIC_STREAM)) - ); - TrafficReplayerRunner.runReplayer( - streamAndConsumer.numHttpTransactions, - httpServer.localhostEndpoint(), - new CounterLimitedReceiverFactory(), - () -> TestContext.noOtelTracking(), - rootContext -> new SentinelSensingTrafficSource( - new KafkaTrafficCaptureSource( - rootContext, - buildKafkaConsumer(), - TEST_TOPIC_NAME, - Duration.ofMillis(DEFAULT_POLL_INTERVAL_MS) + ) { + var streamAndConsumer = ExhaustiveTrafficStreamGenerator.generateStreamAndSumOfItsTransactions( + TestContext.noOtelTracking(), + testSize, + randomize + ); + var trafficStreams = streamAndConsumer.stream.collect(Collectors.toList()); + log.atInfo().setMessage("{}") + .addArgument(() -> trafficStreams.stream() + .map(TrafficStreamUtils::summarizeTrafficStream) + .collect(Collectors.joining("\n")) ) - ) - ); - httpServer.close(); - log.info("done"); - } - } + .log(); - @SneakyThrows - private KafkaConsumer buildKafkaConsumer() { - var kafkaConsumerProps = KafkaTrafficCaptureSource.buildKafkaProperties( - embeddedKafkaBroker.getBootstrapServers(), - TEST_GROUP_CONSUMER_ID, - false, - null - ); - kafkaConsumerProps.setProperty("max.poll.interval.ms", DEFAULT_POLL_INTERVAL_MS + ""); - var kafkaConsumer = new KafkaConsumer(kafkaConsumerProps); - log.atInfo().setMessage("Just built KafkaConsumer={}").addArgument(kafkaConsumer).log(); - return kafkaConsumer; - } - - private void loadStreamsToKafka(KafkaConsumer kafkaConsumer, Stream streams) - throws Exception { - var kafkaProducer = buildKafkaProducer(); - var counter = new AtomicInteger(); - loadStreamsAsynchronouslyWithCloseableResource( - kafkaConsumer, - streams, - s -> s.forEach( - trafficStream -> KafkaTestUtils.writeTrafficStreamRecord( - kafkaProducer, - trafficStream, - TEST_TOPIC_NAME, - "KEY_" + counter.incrementAndGet() - ) - ) - ); - Thread.sleep(PRODUCER_SLEEP_INTERVAL_MS); - } - - private void loadStreamsAsynchronouslyWithCloseableResource( - KafkaConsumer kafkaConsumer, - R closeableResource, - Consumer loader - ) throws Exception { - try { - new Thread(() -> loader.accept(closeableResource)).start(); - var startTime = Instant.now(); - while (!kafkaConsumer.listTopics().isEmpty()) { - Thread.sleep(10); - Assertions.assertTrue( - Duration.between(startTime, Instant.now()).compareTo(MAX_WAIT_TIME_FOR_TOPIC) < 0 + var kafkaConsumer = setup.buildKafkaConsumer(); + setup.loadStreamsToKafka( + kafkaConsumer, + Streams.concat(trafficStreams.stream(), Stream.of(SENTINEL_TRAFFIC_STREAM)) + ); + TrafficReplayerRunner.runReplayer( + streamAndConsumer.numHttpTransactions, + httpServer.localhostEndpoint(), + new CounterLimitedReceiverFactory(), + () -> TestContext.noOtelTracking(), + rootContext -> new SentinelSensingTrafficSource( + new KafkaTrafficCaptureSource( + rootContext, + setup.buildKafkaConsumer(), + TEST_TOPIC_NAME, + Duration.ofMillis(DEFAULT_POLL_INTERVAL_MS) + ) + ) ); + httpServer.close(); + log.info("done"); } - } finally { - closeableResource.close(); } } - - Producer buildKafkaProducer() { - var kafkaProps = new Properties(); - kafkaProps.put( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.StringSerializer" - ); - kafkaProps.put( - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.ByteArraySerializer" - ); - // Property details: - // https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#delivery-timeout-ms - kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 10000); - kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); - kafkaProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000); - kafkaProps.put(ProducerConfig.CLIENT_ID_CONFIG, TEST_GROUP_PRODUCER_ID); - kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBootstrapServers()); - try { - return new KafkaProducer(kafkaProps); - } catch (Exception e) { - log.atError().setCause(e).log(); - System.exit(1); - throw e; - } - } - - private Supplier loadStreamsToKafkaFromCompressedFile( - TestContext rootCtx, - KafkaConsumer kafkaConsumer, - String filename, - int recordCount - ) throws Exception { - var kafkaProducer = buildKafkaProducer(); - loadStreamsAsynchronouslyWithCloseableResource( - kafkaConsumer, - new V0_1TrafficCaptureSource(rootCtx, filename), - originalTrafficSource -> { - try { - for (int i = 0; i < recordCount; ++i) { - List chunks = null; - chunks = originalTrafficSource.readNextTrafficStreamChunk(rootCtx::createReadChunkContext) - .get(); - for (int j = 0; j < chunks.size(); ++j) { - KafkaTestUtils.writeTrafficStreamRecord( - kafkaProducer, - chunks.get(j).getStream(), - TEST_TOPIC_NAME, - "KEY_" + i + "_" + j - ); - Thread.sleep(PRODUCER_SLEEP_INTERVAL_MS); - } - } - } catch (Exception e) { - throw Lombok.sneakyThrow(e); - } - } - ); - return () -> new KafkaTrafficCaptureSource( - rootCtx, - kafkaConsumer, - TEST_TOPIC_NAME, - Duration.ofMillis(DEFAULT_POLL_INTERVAL_MS) - ); - } - } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaKeepAliveTests.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaKeepAliveTests.java index a87c75c0ab..2f8fd3abf8 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaKeepAliveTests.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaKeepAliveTests.java @@ -21,16 +21,15 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.Producer; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; @Slf4j @Testcontainers(disabledWithoutDocker = true) @Tag("requiresDocker") +@Tag("isolatedTest") public class KafkaKeepAliveTests extends InstrumentationTest { public static final String TEST_GROUP_CONSUMER_ID = "TEST_GROUP_CONSUMER_ID"; public static final String HEARTBEAT_INTERVAL_MS_KEY = "heartbeat.interval.ms"; @@ -38,34 +37,41 @@ public class KafkaKeepAliveTests extends InstrumentationTest { public static final long HEARTBEAT_INTERVAL_MS = 300; public static final String testTopicName = "TEST_TOPIC"; - Producer kafkaProducer; - AtomicInteger sendCompleteCount; - Properties kafkaProperties; - BlockingTrafficSource trafficSource; - ArrayList keysReceived; - - @Container - // see - // https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility - private final KafkaContainer embeddedKafkaBroker = new KafkaContainer(SharedDockerImageNames.KAFKA); - - private KafkaTrafficCaptureSource kafkaSource; - - /** - * Set up the test case where we've produced and received 1 message, but have not yet committed it. - * Another message is in the process of being produced. - * The BlockingTrafficSource is blocked on everything after a point before the beginning of the test. - * @throws Exception - */ - @BeforeEach - private void setupTestCase() throws Exception { - kafkaProducer = KafkaTestUtils.buildKafkaProducer(embeddedKafkaBroker.getBootstrapServers()); - this.sendCompleteCount = new AtomicInteger(0); + private record TestSetup( + ConfluentKafkaContainer kafkaContainer, + Producer kafkaProducer, + AtomicInteger sendCompleteCount, + Properties kafkaProperties, + KafkaTrafficCaptureSource kafkaSource, + BlockingTrafficSource trafficSource, + ArrayList keysReceived + ) implements AutoCloseable { + @Override + public void close() throws Exception { + if (trafficSource != null) { + trafficSource.close(); + } + if (kafkaProducer != null) { + kafkaProducer.close(); + } + if (kafkaContainer != null) { + kafkaContainer.close(); + } + } + } + + private TestSetup setupTestCase() throws Exception { + // Create and start a new Kafka container for this test + ConfluentKafkaContainer kafkaContainer = new ConfluentKafkaContainer(SharedDockerImageNames.KAFKA); + kafkaContainer.start(); + + Producer kafkaProducer = KafkaTestUtils.buildKafkaProducer(kafkaContainer.getBootstrapServers()); + AtomicInteger sendCompleteCount = new AtomicInteger(0); KafkaTestUtils.produceKafkaRecord(testTopicName, kafkaProducer, 0, sendCompleteCount).get(); Assertions.assertEquals(1, sendCompleteCount.get()); - this.kafkaProperties = KafkaTrafficCaptureSource.buildKafkaProperties( - embeddedKafkaBroker.getBootstrapServers(), + Properties kafkaProperties = KafkaTrafficCaptureSource.buildKafkaProperties( + kafkaContainer.getBootstrapServers(), TEST_GROUP_CONSUMER_ID, false, null @@ -76,97 +82,103 @@ private void setupTestCase() throws Exception { kafkaProperties.put(HEARTBEAT_INTERVAL_MS_KEY, HEARTBEAT_INTERVAL_MS + ""); kafkaProperties.put("max.poll.records", 1); var kafkaConsumer = new KafkaConsumer(kafkaProperties); - this.kafkaSource = new KafkaTrafficCaptureSource( + KafkaTrafficCaptureSource kafkaSource = new KafkaTrafficCaptureSource( rootContext, kafkaConsumer, testTopicName, Duration.ofMillis(MAX_POLL_INTERVAL_MS) ); - this.trafficSource = new BlockingTrafficSource(kafkaSource, Duration.ZERO); - this.keysReceived = new ArrayList<>(); + BlockingTrafficSource trafficSource = new BlockingTrafficSource(kafkaSource, Duration.ZERO); + ArrayList keysReceived = new ArrayList<>(); readNextNStreams(rootContext, trafficSource, keysReceived, 0, 1); KafkaTestUtils.produceKafkaRecord(testTopicName, kafkaProducer, 1, sendCompleteCount); + + return new TestSetup(kafkaContainer, kafkaProducer, sendCompleteCount, kafkaProperties, kafkaSource, trafficSource, keysReceived); } + @Test @Tag("longTest") public void testTimeoutsDontOccurForSlowPolls() throws Exception { - var pollIntervalMs = Optional.ofNullable(kafkaProperties.get(KafkaTrafficCaptureSource.MAX_POLL_INTERVAL_KEY)) - .map(s -> Integer.valueOf((String) s)) - .orElseThrow(); - var executor = Executors.newSingleThreadScheduledExecutor(); - executor.schedule(() -> { + try (TestSetup setup = setupTestCase()) { + var pollIntervalMs = Optional.ofNullable(setup.kafkaProperties.get(KafkaTrafficCaptureSource.MAX_POLL_INTERVAL_KEY)) + .map(s -> Integer.valueOf((String) s)) + .orElseThrow(); + var executor = Executors.newSingleThreadScheduledExecutor(); try { - var k = keysReceived.get(0); - log.info("Calling commit traffic stream for " + k); - trafficSource.commitTrafficStream(k); - log.info("finished committing traffic stream"); - log.info("Stop reads to infinity"); - // this is a way to signal back to the main thread that this thread is done - KafkaTestUtils.produceKafkaRecord(testTopicName, kafkaProducer, 2, sendCompleteCount); - } catch (Exception e) { - throw Lombok.sneakyThrow(e); + executor.schedule(() -> { + try { + var k = setup.keysReceived.get(0); + log.info("Calling commit traffic stream for " + k); + setup.trafficSource.commitTrafficStream(k); + log.info("finished committing traffic stream"); + log.info("Stop reads to infinity"); + // this is a way to signal back to the main thread that this thread is done + KafkaTestUtils.produceKafkaRecord(testTopicName, setup.kafkaProducer, 2, setup.sendCompleteCount); + } catch (Exception e) { + throw Lombok.sneakyThrow(e); + } + }, pollIntervalMs, TimeUnit.MILLISECONDS); + + readNextNStreams(rootContext, setup.trafficSource, setup.keysReceived, 1, 2); + Assertions.assertEquals(3, setup.keysReceived.size()); + } finally { + executor.shutdownNow(); } - }, pollIntervalMs, TimeUnit.MILLISECONDS); - - // wait for 2 messages so that they include the last one produced by the async schedule call previously - readNextNStreams(rootContext, trafficSource, keysReceived, 1, 2); - Assertions.assertEquals(3, keysReceived.size()); - // At this point, we've read all (3) messages produced , committed the first one - // (all the way through to Kafka), and no commits are in-flight yet for the last two messages. + } } @Test @Tag("longTest") public void testBlockedReadsAndBrokenCommitsDontCauseReordering() throws Exception { - for (int i = 0; i < 2; ++i) { - KafkaTestUtils.produceKafkaRecord(testTopicName, kafkaProducer, 1 + i, sendCompleteCount).get(); + try (TestSetup setup = setupTestCase()) { + for (int i = 0; i < 2; ++i) { + KafkaTestUtils.produceKafkaRecord(testTopicName, setup.kafkaProducer, 1 + i, setup.sendCompleteCount).get(); + } + readNextNStreams(rootContext, setup.trafficSource, setup.keysReceived, 1, 1); + + setup.trafficSource.commitTrafficStream(setup.keysReceived.get(0)); + log.info( + "Called commitTrafficStream but waiting long enough for the client to leave the group. " + + "That will make the previous commit a 'zombie-commit' that should easily be dropped." + ); + + log.info( + "1 message was committed, but not synced, 1 message is being processed." + + "wait long enough to fall out of the group before we can commit" + ); + Thread.sleep(2 * MAX_POLL_INTERVAL_MS); + + // Save the first key for later use + var firstKey = setup.keysReceived.get(0); + setup.keysReceived.clear(); + + log.info("re-establish a client connection so that the following commit will work"); + log.atInfo().setMessage("1 ...{}").addArgument(() -> setup.kafkaSource.trackingKafkaConsumer.nextCommitsToString()).log(); + readNextNStreams(rootContext, setup.trafficSource, setup.keysReceived, 0, 1); + log.atInfo().setMessage("2 ...{}").addArgument(() -> setup.kafkaSource.trackingKafkaConsumer.nextCommitsToString()).log(); + + log.info("wait long enough to fall out of the group again"); + Thread.sleep(2 * MAX_POLL_INTERVAL_MS); + + var keysReceivedUntilDrop2 = new ArrayList<>(setup.keysReceived); + setup.keysReceived.clear(); + log.atInfo().setMessage("re-establish... 3 ...{}").addArgument(() -> setup.kafkaSource.trackingKafkaConsumer.nextCommitsToString()).log(); + readNextNStreams(rootContext, setup.trafficSource, setup.keysReceived, 0, 1); + // Use the second key we've read + var secondKey = setup.keysReceived.get(0); + setup.trafficSource.commitTrafficStream(secondKey); + log.atInfo().setMessage("re-establish... 4 ...{}").addArgument(() -> setup.kafkaSource.trackingKafkaConsumer.nextCommitsToString()).log(); + readNextNStreams(rootContext, setup.trafficSource, setup.keysReceived, 1, 1); + log.atInfo().setMessage("5 ...{}").addArgument(() -> setup.kafkaSource.trackingKafkaConsumer.nextCommitsToString()).log(); + + Thread.sleep(2 * MAX_POLL_INTERVAL_MS); + var keysReceivedUntilDrop3 = new ArrayList<>(setup.keysReceived); + setup.keysReceived.clear(); + readNextNStreams(rootContext, setup.trafficSource, setup.keysReceived, 0, 3); + log.atInfo().setMessage("6 ...{}").addArgument(() -> setup.kafkaSource.trackingKafkaConsumer.nextCommitsToString()).log(); } - readNextNStreams(rootContext, trafficSource, keysReceived, 1, 1); - - trafficSource.commitTrafficStream(keysReceived.get(0)); - log.info( - "Called commitTrafficStream but waiting long enough for the client to leave the group. " - + "That will make the previous commit a 'zombie-commit' that should easily be dropped." - ); - - log.info( - "1 message was committed, but not synced, 1 message is being processed." - + "wait long enough to fall out of the group before we can commit" - ); - Thread.sleep(2 * MAX_POLL_INTERVAL_MS); - - var keysReceivedUntilDrop1 = keysReceived; - keysReceived = new ArrayList<>(); - - log.info("re-establish a client connection so that the following commit will work"); - log.atInfo().setMessage("1 ...{}").addArgument(this::renderNextCommitsAsString).log(); - readNextNStreams(rootContext, trafficSource, keysReceived, 0, 1); - log.atInfo().setMessage("2 ...{}").addArgument(this::renderNextCommitsAsString).log(); - - log.info("wait long enough to fall out of the group again"); - Thread.sleep(2 * MAX_POLL_INTERVAL_MS); - - var keysReceivedUntilDrop2 = keysReceived; - keysReceived = new ArrayList<>(); - log.atInfo().setMessage("re-establish... 3 ...{}").addArgument(this::renderNextCommitsAsString).log(); - readNextNStreams(rootContext, trafficSource, keysReceived, 0, 1); - trafficSource.commitTrafficStream(keysReceivedUntilDrop1.get(1)); - log.atInfo().setMessage("re-establish... 4 ...{}").addArgument(this::renderNextCommitsAsString).log(); - readNextNStreams(rootContext, trafficSource, keysReceived, 1, 1); - log.atInfo().setMessage("5 ...{}").addArgument(this::renderNextCommitsAsString).log(); - - Thread.sleep(2 * MAX_POLL_INTERVAL_MS); - var keysReceivedUntilDrop3 = keysReceived; - keysReceived = new ArrayList<>(); - readNextNStreams(rootContext, trafficSource, keysReceived, 0, 3); - log.atInfo().setMessage("6 ...{}").addArgument(kafkaSource.trackingKafkaConsumer::nextCommitsToString).log(); - trafficSource.close(); - } - - private String renderNextCommitsAsString() { - return kafkaSource.trackingKafkaConsumer.nextCommitsToString(); } @SneakyThrows diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java index 263fb5f191..9fbeb6b421 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java @@ -39,7 +39,7 @@ class KafkaTrafficCaptureSourceTest extends InstrumentationTest { public static final int NUM_READ_ITEMS_BOUND = 1000; public static final String TEST_TOPIC_NAME = "TEST_TOPIC_NAME"; - private static final Duration TEST_TIMEOUT = Duration.ofSeconds(5); + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(15); @Test public void testRecordToString() {