Skip to content

Commit f3d302a

Browse files
ATLAS-4922: Atlas Async Import using Kafka (#307)
Co-authored-by: jackhalfalltrades <[email protected]>
1 parent fd29db1 commit f3d302a

File tree

60 files changed

+5144
-173
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+5144
-173
lines changed

addons/models/0000-Area0/0010-base_model.json

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,24 @@
568568
"serviceType": "atlas_core",
569569
"typeVersion": "1.0",
570570
"attributeDefs": []
571+
},
572+
{
573+
"name": "__AtlasAsyncImportRequest",
574+
"superTypes": [ "__internal" ],
575+
"serviceType": "atlas_core",
576+
"typeVersion": "1.0",
577+
"attributeDefs": [
578+
{ "name": "requestId", "typeName": "string", "isOptional": false, "cardinality": "SINGLE", "isUnique": true, "isIndexable": true },
579+
{ "name": "importId", "typeName": "string", "isOptional": false, "cardinality": "SINGLE", "isUnique": false, "isIndexable": true },
580+
{ "name": "status", "typeName": "string", "isOptional": false, "cardinality": "SINGLE", "isUnique": false, "isIndexable": true },
581+
{ "name": "importDetails", "typeName": "string", "isOptional": true, "cardinality": "SINGLE", "isUnique": false, "isIndexable": false },
582+
{ "name": "startEntityPosition", "typeName": "string", "isOptional": false, "cardinality": "SINGLE", "isUnique": false, "isIndexable": false },
583+
{ "name": "importResult", "typeName": "string", "isOptional": false, "cardinality": "SINGLE", "isUnique": false, "isIndexable": false },
584+
{ "name": "receivedTime", "typeName": "string", "isOptional": false, "cardinality": "SINGLE", "isUnique": false, "isIndexable": false },
585+
{ "name": "stagedTime", "typeName": "string", "isOptional": false, "cardinality": "SINGLE", "isUnique": false, "isIndexable": false },
586+
{ "name": "processingStartTime", "typeName": "string", "isOptional": false, "cardinality": "SINGLE", "isUnique": false, "isIndexable": false },
587+
{ "name": "completedTime", "typeName": "string", "isOptional": false, "cardinality": "SINGLE", "isUnique": false, "isIndexable": false }
588+
]
571589
}
572590
],
573591
"relationshipDefs": [

atlas-examples/sample-app/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@
8484
<version>${project.version}</version>
8585
</dependency>
8686

87+
<dependency>
88+
<groupId>org.glassfish.jersey.inject</groupId>
89+
<artifactId>jersey-hk2</artifactId>
90+
<version>2.34</version>
91+
</dependency>
92+
8793
</dependencies>
8894

8995
<build>
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.atlas.examples.sampleapp;
19+
20+
import org.apache.atlas.AtlasClientV2;
21+
import org.apache.atlas.AtlasServiceException;
22+
import org.apache.atlas.model.PList;
23+
import org.apache.atlas.model.impexp.AsyncImportStatus;
24+
import org.apache.atlas.model.impexp.AtlasAsyncImportRequest;
25+
import org.apache.atlas.model.impexp.AtlasImportRequest;
26+
27+
import java.io.File;
28+
import java.io.IOException;
29+
import java.io.InputStream;
30+
import java.net.URL;
31+
import java.nio.file.Files;
32+
33+
public class AsyncImportApiExample {
34+
private final AtlasClientV2 client;
35+
36+
public AsyncImportApiExample(AtlasClientV2 client) {
37+
this.client = client;
38+
}
39+
40+
public AtlasAsyncImportRequest testImportAsyncWithZip() throws Exception {
41+
URL url = AsyncImportApiExample.class.getClassLoader().getResource("importFile.zip");
42+
43+
if (url == null) {
44+
System.err.println("importFile.zip not found in classpath.");
45+
46+
return null;
47+
}
48+
49+
File zipFile = new File(url.toURI());
50+
AtlasImportRequest request = new AtlasImportRequest();
51+
52+
try (InputStream zipStream = Files.newInputStream(zipFile.toPath())) {
53+
System.out.println("Testing Async Import with ZIP file...");
54+
55+
try {
56+
AtlasAsyncImportRequest asyncRequest = client.importAsync(request, zipStream);
57+
58+
System.out.println("Async Import Request Created: " + asyncRequest);
59+
60+
return asyncRequest;
61+
} catch (AtlasServiceException e) {
62+
System.err.println("Async Import with ZIP file failed: " + e.getMessage());
63+
64+
throw e;
65+
}
66+
} catch (IOException e) {
67+
System.err.println("Failed to open ZIP file: " + e.getMessage());
68+
69+
throw e;
70+
}
71+
}
72+
73+
public void testGetAsyncImportStatus() throws Exception {
74+
System.out.println("Testing getAllAsyncImportStatus...");
75+
76+
try {
77+
PList<AsyncImportStatus> statuses = client.getAsyncImportStatus(null, null);
78+
79+
System.out.println("All Async Import Statuses:");
80+
for (AsyncImportStatus status : statuses.getList()) {
81+
System.out.println(status);
82+
}
83+
} catch (AtlasServiceException e) {
84+
System.err.println("Failed to fetch all async import statuses: " + e.getMessage());
85+
86+
throw e;
87+
}
88+
}
89+
90+
public void testGetAsyncImportStatusById(String importId) throws Exception {
91+
System.out.println("Testing getImportStatus for id=" + importId);
92+
93+
try {
94+
AtlasAsyncImportRequest importStatus = client.getAsyncImportStatusById(importId);
95+
96+
System.out.println("Import Status for ID (" + importId + "): " + importStatus);
97+
} catch (AtlasServiceException e) {
98+
System.err.println("Failed to fetch import status for id=" + importId + ": " + e.getMessage());
99+
100+
throw e;
101+
}
102+
}
103+
104+
public void testAbortAsyncImportById(String importId) throws Exception {
105+
System.out.println("Testing abortAsyncImport for id=" + importId);
106+
107+
try {
108+
client.abortAsyncImport(importId);
109+
110+
System.out.println("Successfully aborted async import with ID: " + importId);
111+
} catch (AtlasServiceException e) {
112+
System.err.println("Failed to abort async import for ID (" + importId + "): " + e.getMessage());
113+
114+
throw e;
115+
}
116+
}
117+
}

atlas-examples/sample-app/src/main/java/org/apache/atlas/examples/sampleapp/SampleApp.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.atlas.AtlasClientV2;
2121
import org.apache.atlas.AtlasException;
22+
import org.apache.atlas.model.impexp.AtlasAsyncImportRequest;
2223
import org.apache.atlas.model.instance.AtlasEntity;
2324
import org.apache.atlas.utils.AuthenticationUtil;
2425

@@ -75,6 +76,18 @@ public static void main(String[] args) throws Exception {
7576
sampleApp.glossaryExample();
7677

7778
entityExample.deleteEntities();
79+
80+
// Async Import Examples
81+
AsyncImportApiExample asyncImportApiExample = new AsyncImportApiExample(sampleApp.getClient());
82+
AtlasAsyncImportRequest asyncRequest = asyncImportApiExample.testImportAsyncWithZip();
83+
84+
asyncImportApiExample.testGetAsyncImportStatus();
85+
86+
String testImportId = asyncRequest.getImportId();
87+
88+
asyncImportApiExample.testGetAsyncImportStatusById(testImportId);
89+
90+
asyncImportApiExample.testAbortAsyncImportById(testImportId);
7891
} finally {
7992
if (sampleApp != null && sampleApp.getClient() != null) {
8093
sampleApp.getClient().close();
Binary file not shown.

client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,17 @@
2020
import com.fasterxml.jackson.databind.node.ArrayNode;
2121
import com.fasterxml.jackson.databind.node.ObjectNode;
2222
import com.google.common.annotations.VisibleForTesting;
23+
import com.sun.jersey.api.client.GenericType;
2324
import com.sun.jersey.api.client.WebResource;
2425
import com.sun.jersey.core.header.FormDataContentDisposition;
2526
import com.sun.jersey.core.util.MultivaluedMapImpl;
27+
import com.sun.jersey.multipart.BodyPart;
2628
import com.sun.jersey.multipart.FormDataBodyPart;
2729
import com.sun.jersey.multipart.FormDataMultiPart;
2830
import com.sun.jersey.multipart.MultiPart;
2931
import com.sun.jersey.multipart.file.StreamDataBodyPart;
3032
import org.apache.atlas.bulkimport.BulkImportResponse;
33+
import org.apache.atlas.model.PList;
3134
import org.apache.atlas.model.SearchFilter;
3235
import org.apache.atlas.model.audit.AtlasAuditEntry;
3336
import org.apache.atlas.model.audit.AuditReductionCriteria;
@@ -43,6 +46,9 @@
4346
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
4447
import org.apache.atlas.model.glossary.relations.AtlasRelatedCategoryHeader;
4548
import org.apache.atlas.model.glossary.relations.AtlasRelatedTermHeader;
49+
import org.apache.atlas.model.impexp.AsyncImportStatus;
50+
import org.apache.atlas.model.impexp.AtlasAsyncImportRequest;
51+
import org.apache.atlas.model.impexp.AtlasImportRequest;
4652
import org.apache.atlas.model.instance.AtlasClassification;
4753
import org.apache.atlas.model.instance.AtlasClassification.AtlasClassifications;
4854
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
@@ -82,6 +88,7 @@
8288
import java.io.File;
8389
import java.io.FileInputStream;
8490
import java.io.FileNotFoundException;
91+
import java.io.IOException;
8592
import java.io.InputStream;
8693
import java.io.InputStreamReader;
8794
import java.time.Instant;
@@ -140,6 +147,13 @@ public class AtlasClientV2 extends AtlasBaseClient {
140147
//IndexRecovery APIs
141148
private static final String INDEX_RECOVERY_URI = BASE_URI + "v2/indexrecovery";
142149

150+
// Async Import APIs
151+
private static final String ASYNC_IMPORT_URI = BASE_URI + "admin/async/import";
152+
private static final String ASYNC_IMPORT_STATUS_URI = BASE_URI + "admin/async/import/status";
153+
154+
private static final String IMPORT_REQUEST_PARAMTER = "request";
155+
private static final String IMPORT_DATA_PARAMETER = "data";
156+
143157
public AtlasClientV2(String[] baseUrl, String[] basicAuthUserNamePassword) {
144158
super(baseUrl, basicAuthUserNamePassword);
145159
}
@@ -1039,6 +1053,38 @@ public API formatPathWithParameter(API api, String... params) {
10391053
return formatPathParameters(api, params);
10401054
}
10411055

1056+
public AtlasAsyncImportRequest importAsync(AtlasImportRequest request, InputStream stream) throws AtlasServiceException {
1057+
return performAsyncImport(getImportRequestBodyPart(request), new StreamDataBodyPart(IMPORT_DATA_PARAMETER, stream));
1058+
}
1059+
1060+
/**
1061+
* Retrieves a list of asynchronous import statuses.
1062+
* If offset or limit is null, defaults to offset = 0 and limit = 50.
1063+
*
1064+
* @param offset Starting index for the result set
1065+
* @param limit Maximum number of results to return
1066+
* @return A paginated list of asynchronous import statuses
1067+
* @throws AtlasServiceException if the request fails
1068+
*/
1069+
public PList<AsyncImportStatus> getAsyncImportStatus(Integer offset, Integer limit) throws AtlasServiceException {
1070+
int actualOffset = (offset != null) ? offset : 0;
1071+
int actualLimit = (limit != null) ? limit : 50;
1072+
1073+
MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
1074+
queryParams.add("offset", String.valueOf(actualOffset));
1075+
queryParams.add("limit", String.valueOf(actualLimit));
1076+
1077+
return callAPI(API_V2.ASYNC_IMPORT_STATUS, new GenericType<PList<AsyncImportStatus>>() {}, queryParams);
1078+
}
1079+
1080+
public AtlasAsyncImportRequest getAsyncImportStatusById(String importId) throws AtlasServiceException {
1081+
return callAPI(formatPathParameters(API_V2.ASYNC_IMPORT_STATUS_BY_ID, importId), AtlasAsyncImportRequest.class, null);
1082+
}
1083+
1084+
public void abortAsyncImport(String importId) throws AtlasServiceException {
1085+
callAPI(formatPathParameters(API_V2.ABORT_ASYNC_IMPORT_BY_ID, importId), null, null);
1086+
}
1087+
10421088
@Override
10431089
protected API formatPathParameters(API api, String... params) {
10441090
return new API(String.format(api.getPath(), params), api.getMethod(), api.getExpectedStatus());
@@ -1152,6 +1198,20 @@ private <T> T getTypeDefByGuid(String guid, Class<T> typeDefClass) throws AtlasS
11521198
return callAPI(api, typeDefClass, null);
11531199
}
11541200

1201+
private FormDataBodyPart getImportRequestBodyPart(AtlasImportRequest request) {
1202+
return new FormDataBodyPart(IMPORT_REQUEST_PARAMTER, AtlasType.toJson(request), MediaType.APPLICATION_JSON_TYPE);
1203+
}
1204+
1205+
private AtlasAsyncImportRequest performAsyncImport(BodyPart requestPart, BodyPart filePart) throws AtlasServiceException {
1206+
try (FormDataMultiPart formDataMultiPart = new FormDataMultiPart()) {
1207+
MultiPart multipartEntity = formDataMultiPart.bodyPart(requestPart).bodyPart(filePart);
1208+
1209+
return callAPI(API_V2.ASYNC_IMPORT, AtlasAsyncImportRequest.class, multipartEntity);
1210+
} catch (IOException e) {
1211+
throw new AtlasServiceException(e);
1212+
}
1213+
}
1214+
11551215
public static class API_V2 extends API {
11561216
// TypeDef APIs
11571217
public static final API_V2 GET_TYPEDEF_BY_NAME = new API_V2(TYPEDEF_BY_NAME, HttpMethod.GET, Response.Status.OK);
@@ -1249,6 +1309,12 @@ public static class API_V2 extends API {
12491309
public static final API_V2 GET_ATLAS_AUDITS = new API_V2(ATLAS_AUDIT_API, HttpMethod.POST, Response.Status.OK);
12501310
public static final API_V2 AGEOUT_ATLAS_AUDITS = new API_V2(ATLAS_AUDIT_API + "ageout/", HttpMethod.POST, Response.Status.OK);
12511311

1312+
// Async Import APIs
1313+
public static final API_V2 ASYNC_IMPORT = new API_V2(ASYNC_IMPORT_URI, HttpMethod.POST, Response.Status.OK, MediaType.MULTIPART_FORM_DATA, MediaType.APPLICATION_JSON);
1314+
public static final API_V2 ASYNC_IMPORT_STATUS = new API_V2(ASYNC_IMPORT_STATUS_URI, HttpMethod.GET, Response.Status.OK);
1315+
public static final API_V2 ASYNC_IMPORT_STATUS_BY_ID = new API_V2(ASYNC_IMPORT_STATUS_URI + "/%s", HttpMethod.GET, Response.Status.OK);
1316+
public static final API_V2 ABORT_ASYNC_IMPORT_BY_ID = new API_V2(ASYNC_IMPORT_URI + "/%s", HttpMethod.DELETE, Response.Status.NO_CONTENT);
1317+
12521318
// Glossary APIs
12531319
public static final API_V2 GET_ALL_GLOSSARIES = new API_V2(GLOSSARY_URI, HttpMethod.GET, Response.Status.OK);
12541320
public static final API_V2 GET_GLOSSARY_BY_GUID = new API_V2(GLOSSARY_URI + "/%s", HttpMethod.GET, Response.Status.OK);

common/src/main/java/org/apache/atlas/repository/Constants.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,14 @@ public final class Constants {
229229
public static final String PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_CUSTOM = encodePropertyKey(AUDIT_REDUCTION_PREFIX + "custom");
230230
public static final String PROPERTY_KEY_GUIDS_TO_SWEEPOUT = encodePropertyKey(AUDIT_REDUCTION_PREFIX + "sweepout");
231231

232+
/**
233+
* Atlas Async Import vertex property keys.
234+
*/
235+
public static final String ATLAS_ASYNC_IMPORT_PREFIX = INTERNAL_PROPERTY_KEY_PREFIX + "AtlasAsyncImportRequest.";
236+
public static final String PROPERTY_KEY_RECEIVED_TIME = encodePropertyKey(ATLAS_ASYNC_IMPORT_PREFIX + "receivedTime");
237+
public static final String PROPERTY_KEY_ASYNC_IMPORT_STATUS = encodePropertyKey(ATLAS_ASYNC_IMPORT_PREFIX + "status");
238+
public static final String PROPERTY_KEY_ASYNC_IMPORT_ID = encodePropertyKey(ATLAS_ASYNC_IMPORT_PREFIX + "importId");
239+
232240
public static final String SQOOP_SOURCE = "sqoop";
233241
public static final String FALCON_SOURCE = "falcon";
234242
public static final String HBASE_SOURCE = "hbase";

intg/src/main/java/org/apache/atlas/AtlasConfiguration.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,10 @@ public enum AtlasConfiguration {
111111
ATLAS_AUDIT_DEFAULT_AGEOUT_IGNORE_TTL("atlas.audit.default.ageout.ignore.ttl", false),
112112
ATLAS_AUDIT_AGING_TTL_TEST_AUTOMATION("atlas.audit.aging.ttl.test.automation", false), //Only for test automation
113113
RELATIONSHIP_SEARCH_ENABLED("atlas.relationship.search.enabled", false),
114-
UI_TASKS_TAB_USE_ENABLED("atlas.tasks.ui.tab.enabled", false);
114+
UI_TASKS_TAB_USE_ENABLED("atlas.tasks.ui.tab.enabled", false),
115+
ATLAS_ASYNC_IMPORT_MIN_DURATION_OVERRIDE_TEST_AUTOMATION("atlas.async.import.min.duration.override.test.automation", false),
116+
ASYNC_IMPORT_TOPIC_PREFIX("atlas.async.import.topic.prefix", "ATLAS_IMPORT_"),
117+
ASYNC_IMPORT_REQUEST_ID_PREFIX("atlas.async.import.request_id.prefix", "async_import_");
115118

116119
private static final Configuration APPLICATION_PROPERTIES;
117120

intg/src/main/java/org/apache/atlas/AtlasErrorCode.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ public enum AtlasErrorCode {
208208
FILE_NAME_NOT_FOUND(404, "ATLAS-404-00-014", "File name should not be blank"),
209209
NO_TYPE_NAME_ON_VERTEX(404, "ATLAS-404-00-015", "No typename found for given entity with guid: {0}"),
210210
NO_LINEAGE_CONSTRAINTS_FOR_GUID(404, "ATLAS-404-00-016", "No lineage constraints found for requested entity with guid : {0}"),
211+
IMPORT_NOT_FOUND(404, "ATLAS-404-00-017", "Import id {0} is not found"),
211212

212213
METHOD_NOT_ALLOWED(405, "ATLAS-405-00-001", "Error 405 - The request method {0} is inappropriate for the URL: {1}"),
213214

@@ -226,6 +227,7 @@ public enum AtlasErrorCode {
226227
GLOSSARY_IMPORT_FAILED(409, "ATLAS-409-00-011", "Glossary import failed"),
227228
METRICSSTAT_ALREADY_EXISTS(409, "ATLAS-409-00-012", "Metric Statistics already collected at {0}"),
228229
PENDING_TASKS_ALREADY_IN_PROGRESS(409, "ATLAS-409-00-013", "There are already {0} pending tasks in queue"),
230+
IMPORT_ABORT_NOT_ALLOWED(409, "ATLAS-409-00-016", "Import id {0} is currently in state {1}, cannot be aborted"),
229231

230232
// All internal errors go here
231233
INTERNAL_ERROR(500, "ATLAS-500-00-001", "Internal server error {0}"),
@@ -250,7 +252,12 @@ public enum AtlasErrorCode {
250252
FAILED_TO_UPLOAD(500, "ATLAS-500-00-015", "Error occurred while uploading the file: {0}"),
251253
FAILED_TO_CREATE_GLOSSARY_TERM(500, "ATLAS-500-00-016", "Error occurred while creating glossary term: {0}"),
252254
FAILED_TO_UPDATE_GLOSSARY_TERM(500, "ATLAS-500-00-017", "Error occurred while updating glossary term: {0}"),
253-
NOTIFICATION_EXCEPTION(500, "ATLAS-500-00-018", "{0}");
255+
NOTIFICATION_EXCEPTION(500, "ATLAS-500-00-018", "{0}"),
256+
IMPORT_UPDATE_FAILED(500, "ATLAS-500-00-019", "Failed to update import with id={0}"),
257+
IMPORT_REGISTRATION_FAILED(500, "ATLAS-500-00-020", "Failed to register import request"),
258+
IMPORT_FAILED(500, "ATLAS-500-00-021", "Import with id {0} failed"),
259+
ABORT_IMPORT_FAILED(500, "ATLAS-500-00-022", "Failed to abort import with id {0}"),
260+
IMPORT_QUEUEING_FAILED(500, "ATLAS-500-00-023", "Failed to add import with id {0} to request queue, please try again later");
254261

255262
private static final Logger LOG = LoggerFactory.getLogger(AtlasErrorCode.class);
256263
private final String errorCode;

0 commit comments

Comments
 (0)