Skip to content

Commit 086f030

Browse files
committed
W-17312191: Set initialPositionInStreamExtended back 1 hour
1 parent 7c3aee9 commit 086f030

File tree

5 files changed

+31
-18
lines changed

5 files changed

+31
-18
lines changed

carbonj.service/src/main/java/com/demandware/carbonj/service/engine/Consumers.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,11 @@ public class Consumers {
5252

5353
private final String carbonjEnv;
5454

55+
private final int kinesisConsumerTracebackSeconds;
56+
5557
Consumers(MetricRegistry metricRegistry, PointProcessor pointProcessor, PointProcessor recoveryPointProcessor, File rulesFile,
5658
KinesisConfig kinesisConfig, CheckPointMgr<Date> checkPointMgr, String kinesisConsumerRegion,
57-
NamespaceCounter namespaceCounter, File indexNameSyncDir, String carbonjEnv) {
59+
NamespaceCounter namespaceCounter, File indexNameSyncDir, String carbonjEnv, int kinesisConsumerTracebackSeconds) {
5860

5961
this.metricRegistry = metricRegistry;
6062
this.pointProcessor = pointProcessor;
@@ -67,6 +69,7 @@ public class Consumers {
6769
this.consumers = new ConcurrentHashMap<>();
6870
this.consumerRules = new ConsumerRules(rulesFile);
6971
this.carbonjEnv = carbonjEnv;
72+
this.kinesisConsumerTracebackSeconds = kinesisConsumerTracebackSeconds;
7073
reload();
7174
}
7275

@@ -84,7 +87,7 @@ public synchronized void reload() {
8487
log.debug(" Check for consumer configuration update");
8588
}
8689
if (!currentRules.equals(newRules)) {
87-
log.info(String.format("Updating consumer Rules. Old consumer rules: [%s], New consumer rules: [%s]", currentRules, newRules));
90+
log.info("Updating consumer Rules. Old consumer rules: [{}], New consumer rules: [{}]", currentRules, newRules);
8891
}
8992
else {
9093
log.debug(" Consumer rules haven't changed.");
@@ -103,11 +106,11 @@ private void reconfigureConsumers(Set<String> newRules, Set<String> currentRules
103106
log.info (consumer);
104107

105108
if (newRules.contains(consumer)) {
106-
log.info(String.format("[%s] Reuse unchanged consumer", consumer));
109+
log.info("[{}] Reuse unchanged consumer", consumer);
107110
newConsumers.add(consumer);
108111
newRules.remove(consumer);
109112
} else {
110-
log.info(String.format("[%s] Consumer scheduled for removal", consumer));
113+
log.info("[{}] Consumer scheduled for removal", consumer);
111114
obsoleteConsumers.add(consumer);
112115
}
113116

@@ -117,7 +120,7 @@ private void reconfigureConsumers(Set<String> newRules, Set<String> currentRules
117120
// we use the host name to generate the kinesis application name as they are stable for stable set pods.
118121
String hostName = getHostName();
119122
for (String consumerName : newRules) {
120-
log.info(String.format("Creating new consumer with kinesis stream name: %s", consumerName));
123+
log.info("Creating new consumer with kinesis stream name: {}", consumerName);
121124

122125
if (consumerName.startsWith("kinesis:")) {
123126

@@ -128,23 +131,23 @@ private void reconfigureConsumers(Set<String> newRules, Set<String> currentRules
128131

129132
try {
130133
InputStream input = new FileInputStream(consumerCfgFile);
131-
log.info(" Loading values from " + consumerCfgFile);
134+
log.info(" Loading values from {}", consumerCfgFile);
132135
Properties consumerCfg = new Properties();
133136
consumerCfg.load(input);
134137
String kinesisApplicationNamePropValue = consumerCfg.getProperty("kinesis.application.name");
135138
if( kinesisApplicationNamePropValue != null ) {
136139
kinesisApplicationName = kinesisApplicationNamePropValue;
137140
}
138141
} catch (FileNotFoundException e) {
139-
log.warn(consumerCfgFile + " not found in the classpath ");
142+
log.warn("{} not found in the classpath ", consumerCfgFile);
140143
log.info(" Falling back to default values ");
141144
} catch (Throwable e) {
142145
log.error(e.getMessage(), e);
143146
}
144147

145148
Counter initRetryCounter = metricRegistry.counter(MetricRegistry.name("kinesis.consumer." + kinesisStreamName + ".initRetryCounter"));
146149
KinesisConsumer kinesisConsumer = new KinesisConsumer(metricRegistry, pointProcessor, recoveryPointProcessor, kinesisStreamName,
147-
kinesisApplicationName, kinesisConfig, checkPointMgr, initRetryCounter, kinesisConsumerRegion);
150+
kinesisApplicationName, kinesisConfig, checkPointMgr, initRetryCounter, kinesisConsumerRegion, kinesisConsumerTracebackSeconds);
148151
log.info("New Consumer created with name {}", kinesisStreamName);
149152
newConsumers.add(consumerName);
150153
consumers.put(consumerName, kinesisConsumer);

carbonj.service/src/main/java/com/demandware/carbonj/service/engine/KinesisConsumer.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import software.amazon.kinesis.coordinator.Scheduler;
2929
import software.amazon.kinesis.leases.LeaseManagementConfig;
3030
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
31-
import software.amazon.kinesis.common.InitialPositionInStream;
3231
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
3332
import software.amazon.kinesis.retrieval.polling.PollingConfig;
3433
import com.codahale.metrics.Counter;
@@ -40,6 +39,7 @@
4039
import org.slf4j.LoggerFactory;
4140

4241
import java.net.URI;
42+
import java.time.Clock;
4343
import java.util.Date;
4444
import java.util.List;
4545
import java.util.concurrent.TimeUnit;
@@ -70,18 +70,21 @@ public class KinesisConsumer extends Thread {
7070

7171
private final String overrideKinesisEndpoint;
7272

73+
private final int kinesisConsumerTracebackSeconds;
74+
7375
public KinesisConsumer(MetricRegistry metricRegistry, PointProcessor pointProcessor, PointProcessor recoveryPointProcessor,
7476
String kinesisStreamName, String kinesisApplicationName,
7577
KinesisConfig kinesisConfig, CheckPointMgr<Date> checkPointMgr,
76-
Counter noOfRestarts, String kinesisConsumerRegion) {
78+
Counter noOfRestarts, String kinesisConsumerRegion, int kinesisConsumerTracebackSeconds) {
7779
this(metricRegistry, pointProcessor, recoveryPointProcessor, kinesisStreamName, kinesisApplicationName, kinesisConfig,
78-
checkPointMgr, noOfRestarts, kinesisConsumerRegion, null);
80+
checkPointMgr, noOfRestarts, kinesisConsumerRegion, kinesisConsumerTracebackSeconds, null);
7981
}
8082

8183
public KinesisConsumer(MetricRegistry metricRegistry, PointProcessor pointProcessor, PointProcessor recoveryPointProcessor,
8284
String kinesisStreamName, String kinesisApplicationName,
8385
KinesisConfig kinesisConfig, CheckPointMgr<Date> checkPointMgr,
8486
Counter noOfRestarts, String kinesisConsumerRegion,
87+
int kinesisConsumerTracebackSeconds,
8588
String overrideKinesisEndpoint) {
8689
this.metricRegistry = metricRegistry;
8790
this.pointProcessor = Preconditions.checkNotNull(pointProcessor);
@@ -92,6 +95,7 @@ public KinesisConsumer(MetricRegistry metricRegistry, PointProcessor pointProces
9295
this.checkPointMgr = checkPointMgr;
9396
this.noOfRestarts = noOfRestarts;
9497
this.kinesisConsumerRegion = kinesisConsumerRegion;
98+
this.kinesisConsumerTracebackSeconds = kinesisConsumerTracebackSeconds;
9599
this.overrideKinesisEndpoint = overrideKinesisEndpoint;
96100
log.info("Kinesis consumer started");
97101
this.start();
@@ -155,6 +159,10 @@ public void run () {
155159
// Map v1 withFailoverTimeMillis -> v2/3 failoverTimeMillis
156160
LeaseManagementConfig leaseManagementConfig = configsBuilder.leaseManagementConfig()
157161
.failoverTimeMillis(kinesisConfig.getLeaseExpirationTimeInSecs() * 1000L);
162+
// Since v2 will create a new DynamoDB table, we need to trace back the initial position
163+
InitialPositionInStreamExtended initialPositionInStreamExtended =
164+
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(
165+
new Date(Clock.systemUTC().millis() - kinesisConsumerTracebackSeconds * 1000L));
158166
worker = new Scheduler(
159167
configsBuilder.checkpointConfig(),
160168
configsBuilder.coordinatorConfig(),
@@ -163,9 +171,8 @@ public void run () {
163171
configsBuilder.metricsConfig(),
164172
configsBuilder.processorConfig(),
165173
configsBuilder.retrievalConfig()
166-
.initialPositionInStreamExtended(InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))
167-
.retrievalSpecificConfig(pollingConfig)
168-
);
174+
.initialPositionInStreamExtended(initialPositionInStreamExtended)
175+
.retrievalSpecificConfig(pollingConfig));
169176
log.info("KCL v2 Scheduler started with app {}, stream {}, workerId {}", kinesisApplicationName, kinesisStreamName, workerId);
170177
worker.run();
171178
} catch (Throwable t) {

carbonj.service/src/main/java/com/demandware/carbonj/service/engine/cfgCarbonJ.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,9 @@ public class cfgCarbonJ
210210
@Value("${carbonj.env:dev}")
211211
private String carbonjEnv;
212212

213+
@Value("${kinesis.consumer.traceback.seconds:3600}")
214+
private int kinesisConsumerTracebackSeconds;
215+
213216
@Bean
214217
public RestTemplate restTemplate() {
215218
return new RestTemplate();
@@ -427,7 +430,7 @@ Consumers consumer( PointProcessor pointProcessor,
427430
File rulesFile = locateConfigFile( serviceDir, consumerRulesFile );
428431
Consumers consumer = new Consumers( metricRegistry, pointProcessor, recoveryPointProcessor, rulesFile,
429432
kinesisConfig, checkPointMgr, kinesisConsumerRegion,
430-
nsCounter, dataDir == null ? null : new File(dataDir, "index-name-sync"), carbonjEnv);
433+
nsCounter, dataDir == null ? null : new File(dataDir, "index-name-sync"), carbonjEnv, kinesisConsumerTracebackSeconds);
431434
s.scheduleWithFixedDelay( consumer::reload, 15, 30, TimeUnit.SECONDS );
432435
if (syncSecondaryDb) {
433436
s.scheduleWithFixedDelay( consumer::syncNamespaces, 60, 60, TimeUnit.SECONDS );

carbonj.service/src/test/java/com/demandware/carbonj/service/engine/TestConsumers.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public void testConsumers() throws Exception {
3838
1, checkPointDir, 60, 60, "recoveryProvider", 1, 1, true);
3939
FileCheckPointMgr checkPointMgr = new FileCheckPointMgr(checkPointDir, 5);
4040
Consumers consumers = new Consumers(metricRegistry, new PointProcessorMock(), new PointProcessorMock(),
41-
rulesFile, kinesisConfig, checkPointMgr, Region.US_EAST_1.id(), new NamespaceCounter(metricRegistry, 60), new File("/tmp/sync"), "test");
41+
rulesFile, kinesisConfig, checkPointMgr, Region.US_EAST_1.id(), new NamespaceCounter(metricRegistry, 60), new File("/tmp/sync"), "test", 60);
4242
new KinesisRecordProcessorFactory(metricRegistry, new PointProcessorMock(), kinesisConfig, "test-stream", checkPointMgr);
4343
consumers.dumpStats();
4444
consumers.syncNamespaces();
@@ -54,7 +54,7 @@ public void testConsumersNoAggregation() throws Exception {
5454
1, checkPointDir, 60, 60, "recoveryProvider", 1, 1, false);
5555
FileCheckPointMgr checkPointMgr = new FileCheckPointMgr(checkPointDir, 5);
5656
Consumers consumers = new Consumers(metricRegistry, new PointProcessorMock(), new PointProcessorMock(),
57-
rulesFile, kinesisConfig, checkPointMgr, Region.US_EAST_1.id(), new NamespaceCounter(metricRegistry, 60), new File("/tmp/sync"), "test");
57+
rulesFile, kinesisConfig, checkPointMgr, Region.US_EAST_1.id(), new NamespaceCounter(metricRegistry, 60), new File("/tmp/sync"), "test", 60);
5858
new KinesisRecordProcessorFactory(metricRegistry, new PointProcessorMock(), kinesisConfig, "test-stream", checkPointMgr);
5959
consumers.dumpStats();
6060
consumers.syncNamespaces();

carbonj.service/src/test/java/com/demandware/carbonj/service/engine/TestKinesisConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public void test() throws Exception {
9393
PointProcessorMock pointProcessor = new PointProcessorMock();
9494
KinesisConsumer kinesisConsumer = new KinesisConsumer(metricRegistry, pointProcessor, pointProcessor,
9595
STREAM_NAME, STREAM_NAME + "-app", kinesisConfig, checkPointMgr, metricRegistry.counter("kinesis-consumer-counter"),
96-
Region.US_EAST_1.id(), localstack.getEndpointOverride(KINESIS).toString());
96+
Region.US_EAST_1.id(), 60, localstack.getEndpointOverride(KINESIS).toString());
9797
Thread.sleep(40000);
9898
log.info("Start ingesting data points ...");
9999
int current = (int) (System.currentTimeMillis() / 1000);

0 commit comments

Comments
 (0)