Skip to content

Commit 447bbc8

Browse files
committed
W-17312191: Make carbonj.env as part of KCL application name
1 parent 2b2c57c commit 447bbc8

File tree

3 files changed

+29
-26
lines changed

3 files changed

+29
-26
lines changed

carbonj.service/src/main/java/com/demandware/carbonj/service/engine/Consumers.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,11 @@ public class Consumers {
5050

5151
private final File indexNameSyncDir;
5252

53+
private final String carbonjEnv;
54+
5355
Consumers(MetricRegistry metricRegistry, PointProcessor pointProcessor, PointProcessor recoveryPointProcessor, File rulesFile,
5456
KinesisConfig kinesisConfig, CheckPointMgr<Date> checkPointMgr, String kinesisConsumerRegion,
55-
NamespaceCounter namespaceCounter, File indexNameSyncDir) {
57+
NamespaceCounter namespaceCounter, File indexNameSyncDir, String carbonjEnv) {
5658

5759
this.metricRegistry = metricRegistry;
5860
this.pointProcessor = pointProcessor;
@@ -62,8 +64,9 @@ public class Consumers {
6264
this.kinesisConsumerRegion = kinesisConsumerRegion;
6365
this.namespaceCounter = namespaceCounter;
6466
this.indexNameSyncDir = indexNameSyncDir;
65-
consumers = new ConcurrentHashMap<>();
66-
consumerRules = new ConsumerRules(rulesFile);
67+
this.consumers = new ConcurrentHashMap<>();
68+
this.consumerRules = new ConsumerRules(rulesFile);
69+
this.carbonjEnv = carbonjEnv;
6770
reload();
6871
}
6972

@@ -119,7 +122,7 @@ private void reconfigureConsumers(Set<String> newRules, Set<String> currentRules
119122
if (consumerName.startsWith("kinesis:")) {
120123

121124
String kinesisStreamName = consumerName.substring(("kinesis:".length()));
122-
String kinesisApplicationName = getKinesisApplicationName(kinesisStreamName, hostName);
125+
String kinesisApplicationName = getKinesisApplicationName(kinesisStreamName, hostName, carbonjEnv);
123126
String consumerCfgFile = "config/kinesis-" + kinesisStreamName + "-consumer.conf";
124127

125128
try {
@@ -140,7 +143,7 @@ private void reconfigureConsumers(Set<String> newRules, Set<String> currentRules
140143

141144
Counter initRetryCounter = metricRegistry.counter(MetricRegistry.name("kinesis.consumer." + kinesisStreamName + ".initRetryCounter"));
142145
KinesisConsumer kinesisConsumer = new KinesisConsumer(metricRegistry, pointProcessor, recoveryPointProcessor, kinesisStreamName,
143-
kinesisApplicationName, kinesisConfig, checkPointMgr, initRetryCounter, kinesisConsumerRegion);
146+
kinesisApplicationName, kinesisConfig, checkPointMgr, initRetryCounter, kinesisConsumerRegion, carbonjEnv);
144147
log.info(String.format("New Consumer created with name %s", kinesisStreamName));
145148
newConsumers.add(consumerName);
146149
consumers.put(consumerName, kinesisConsumer);
@@ -170,8 +173,8 @@ private String getHostName() {
170173
}
171174
}
172175

173-
private String getKinesisApplicationName(String streamName, String hostName) {
174-
return streamName + "-" + hostName;
176+
private String getKinesisApplicationName(String streamName, String hostName, String carbonjEnv) {
177+
return streamName + "-" + hostName + "-" + carbonjEnv;
175178
}
176179

177180
private void close(Set<String> consumerSet) {

carbonj.service/src/main/java/com/demandware/carbonj/service/engine/cfgCarbonJ.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,9 @@ public class cfgCarbonJ
207207
@Value("${metrics.store.sync.secondary.db:false}")
208208
private boolean syncSecondaryDb;
209209

210+
@Value("${carbonj.env:dev}")
211+
private String carbonjEnv;
212+
210213
@Bean
211214
public RestTemplate restTemplate() {
212215
return new RestTemplate();
@@ -424,7 +427,7 @@ Consumers consumer( PointProcessor pointProcessor,
424427
File rulesFile = locateConfigFile( serviceDir, consumerRulesFile );
425428
Consumers consumer = new Consumers( metricRegistry, pointProcessor, recoveryPointProcessor, rulesFile,
426429
kinesisConfig, checkPointMgr, kinesisConsumerRegion,
427-
nsCounter, dataDir == null ? null : new File(dataDir, "index-name-sync"));
430+
nsCounter, dataDir == null ? null : new File(dataDir, "index-name-sync"), carbonjEnv);
428431
s.scheduleWithFixedDelay( consumer::reload, 15, 30, TimeUnit.SECONDS );
429432
if (syncSecondaryDb) {
430433
s.scheduleWithFixedDelay( consumer::syncNamespaces, 60, 60, TimeUnit.SECONDS );
@@ -468,8 +471,7 @@ Void checkExpiredNamespaces( ScheduledExecutorService s, NamespaceCounter ns )
468471
{
469472
if ( runInactiveNamespaceCheckEverySeconds > 0 )
470473
{
471-
log.info( String.format( "scheduling removal of expired namespace counters to run every %s sec",
472-
runInactiveNamespaceCheckEverySeconds ) );
474+
log.info("scheduling removal of expired namespace counters to run every {} sec", runInactiveNamespaceCheckEverySeconds);
473475
s.scheduleWithFixedDelay( ns::removeInactive, 120, runInactiveNamespaceCheckEverySeconds,
474476
TimeUnit.SECONDS );
475477
}
@@ -506,16 +508,14 @@ Void checkExpiredNamespaces( ScheduledExecutorService s, NamespaceCounter ns )
506508
NettyChannel lineProtocolChannel( NettyServer netty, InputQueue r )
507509
{
508510
lineProtocolTcpPort = ( lineProtocolTcpPort == -1 ) ? jettyPort + 2 : lineProtocolTcpPort;
509-
return netty.bind( lineProtocolTcpHost, lineProtocolTcpPort, new ChannelInitializer<SocketChannel>()
510-
{
511-
@Override public void initChannel(@SuppressWarnings("NullableProblems") SocketChannel ch )
512-
{
513-
if ( log.isDebugEnabled() )
514-
{
515-
log.debug( "accepted TCP line protocol from " + ch );
511+
return netty.bind( lineProtocolTcpHost, lineProtocolTcpPort, new ChannelInitializer<>() {
512+
@Override
513+
public void initChannel(SocketChannel ch) {
514+
if (log.isDebugEnabled()) {
515+
log.debug("accepted TCP line protocol from {}", ch);
516516
}
517-
ch.pipeline().addLast( new DelimiterBasedFrameDecoder( tcpBuff, Delimiters.lineDelimiter() ),
518-
new LineProtocolHandler( metricRegistry, r ) );
517+
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(tcpBuff, Delimiters.lineDelimiter()),
518+
new LineProtocolHandler(metricRegistry, r));
519519
}
520520
} );
521521
}
@@ -536,7 +536,7 @@ NettyChannel udpLineProtocolChannel( NettyServer netty, InputQueue r )
536536
NettyServer.udpMsgsReceived.mark();
537537
if ( log.isDebugEnabled() )
538538
{
539-
log.debug( "accepted UDP line protocol from " + ctx.channel() );
539+
log.debug("accepted UDP line protocol from {}", ctx.channel());
540540
}
541541
lp.process( msg.content() );
542542
}
@@ -551,13 +551,13 @@ NettyChannel udpLineProtocolChannel( NettyServer netty, InputQueue r )
551551
@ConditionalOnProperty(name = "carbonj.relay", havingValue = "true", matchIfMissing = true)
552552
NettyChannel pickleProtocolChannel( NettyServer netty, InputQueue r )
553553
{
554-
return netty.bind( "0.0.0.0", jettyPort + 3, new ChannelInitializer<SocketChannel>()
554+
return netty.bind( "0.0.0.0", jettyPort + 3, new ChannelInitializer<>()
555555
{
556-
@Override public void initChannel(@SuppressWarnings("NullableProblems") SocketChannel ch )
556+
@Override public void initChannel(SocketChannel ch )
557557
{
558558
if ( log.isDebugEnabled() )
559559
{
560-
log.debug( "accepted pickle protocol from " + ch );
560+
log.debug("accepted pickle protocol from {}", ch);
561561
}
562562

563563
ch.pipeline().addLast( new LengthFieldBasedFrameDecoder( pickleBuff, 0, 4 ) )
@@ -625,7 +625,7 @@ class Store:
625625
{
626626
for ( String profileName : environment.getActiveProfiles() )
627627
{
628-
log.warn( "Currently active profile: " + profileName );
628+
log.warn("Currently active profile: {}", profileName);
629629
}
630630
}
631631

carbonj.service/src/test/java/com/demandware/carbonj/service/engine/TestConsumers.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public void testConsumers() throws Exception {
3838
1, checkPointDir, 60, 60, "recoveryProvider", 1, 1, true);
3939
FileCheckPointMgr checkPointMgr = new FileCheckPointMgr(checkPointDir, 5);
4040
Consumers consumers = new Consumers(metricRegistry, new PointProcessorMock(), new PointProcessorMock(),
41-
rulesFile, kinesisConfig, checkPointMgr, Region.US_EAST_1.id(), new NamespaceCounter(metricRegistry, 60), new File("/tmp/sync"));
41+
rulesFile, kinesisConfig, checkPointMgr, Region.US_EAST_1.id(), new NamespaceCounter(metricRegistry, 60), new File("/tmp/sync"), "test");
4242
new KinesisRecordProcessorFactory(metricRegistry, new PointProcessorMock(), kinesisConfig, "test-stream", checkPointMgr);
4343
consumers.dumpStats();
4444
consumers.syncNamespaces();
@@ -54,7 +54,7 @@ public void testConsumersNoAggregation() throws Exception {
5454
1, checkPointDir, 60, 60, "recoveryProvider", 1, 1, false);
5555
FileCheckPointMgr checkPointMgr = new FileCheckPointMgr(checkPointDir, 5);
5656
Consumers consumers = new Consumers(metricRegistry, new PointProcessorMock(), new PointProcessorMock(),
57-
rulesFile, kinesisConfig, checkPointMgr, Region.US_EAST_1.id(), new NamespaceCounter(metricRegistry, 60), new File("/tmp/sync"));
57+
rulesFile, kinesisConfig, checkPointMgr, Region.US_EAST_1.id(), new NamespaceCounter(metricRegistry, 60), new File("/tmp/sync"), "test");
5858
new KinesisRecordProcessorFactory(metricRegistry, new PointProcessorMock(), kinesisConfig, "test-stream", checkPointMgr);
5959
consumers.dumpStats();
6060
consumers.syncNamespaces();

0 commit comments

Comments
 (0)