3131import org .apache .nifi .components .PropertyDescriptor ;
3232import org .apache .nifi .controller .NodeTypeProvider ;
3333import org .apache .nifi .flowfile .FlowFile ;
34+ import org .apache .nifi .logging .ComponentLog ;
3435import org .apache .nifi .processor .AbstractProcessor ;
3536import org .apache .nifi .processor .DataUnit ;
3637import org .apache .nifi .processor .ProcessContext ;
6667import software .amazon .kinesis .common .InitialPositionInStream ;
6768import software .amazon .kinesis .common .InitialPositionInStreamExtended ;
6869import software .amazon .kinesis .coordinator .Scheduler ;
70+ import software .amazon .kinesis .coordinator .WorkerStateChangeListener ;
6971import software .amazon .kinesis .lifecycle .events .InitializationInput ;
7072import software .amazon .kinesis .lifecycle .events .LeaseLostInput ;
7173import software .amazon .kinesis .lifecycle .events .ProcessRecordsInput ;
8991import java .util .Optional ;
9092import java .util .Set ;
9193import java .util .UUID ;
94+ import java .util .concurrent .CompletableFuture ;
9295import java .util .concurrent .ExecutionException ;
96+ import java .util .concurrent .Future ;
9397import java .util .concurrent .TimeoutException ;
9498
9599import static java .util .concurrent .TimeUnit .NANOSECONDS ;
113117 Consumes data from the specified AWS Kinesis stream and outputs a FlowFile for every processed Record (raw)
114118 or a FlowFile for a batch of processed records if a Record Reader and Record Writer are configured.
115119 The processor may take a few minutes on the first start and several seconds on subsequent starts
116- to initialise before starting to fetch data.
120+ to initialize before starting to fetch data.
117121 Uses DynamoDB for check pointing and coordination, and (optional) CloudWatch for metrics.
118122 """ )
119123@ WritesAttributes ({
@@ -163,6 +167,11 @@ public class ConsumeKinesis extends AbstractProcessor {
163167 private static final int KINESIS_HTTP_CLIENT_WINDOW_SIZE_BYTES = 512 * 1024 ; // 512 KiB
164168 private static final Duration KINESIS_HTTP_HEALTH_CHECK_PERIOD = Duration .ofMinutes (1 );
165169
170+ /**
171+ * Using a large enough value to ensure we don't wait infinitely for the initialization.
172+ * Actual initialization shouldn't take that long.
173+ */
174+ private static final Duration KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT = Duration .ofMinutes (15 );
166175 private static final Duration KINESIS_SCHEDULER_GRACEFUL_SHUTDOWN_TIMEOUT = Duration .ofMinutes (3 );
167176
168177 static final PropertyDescriptor STREAM_NAME = new PropertyDescriptor .Builder ()
@@ -393,9 +402,11 @@ public void setup(final ProcessContext context) {
393402
394403 final MetricsFactory metricsFactory = configureMetricsFactory (context );
395404
405+ final InitializationStateChangeListener initializationListener = new InitializationStateChangeListener (getLogger ());
406+
396407 kinesisScheduler = new Scheduler (
397408 configsBuilder .checkpointConfig (),
398- configsBuilder .coordinatorConfig (),
409+ configsBuilder .coordinatorConfig (). workerStateChangeListener ( initializationListener ) ,
399410 configsBuilder .leaseManagementConfig (),
400411 configsBuilder .lifecycleConfig (),
401412 configsBuilder .metricsConfig ().metricsFactory (metricsFactory ),
@@ -409,8 +420,32 @@ public void setup(final ProcessContext context) {
409420 schedulerThread .start ();
410421 // The thread is stopped when kinesisScheduler is shutdown in the onStopped method.
411422
412- getLogger ().info ("Started Kinesis Scheduler for stream [{}] with application name [{}] and workerId [{}]" ,
413- streamName , applicationName , workerId );
423+ final InitializationResult result ;
424+ try {
425+ result = initializationListener .result ().get (KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT .getSeconds (), SECONDS );
426+ } catch (final InterruptedException | ExecutionException | TimeoutException e ) {
427+ if (e instanceof InterruptedException ) {
428+ Thread .currentThread ().interrupt ();
429+ }
430+ cleanUpState ();
431+ throw new ProcessException (e );
432+ }
433+
434+ switch (result ) {
435+ case InitializationResult .Success ignored ->
436+ getLogger ().info (
437+ "Started Kinesis Scheduler for stream [{}] with application name [{}] and workerId [{}]" ,
438+ streamName , applicationName , workerId );
439+ case InitializationResult .Failure failure -> {
440+ cleanUpState ();
441+
442+ final ProcessException ex = failure .error ()
443+ .map (err -> new ProcessException ("Failed to initialize the processor." , err ))
444+ .orElseGet (() -> new ProcessException ("Failed to initialize the processor due to an unknown failure. Check application logs for more details." ));
445+
446+ throw ex ;
447+ }
448+ }
414449 }
415450
416451 /**
@@ -510,10 +545,15 @@ private String generateWorkerId() {
510545
511546 @ OnStopped
512547 public void onStopped () {
548+ cleanUpState ();
549+ }
550+
551+ private void cleanUpState () {
513552 if (kinesisScheduler != null ) {
514553 shutdownScheduler ();
515554 kinesisScheduler = null ;
516555 }
556+
517557 if (kinesisClient != null ) {
518558 kinesisClient .close ();
519559 kinesisClient = null ;
@@ -532,6 +572,10 @@ public void onStopped() {
532572 }
533573
534574 private void shutdownScheduler () {
575+ if (kinesisScheduler .shutdownComplete ()) {
576+ return ;
577+ }
578+
535579 final long start = System .nanoTime ();
536580 getLogger ().debug ("Shutting down Kinesis Scheduler" );
537581
@@ -684,6 +728,49 @@ public void shutdownRequested(final ShutdownRequestedInput shutdownRequestedInpu
684728 }
685729 }
686730
731+ private static final class InitializationStateChangeListener implements WorkerStateChangeListener {
732+
733+ private final ComponentLog logger ;
734+
735+ private final CompletableFuture <InitializationResult > resultFuture = new CompletableFuture <>();
736+
737+ private volatile @ Nullable Throwable initializationFailure ;
738+
739+ InitializationStateChangeListener (final ComponentLog logger ) {
740+ this .logger = logger ;
741+ }
742+
743+ @ Override
744+ public void onWorkerStateChange (final WorkerState newState ) {
745+ logger .info ("Processor state changed to: {}" , newState );
746+
747+ if (newState == WorkerState .STARTED ) {
748+ resultFuture .complete (new InitializationResult .Success ());
749+ } else if (newState == WorkerState .SHUT_DOWN ) {
750+ resultFuture .complete (new InitializationResult .Failure (Optional .ofNullable (initializationFailure )));
751+ }
752+ }
753+
754+ @ Override
755+ public void onAllInitializationAttemptsFailed (final Throwable e ) {
756+ // This method is called before the SHUT_DOWN_STARTED phase.
757+ // Memorizing the error until the Scheduler is SHUT_DOWN.
758+ initializationFailure = e ;
759+ }
760+
761+ Future <InitializationResult > result () {
762+ return resultFuture ;
763+ }
764+ }
765+
766+ private sealed interface InitializationResult {
767+ record Success () implements InitializationResult {
768+ }
769+
770+ record Failure (Optional <Throwable > error ) implements InitializationResult {
771+ }
772+ }
773+
687774 enum ProcessingStrategy implements DescribedValue {
688775 FLOW_FILE ("Write one FlowFile for each consumed Kinesis Record" ),
689776 RECORD ("Write one FlowFile containing multiple consumed Kinesis Records processed with Record Reader and Record Writer" );
0 commit comments