Skip to content

Commit d678555

Browse files
committed
ATLAS-4922: Atlas Async Import using Kafka [4] - Unit Tests
1 parent 16b5449 commit d678555

File tree

10 files changed

+1716
-16
lines changed

10 files changed

+1716
-16
lines changed

notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Collections;
3434
import java.util.List;
3535
import java.util.Map;
36+
import java.util.Set;
3637

3738
/**
3839
* Kafka specific notification consumer.
@@ -58,6 +59,24 @@ public AtlasKafkaConsumer(AtlasNotificationMessageDeserializer<T> deserializer,
5859
this.pollTimeoutMilliSeconds = pollTimeoutMilliSeconds;
5960
}
6061

62+
@Override
63+
public Set<TopicPartition> getTopicPartition() {
64+
Set<TopicPartition> ret = null;
65+
if (kafkaConsumer != null) {
66+
ret = kafkaConsumer.assignment();
67+
}
68+
return ret;
69+
}
70+
71+
@Override
72+
public Set<String> subscription() {
73+
Set<String> ret = null;
74+
if (kafkaConsumer != null) {
75+
ret = kafkaConsumer.subscription();
76+
}
77+
return ret;
78+
}
79+
6180
@Override
6281
public void commit(TopicPartition partition, long offset) {
6382
if (!autoCommitEnabled) {

notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.util.List;
2424
import java.util.Map;
25+
import java.util.Set;
2526

2627
/**
2728
* Atlas notification consumer. This consumer blocks until a notification can be read.
@@ -68,4 +69,8 @@ public interface NotificationConsumer<T> {
6869
* @return List containing kafka message and partitionId and offset.
6970
*/
7071
List<AtlasKafkaMessage<T>> receiveRawRecordsWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset);
72+
73+
Set<TopicPartition> getTopicPartition();
74+
75+
Set<String> subscription();
7176
}

notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@
2929
import org.testng.annotations.Test;
3030

3131
import java.util.ArrayList;
32+
import java.util.Collections;
3233
import java.util.LinkedList;
3334
import java.util.List;
3435
import java.util.Map;
3536
import java.util.Objects;
37+
import java.util.Set;
3638

