45
45
import javax .inject .Inject ;
46
46
47
47
import java .util .ArrayList ;
48
- import java .util .Arrays ;
49
48
import java .util .Collections ;
50
49
import java .util .HashMap ;
51
50
import java .util .List ;
52
51
import java .util .Map ;
53
52
import java .util .Properties ;
54
53
import java .util .concurrent .Future ;
55
- import java .util .stream .Stream ;
56
54
57
55
import static org .apache .atlas .security .SecurityProperties .TLS_ENABLED ;
58
56
import static org .apache .atlas .security .SecurityProperties .TRUSTSTORE_PASSWORD_KEY ;
@@ -83,7 +81,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
83
81
private static final boolean SORT_NOT_NEEDED = false ;
84
82
85
83
private static final Map <NotificationType , String > PRODUCER_TOPIC_MAP = new HashMap <>();
86
- private static Map <NotificationType , String [] > CONSUMER_TOPICS_MAP = new HashMap <>();
84
+ private static final Map <NotificationType , List < String > > CONSUMER_TOPICS_MAP = new HashMap <>();
87
85
88
86
private final Properties properties ;
89
87
private final Long pollTimeOutMs ;
@@ -154,7 +152,7 @@ protected KafkaNotification(Properties properties) {
154
152
LOG .info ("<== KafkaNotification()" );
155
153
}
156
154
157
- public static String [] trimAndPurge (String [] strings ) {
155
+ public static List < String > trimAndPurge (String [] strings ) {
158
156
List <String > ret = new ArrayList <>();
159
157
160
158
if (strings != null ) {
@@ -167,7 +165,7 @@ public static String[] trimAndPurge(String[] strings) {
167
165
}
168
166
}
169
167
170
- return ret . toArray ( new String [ ret . size ()]) ;
168
+ return ret ;
171
169
}
172
170
173
171
@ Override
@@ -188,10 +186,7 @@ public void stop() {
188
186
189
187
@ Override
190
188
public <T > List <NotificationConsumer <T >> createConsumers (NotificationType notificationType , int numConsumers ) {
191
- boolean enableAutoCommit = Boolean .parseBoolean (properties .getProperty ("enable.auto.commit" , properties .getProperty ("auto.commit.enable" , "false" )));
192
- if (notificationType .equals (NotificationType .ASYNC_IMPORT )) {
193
- enableAutoCommit = true ;
194
- }
189
+ boolean enableAutoCommit = notificationType .equals (NotificationType .ASYNC_IMPORT ) || Boolean .parseBoolean (properties .getProperty ("enable.auto.commit" , properties .getProperty ("auto.commit.enable" , "false" )));
195
190
return createConsumers (notificationType , numConsumers , enableAutoCommit );
196
191
}
197
192
@@ -215,13 +210,18 @@ public void close() {
215
210
}
216
211
217
212
@ Override
218
- public void closeConsumer (NotificationType notificationType ) {
219
- List <KafkaConsumer > notificationConsumers = this .consumers .get (notificationType );
220
- for (final KafkaConsumer consumer : notificationConsumers ) {
221
- consumer .unsubscribe ();
222
- consumer .close ();
223
- }
224
- this .consumers .remove (notificationType );
213
+ public void closeConsumer (NotificationType notificationTypeToClose , String topic ) {
214
+ this .consumers .computeIfPresent (notificationTypeToClose , (notificationType , notificationConsumers ) -> {
215
+ notificationConsumers .removeIf (consumer -> {
216
+ if (consumer .subscription ().contains (topic )) {
217
+ consumer .unsubscribe ();
218
+ consumer .close ();
219
+ return true ;
220
+ }
221
+ return false ;
222
+ });
223
+ return notificationConsumers .isEmpty () ? null : notificationConsumers ;
224
+ });
225
225
}
226
226
227
227
// ----- NotificationInterface -------------------------------------------
@@ -243,16 +243,16 @@ public boolean isReady(NotificationType notificationType) {
243
243
public <T > List <NotificationConsumer <T >> createConsumers (NotificationType notificationType , int numConsumers , boolean autoCommitEnabled ) {
244
244
LOG .info ("==> KafkaNotification.createConsumers(notificationType={}, numConsumers={}, autoCommitEnabled={})" , notificationType , numConsumers , autoCommitEnabled );
245
245
246
- String [] topics = CONSUMER_TOPICS_MAP .get (notificationType );
246
+ List < String > topics = CONSUMER_TOPICS_MAP .get (notificationType );
247
247
248
- if (numConsumers < topics .length ) {
249
- LOG .warn ("consumers count {} is fewer than number of topics {}. Creating {} consumers, so that consumer count is equal to number of topics." , numConsumers , topics .length , topics .length );
248
+ if (numConsumers < topics .size () ) {
249
+ LOG .warn ("consumers count {} is fewer than number of topics {}. Creating {} consumers, so that consumer count is equal to number of topics." , numConsumers , topics .size () , topics .size () );
250
250
251
- numConsumers = topics .length ;
252
- } else if (numConsumers > topics .length ) {
253
- LOG .warn ("consumers count {} is higher than number of topics {}. Creating {} consumers, so that consumer count is equal to number of topics" , numConsumers , topics .length , topics .length );
251
+ numConsumers = topics .size () ;
252
+ } else if (numConsumers > topics .size () ) {
253
+ LOG .warn ("consumers count {} is higher than number of topics {}. Creating {} consumers, so that consumer count is equal to number of topics" , numConsumers , topics .size () , topics .size () );
254
254
255
- numConsumers = topics .length ;
255
+ numConsumers = topics .size () ;
256
256
}
257
257
258
258
List <KafkaConsumer > notificationConsumers = this .consumers .get (notificationType );
@@ -318,11 +318,7 @@ public Properties getConsumerProperties(NotificationType notificationType) {
318
318
String groupId = properties .getProperty (notificationType .toString ().toLowerCase () + "." + CONSUMER_GROUP_ID_PROPERTY );
319
319
320
320
if (StringUtils .isEmpty (groupId )) {
321
- if (!notificationType .equals (NotificationType .ASYNC_IMPORT )) {
322
- groupId = "atlas" ;
323
- } else {
324
- groupId = "atlas-import" ;
325
- }
321
+ groupId = notificationType .equals (NotificationType .ASYNC_IMPORT ) ? "atlas-import" : "atlas" ;
326
322
}
327
323
328
324
if (StringUtils .isEmpty (groupId )) {
@@ -343,8 +339,8 @@ public KafkaConsumer getOrCreateKafkaConsumer(KafkaConsumer existingConsumer, Pr
343
339
344
340
try {
345
341
if (ret == null || !isKafkaConsumerOpen (ret )) {
346
- String [] topics = CONSUMER_TOPICS_MAP .get (notificationType );
347
- String topic = topics [ idxConsumer % topics .length ] ;
342
+ List < String > topics = CONSUMER_TOPICS_MAP .get (notificationType );
343
+ String topic = topics . get ( idxConsumer % topics .size ()) ;
348
344
349
345
LOG .debug ("Creating new KafkaConsumer for topic : {}, index : {}" , topic , idxConsumer );
350
346
@@ -452,39 +448,29 @@ private KafkaProducer getOrCreateProducerByCriteria(Object producerCriteria, Map
452
448
453
449
@ Override
454
450
public void addTopicToNotificationType (NotificationType notificationType , String topic ) {
455
- String [] topics = CONSUMER_TOPICS_MAP .get (notificationType );
456
- String [] updatedTopics ;
457
- if (topics == null ) {
458
- updatedTopics = new String [] {topic };
459
- } else {
460
- updatedTopics = Stream .concat (Arrays .stream (topics ), Stream .of (topic )).toArray (String []::new );
461
- }
462
- CONSUMER_TOPICS_MAP .put (notificationType , updatedTopics );
451
+ CONSUMER_TOPICS_MAP .computeIfAbsent (notificationType , k -> new ArrayList <>()).add (topic );
463
452
}
464
453
465
454
@ Override
466
455
public void closeProducer (NotificationType notificationType , String topic ) {
467
- KafkaProducer producerToClose = producersByTopic .get (topic );
468
- if (producerToClose != null ) {
469
- producersByTopic .remove (topic );
470
- producerToClose .close ();
471
- }
456
+ producersByTopic .computeIfPresent (topic , (key , producer ) -> {
457
+ // Close the KafkaProducer before removal
458
+ producer .close ();
459
+ // Returning null removes the key from the map
460
+ return null ;
461
+ });
472
462
PRODUCER_TOPIC_MAP .remove (notificationType , topic );
473
463
}
474
464
475
465
@ Override
476
- public void deleteTopics (NotificationType notificationType , String topicName ) {
466
+ public void deleteTopic (NotificationType notificationType , String topicName ) {
477
467
try (AdminClient adminClient = AdminClient .create (this .properties )) {
478
468
adminClient .deleteTopics (Collections .singleton (topicName ));
479
469
}
480
- String [] topics = CONSUMER_TOPICS_MAP .get (notificationType );
481
- String [] updatedTopics ;
482
- if (topics == null ) {
483
- updatedTopics = new String [] {};
484
- } else {
485
- updatedTopics = Arrays .stream (topics ).filter (topic -> !topic .equals (topicName )).toArray (String []::new );
486
- }
487
- CONSUMER_TOPICS_MAP .put (notificationType , updatedTopics );
470
+ CONSUMER_TOPICS_MAP .computeIfPresent (notificationType , (key , topics ) -> {
471
+ topics .remove (topicName );
472
+ return topics .isEmpty () ? null : topics ;
473
+ });
488
474
}
489
475
490
476
// kafka-client doesn't have method to check if consumer is open, hence checking list topics and catching exception
0 commit comments