33package org .odpi .openmetadata .adapters .eventbus .topic .kafka ;
44
55import java .time .Duration ;
6- import java .util .Collection ;
7- import java .util .Collections ;
8- import java .util .HashMap ;
9- import java .util .Map ;
10- import java .util .Properties ;
11- import java .util .Queue ;
6+ import java .util .*;
127import java .util .concurrent .BlockingDeque ;
138import java .util .concurrent .LinkedBlockingDeque ;
149import java .util .concurrent .ConcurrentHashMap ;
1510import java .util .concurrent .TimeUnit ;
1611import java .util .concurrent .atomic .AtomicBoolean ;
1712
18- import org .apache .kafka .clients .consumer .CommitFailedException ;
19- import org .apache .kafka .clients .consumer .ConsumerRebalanceListener ;
20- import org .apache .kafka .clients .consumer .ConsumerRecord ;
21- import org .apache .kafka .clients .consumer .ConsumerRecords ;
22- import org .apache .kafka .clients .consumer .KafkaConsumer ;
23- import org .apache .kafka .clients .consumer .OffsetAndMetadata ;
13+ import org .apache .kafka .clients .consumer .*;
2414import org .apache .kafka .common .TopicPartition ;
2515import org .apache .kafka .common .errors .WakeupException ;
2616import org .odpi .openmetadata .frameworks .auditlog .AuditLog ;
2717import org .slf4j .Logger ;
2818import org .slf4j .LoggerFactory ;
2919
20+ import static java .lang .Thread .sleep ;
3021
3122
3223/**
@@ -52,7 +43,10 @@ public class KafkaOpenMetadataEventConsumer implements Runnable
5243 private long nextMessageProcessingStatusCheckTime = System .currentTimeMillis ();
5344 private long maxNextPollTimestampToAvoidConsumerTimeout = 0 ;
5445 private final long maxMsBetweenPolls ;
55-
46+
47+ // Keep track of when an initial rebalance is done
48+ private boolean initialPartitionAssignment = true ;
49+
5650
5751 //If we get close enough to the consumer timeout timestamp, force a poll so that
5852 //we do not exceed the timeout. This parameter controls how close we can get
@@ -68,6 +62,7 @@ public class KafkaOpenMetadataEventConsumer implements Runnable
6862 private final AtomicBoolean running = new AtomicBoolean (true );
6963
7064 private final boolean isAutoCommitEnabled ;
65+ private final long startTime = System .currentTimeMillis ();
7166
7267 /**
7368 * Constructor for the event consumer.
@@ -112,6 +107,7 @@ public class KafkaOpenMetadataEventConsumer implements Runnable
112107 this .messageProcessingStatusCheckIntervalMs = config .getLongProperty (KafkaOpenMetadataEventConsumerProperty .COMMIT_CHECK_INTERVAL_MS );
113108 long messageTimeoutMins = config .getLongProperty (KafkaOpenMetadataEventConsumerProperty .CONSUMER_EVENT_PROCESSING_TIMEOUT_MINS );
114109 this .messageProcessingTimeoutMs = messageTimeoutMins < 0 ? messageTimeoutMins : TimeUnit .MILLISECONDS .convert (messageTimeoutMins , TimeUnit .MINUTES );
110+
115111 }
116112
117113
@@ -135,8 +131,7 @@ private void updateNextMaxPollTimestamp() {
135131 public void run ()
136132 {
137133 final String actionDescription = "run" ;
138- KafkaOpenMetadataTopicConnectorAuditCode auditCode ;
139-
134+
140135 while (isRunning ())
141136 {
142137 try
@@ -452,7 +447,7 @@ private int getNumberOfUnprocessedMessages() {
452447 private void awaitNextPollingTime () {
453448 try
454449 {
455- Thread . sleep (1000 );
450+ sleep (1000 );
456451 }
457452 catch (InterruptedException e )
458453 {
@@ -468,7 +463,7 @@ private void recoverAfterError()
468463
469464 try
470465 {
471- Thread . sleep (recoverySleepTimeSec * 1000L );
466+ sleep (recoverySleepTimeSec * 1000L );
472467 }
473468 catch (InterruptedException e1 )
474469 {
@@ -514,17 +509,68 @@ private void stopRunning()
514509 }
515510
516511
517- private class HandleRebalance implements ConsumerRebalanceListener
518- {
512+ private class HandleRebalance implements ConsumerRebalanceListener {
519513 AuditLog auditLog = null ;
514+
520515 public HandleRebalance (AuditLog auditLog ) {
521516 this .auditLog = auditLog ;
522517 }
523518
524- public void onPartitionsAssigned (Collection <TopicPartition > partitions )
525- {
519+ @ Override
520+ public void onPartitionsAssigned (Collection <TopicPartition > partitions ) {
521+
522+ // Check if we need to rewind to handle initial startup case -- but only on first assignment
523+ try {
524+ if (initialPartitionAssignment ) {
525+ log .info ("Received initial PartitionsAssigned event" );
526+
527+ long partitionCount = partitions .size ();
528+
529+ if (partitionCount != 1 ) {
530+ log .info ("Received PartitionsAssigned event with {} partitions. This is not supported." ,partitionCount );
531+ } else {
532+ // there is only one partition, so we can just grab the first one - and we'll try this once only
533+ initialPartitionAssignment = false ;
534+ long maxOffsetWanted = 0 ; // same as 'beginning'
535+
536+ TopicPartition partition = partitions .iterator ().next ();
537+
538+ // query offset by timestamp (when we started connector) - NULL if there are no messages later than this offset
539+ long reqStartTime =KafkaOpenMetadataEventConsumer .this .startTime ;
540+ log .info ("Querying for offset by timestamp: {}" ,reqStartTime );
541+ OffsetAndTimestamp otByStartTime = consumer .offsetsForTimes (Collections .singletonMap (partition ,
542+ reqStartTime )).get (partition );
543+
544+ // If null, then we don't have any earlier messages - ie there is no offset found
545+ if (otByStartTime != null ) {
546+ // where we want to scoll to - the messages sent since we thought we started
547+ maxOffsetWanted = otByStartTime .offset ();
548+ log .info ("Earliest offset found for {} is {}" ,reqStartTime ,otByStartTime .timestamp ());
549+
550+ // get the current offset
551+ long currentOffset = consumer .position (partition );
552+
553+ // if the current offset is later than the start time we want, rewind to the start time
554+ if (currentOffset > maxOffsetWanted ) {
555+
556+ log .info ("Seeking to {} for partition {} and topic {} as current offset {} is too late" , maxOffsetWanted , partition .partition (),
557+ partition .topic (), currentOffset );
558+ consumer .seek (partition , maxOffsetWanted );
559+ } else
560+ log .info ("Not Seeking to {} for partition {} and topic {} as current offset {} is older" , maxOffsetWanted , partition .partition (),
561+ partition .topic (), currentOffset );
562+ }
563+ else
564+ log .info ("No missed events found for partition {} and topic {}" , partition .partition (), partition .topic ());
565+ }
566+ }
567+ } catch (Exception e ) {
568+ // We leave the offset as-is if anything goes wrong. Eventually other messages will cause the effective state to be updated
569+ log .info ("Error correcting seek position, continuing with defaults" , e );
570+ }
526571 }
527572
573+ @ Override
528574 public void onPartitionsRevoked (Collection <TopicPartition > partitions )
529575 {
530576 final String methodName = "onPartitionsRevoked.commitSync" ;
0 commit comments