Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -429,37 +429,48 @@ private static void exitOnLeaseTimeout(
log.atWarn().setMessage("Terminating RfsMigrateDocuments because the lease has expired for {}")
.addArgument(workItemId)
.log();
if (progressCursorRef.get() != null) {
log.atWarn().setMessage("Progress cursor set, cancelling active doc migration").log();
cancellationRunnable.run();
// Get a new progressCursor after cancellation for most up-to-date checkpoint
var progressCursor = progressCursorRef.get();
log.atWarn().setMessage("Progress cursor: {}")
.addArgument(progressCursor).log();
var workItemAndDuration = workItemRef.get();
if (workItemAndDuration == null) {
throw new IllegalStateException("Unexpected state with progressCursor set without a" +
"work item");
try {
if (progressCursorRef.get() != null) {
log.atWarn().setMessage("Progress cursor set, cancelling active doc migration").log();
cancellationRunnable.run();
// Get a new progressCursor after cancellation for most up-to-date checkpoint
var progressCursor = progressCursorRef.get();
log.atWarn().setMessage("Progress cursor: {}")
.addArgument(progressCursor).log();
var workItemAndDuration = workItemRef.get();
if (workItemAndDuration == null) {
throw new IllegalStateException("Unexpected state with progressCursor set without a" +
"work item");
}
log.atWarn().setMessage("Work Item and Duration: {}").addArgument(workItemAndDuration)
.log();
log.atWarn().setMessage("Work Item: {}").addArgument(workItemAndDuration.getWorkItem())
.log();
var successorWorkItemIds = getSuccessorWorkItemIds(workItemAndDuration, progressCursor);
if (successorWorkItemIds.size() == 1 && workItemId.equals(successorWorkItemIds.get(0))) {
log.atWarn().setMessage("No real progress was made for work item: {}. Will retry with larger timeout").addArgument(workItemId).log();
Comment on lines +450 to +451
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a strange case, it seems like the getSuccessorWorkItemIds should error out internally before returning up to this level. Can we rework this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getSuccessorWorkItemIds does error out, but this should really be a warn instead of an error.

The case here is that the lease is just long enough to send one request to the target cluster successfully, getSuccessorWorkItemIds does throw.

With the new try catch, this would be caught, but this isn't an "Error" case, more of a Warn which is why we shouldn't rely on that exception in getSuccessorWorkItemIds

} else {
log.atWarn().setMessage("Successor Work Ids: {}").addArgument(String.join(", ", successorWorkItemIds))
.log();
var successorNextAcquisitionLeaseExponent = getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, workItemAndDuration.getLeaseExpirationTime());
coordinator.createSuccessorWorkItemsAndMarkComplete(
workItemId,
successorWorkItemIds,
successorNextAcquisitionLeaseExponent,
contextSupplier
);
}
} else {
log.atWarn().setMessage("No progress cursor to create successor work items from. This can happen when" +
"downloading and unpacking shard takes longer than the lease").log();
log.atWarn().setMessage("Skipping creation of successor work item to retry the existing one with more time")
.log();
}
Comment on lines +463 to 468
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we invert the flow of control and return if the precondition fails right away?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we'd have only one 'level' of if/elseif/else blocks for each function, makes it much cleaner to read.

log.atWarn().setMessage("Work Item and Duration: {}").addArgument(workItemAndDuration)
.log();
log.atWarn().setMessage("Work Item: {}").addArgument(workItemAndDuration.getWorkItem())
.log();
var successorWorkItemIds = getSuccessorWorkItemIds(workItemAndDuration, progressCursor);
log.atWarn().setMessage("Successor Work Ids: {}").addArgument(String.join(", ", successorWorkItemIds))
.log();
var successorNextAcquisitionLeaseExponent = getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, workItemAndDuration.getLeaseExpirationTime());
coordinator.createSuccessorWorkItemsAndMarkComplete(
workItemId,
successorWorkItemIds,
successorNextAcquisitionLeaseExponent,
contextSupplier
);
} else {
log.atWarn().setMessage("No progress cursor to create successor work items from. This can happen when" +
"downloading and unpacking shard takes longer than the lease").log();
log.atWarn().setMessage("Skipping creation of successor work item to retry the existing one with more time")
.log();
} catch (Exception e) {
log.atError().setMessage("Exception during exit on lease timeout, clean shutdown failed")
.setCause(e).log();
cleanShutdownCompleted.set(false);
System.exit(PROCESS_TIMED_OUT_EXIT_CODE);
}
cleanShutdownCompleted.set(true);
System.exit(PROCESS_TIMED_OUT_EXIT_CODE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public class SourceTestBase {

@NotNull
protected static Process runAndMonitorProcess(ProcessBuilder processBuilder) throws IOException {
processBuilder.redirectErrorStream(true);
processBuilder.redirectOutput(ProcessBuilder.Redirect.PIPE);
var process = processBuilder.start();

log.atInfo().setMessage("Process started with ID: {}").addArgument(() -> process.toHandle().pid()).log();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import lombok.extern.slf4j.Slf4j;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.containers.wait.strategy.WaitStrategyTarget;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.lifecycle.Startable;

@Slf4j
Expand Down Expand Up @@ -47,7 +47,7 @@ public CaptureProxyContainer(final String destinationUri, final String kafkaUri)
this(() -> destinationUri, () -> kafkaUri);
}

public CaptureProxyContainer(final Container<?> destination, final KafkaContainer kafka) {
public CaptureProxyContainer(final Container<?> destination, final ConfluentKafkaContainer kafka) {
this(() -> getUriFromContainer(destination), () -> getUriFromContainer(kafka));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

import org.opensearch.migrations.testutils.SharedDockerImageNames;

import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.kafka.ConfluentKafkaContainer;

public class KafkaContainerTestBase extends TestContainerTestBase<KafkaContainer> {

private static final KafkaContainer kafka = new KafkaContainer(SharedDockerImageNames.KAFKA);
public class KafkaContainerTestBase extends TestContainerTestBase<ConfluentKafkaContainer> {

public KafkaContainer getContainer() {
private static final ConfluentKafkaContainer kafka = new ConfluentKafkaContainer(SharedDockerImageNames.KAFKA);

public ConfluentKafkaContainer getContainer() {
return kafka;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE })
@Tag("longTest")
@Tag("isolatedTest")
@Testcontainers(disabledWithoutDocker = true, parallel = true)
public @interface TestContainerTest {
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@
import org.junit.jupiter.api.parallel.ResourceLock;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;

@Slf4j
@Testcontainers(disabledWithoutDocker = true)
@WrapWithNettyLeakDetection(disableLeakChecks = true)
@Tag("requiresDocker")
@Tag("isolatedTest")
public class KafkaRestartingTrafficReplayerTest extends InstrumentationTest {
public static final int INITIAL_STOP_REPLAYER_REQUEST_COUNT = 1;
public static final String TEST_GROUP_CONSUMER_ID = "TEST_GROUP_CONSUMER_ID";
Expand All @@ -64,7 +64,7 @@ public class KafkaRestartingTrafficReplayerTest extends InstrumentationTest {
@Container
// see
// https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility
private final KafkaContainer embeddedKafkaBroker = new KafkaContainer(SharedDockerImageNames.KAFKA);
private final ConfluentKafkaContainer embeddedKafkaBroker = new ConfluentKafkaContainer(SharedDockerImageNames.KAFKA);

private static class CounterLimitedReceiverFactory implements Supplier<Consumer<SourceTargetCaptureTuple>> {
AtomicInteger nextStopPointRef = new AtomicInteger(INITIAL_STOP_REPLAYER_REQUEST_COUNT);
Expand All @@ -87,7 +87,6 @@ public Consumer<SourceTargetCaptureTuple> get() {

@ParameterizedTest
@CsvSource(value = { "3,false", "-1,false", "3,true", "-1,true", })
@Tag("longTest")
@ResourceLock("TrafficReplayerRunner")
public void fullTest(int testSize, boolean randomize) throws Throwable {
var random = new Random(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;

@Slf4j
@Testcontainers(disabledWithoutDocker = true)
@Tag("requiresDocker")
@Tag("isolatedTest")
public class KafkaCommitsWorkBetweenLongPollsTest extends InstrumentationTest {
private static final long DEFAULT_POLL_INTERVAL_MS = 1000;
private static final int NUM_RUNS = 5;
public static final String TEST_TOPIC_NAME = "test-topic";
@Container
// see
// https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility
private final KafkaContainer embeddedKafkaBroker = new KafkaContainer(SharedDockerImageNames.KAFKA);
private final ConfluentKafkaContainer embeddedKafkaBroker = new ConfluentKafkaContainer(SharedDockerImageNames.KAFKA);

@SneakyThrows
private KafkaConsumer<String, byte[]> buildKafkaConsumer() {
Expand All @@ -49,7 +49,6 @@ private KafkaConsumer<String, byte[]> buildKafkaConsumer() {
}

@Test
@Tag("longTest")
public void testThatCommitsAndReadsKeepWorking() throws Exception {
var kafkaSource = new KafkaTrafficCaptureSource(
rootContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;

@Slf4j
@Testcontainers(disabledWithoutDocker = true)
@Tag("requiresDocker")
@Tag("isolatedTest")
public class KafkaKeepAliveTests extends InstrumentationTest {
public static final String TEST_GROUP_CONSUMER_ID = "TEST_GROUP_CONSUMER_ID";
public static final String HEARTBEAT_INTERVAL_MS_KEY = "heartbeat.interval.ms";
Expand All @@ -47,7 +47,7 @@ public class KafkaKeepAliveTests extends InstrumentationTest {
@Container
// see
// https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility
private final KafkaContainer embeddedKafkaBroker = new KafkaContainer(SharedDockerImageNames.KAFKA);
private final ConfluentKafkaContainer embeddedKafkaBroker = new ConfluentKafkaContainer(SharedDockerImageNames.KAFKA);

private KafkaTrafficCaptureSource kafkaSource;

Expand Down Expand Up @@ -90,7 +90,6 @@ private void setupTestCase() throws Exception {
}

@Test
@Tag("longTest")
public void testTimeoutsDontOccurForSlowPolls() throws Exception {
var pollIntervalMs = Optional.ofNullable(kafkaProperties.get(KafkaTrafficCaptureSource.MAX_POLL_INTERVAL_KEY))
.map(s -> Integer.valueOf((String) s))
Expand Down Expand Up @@ -118,7 +117,6 @@ public void testTimeoutsDontOccurForSlowPolls() throws Exception {
}

@Test
@Tag("longTest")
public void testBlockedReadsAndBrokenCommitsDontCauseReordering() throws Exception {
for (int i = 0; i < 2; ++i) {
KafkaTestUtils.produceKafkaRecord(testTopicName, kafkaProducer, 1 + i, sendCompleteCount).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;

@Slf4j
@Testcontainers(disabledWithoutDocker = true)
@Tag("requiresDocker")
@Tag("isolatedTest")
public class KafkaTrafficCaptureSourceLongTermTest extends InstrumentationTest {

public static final int TEST_RECORD_COUNT = 10;
Expand All @@ -30,7 +30,7 @@ public class KafkaTrafficCaptureSourceLongTermTest extends InstrumentationTest {
@Container
// see
// https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility
private final KafkaContainer embeddedKafkaBroker = new KafkaContainer(SharedDockerImageNames.KAFKA);
private final ConfluentKafkaContainer embeddedKafkaBroker = new ConfluentKafkaContainer(SharedDockerImageNames.KAFKA);

@Test
@Tag("isolatedTest")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class KafkaTrafficCaptureSourceTest extends InstrumentationTest {
public static final int NUM_READ_ITEMS_BOUND = 1000;
public static final String TEST_TOPIC_NAME = "TEST_TOPIC_NAME";

private static final Duration TEST_TIMEOUT = Duration.ofSeconds(5);
private static final Duration TEST_TIMEOUT = Duration.ofSeconds(30);

@Test
public void testRecordToString() {
Expand Down
Loading