Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
Expand Down Expand Up @@ -66,6 +67,7 @@
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.coordinator.WorkerStateChangeListener;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
Expand All @@ -89,7 +91,9 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
Expand All @@ -113,7 +117,7 @@
Consumes data from the specified AWS Kinesis stream and outputs a FlowFile for every processed Record (raw)
or a FlowFile for a batch of processed records if a Record Reader and Record Writer are configured.
The processor may take a few minutes on the first start and several seconds on subsequent starts
to initialise before starting to fetch data.
to initialize before starting to fetch data.
Uses DynamoDB for check pointing and coordination, and (optional) CloudWatch for metrics.
""")
@WritesAttributes({
Expand Down Expand Up @@ -163,6 +167,11 @@ public class ConsumeKinesis extends AbstractProcessor {
private static final int KINESIS_HTTP_CLIENT_WINDOW_SIZE_BYTES = 512 * 1024; // 512 KiB
private static final Duration KINESIS_HTTP_HEALTH_CHECK_PERIOD = Duration.ofMinutes(1);

/**
* Using a large enough value to ensure we don't wait infinitely for the initialization.
* Actual initialization shouldn't take that long.
*/
private static final Duration KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT = Duration.ofMinutes(15);
private static final Duration KINESIS_SCHEDULER_GRACEFUL_SHUTDOWN_TIMEOUT = Duration.ofMinutes(3);

static final PropertyDescriptor STREAM_NAME = new PropertyDescriptor.Builder()
Expand Down Expand Up @@ -393,9 +402,11 @@ public void setup(final ProcessContext context) {

final MetricsFactory metricsFactory = configureMetricsFactory(context);

final InitializationStateChangeListener initializationListener = new InitializationStateChangeListener(getLogger());

kinesisScheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.coordinatorConfig().workerStateChangeListener(initializationListener),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig().metricsFactory(metricsFactory),
Expand All @@ -409,8 +420,32 @@ public void setup(final ProcessContext context) {
schedulerThread.start();
// The thread is stopped when kinesisScheduler is shutdown in the onStopped method.

getLogger().info("Started Kinesis Scheduler for stream [{}] with application name [{}] and workerId [{}]",
streamName, applicationName, workerId);
final InitializationResult result;
try {
result = initializationListener.result().get(KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT.getSeconds(), SECONDS);
} catch (final InterruptedException | ExecutionException | TimeoutException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
cleanUpState();
throw new ProcessException(e);
}

switch (result) {
case InitializationResult.Success ignored ->
getLogger().info(
"Started Kinesis Scheduler for stream [{}] with application name [{}] and workerId [{}]",
streamName, applicationName, workerId);
case InitializationResult.Failure failure -> {
cleanUpState();

final ProcessException ex = failure.error()
.map(err -> new ProcessException("Failed to initialize the processor.", err))
.orElseGet(() -> new ProcessException("Failed to initialize the processor due to an unknown failure. Check application logs for more details."));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This branch is active only when a scheduler was shutdown, but no initialization error was provided. However, I didn't observe this behavior while testing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that is the case, it seems worth mentioning. Mentioning "check the application logs for more details" is not helpful because it already appears in the logs, so the message should be shortened.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason behind "check the application logs for more details" is that the error text is visible in a bulletin.
When an initialization error is present, it's logged and, therefore available in the bulletin too.

When error is not available for some reason, the bulletin will just mention that initialization failed. However, additional details may be present in application logs, which aren't available in NiFi canvas.

Is it expected for the NiFi users to inspect application logs as well? If so, then the "check the application logs for more details" part is not needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bulletin messages are limited in nature, and often require checking logs as a general practice. More recent adjustments have introduced stack trace visibility for Bulletins, but checking the logs is almost always necessary, so including the statement in a message is redundant.


throw ex;
}
}
}

/**
Expand Down Expand Up @@ -510,10 +545,15 @@ private String generateWorkerId() {

@OnStopped
public void onStopped() {
cleanUpState();
}

private void cleanUpState() {
if (kinesisScheduler != null) {
shutdownScheduler();
kinesisScheduler = null;
}

if (kinesisClient != null) {
kinesisClient.close();
kinesisClient = null;
Expand All @@ -532,6 +572,10 @@ public void onStopped() {
}

private void shutdownScheduler() {
if (kinesisScheduler.shutdownComplete()) {
return;
}

final long start = System.nanoTime();
getLogger().debug("Shutting down Kinesis Scheduler");

Expand Down Expand Up @@ -684,6 +728,49 @@ public void shutdownRequested(final ShutdownRequestedInput shutdownRequestedInpu
}
}

private static final class InitializationStateChangeListener implements WorkerStateChangeListener {

private final ComponentLog logger;

private final CompletableFuture<InitializationResult> resultFuture = new CompletableFuture<>();

private volatile @Nullable Throwable initializationFailure;

InitializationStateChangeListener(final ComponentLog logger) {
this.logger = logger;
}

@Override
public void onWorkerStateChange(final WorkerState newState) {
logger.info("Processor state changed to: {}", newState);

if (newState == WorkerState.STARTED) {
resultFuture.complete(new InitializationResult.Success());
} else if (newState == WorkerState.SHUT_DOWN) {
resultFuture.complete(new InitializationResult.Failure(Optional.ofNullable(initializationFailure)));
}
}

@Override
public void onAllInitializationAttemptsFailed(final Throwable e) {
// This method is called before the SHUT_DOWN_STARTED phase.
// Memorizing the error until the Scheduler is SHUT_DOWN.
initializationFailure = e;
}

Future<InitializationResult> result() {
return resultFuture;
}
}

private sealed interface InitializationResult {
record Success() implements InitializationResult {
}

record Failure(Optional<Throwable> error) implements InitializationResult {
}
}

enum ProcessingStrategy implements DescribedValue {
FLOW_FILE("Write one FlowFile for each consumed Kinesis Record"),
RECORD("Write one FlowFile containing multiple consumed Kinesis Records processed with Record Reader and Record Writer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
package org.apache.nifi.processors.aws.kinesis;

import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
import org.apache.nifi.processors.aws.region.RegionUtil;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -30,6 +34,8 @@
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.REL_PARSE_FAILURE;
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.REL_SUCCESS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

class ConsumeKinesisTest {

Expand Down Expand Up @@ -57,4 +63,42 @@ void getRelationshipsForRecordProcessingStrategy() {

assertEquals(Set.of(REL_SUCCESS, REL_PARSE_FAILURE), relationships);
}

@Test
void failInitializationWithInvalidValues() {
// With dummy values KCL Scheduler initialization will fail.
setDummyValues(testRunner);

// Using the processor object to avoid error wrapping by testRunner.
final ConsumeKinesis consumeKinesis = (ConsumeKinesis) testRunner.getProcessor();
final ProcessException ex = assertThrows(
ProcessException.class,
() -> consumeKinesis.setup(testRunner.getProcessContext()));

assertEquals("Failed to initialize the processor.", ex.getMessage());
assertNotNull(ex.getCause());
}

private static void setDummyValues(final TestRunner runner) {
final AWSCredentialsProviderControllerService credentialsService = new AWSCredentialsProviderControllerService();
try {
runner.addControllerService("credentials", credentialsService);
} catch (final InitializationException e) {
throw new RuntimeException(e);
}
runner.setProperty(credentialsService, AWSCredentialsProviderControllerService.ACCESS_KEY_ID, "123");
runner.setProperty(credentialsService, AWSCredentialsProviderControllerService.SECRET_KEY, "123");
runner.enableControllerService(credentialsService);

runner.setProperty(ConsumeKinesis.AWS_CREDENTIALS_PROVIDER_SERVICE, "credentials");
runner.setProperty(ConsumeKinesis.STREAM_NAME, "stream");
runner.setProperty(ConsumeKinesis.APPLICATION_NAME, "application");
runner.setProperty(RegionUtil.REGION, "us-west-2");
runner.setProperty(ConsumeKinesis.INITIAL_STREAM_POSITION, ConsumeKinesis.InitialPosition.TRIM_HORIZON);
runner.setProperty(ConsumeKinesis.PROCESSING_STRATEGY, ConsumeKinesis.ProcessingStrategy.FLOW_FILE);

runner.setProperty(ConsumeKinesis.METRICS_PUBLISHING, ConsumeKinesis.MetricsPublishing.CLOUDWATCH);

runner.setProperty(ConsumeKinesis.MAX_BYTES_TO_BUFFER, "10 MB");
}
}
Loading