Skip to content

Commit 34cd4a5

Browse files
committed
@W-19993179 enable dynamic app name for kinesis consumer
1 parent f6b64a8 commit 34cd4a5

File tree

5 files changed

+130
-3
lines changed

5 files changed

+130
-3
lines changed

carbonj.service/cj-deploy.sh

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#!/bin/bash
2+
3+
# Shards 18-31 are part of the v2-B sidecar migration.
4+
# We are deploying to them with a 11 min delay.
5+
for i in {18..31}
6+
do
7+
echo "Upgrading carbonj-p${i}..."
8+
helm upgrade -f ./values-prd-v2-shard${i}.yaml carbonj-p${i} ./ -n carbonj
9+
echo "Upgrade of carbonj-p${i} complete."
10+
if [ $i -lt 31 ]; then
11+
echo "Waiting for 11 minutes before next upgrade..."
12+
13+
POD1="carbonj-p${i}-0"
14+
POD2="carbonj-p${i}-1"
15+
16+
# Wait for approximately 11 minutes (22 * 30 seconds = 660 seconds), checking status periodically.
17+
for j in {1..22}; do
18+
echo "--- Status check ${j}/22 for shard p${i} ---"
19+
kubectl get pod ${POD1} -n carbonj
20+
kubectl get pod ${POD2} -n carbonj
21+
echo "------------------------------------------------"
22+
# If not the last check, sleep.
23+
if [ $j -lt 22 ]; then
24+
sleep 30
25+
fi
26+
done
27+
echo "Finished 11 minute wait for shard p${i}."
28+
fi
29+
done
30+
31+
echo "All upgrades complete."
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
foo=bar

carbonj.service/src/main/java/com/demandware/carbonj/service/engine/Consumers.java

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,20 @@
1919
import java.io.IOException;
2020
import java.io.InputStream;
2121
import java.net.InetAddress;
22-
import java.net.UnknownHostException;
2322
import java.util.Date;
2423
import java.util.HashSet;
2524
import java.util.Map;
2625
import java.util.Properties;
2726
import java.util.Set;
2827
import java.util.concurrent.ConcurrentHashMap;
28+
import java.util.concurrent.Executors;
29+
import java.util.concurrent.ExecutorService;
30+
import java.util.concurrent.TimeUnit;
31+
import java.net.UnknownHostException;
32+
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
33+
import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest;
34+
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
35+
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
2936