3739
import static org.mockito.Mockito.mock;
3840
import static org.testng.Assert.assertEquals;
@@ -184,6 +186,16 @@ public void close() {
184186
public void wakeup() {
185187
}
186188

189+
@Override
190+
public Set<TopicPartition> getTopicPartition() {
191+
return Collections.emptySet();
192+
}
193+
194+
@Override
195+
public Set<String> subscription() {
196+
return Collections.emptySet();
197+
}
198+
187199
@Override
188200
public List<AtlasKafkaMessage<TestMessage>> receive() {
189201
return receive(1000L);

repository/src/test/java/org/apache/atlas/TestModules.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.atlas.repository.ogm.glossary.AtlasGlossaryCategoryDTO;
5454
import org.apache.atlas.repository.ogm.glossary.AtlasGlossaryDTO;
5555
import org.apache.atlas.repository.ogm.glossary.AtlasGlossaryTermDTO;
56+
import org.apache.atlas.repository.ogm.impexp.AtlasAsyncImportRequestDTO;
5657
import org.apache.atlas.repository.ogm.metrics.AtlasMetricsStatDTO;
5758
import org.apache.atlas.repository.ogm.profiles.AtlasSavedSearchDTO;
5859
import org.apache.atlas.repository.ogm.profiles.AtlasUserProfileDTO;
@@ -66,6 +67,7 @@
6667
import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
6768
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
6869
import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier;
70+
import org.apache.atlas.repository.store.graph.v2.asyncimport.ImportTaskListener;
6971
import org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory;
7072
import org.apache.atlas.runner.LocalSolrRunner;
7173
import org.apache.atlas.service.Service;
@@ -131,6 +133,7 @@ protected void configure() {
131133
bind(ExportService.class).asEagerSingleton();
132134

133135
bind(SearchTracker.class).asEagerSingleton();
136+
bind(ImportTaskListener.class).toInstance(Mockito.mock(ImportTaskListener.class));
134137

135138
bind(AtlasEntityStore.class).to(AtlasEntityStoreV2.class);
136139
bind(AtlasRelationshipStore.class).to(AtlasRelationshipStoreV2.class);
@@ -164,6 +167,7 @@ protected void configure() {
164167
availableDTOs.addBinding().to(ExportImportAuditEntryDTO.class);
165168
availableDTOs.addBinding().to(AtlasAuditEntryDTO.class);
166169
availableDTOs.addBinding().to(AtlasMetricsStatDTO.class);
170+
availableDTOs.addBinding().to(AtlasAsyncImportRequestDTO.class);
167171

168172
bind(DTORegistry.class).asEagerSingleton();
169173
bind(DataAccess.class).asEagerSingleton();
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.atlas.repository.impexp;
19+
20+
import org.apache.atlas.SortOrder;
21+
import org.apache.atlas.exception.AtlasBaseException;
22+
import org.apache.atlas.model.impexp.AtlasAsyncImportRequest;
23+
import org.apache.atlas.model.impexp.AtlasImportResult;
24+
import org.apache.atlas.repository.ogm.DataAccess;
25+
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
26+
import org.mockito.Mock;
27+
import org.mockito.MockedStatic;
28+
import org.mockito.Mockito;
29+
import org.mockito.MockitoAnnotations;
30+
import org.testng.annotations.AfterMethod;
31+
import org.testng.annotations.BeforeMethod;
32+
import org.testng.annotations.Test;
33+
34+
import java.util.Arrays;
35+
import java.util.Collections;
36+
import java.util.HashMap;
37+
import java.util.List;
38+
import java.util.Map;
39+
40+
import static org.mockito.ArgumentMatchers.anyString;
41+
import static org.mockito.Mockito.any;
42+
import static org.mockito.Mockito.anyList;
43+
import static org.mockito.Mockito.anyListOf;
44+
import static org.mockito.Mockito.doReturn;
45+
import static org.mockito.Mockito.doThrow;
46+
import static org.mockito.Mockito.mock;
47+
import static org.mockito.Mockito.mockStatic;
48+
import static org.mockito.Mockito.spy;
49+
import static org.mockito.Mockito.times;
50+
import static org.mockito.Mockito.verify;
51+
import static org.mockito.Mockito.when;
52+
import static org.testng.Assert.assertEquals;
53+
import static org.testng.Assert.assertNotNull;
54+
import static org.testng.Assert.assertNull;
55+
import static org.testng.Assert.assertTrue;
56+
57+
public class AsyncImportServiceTest {
58+
private DataAccess dataAccess;
59+
private AsyncImportService asyncImportService;
60+
@Mock
61+
private AtlasGraphUtilsV2 atlasGraphUtilsV2;
62+
63+
@BeforeMethod
64+
public void setup() {
65+
MockitoAnnotations.openMocks(this);
66+
dataAccess = mock(DataAccess.class);
67+
asyncImportService = new AsyncImportService(dataAccess);
68+
}
69+
70+
@Test
71+
public void testFetchImportRequestByImportId() throws Exception {
72+
String importId = "import123";
73+
AtlasAsyncImportRequest mockRequest = new AtlasAsyncImportRequest();
74+
mockRequest.setImportId(importId);
75+
76+
when(dataAccess.load(any(AtlasAsyncImportRequest.class))).thenReturn(mockRequest);
77+
78+
AtlasAsyncImportRequest result = asyncImportService.fetchImportRequestByImportId(importId);
79+
80+
assertNotNull(result);
81+
assertEquals(result.getImportId(), importId);
82+
verify(dataAccess, times(1)).load(any(AtlasAsyncImportRequest.class));
83+
}
84+
85+
@Test
86+
public void testFetchImportRequestByImportIdError() throws AtlasBaseException {
87+
String importId = "import123";
88+
89+
when(dataAccess.load(any(AtlasAsyncImportRequest.class))).thenThrow(new RuntimeException("Test Exception"));
90+
91+
AtlasAsyncImportRequest result = asyncImportService.fetchImportRequestByImportId(importId);
92+
93+
assertNull(result);
94+
verify(dataAccess, times(1)).load(any(AtlasAsyncImportRequest.class));
95+
}
96+
97+
@Test
98+
public void testSaveImportRequest() throws AtlasBaseException {
99+
AtlasAsyncImportRequest importRequest = new AtlasAsyncImportRequest();
100+
importRequest.setImportId("import123");
101+
102+
asyncImportService.saveImportRequest(importRequest);
103+
104+
verify(dataAccess, times(1)).save(importRequest);
105+
}
106+
107+
@Test
108+
public void testUpdateImportRequest() throws AtlasBaseException {
109+
AtlasAsyncImportRequest importRequest = new AtlasAsyncImportRequest();
110+
importRequest.setImportId("import123");
111+
112+
doThrow(new AtlasBaseException("Save failed")).when(dataAccess).save(importRequest);
113+
114+
asyncImportService.updateImportRequest(importRequest);
115+
116+
verify(dataAccess, times(1)).save(importRequest);
117+
}
118+
119+
@Test
120+
public void testFetchInProgressImportIds() throws AtlasBaseException {
121+
List<String> guids = Arrays.asList("guid1", "guid2");
122+
AtlasAsyncImportRequest request1 = new AtlasAsyncImportRequest();
123+
request1.setImportId("guid1");
124+
request1.setStatus(AtlasAsyncImportRequest.ImportStatus.PROCESSING);
125+
126+
AtlasAsyncImportRequest request2 = new AtlasAsyncImportRequest();
127+
request2.setImportId("guid2");
128+
request2.setStatus(AtlasAsyncImportRequest.ImportStatus.SUCCESSFUL);
129+
130+
try (MockedStatic<AtlasGraphUtilsV2> mockedStatic = mockStatic(AtlasGraphUtilsV2.class)) {
131+
mockedStatic.when(() -> AtlasGraphUtilsV2.findEntityGUIDsByType(anyString(), any(SortOrder.class)))
132+
.thenReturn(guids);
133+
134+
when(dataAccess.load(anyListOf(AtlasAsyncImportRequest.class))).thenReturn(Arrays.asList(request1, request2));
135+
136+
List<String> result = asyncImportService.fetchInProgressImportIds();
137+
138+
assertEquals(result.size(), 1);
139+
assertTrue(result.contains("guid1"));
140+
141+
mockedStatic.verify(() -> AtlasGraphUtilsV2.findEntityGUIDsByType(anyString(), any(SortOrder.class)));
142+
verify(dataAccess, times(1)).load(anyListOf(AtlasAsyncImportRequest.class));
143+
}
144+
}
145+
146+
@Test
147+
public void testFetchQueuedImportRequests() throws AtlasBaseException {
148+
List<String> guids = Arrays.asList("guid1", "guid2");
149+
AtlasAsyncImportRequest request1 = new AtlasAsyncImportRequest();
150+
request1.setImportId("guid1");
151+
request1.setStatus(AtlasAsyncImportRequest.ImportStatus.WAITING);
152+
153+
AtlasAsyncImportRequest request2 = new AtlasAsyncImportRequest();
154+
request2.setImportId("guid2");
155+
request2.setStatus(AtlasAsyncImportRequest.ImportStatus.PROCESSING);
156+
157+
try (MockedStatic<AtlasGraphUtilsV2> mockStatic = mockStatic(AtlasGraphUtilsV2.class)) {
158+
mockStatic.when(() -> AtlasGraphUtilsV2.findEntityGUIDsByType(anyString(), any(SortOrder.class)))
159+
.thenReturn(guids);
160+
161+
when(dataAccess.load(anyList())).thenReturn(Arrays.asList(request1, request2));
162+
163+
List<String> result = asyncImportService.fetchQueuedImportRequests();
164+
165+
assertEquals(result.size(), 1);
166+
assertTrue(result.contains("guid1"));
167+
verify(dataAccess, times(1)).load(anyList());
168+
}
169+
}
170+
171+
@Test
172+
public void testDeleteRequests() throws AtlasBaseException {
173+
List<String> guids = Arrays.asList("guid1", "guid2");
174+
175+
try (MockedStatic<AtlasGraphUtilsV2> mockStatic = mockStatic(AtlasGraphUtilsV2.class)) {
176+
mockStatic.when(() -> AtlasGraphUtilsV2.findEntityGUIDsByType(anyString(), any()))
177+
.thenReturn(guids);
178+
179+
asyncImportService.deleteRequests();
180+
181+
verify(dataAccess, times(1)).delete(guids);
182+
}
183+
}
184+
185+
@Test
186+
public void testGetAllImports() throws AtlasBaseException {
187+
List<String> guids = Arrays.asList("guid1", "guid2");
188+
189+
AtlasAsyncImportRequest request1 = spy(new AtlasAsyncImportRequest());
190+
request1.setImportId("guid1");
191+
request1.setStatus(AtlasAsyncImportRequest.ImportStatus.PROCESSING);
192+
193+
AtlasImportResult mockImportResult = mock(AtlasImportResult.class);
194+
doReturn("admin").when(mockImportResult).getUserName();
195+
request1.setAtlasImportResult(mockImportResult);
196+
197+
Map<String, Object> mockMinInfo = new HashMap<>();
198+
mockMinInfo.put("importId", "24cbff65a7ed60e02d099ce78cb06efd");
199+
mockMinInfo.put("importRequestReceivedBy", "admin");
200+
mockMinInfo.put("status", "SUCCESSFUL");
201+
mockMinInfo.put("importRequestReceivedAt", "2025-01-23T00:37:08.634Z");
202+
203+
doReturn(mockMinInfo).when(request1).getImportMinInfo();
204+
205+
try (MockedStatic<AtlasGraphUtilsV2> mockStatic = mockStatic(AtlasGraphUtilsV2.class)) {
206+
mockStatic.when(() -> AtlasGraphUtilsV2.findEntityGUIDsByType(anyString(), any()))
207+
.thenReturn(guids);
208+
when(dataAccess.load(anyList())).thenReturn(Collections.singletonList(request1));
209+
210+
List<Map<String, Object>> result = asyncImportService.getAllImports();
211+
212+
assertEquals(result.size(), 1);
213+
assertTrue(result.get(0).containsKey("importId"));
214+
verify(dataAccess, times(1)).load(anyList());
215+
}
216+
}
217+
218+
@Test
219+
public void testGetImportStatusById() throws AtlasBaseException {
220+
String importId = "import123";
221+
AtlasAsyncImportRequest request = new AtlasAsyncImportRequest();
222+
request.setImportId(importId);
223+
224+
when(dataAccess.load(any(AtlasAsyncImportRequest.class))).thenReturn(request);
225+
226+
AtlasAsyncImportRequest result = asyncImportService.getImportStatusById(importId);
227+
228+
assertNotNull(result);
229+
assertEquals(result.getImportId(), importId);
230+
verify(dataAccess, times(1)).load(any(AtlasAsyncImportRequest.class));
231+
}
232+
233+
@AfterMethod
234+
public void tearDown() {
235+
Mockito.reset(dataAccess);
236+
}
237+
}

0 commit comments

Comments
 (0)