Skip to content

Commit 0f215e7

Browse files
committed
ATLAS-4922: Atlas Async Import using Kafka [8] - Addressed some PR comments and made improvements
1 parent 88ada02 commit 0f215e7

File tree

16 files changed

+266
-249
lines changed

16 files changed

+266
-249
lines changed

atlas-examples/sample-app/src/main/java/org/apache/atlas/examples/sampleapp/AsyncImportApiExample.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import org.apache.atlas.AtlasClientV2;
2121
import org.apache.atlas.AtlasServiceException;
22+
import org.apache.atlas.model.PList;
23+
import org.apache.atlas.model.impexp.AsyncImportStatus;
2224
import org.apache.atlas.model.impexp.AtlasAsyncImportRequest;
2325
import org.apache.atlas.model.impexp.AtlasImportRequest;
2426

@@ -27,8 +29,6 @@
2729
import java.io.IOException;
2830
import java.io.InputStream;
2931
import java.net.URL;
30-
import java.util.List;
31-
import java.util.Map;
3232

3333
public class AsyncImportApiExample {
3434
private final AtlasClientV2 client;
@@ -37,12 +37,12 @@ public AsyncImportApiExample(AtlasClientV2 client) {
3737
this.client = client;
3838
}
3939

40-
public void testImportAsyncWithZip() throws Exception {
40+
public AtlasAsyncImportRequest testImportAsyncWithZip() throws Exception {
4141
URL url = AsyncImportApiExample.class.getClassLoader().getResource("importFile.zip");
4242

4343
if (url == null) {
4444
System.err.println("importFile.zip not found in classpath.");
45-
return;
45+
return null;
4646
}
4747

4848
File zipFile = new File(url.toURI());
@@ -54,6 +54,7 @@ public void testImportAsyncWithZip() throws Exception {
5454
try {
5555
AtlasAsyncImportRequest asyncRequest = client.importAsync(request, zipStream);
5656
System.out.println("Import Data Async Request Created: " + asyncRequest);
57+
return asyncRequest;
5758
} catch (AtlasServiceException e) {
5859
System.err.println("Failed to execute importDataAsync with ZIP file: " + e.getMessage());
5960
throw e;
@@ -67,7 +68,7 @@ public void testImportAsyncWithZip() throws Exception {
6768
public void testGetAsyncImportStatus() throws Exception {
6869
System.out.println("Testing getAllAsyncImportStatus...");
6970
try {
70-
List<Map<String, Object>> statuses = client.getAsyncImportStatus();
71+
PList<AsyncImportStatus> statuses = client.getAsyncImportStatus();
7172
System.out.println("All Async Import Statuses: " + statuses);
7273
} catch (AtlasServiceException e) {
7374
System.err.println("Failed to fetch all async import statuses: " + e.getMessage());

atlas-examples/sample-app/src/main/java/org/apache/atlas/examples/sampleapp/SampleApp.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.atlas.AtlasClientV2;
2121
import org.apache.atlas.AtlasException;
22+
import org.apache.atlas.model.impexp.AtlasAsyncImportRequest;
2223
import org.apache.atlas.model.instance.AtlasEntity;
2324
import org.apache.atlas.utils.AuthenticationUtil;
2425

@@ -79,11 +80,11 @@ public static void main(String[] args) throws Exception {
7980
//Async Import Examples
8081
AsyncImportApiExample asyncImportApiExample = new AsyncImportApiExample(sampleApp.getClient());
8182

82-
asyncImportApiExample.testImportAsyncWithZip();
83+
AtlasAsyncImportRequest asyncRequest = asyncImportApiExample.testImportAsyncWithZip();
8384

8485
asyncImportApiExample.testGetAsyncImportStatus();
8586

86-
String testImportId = "24cbff65a7ed60e02d099ce78cb06efd";
87+
String testImportId = asyncRequest.getImportId();
8788
asyncImportApiExample.testGetAsyncImportStatusById(testImportId);
8889
asyncImportApiExample.testDeleteAsyncImportById(testImportId);
8990
} finally {

client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.sun.jersey.multipart.MultiPart;
3131
import com.sun.jersey.multipart.file.StreamDataBodyPart;
3232
import org.apache.atlas.bulkimport.BulkImportResponse;
33+
import org.apache.atlas.model.PList;
3334
import org.apache.atlas.model.SearchFilter;
3435
import org.apache.atlas.model.audit.AtlasAuditEntry;
3536
import org.apache.atlas.model.audit.AuditReductionCriteria;
@@ -45,6 +46,7 @@
4546
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
4647
import org.apache.atlas.model.glossary.relations.AtlasRelatedCategoryHeader;
4748
import org.apache.atlas.model.glossary.relations.AtlasRelatedTermHeader;
49+
import org.apache.atlas.model.impexp.AsyncImportStatus;
4850
import org.apache.atlas.model.impexp.AtlasAsyncImportRequest;
4951
import org.apache.atlas.model.impexp.AtlasImportRequest;
5052
import org.apache.atlas.model.instance.AtlasClassification;
@@ -145,10 +147,10 @@ public class AtlasClientV2 extends AtlasBaseClient {
145147
private static final String INDEX_RECOVERY_URI = BASE_URI + "v2/indexrecovery";
146148

147149
// Async Import APIs
148-
private static final String ASYNC_IMPORT_URI = BASE_URI + "admin/asyncImport";
149-
private static final String ASYNC_IMPORT_STATUS_URI = BASE_URI + "admin/asyncImport/status";
150-
private static final String ASYNC_IMPORT_STATUS_BY_ID_URI = BASE_URI + "admin/asyncImport/status/";
151-
private static final String ASYNC_IMPORT_BY_ID_URI = BASE_URI + "admin/asyncImport/";
150+
private static final String ASYNC_IMPORT_URI = BASE_URI + "admin/async/import";
151+
private static final String ASYNC_IMPORT_STATUS_URI = BASE_URI + "admin/async/import/status";
152+
private static final String ASYNC_IMPORT_STATUS_BY_ID_URI = BASE_URI + "admin/async/import/status/";
153+
private static final String ASYNC_IMPORT_BY_ID_URI = BASE_URI + "admin/async/import/";
152154

153155
private static final String IMPORT_REQUEST_PARAMTER = "request";
154156
private static final String IMPORT_DATA_PARAMETER = "data";
@@ -1052,27 +1054,15 @@ public API formatPathWithParameter(API api, String... params) {
10521054
return formatPathParameters(api, params);
10531055
}
10541056

1055-
private FormDataBodyPart getImportRequestBodyPart(AtlasImportRequest request) {
1056-
return new FormDataBodyPart(IMPORT_REQUEST_PARAMTER, AtlasType.toJson(request), MediaType.APPLICATION_JSON_TYPE);
1057-
}
1058-
10591057
public AtlasAsyncImportRequest importAsync(AtlasImportRequest request, InputStream stream) throws AtlasServiceException {
10601058
return performAsyncImport(getImportRequestBodyPart(request),
10611059
new StreamDataBodyPart(IMPORT_DATA_PARAMETER, stream));
10621060
}
10631061

1064-
private AtlasAsyncImportRequest performAsyncImport(BodyPart requestPart, BodyPart filePart) throws AtlasServiceException {
1065-
MultiPart multipartEntity = new FormDataMultiPart()
1066-
.bodyPart(requestPart)
1067-
.bodyPart(filePart);
1068-
1069-
return callAPI(API_V2.ASYNC_IMPORT, AtlasAsyncImportRequest.class, multipartEntity);
1070-
}
1071-
1072-
public List<Map<String, Object>> getAsyncImportStatus() throws AtlasServiceException {
1062+
public PList<AsyncImportStatus> getAsyncImportStatus() throws AtlasServiceException {
10731063
return callAPI(
10741064
API_V2.ASYNC_IMPORT_STATUS_ALL,
1075-
new GenericType<List<Map<String, Object>>>() {},
1065+
new GenericType<PList<AsyncImportStatus>>() {},
10761066
null);
10771067
}
10781068

@@ -1200,6 +1190,18 @@ private <T> T getTypeDefByGuid(String guid, Class<T> typeDefClass) throws AtlasS
12001190
return callAPI(api, typeDefClass, null);
12011191
}
12021192

1193+
private FormDataBodyPart getImportRequestBodyPart(AtlasImportRequest request) {
1194+
return new FormDataBodyPart(IMPORT_REQUEST_PARAMTER, AtlasType.toJson(request), MediaType.APPLICATION_JSON_TYPE);
1195+
}
1196+
1197+
private AtlasAsyncImportRequest performAsyncImport(BodyPart requestPart, BodyPart filePart) throws AtlasServiceException {
1198+
MultiPart multipartEntity = new FormDataMultiPart()
1199+
.bodyPart(requestPart)
1200+
.bodyPart(filePart);
1201+
1202+
return callAPI(API_V2.ASYNC_IMPORT, AtlasAsyncImportRequest.class, multipartEntity);
1203+
}
1204+
12031205
public static class API_V2 extends API {
12041206
// TypeDef APIs
12051207
public static final API_V2 GET_TYPEDEF_BY_NAME = new API_V2(TYPEDEF_BY_NAME, HttpMethod.GET, Response.Status.OK);

common/src/main/java/org/apache/atlas/repository/Constants.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,14 @@ public final class Constants {
229229
public static final String PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_CUSTOM = encodePropertyKey(AUDIT_REDUCTION_PREFIX + "custom");
230230
public static final String PROPERTY_KEY_GUIDS_TO_SWEEPOUT = encodePropertyKey(AUDIT_REDUCTION_PREFIX + "sweepout");
231231

232+
/**
233+
* Atlas Async Import vertex property keys.
234+
*/
235+
public static final String ATLAS_ASYNC_IMPORT_PREFIX = INTERNAL_PROPERTY_KEY_PREFIX + "AtlasAsyncImportRequest.";
236+
public static final String PROPERTY_KEY_RECEIVED_AT = encodePropertyKey(ATLAS_ASYNC_IMPORT_PREFIX + "receivedAt");
237+
public static final String PROPERTY_KEY_ASYNC_IMPORT_STATUS = encodePropertyKey(ATLAS_ASYNC_IMPORT_PREFIX + "status");
238+
public static final String PROPERTY_KEY_ASYNC_IMPORT_ID = encodePropertyKey(ATLAS_ASYNC_IMPORT_PREFIX + "importId");
239+
232240
public static final String SQOOP_SOURCE = "sqoop";
233241
public static final String FALCON_SOURCE = "falcon";
234242
public static final String HBASE_SOURCE = "hbase";
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.atlas.model.impexp;
20+
21+
public class AsyncImportStatus {
22+
private String importId;
23+
private AtlasAsyncImportRequest.ImportStatus status;
24+
private String importRequestReceivedAt;
25+
private String importRequestReceivedBy;
26+
27+
public AsyncImportStatus() {}
28+
29+
public AsyncImportStatus(String importId, AtlasAsyncImportRequest.ImportStatus status, String importRequestReceivedAt, String importRequestReceivedBy) {
30+
this.importId = importId;
31+
this.status = status;
32+
this.importRequestReceivedAt = importRequestReceivedAt;
33+
this.importRequestReceivedBy = importRequestReceivedBy;
34+
}
35+
36+
public String getImportId() {
37+
return importId;
38+
}
39+
40+
public AtlasAsyncImportRequest.ImportStatus getStatus() {
41+
return status;
42+
}
43+
44+
public String getImportRequestReceivedAt() {
45+
return importRequestReceivedAt;
46+
}
47+
48+
public String getImportRequestReceivedBy() {
49+
return importRequestReceivedBy;
50+
}
51+
}

intg/src/main/java/org/apache/atlas/model/impexp/AtlasAsyncImportRequest.java

Lines changed: 62 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,6 @@ public String toString() {
7272
}
7373
}
7474

75-
@JsonIgnore
76-
private String requestId;
77-
@JsonIgnore
78-
private int skipTo;
79-
8075
private String importId;
8176
private ImportStatus status;
8277
private ImportDetails importDetails = new ImportDetails();
@@ -88,6 +83,17 @@ public String toString() {
8883
@JsonInclude(JsonInclude.Include.NON_NULL)
8984
private AtlasImportResult importResult;
9085

86+
@JsonIgnore
87+
private ImportTrackingInfo importTrackingInfo;
88+
89+
public ImportTrackingInfo getImportTrackingInfo() {
90+
return importTrackingInfo;
91+
}
92+
93+
public void setImportTrackingInfo(ImportTrackingInfo importTrackingInfo) {
94+
this.importTrackingInfo = importTrackingInfo;
95+
}
96+
9197
public AtlasAsyncImportRequest() {}
9298

9399
public AtlasAsyncImportRequest(String guid) {
@@ -97,30 +103,24 @@ public AtlasAsyncImportRequest(String guid) {
97103
public AtlasAsyncImportRequest(AtlasImportResult result) {
98104
this.importResult = result;
99105
this.status = ImportStatus.STAGING;
100-
this.skipTo = 0;
101106
this.receivedAt = 0L;
102107
this.stagedAt = 0L;
103108
this.startedProcessingAt = 0L;
104109
this.completedAt = 0L;
105110
this.importDetails = new ImportDetails();
111+
this.importTrackingInfo = new ImportTrackingInfo(null, 0);
106112
setGuid(getGuid());
107113
}
108114

109-
public String getRequestId() {
110-
return requestId;
111-
}
112-
113-
public void setRequestId(String requestId) {
114-
this.requestId = requestId;
115-
}
116-
117115
public String getImportId() {
118116
return importId;
119117
}
120118

121119
public void setImportId(String importId) {
122120
this.importId = importId;
123-
setRequestId(REQUEST_ID_PREFIX_PROPERTY + importId + "@" + AtlasEntityUtil.getMetadataNamespace());
121+
if (importTrackingInfo != null) {
122+
importTrackingInfo.setRequestId(REQUEST_ID_PREFIX_PROPERTY + importId + "@" + AtlasEntityUtil.getMetadataNamespace());
123+
}
124124
}
125125

126126
public ImportStatus getStatus() {
@@ -144,14 +144,6 @@ public String getTopicName() {
144144
return ASYNC_IMPORT_TOPIC_PREFIX + importId;
145145
}
146146

147-
public int getSkipTo() {
148-
return skipTo;
149-
}
150-
151-
public void setSkipTo(int skipTo) {
152-
this.skipTo = skipTo;
153-
}
154-
155147
public AtlasImportResult getImportResult() {
156148
return importResult;
157149
}
@@ -193,19 +185,13 @@ public void setCompletedAt(long completedAt) {
193185
}
194186

195187
@JsonIgnore
196-
public Map<String, Object> getImportMinInfo() {
197-
Map<String, Object> minInfoResponse = new HashMap<>();
198-
199-
String importId = this.getImportId();
200-
long timestamp = this.receivedAt;
201-
String isoDate = toIsoDate(new Date(timestamp));
202-
203-
minInfoResponse.put("importId", importId);
204-
minInfoResponse.put("status", status);
205-
minInfoResponse.put("importRequestReceivedAt", isoDate);
206-
minInfoResponse.put("importRequestReceivedBy", importResult.getUserName());
207-
208-
return minInfoResponse;
188+
public AsyncImportStatus toImportMinInfo() {
189+
AsyncImportStatus asyncImportStatus = new AsyncImportStatus(
190+
this.getImportId(),
191+
status,
192+
toIsoDate(new Date(this.receivedAt)),
193+
importResult.getUserName());
194+
return asyncImportStatus;
209195
}
210196

211197
private String toIsoDate(Date value) {
@@ -236,7 +222,7 @@ public boolean equals(Object o) {
236222
Objects.equals(importId, that.importId) &&
237223
Objects.equals(status, that.status) &&
238224
Objects.equals(importDetails, that.importDetails) &&
239-
Objects.equals(requestId, that.requestId) &&
225+
Objects.equals(importTrackingInfo.getRequestId(), that.importTrackingInfo.getRequestId()) &&
240226
Objects.equals(receivedAt, that.receivedAt) &&
241227
Objects.equals(stagedAt, that.stagedAt) &&
242228
Objects.equals(startedProcessingAt, that.startedProcessingAt) &&
@@ -245,7 +231,7 @@ public boolean equals(Object o) {
245231

246232
@Override
247233
public int hashCode() {
248-
return Objects.hash(super.hashCode(), requestId, importResult,
234+
return Objects.hash(super.hashCode(), importTrackingInfo.getRequestId(), importResult,
249235
importId, status, importDetails, receivedAt, stagedAt, startedProcessingAt, completedAt);
250236
}
251237

@@ -257,7 +243,7 @@ protected StringBuilder toString(StringBuilder sb) {
257243
} else {
258244
sb.append(importResult);
259245
}
260-
sb.append(", requestId=").append(requestId);
246+
sb.append(", requestId=").append(importTrackingInfo.getRequestId());
261247
sb.append(", importId=").append(importId);
262248
sb.append(", status=").append(status);
263249
sb.append(", receivedAt=").append(receivedAt);
@@ -387,4 +373,41 @@ public int hashCode() {
387373
return Objects.hash(publishedEntityCount, totalEntitiesCount, importedEntitiesCount, failedEntitiesCount, importProgress, failures);
388374
}
389375
}
376+
377+
public static class ImportTrackingInfo {
378+
private String requestId;
379+
private int skipTo;
380+
381+
public ImportTrackingInfo() {
382+
}
383+
384+
public ImportTrackingInfo(String requestId, int skipTo) {
385+
this.requestId = requestId;
386+
this.skipTo = skipTo;
387+
}
388+
389+
public String getRequestId() {
390+
return requestId;
391+
}
392+
393+
public void setRequestId(String requestId) {
394+
this.requestId = requestId;
395+
}
396+
397+
public int getSkipTo() {
398+
return skipTo;
399+
}
400+
401+
public void setSkipTo(int skipTo) {
402+
this.skipTo = skipTo;
403+
}
404+
405+
@Override
406+
public String toString() {
407+
return "ImportTrackingInfo{" +
408+
"requestId='" + requestId + '\'' +
409+
", skipTo=" + skipTo +
410+
'}';
411+
}
412+
}
390413
}

0 commit comments

Comments
 (0)