3037
public class Consumers {
3138

@@ -149,6 +156,16 @@ private void reconfigureConsumers(Set<String> newRules, Set<String> currentRules
149156
log.error(e.getMessage(), e);
150157
}
151158

159+
// Optionally add dynamic suffix to the KCL application name
160+
if (kinesisConfig.isAppNameDynamicSuffixEnabled()) {
161+
String suffix = resolveDynamicSuffix(kinesisConfig.getAppNameDynamicSuffixFormat());
162+
kinesisApplicationName = kinesisApplicationName + suffix;
163+
// Optionally kick off async cleanup of old lease tables
164+
if (kinesisConfig.isCleanupOldLeaseTablesEnabled()) {
165+
asyncCleanupOldLeaseTables(getKinesisApplicationName(kinesisStreamName, hostName, carbonjEnv));
166+
}
167+
}
168+
152169
Counter initRetryCounter = metricRegistry.counter(MetricRegistry.name("kinesis.consumer." + kinesisStreamName + ".initRetryCounter"));
153170
KinesisConsumer kinesisConsumer = new KinesisConsumer(metricRegistry, pointProcessor, recoveryPointProcessor, kinesisStreamName,
154171
kinesisApplicationName, kinesisConfig, checkPointMgr, initRetryCounter, kinesisConsumerRegion, kinesisConsumerTracebackMinutes);
@@ -185,6 +202,50 @@ private String getKinesisApplicationName(String streamName, String hostName, Str
185202
return streamName + "-" + hostName + "-" + carbonjEnv;
186203
}
187204

205+
private String resolveDynamicSuffix(String format) {
206+
String suffix = format;
207+
try {
208+
suffix = suffix.replace("{epoch}", Long.toString(System.currentTimeMillis()))
209+
.replace("{hostname}", InetAddress.getLocalHost().getHostName())
210+
.replace("{uuid}", java.util.UUID.randomUUID().toString());
211+
} catch (Throwable t) {
212+
// fallback minimal suffix
213+
suffix = "-" + System.currentTimeMillis();
214+
}
215+
if (!suffix.startsWith("-")) {
216+
suffix = "-" + suffix;
217+
}
218+
return suffix;
219+
}
220+
221+
private void asyncCleanupOldLeaseTables(String baseApplicationName) {
222+
ExecutorService es = Executors.newSingleThreadExecutor(r -> {
223+
Thread t = new Thread(r, "kcl-lease-cleanup");
224+
t.setDaemon(true);
225+
return t;
226+
});
227+
es.submit(() -> {
228+
try (DynamoDbClient ddb = DynamoDbClient.builder().build()) {
229+
ListTablesResponse resp = ddb.listTables(ListTablesRequest.builder().build());
230+
for (String table : resp.tableNames()) {
231+
// KCL v2/v3 table name prefix is usually application name; be conservative
232+
if (table.startsWith(baseApplicationName) && !table.equals(baseApplicationName)) {
233+
try {
234+
ddb.deleteTable(DeleteTableRequest.builder().tableName(table).build());
235+
log.info("Submitted async deletion for old KCL lease/checkpoint table {}", table);
236+
} catch (Throwable t) {
237+
log.warn("Failed to delete table {}: {}", table, t.getMessage());
238+
}
239+
}
240+
}
241+
} catch (Throwable t) {
242+
log.warn("KCL lease cleanup encountered an error: {}", t.getMessage());
243+
}
244+
});
245+
es.shutdown();
246+
try { es.awaitTermination(1, TimeUnit.SECONDS); } catch (InterruptedException ignored) {}
247+
}
248+
188249
private void close(Set<String> consumerSet) {
189250
if (null == consumerSet) {
190251
return;

carbonj.service/src/main/java/com/demandware/carbonj/service/engine/KinesisConfig.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,18 @@ public class KinesisConfig {
2929
private final int maxRecords;
3030
private final boolean aggregationEnabled;
3131

32+
// Controls dynamic application name behavior and cleanup of old lease tables
33+
private final boolean appNameDynamicSuffixEnabled;
34+
private final String appNameDynamicSuffixFormat;
35+
private final boolean cleanupOldLeaseTablesEnabled;
36+
3237
public KinesisConfig(boolean kinesisConsumerEnabled, boolean recoveryEnabled,
3338
long recoveryIdleTimeMillis, long checkPointIntervalMillis, long retryTimeInMillis,
3439
int recoveryThreads, Path checkPointDir, int initRetryTimeInSecs,
3540
int leaseExpirationTimeInSecs, String recoveryProvider, int gapsTableProvThroughput,
36-
int maxRecords, boolean aggregationEnabled) {
41+
int maxRecords, boolean aggregationEnabled,
42+
boolean appNameDynamicSuffixEnabled, String appNameDynamicSuffixFormat,
43+
boolean cleanupOldLeaseTablesEnabled) {
3744
this.kinesisConsumerEnabled = kinesisConsumerEnabled;
3845
this.recoveryEnabled = recoveryEnabled;
3946
this.recoveryIdleTimeMillis = recoveryIdleTimeMillis;
@@ -46,6 +53,9 @@ public KinesisConfig(boolean kinesisConsumerEnabled, boolean recoveryEnabled,
4653
this.gapsTableProvisionedThroughput = gapsTableProvThroughput;
4754
this.maxRecords = maxRecords;
4855
this.aggregationEnabled = aggregationEnabled;
56+
this.appNameDynamicSuffixEnabled = appNameDynamicSuffixEnabled;
57+
this.appNameDynamicSuffixFormat = appNameDynamicSuffixFormat;
58+
this.cleanupOldLeaseTablesEnabled = cleanupOldLeaseTablesEnabled;
4959

5060
if( recoveryProvider.equalsIgnoreCase("dynamodb")) {
5161
this.recoveryProvider = KinesisRecoveryProvider.DYNAMODB;
@@ -102,4 +112,16 @@ public int getMaxRecords() {
102112
public boolean isAggregationEnabled() {
103113
return aggregationEnabled;
104114
}
115+
116+
public boolean isAppNameDynamicSuffixEnabled() {
117+
return appNameDynamicSuffixEnabled;
118+
}
119+
120+
public String getAppNameDynamicSuffixFormat() {
121+
return appNameDynamicSuffixFormat;
122+
}
123+
124+
public boolean isCleanupOldLeaseTablesEnabled() {
125+
return cleanupOldLeaseTablesEnabled;
126+
}
105127
}

carbonj.service/src/main/java/com/demandware/carbonj/service/engine/cfgKinesis.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,24 @@ public class cfgKinesis
5555
@Value( "${aggregation.enabled:true}" )
5656
private boolean aggregationEnabled;
5757

58+
// Optional: dynamically suffix the KCL application name and cleanup old lease tables
59+
@Value( "${kinesis.consumer.appName.dynamicSuffix.enabled:false}" )
60+
private boolean appNameDynamicSuffixEnabled;
61+
62+
// Supports tokens: {epoch}, {uuid}, {hostname}
63+
@Value( "${kinesis.consumer.appName.dynamicSuffix.format:-{epoch}}" )
64+
private String appNameDynamicSuffixFormat;
65+
66+
@Value( "${kinesis.consumer.cleanupOldLeaseTables.enabled:false}" )
67+
private boolean cleanupOldLeaseTablesEnabled;
68+
5869
@Bean
5970
KinesisConfig kinesisConfig()
6071
{
6172
return new KinesisConfig(kinesisConsumerEnabled, recoveryEnabled, recoveryIdleTimeInMillis,
6273
checkPointIntervalMillis, retryTimeInMillis, recoveryThreads, Paths.get(checkPointDir),
6374
initRetryTimeInSecs, leaseExpirationTimeInSecs, recoveryProvider, gapsTableProvisionedThroughput,
64-
maxRecords, aggregationEnabled);
75+
maxRecords, aggregationEnabled,
76+
appNameDynamicSuffixEnabled, appNameDynamicSuffixFormat, cleanupOldLeaseTablesEnabled);
6577
}
6678
}

0 commit comments

Comments
 (0)