4040
4141import java .net .URI ;
4242import java .time .Clock ;
43+ import java .time .Duration ;
44+ import java .time .Instant ;
4345import java .util .Date ;
4446import java .util .List ;
4547import java .util .concurrent .TimeUnit ;
@@ -70,21 +72,21 @@ public class KinesisConsumer extends Thread {
7072
7173 private final String overrideKinesisEndpoint ;
7274
73- private final int kinesisConsumerTracebackSeconds ;
75+ private final int kinesisConsumerTracebackMinutes ;
7476
7577 public KinesisConsumer (MetricRegistry metricRegistry , PointProcessor pointProcessor , PointProcessor recoveryPointProcessor ,
7678 String kinesisStreamName , String kinesisApplicationName ,
7779 KinesisConfig kinesisConfig , CheckPointMgr <Date > checkPointMgr ,
78- Counter noOfRestarts , String kinesisConsumerRegion , int kinesisConsumerTracebackSeconds ) {
80+ Counter noOfRestarts , String kinesisConsumerRegion , int kinesisConsumerTracebackMinutes ) {
7981 this (metricRegistry , pointProcessor , recoveryPointProcessor , kinesisStreamName , kinesisApplicationName , kinesisConfig ,
80- checkPointMgr , noOfRestarts , kinesisConsumerRegion , kinesisConsumerTracebackSeconds , null );
82+ checkPointMgr , noOfRestarts , kinesisConsumerRegion , kinesisConsumerTracebackMinutes , null );
8183 }
8284
8385 public KinesisConsumer (MetricRegistry metricRegistry , PointProcessor pointProcessor , PointProcessor recoveryPointProcessor ,
8486 String kinesisStreamName , String kinesisApplicationName ,
8587 KinesisConfig kinesisConfig , CheckPointMgr <Date > checkPointMgr ,
8688 Counter noOfRestarts , String kinesisConsumerRegion ,
87- int kinesisConsumerTracebackSeconds ,
89+ int kinesisConsumerTracebackMinutes ,
8890 String overrideKinesisEndpoint ) {
8991 this .metricRegistry = metricRegistry ;
9092 this .pointProcessor = Preconditions .checkNotNull (pointProcessor );
@@ -95,7 +97,7 @@ public KinesisConsumer(MetricRegistry metricRegistry, PointProcessor pointProces
9597 this .checkPointMgr = checkPointMgr ;
9698 this .noOfRestarts = noOfRestarts ;
9799 this .kinesisConsumerRegion = kinesisConsumerRegion ;
98- this .kinesisConsumerTracebackSeconds = kinesisConsumerTracebackSeconds ;
100+ this .kinesisConsumerTracebackMinutes = kinesisConsumerTracebackMinutes ;
99101 this .overrideKinesisEndpoint = overrideKinesisEndpoint ;
100102 log .info ("Kinesis consumer started" );
101103 this .start ();
@@ -160,9 +162,9 @@ public void run () {
160162 LeaseManagementConfig leaseManagementConfig = configsBuilder .leaseManagementConfig ()
161163 .failoverTimeMillis (kinesisConfig .getLeaseExpirationTimeInSecs () * 1000L );
162164 // Since v2 will create a new DynamoDB table, we need to trace back the initial position
165+ Instant startTime = Instant .now ().minus (Duration .ofMinutes (kinesisConsumerTracebackMinutes ));
163166 InitialPositionInStreamExtended initialPositionInStreamExtended =
164- InitialPositionInStreamExtended .newInitialPositionAtTimestamp (
165- new Date (Clock .systemUTC ().millis () - kinesisConsumerTracebackSeconds * 1000L ));
167+ InitialPositionInStreamExtended .newInitialPositionAtTimestamp (new Date (startTime .toEpochMilli ()));
166168 worker = new Scheduler (
167169 configsBuilder .checkpointConfig (),
168170 configsBuilder .coordinatorConfig (),
0 commit comments