Skip to content

Commit 8c56a08

Browse files
Chandra Kanth PeravelliChandra Kanth Peravelli
Chandra Kanth Peravelli
authored and
Chandra Kanth Peravelli
committed
ATLAS-4922: Atlas Async Import using Kafka [4] - Addressing few of the PR comments
1 parent d678555 commit 8c56a08

File tree

16 files changed

+106
-160
lines changed

16 files changed

+106
-160
lines changed

intg/src/main/java/org/apache/atlas/AtlasErrorCode.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ public enum AtlasErrorCode {
205205
FILE_NAME_NOT_FOUND(404, "ATLAS-404-00-014", "File name should not be blank"),
206206
NO_TYPE_NAME_ON_VERTEX(404, "ATLAS-404-00-015", "No typename found for given entity with guid: {0}"),
207207
NO_LINEAGE_CONSTRAINTS_FOR_GUID(404, "ATLAS-404-00-016", "No lineage constraints found for requested entity with guid : {0}"),
208-
IMPORT_NOT_FOUND(404, "ATLAS-404-00-017", "Give import request {0} is not found"),
208+
IMPORT_NOT_FOUND(404, "ATLAS-404-00-017", "Import {0} is not found"),
209209

210210
METHOD_NOT_ALLOWED(405, "ATLAS-405-00-001", "Error 405 - The request method {0} is inappropriate for the URL: {1}"),
211211

@@ -224,7 +224,7 @@ public enum AtlasErrorCode {
224224
GLOSSARY_IMPORT_FAILED(409, "ATLAS-409-00-011", "Glossary import failed"),
225225
METRICSSTAT_ALREADY_EXISTS(409, "ATLAS-409-00-012", "Metric Statistics already collected at {0}"),
226226
PENDING_TASKS_ALREADY_IN_PROGRESS(409, "ATLAS-409-00-013", "There are already {0} pending tasks in queue"),
227-
IMPORT_ALREADY_IN_PROGRESS(409, "ATLAS-409-00-016", "Given import request {0} is already in progress or completed"),
227+
ABORT_IMPORT(409, "ATLAS-409-00-016", "Import id={0} is currently in state {1}, cannot be aborted"),
228228

229229
// All internal errors go here
230230
INTERNAL_ERROR(500, "ATLAS-500-00-001", "Internal server error {0}"),
@@ -252,9 +252,9 @@ public enum AtlasErrorCode {
252252
NOTIFICATION_EXCEPTION(500, "ATLAS-500-00-018", "{0}"),
253253
IMPORT_UPDATE_FAILED(500, "ATLAS-500-00-019", "Failed to update import: {0}"),
254254
IMPORT_REGISTRATION_FAILED(500, "ATLAS-500-00-020", "Failed to register import request"),
255-
IMPORT_FAILED(500, "ATLAS-500-00-021", "Given import {0} failed"),
256-
ABORT_IMPORT_FAILED(500, "ATLAS-500-00-022", "Failed to abort given import {0}"),
257-
IMPORT_QUEUEING_FAILED(500, "ATLAS-500-00-023", "Failed to add given import {0} to request queue, please try again later");
255+
IMPORT_FAILED(500, "ATLAS-500-00-021", "Import with id={0} failed"),
256+
ABORT_IMPORT_FAILED(500, "ATLAS-500-00-022", "Failed to abort import id={0}"),
257+
IMPORT_QUEUEING_FAILED(500, "ATLAS-500-00-023", "Failed to add import id={0} to request queue, please try again later");
258258

259259
private static final Logger LOG = LoggerFactory.getLogger(AtlasErrorCode.class);
260260
private final String errorCode;

intg/src/main/java/org/apache/atlas/model/impexp/AtlasAsyncImportRequest.java

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import com.fasterxml.jackson.annotation.JsonIgnore;
2222
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
2323
import com.fasterxml.jackson.annotation.JsonInclude;
24-
import com.fasterxml.jackson.annotation.JsonProperty;
25-
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
2624
import org.apache.atlas.model.AtlasBaseModelObject;
2725
import org.apache.atlas.utils.AtlasEntityUtil;
2826

@@ -42,7 +40,7 @@
4240
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
4341

4442
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
45-
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
43+
@JsonInclude(JsonInclude.Include.NON_NULL)
4644
@JsonIgnoreProperties(ignoreUnknown = true)
4745
public class AtlasAsyncImportRequest extends AtlasBaseModelObject implements Serializable {
4846
private static final long serialVersionUID = 1L;
@@ -77,35 +75,20 @@ public String toString() {
7775

7876
@JsonIgnore
7977
private String requestId;
80-
81-
@JsonProperty("importId")
82-
private String importId;
83-
84-
@JsonProperty("status")
85-
private ImportStatus status;
86-
8778
@JsonIgnore
8879
private int skipTo;
8980

90-
@JsonInclude(JsonInclude.Include.NON_NULL)
91-
@JsonProperty("atlasImportResult")
92-
private AtlasImportResult atlasImportResult;
93-
94-
@JsonProperty("importDetails")
81+
private String importId;
82+
private ImportStatus status;
9583
private ImportDetails importDetails = new ImportDetails();
96-
97-
@JsonProperty("receivedAt")
9884
private long receivedAt;
99-
100-
@JsonProperty("stagedAt")
10185
private long stagedAt;
102-
103-
@JsonProperty("startedProcessingAt")
10486
private long startedProcessingAt;
105-
106-
@JsonProperty("completedAt")
10787
private long completedAt;
10888

89+
@JsonInclude(JsonInclude.Include.NON_NULL)
90+
private AtlasImportResult atlasImportResult;
91+
10992
public AtlasAsyncImportRequest() {}
11093

11194
public AtlasAsyncImportRequest(String guid) {

intg/src/main/java/org/apache/atlas/model/notification/ImportNotification.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
import com.fasterxml.jackson.annotation.JsonAutoDetect;
2121
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
22+
import com.fasterxml.jackson.annotation.JsonInclude;
2223
import com.fasterxml.jackson.annotation.JsonProperty;
23-
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
2424
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
2525
import org.apache.atlas.model.typedef.AtlasTypesDef;
2626

@@ -37,7 +37,7 @@
3737
* Class representing atlas import notification, extending HookNotification.
3838
*/
3939
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
40-
@JsonSerialize(include = JsonSerialize.Inclusion.ALWAYS)
40+
@JsonInclude
4141
@JsonIgnoreProperties(ignoreUnknown = true)
4242
@XmlRootElement
4343
@XmlAccessorType(XmlAccessType.PROPERTY)
@@ -73,7 +73,7 @@ public StringBuilder toString(StringBuilder sb) {
7373
* Notification for type definitions import
7474
*/
7575
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
76-
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
76+
@JsonInclude(JsonInclude.Include.NON_NULL)
7777
@JsonIgnoreProperties(ignoreUnknown = true)
7878
@XmlRootElement
7979
@XmlAccessorType(XmlAccessType.PROPERTY)
@@ -113,7 +113,7 @@ public String toString() {
113113
* Notification for entities import
114114
*/
115115
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
116-
@JsonSerialize(include = JsonSerialize.Inclusion.ALWAYS)
116+
@JsonInclude
117117
@JsonIgnoreProperties(ignoreUnknown = true)
118118
@XmlRootElement
119119
@XmlAccessorType(XmlAccessType.PROPERTY)

notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,20 +61,12 @@ public AtlasKafkaConsumer(AtlasNotificationMessageDeserializer<T> deserializer,
6161

6262
@Override
6363
public Set<TopicPartition> getTopicPartition() {
64-
Set<TopicPartition> ret = null;
65-
if (kafkaConsumer != null) {
66-
ret = kafkaConsumer.assignment();
67-
}
68-
return ret;
64+
return kafkaConsumer != null ? kafkaConsumer.assignment() : null;
6965
}
7066

7167
@Override
7268
public Set<String> subscription() {
73-
Set<String> ret = null;
74-
if (kafkaConsumer != null) {
75-
ret = kafkaConsumer.subscription();
76-
}
77-
return ret;
69+
return kafkaConsumer != null ? kafkaConsumer.subscription() : null;
7870
}
7971

8072
@Override

notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java

Lines changed: 38 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,12 @@
4545
import javax.inject.Inject;
4646

4747
import java.util.ArrayList;
48-
import java.util.Arrays;
4948
import java.util.Collections;
5049
import java.util.HashMap;
5150
import java.util.List;
5251
import java.util.Map;
5352
import java.util.Properties;
5453
import java.util.concurrent.Future;
55-
import java.util.stream.Stream;
5654

5755
import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED;
5856
import static org.apache.atlas.security.SecurityProperties.TRUSTSTORE_PASSWORD_KEY;
@@ -83,7 +81,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
8381
private static final boolean SORT_NOT_NEEDED = false;
8482

8583
private static final Map<NotificationType, String> PRODUCER_TOPIC_MAP = new HashMap<>();
86-
private static Map<NotificationType, String[]> CONSUMER_TOPICS_MAP = new HashMap<>();
84+
private static final Map<NotificationType, List<String>> CONSUMER_TOPICS_MAP = new HashMap<>();
8785

8886
private final Properties properties;
8987
private final Long pollTimeOutMs;
@@ -154,7 +152,7 @@ protected KafkaNotification(Properties properties) {
154152
LOG.info("<== KafkaNotification()");
155153
}
156154

157-
public static String[] trimAndPurge(String[] strings) {
155+
public static List<String> trimAndPurge(String[] strings) {
158156
List<String> ret = new ArrayList<>();
159157

160158
if (strings != null) {
@@ -167,7 +165,7 @@ public static String[] trimAndPurge(String[] strings) {
167165
}
168166
}
169167

170-
return ret.toArray(new String[ret.size()]);
168+
return ret;
171169
}
172170

173171
@Override
@@ -188,10 +186,7 @@ public void stop() {
188186

189187
@Override
190188
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) {
191-
boolean enableAutoCommit = Boolean.parseBoolean(properties.getProperty("enable.auto.commit", properties.getProperty("auto.commit.enable", "false")));
192-
if (notificationType.equals(NotificationType.ASYNC_IMPORT)) {
193-
enableAutoCommit = true;
194-
}
189+
boolean enableAutoCommit = notificationType.equals(NotificationType.ASYNC_IMPORT) || Boolean.parseBoolean(properties.getProperty("enable.auto.commit", properties.getProperty("auto.commit.enable", "false")));
195190
return createConsumers(notificationType, numConsumers, enableAutoCommit);
196191
}
197192

@@ -215,13 +210,18 @@ public void close() {
215210
}
216211

217212
@Override
218-
public void closeConsumer(NotificationType notificationType) {
219-
List<KafkaConsumer> notificationConsumers = this.consumers.get(notificationType);
220-
for (final KafkaConsumer consumer : notificationConsumers) {
221-
consumer.unsubscribe();
222-
consumer.close();
223-
}
224-
this.consumers.remove(notificationType);
213+
public void closeConsumer(NotificationType notificationTypeToClose, String topic) {
214+
this.consumers.computeIfPresent(notificationTypeToClose, (notificationType, notificationConsumers) -> {
215+
notificationConsumers.removeIf(consumer -> {
216+
if (consumer.subscription().contains(topic)) {
217+
consumer.unsubscribe();
218+
consumer.close();
219+
return true;
220+
}
221+
return false;
222+
});
223+
return notificationConsumers.isEmpty() ? null : notificationConsumers;
224+
});
225225
}
226226

227227
// ----- NotificationInterface -------------------------------------------
@@ -243,16 +243,16 @@ public boolean isReady(NotificationType notificationType) {
243243
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers, boolean autoCommitEnabled) {
244244
LOG.info("==> KafkaNotification.createConsumers(notificationType={}, numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, autoCommitEnabled);
245245

246-
String[] topics = CONSUMER_TOPICS_MAP.get(notificationType);
246+
List<String> topics = CONSUMER_TOPICS_MAP.get(notificationType);
247247

248-
if (numConsumers < topics.length) {
249-
LOG.warn("consumers count {} is fewer than number of topics {}. Creating {} consumers, so that consumer count is equal to number of topics.", numConsumers, topics.length, topics.length);
248+
if (numConsumers < topics.size()) {
249+
LOG.warn("consumers count {} is fewer than number of topics {}. Creating {} consumers, so that consumer count is equal to number of topics.", numConsumers, topics.size(), topics.size());
250250

251-
numConsumers = topics.length;
252-
} else if (numConsumers > topics.length) {
253-
LOG.warn("consumers count {} is higher than number of topics {}. Creating {} consumers, so that consumer count is equal to number of topics", numConsumers, topics.length, topics.length);
251+
numConsumers = topics.size();
252+
} else if (numConsumers > topics.size()) {
253+
LOG.warn("consumers count {} is higher than number of topics {}. Creating {} consumers, so that consumer count is equal to number of topics", numConsumers, topics.size(), topics.size());
254254

255-
numConsumers = topics.length;
255+
numConsumers = topics.size();
256256
}
257257

258258
List<KafkaConsumer> notificationConsumers = this.consumers.get(notificationType);
@@ -318,11 +318,7 @@ public Properties getConsumerProperties(NotificationType notificationType) {
318318
String groupId = properties.getProperty(notificationType.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
319319

320320
if (StringUtils.isEmpty(groupId)) {
321-
if (!notificationType.equals(NotificationType.ASYNC_IMPORT)) {
322-
groupId = "atlas";
323-
} else {
324-
groupId = "atlas-import";
325-
}
321+
groupId = notificationType.equals(NotificationType.ASYNC_IMPORT) ? "atlas-import" : "atlas";
326322
}
327323

328324
if (StringUtils.isEmpty(groupId)) {
@@ -343,8 +339,8 @@ public KafkaConsumer getOrCreateKafkaConsumer(KafkaConsumer existingConsumer, Pr
343339

344340
try {
345341
if (ret == null || !isKafkaConsumerOpen(ret)) {
346-
String[] topics = CONSUMER_TOPICS_MAP.get(notificationType);
347-
String topic = topics[idxConsumer % topics.length];
342+
List<String> topics = CONSUMER_TOPICS_MAP.get(notificationType);
343+
String topic = topics.get(idxConsumer % topics.size());
348344

349345
LOG.debug("Creating new KafkaConsumer for topic : {}, index : {}", topic, idxConsumer);
350346

@@ -452,39 +448,29 @@ private KafkaProducer getOrCreateProducerByCriteria(Object producerCriteria, Map
452448

453449
@Override
454450
public void addTopicToNotificationType(NotificationType notificationType, String topic) {
455-
String[] topics = CONSUMER_TOPICS_MAP.get(notificationType);
456-
String[] updatedTopics;
457-
if (topics == null) {
458-
updatedTopics = new String[] {topic};
459-
} else {
460-
updatedTopics = Stream.concat(Arrays.stream(topics), Stream.of(topic)).toArray(String[]::new);
461-
}
462-
CONSUMER_TOPICS_MAP.put(notificationType, updatedTopics);
451+
CONSUMER_TOPICS_MAP.computeIfAbsent(notificationType, k -> new ArrayList<>()).add(topic);
463452
}
464453

465454
@Override
466455
public void closeProducer(NotificationType notificationType, String topic) {
467-
KafkaProducer producerToClose = producersByTopic.get(topic);
468-
if (producerToClose != null) {
469-
producersByTopic.remove(topic);
470-
producerToClose.close();
471-
}
456+
producersByTopic.computeIfPresent(topic, (key, producer) -> {
457+
// Close the KafkaProducer before removal
458+
producer.close();
459+
// Returning null removes the key from the map
460+
return null;
461+
});
472462
PRODUCER_TOPIC_MAP.remove(notificationType, topic);
473463
}
474464

475465
@Override
476-
public void deleteTopics(NotificationType notificationType, String topicName) {
466+
public void deleteTopic(NotificationType notificationType, String topicName) {
477467
try (AdminClient adminClient = AdminClient.create(this.properties)) {
478468
adminClient.deleteTopics(Collections.singleton(topicName));
479469
}
480-
String[] topics = CONSUMER_TOPICS_MAP.get(notificationType);
481-
String[] updatedTopics;
482-
if (topics == null) {
483-
updatedTopics = new String[] {};
484-
} else {
485-
updatedTopics = Arrays.stream(topics).filter(topic -> !topic.equals(topicName)).toArray(String[]::new);
486-
}
487-
CONSUMER_TOPICS_MAP.put(notificationType, updatedTopics);
470+
CONSUMER_TOPICS_MAP.computeIfPresent(notificationType, (key, topics) -> {
471+
topics.remove(topicName);
472+
return topics.isEmpty() ? null : topics;
473+
});
488474
}
489475

490476
// kafka-client doesn't have method to check if consumer is open, hence checking list topics and catching exception

notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,8 @@ public <T> void send(NotificationType type, List<T> messages, MessageSource sour
237237
@Override
238238
public <T> void send(String topic, List<T> messages, MessageSource source) throws NotificationException {
239239
List<String> strMessages = new ArrayList<>(messages.size());
240-
for (int index = 0; index < messages.size(); index++) {
241-
createNotificationMessages(messages.get(index), strMessages, source);
240+
for (T message : messages) {
241+
createNotificationMessages(message, strMessages, source);
242242
}
243243
sendInternal(topic, strMessages);
244244
}

notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,14 +131,16 @@ default void closeProducer(NotificationType notificationType, String topic) {}
131131
* @param notificationType The type of notification related to the topic.
132132
* @param topicName The name of the topic to be deleted.
133133
*/
134-
default void deleteTopics(NotificationType notificationType, String topicName) {}
134+
default void deleteTopic(NotificationType notificationType, String topicName) {}
135135

136136
/**
137137
* Closes the consumer associated with the specified notification type.
138138
*
139139
* @param notificationType The type of notification for which the consumer is to be closed.
140+
* @param topic The consumer to close with assignment.
141+
*
140142
*/
141-
default void closeConsumer(NotificationType notificationType) {}
143+
default void closeConsumer(NotificationType notificationType, String topic) {}
142144

143145
/**
144146
* Atlas notification types.
@@ -150,11 +152,11 @@ enum NotificationType {
150152
// Notifications from the Atlas integration hooks - unsorted.
151153
HOOK_UNSORTED(new HookMessageDeserializer()),
152154

153-
// Notifications from Atlas async importer
154-
ASYNC_IMPORT(new HookMessageDeserializer()),
155-
156155
// Notifications to entity change consumers.
157-
ENTITIES(new EntityMessageDeserializer());
156+
ENTITIES(new EntityMessageDeserializer()),
157+
158+
// Notifications from Atlas async importer
159+
ASYNC_IMPORT(new HookMessageDeserializer());
158160

159161
private final AtlasNotificationMessageDeserializer deserializer;
160162

notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class KafkaConsumerTest {
5858
private static final String TRAIT_NAME = "MyTrait";
5959

6060
private static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
61-
private static final String[] ATLAS_HOOK_CONSUMER_TOPICS = KafkaNotification.trimAndPurge(AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC));
61+
private static final List<String> ATLAS_HOOK_CONSUMER_TOPICS = KafkaNotification.trimAndPurge(AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC));
6262

6363
@Mock
6464
private KafkaConsumer<String, String> kafkaConsumer;

0 commit comments

Comments
 (0)