Skip to content

ATLAS-4922: Atlas Async Import using Kafka #307

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 33 commits into from
May 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
2e9fba9
ATLAS-4922: Atlas Async Import using Kafka [1] - POJO & Data Access
DishaTalreja3 Feb 26, 2025
3584a20
ATLAS-4922: Atlas Async Import using Kafka [2] - Client-side changes
DishaTalreja3 Feb 27, 2025
16b5449
ATLAS-4922: Atlas Async Import using Kafka [3] - Implementation and N…
Mar 6, 2025
d678555
ATLAS-4922: Atlas Async Import using Kafka [4] - Unit Tests
DishaTalreja3 Mar 12, 2025
69ecd1b
ATLAS-4922: Atlas Async Import using Kafka [5] - Addressing few of th…
Mar 14, 2025
6ae366a
ATLAS-4922: Atlas Async Import using Kafka [6] - Addressing few of th…
Mar 17, 2025
88ada02
ATLAS-4922: Atlas Async Import using Kafka [7] - Adding support to te…
Mar 23, 2025
0f215e7
ATLAS-4922: Atlas Async Import using Kafka [8] - Addressed some PR co…
DishaTalreja3 Mar 24, 2025
8fe6be6
addressed review comments
mneethiraj Mar 25, 2025
a2c731f
addressed review comments
mneethiraj Mar 25, 2025
94ff706
addressed review comments
mneethiraj Mar 25, 2025
bfc3c15
addressed review comments
mneethiraj Mar 25, 2025
06507ee
fixed unit tests
mneethiraj Mar 25, 2025
3d66b3f
code readability improvements
mneethiraj Mar 25, 2025
ff7c0ec
fixed graph query
DishaTalreja3 Mar 25, 2025
83ed723
ATLAS-4922: Ensuring executors are not overwritten and tests to prove…
Mar 27, 2025
128bf1b
ATLAS-4922: Fix failing tests
Mar 28, 2025
3a4c62a
Fix getTopicName()
DishaTalreja3 Mar 31, 2025
6ccc2ae
ATLAS-4922: Fix flaky tests
Apr 1, 2025
580a0c2
ATLAS-4922: Fix failing tests
Apr 6, 2025
484f27d
ATLAS-4922: Fix stuck in processing state in case of no topic found
Apr 7, 2025
21d62ab
ATLAS-4922: Fix enunciate build errors
Apr 9, 2025
a65a5f8
ATLAS-4922: Fix test failures in NotificationHookConsumerTest and Imp…
Apr 10, 2025
594765e
ATLAS-4922: Add new urls to notFilterd list for HA fix concurrent mod…
Apr 23, 2025
666e7f2
ATLAS-4922: Rename deleteImport to abortImport
DishaTalreja3 Apr 23, 2025
a43629b
Merge branch 'apache:master' into ASYNC_IMPORT
jackhalfalltrades May 5, 2025
854ce59
ATLAS-4922: PR Comments
May 6, 2025
3e84ea7
ATLAS-4922: PR Comments updating the request fields
May 6, 2025
73d1ac1
ATLAS-4922: Addressed PR comments for Client-side Changes & Renamed C…
DishaTalreja3 May 6, 2025
431bc90
ATLAS-4922: Revert to manual commit and handle consumer and topic cle…
May 12, 2025
b141395
Renamed ASYNC_IMPORT_STATUS_ALL to ASYNC_IMPORT_STATUS
DishaTalreja3 May 16, 2025
12fced1
ATLAS-4922: Remove getAsyncImportStatus() overload
DishaTalreja3 May 19, 2025
2bc64d4
ATLAS-4922: Fix failing tests in ImportServiceTest
DishaTalreja3 May 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions addons/models/0000-Area0/0010-base_model.json
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,24 @@
"serviceType": "atlas_core",
"typeVersion": "1.0",
"attributeDefs": []
},
{
"name": "__AtlasAsyncImportRequest",
"superTypes": [ "__internal" ],
"serviceType": "atlas_core",
"typeVersion": "1.0",
"attributeDefs": [
{ "name": "requestId", "typeName": "string", "isOptional": false, "cardinality": "SINGLE", "isUnique": true, "isIndexable": true },
{ "name": "importId", "typeName": "string", "isOptional": false, "cardinality": "SINGLE", "isUnique": false, "isIndexable": true },
{ "name": "status", "typeName": "string", "isOptional": false, "cardinality": "SINGLE", "isUnique": false, "isIndexable": true },
{ "name": "importDetails", "typeName": "string", "isOptional": true, "cardinality": "SINGLE", "isUnique": false, "isIndexable": false },
{ "name": "startEntityPosition", "typeName": "string", "isOptional": false, "cardinality": "SINGLE", "isUnique": false, "isIndexable": false },
{ "name": "importResult", "typeName": "string", "isOptional": false, "cardinality": "SINGLE", "isUnique": false, "isIndexable": false },
{ "name": "receivedTime", "typeName": "string", "isOptional": false, "cardinality": "SINGLE", "isUnique": false, "isIndexable": false },
{ "name": "stagedTime", "typeName": "string", "isOptional": false, "cardinality": "SINGLE", "isUnique": false, "isIndexable": false },
{ "name": "processingStartTime", "typeName": "string", "isOptional": false, "cardinality": "SINGLE", "isUnique": false, "isIndexable": false },
{ "name": "completedTime", "typeName": "string", "isOptional": false, "cardinality": "SINGLE", "isUnique": false, "isIndexable": false }
]
}
],
"relationshipDefs": [
Expand Down
6 changes: 6 additions & 0 deletions atlas-examples/sample-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
<version>2.34</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.examples.sampleapp;

import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.model.PList;
import org.apache.atlas.model.impexp.AsyncImportStatus;
import org.apache.atlas.model.impexp.AtlasAsyncImportRequest;
import org.apache.atlas.model.impexp.AtlasImportRequest;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.file.Files;

public class AsyncImportApiExample {
private final AtlasClientV2 client;

public AsyncImportApiExample(AtlasClientV2 client) {
this.client = client;
}

public AtlasAsyncImportRequest testImportAsyncWithZip() throws Exception {
URL url = AsyncImportApiExample.class.getClassLoader().getResource("importFile.zip");

if (url == null) {
System.err.println("importFile.zip not found in classpath.");

return null;
}

File zipFile = new File(url.toURI());
AtlasImportRequest request = new AtlasImportRequest();

try (InputStream zipStream = Files.newInputStream(zipFile.toPath())) {
System.out.println("Testing Async Import with ZIP file...");

try {
AtlasAsyncImportRequest asyncRequest = client.importAsync(request, zipStream);

System.out.println("Async Import Request Created: " + asyncRequest);

return asyncRequest;
} catch (AtlasServiceException e) {
System.err.println("Async Import with ZIP file failed: " + e.getMessage());

throw e;
}
} catch (IOException e) {
System.err.println("Failed to open ZIP file: " + e.getMessage());

throw e;
}
}

public void testGetAsyncImportStatus() throws Exception {
System.out.println("Testing getAllAsyncImportStatus...");

try {
PList<AsyncImportStatus> statuses = client.getAsyncImportStatus(null, null);

System.out.println("All Async Import Statuses:");
for (AsyncImportStatus status : statuses.getList()) {
System.out.println(status);
}
} catch (AtlasServiceException e) {
System.err.println("Failed to fetch all async import statuses: " + e.getMessage());

throw e;
}
}

public void testGetAsyncImportStatusById(String importId) throws Exception {
System.out.println("Testing getImportStatus for id=" + importId);

try {
AtlasAsyncImportRequest importStatus = client.getAsyncImportStatusById(importId);

System.out.println("Import Status for ID (" + importId + "): " + importStatus);
} catch (AtlasServiceException e) {
System.err.println("Failed to fetch import status for id=" + importId + ": " + e.getMessage());

throw e;
}
}

public void testAbortAsyncImportById(String importId) throws Exception {
System.out.println("Testing abortAsyncImport for id=" + importId);

try {
client.abortAsyncImport(importId);

System.out.println("Successfully aborted async import with ID: " + importId);
} catch (AtlasServiceException e) {
System.err.println("Failed to abort async import for ID (" + importId + "): " + e.getMessage());

throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasException;
import org.apache.atlas.model.impexp.AtlasAsyncImportRequest;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.utils.AuthenticationUtil;

Expand Down Expand Up @@ -75,6 +76,18 @@ public static void main(String[] args) throws Exception {
sampleApp.glossaryExample();

entityExample.deleteEntities();

// Async Import Examples
AsyncImportApiExample asyncImportApiExample = new AsyncImportApiExample(sampleApp.getClient());
AtlasAsyncImportRequest asyncRequest = asyncImportApiExample.testImportAsyncWithZip();

asyncImportApiExample.testGetAsyncImportStatus();

String testImportId = asyncRequest.getImportId();

asyncImportApiExample.testGetAsyncImportStatusById(testImportId);

asyncImportApiExample.testAbortAsyncImportById(testImportId);
} finally {
if (sampleApp != null && sampleApp.getClient() != null) {
sampleApp.getClient().close();
Expand Down
Binary file not shown.
66 changes: 66 additions & 0 deletions client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.core.header.FormDataContentDisposition;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import com.sun.jersey.multipart.BodyPart;
import com.sun.jersey.multipart.FormDataBodyPart;
import com.sun.jersey.multipart.FormDataMultiPart;
import com.sun.jersey.multipart.MultiPart;
import com.sun.jersey.multipart.file.StreamDataBodyPart;
import org.apache.atlas.bulkimport.BulkImportResponse;
import org.apache.atlas.model.PList;
import org.apache.atlas.model.SearchFilter;
import org.apache.atlas.model.audit.AtlasAuditEntry;
import org.apache.atlas.model.audit.AuditReductionCriteria;
Expand All @@ -43,6 +46,9 @@
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import org.apache.atlas.model.glossary.relations.AtlasRelatedCategoryHeader;
import org.apache.atlas.model.glossary.relations.AtlasRelatedTermHeader;
import org.apache.atlas.model.impexp.AsyncImportStatus;
import org.apache.atlas.model.impexp.AtlasAsyncImportRequest;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasClassification.AtlasClassifications;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
Expand Down Expand Up @@ -82,6 +88,7 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.time.Instant;
Expand Down Expand Up @@ -140,6 +147,13 @@ public class AtlasClientV2 extends AtlasBaseClient {
//IndexRecovery APIs
private static final String INDEX_RECOVERY_URI = BASE_URI + "v2/indexrecovery";

// Async Import APIs
private static final String ASYNC_IMPORT_URI = BASE_URI + "admin/async/import";
private static final String ASYNC_IMPORT_STATUS_URI = BASE_URI + "admin/async/import/status";

private static final String IMPORT_REQUEST_PARAMTER = "request";
private static final String IMPORT_DATA_PARAMETER = "data";

public AtlasClientV2(String[] baseUrl, String[] basicAuthUserNamePassword) {
super(baseUrl, basicAuthUserNamePassword);
}
Expand Down Expand Up @@ -1039,6 +1053,38 @@ public API formatPathWithParameter(API api, String... params) {
return formatPathParameters(api, params);
}

public AtlasAsyncImportRequest importAsync(AtlasImportRequest request, InputStream stream) throws AtlasServiceException {
return performAsyncImport(getImportRequestBodyPart(request), new StreamDataBodyPart(IMPORT_DATA_PARAMETER, stream));
}

/**
* Retrieves a list of asynchronous import statuses.
* If offset or limit is null, defaults to offset = 0 and limit = 50.
*
* @param offset Starting index for the result set
* @param limit Maximum number of results to return
* @return A paginated list of asynchronous import statuses
* @throws AtlasServiceException if the request fails
*/
public PList<AsyncImportStatus> getAsyncImportStatus(Integer offset, Integer limit) throws AtlasServiceException {
int actualOffset = (offset != null) ? offset : 0;
int actualLimit = (limit != null) ? limit : 50;

MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
queryParams.add("offset", String.valueOf(actualOffset));
queryParams.add("limit", String.valueOf(actualLimit));

return callAPI(API_V2.ASYNC_IMPORT_STATUS, new GenericType<PList<AsyncImportStatus>>() {}, queryParams);
}

public AtlasAsyncImportRequest getAsyncImportStatusById(String importId) throws AtlasServiceException {
return callAPI(formatPathParameters(API_V2.ASYNC_IMPORT_STATUS_BY_ID, importId), AtlasAsyncImportRequest.class, null);
}

public void abortAsyncImport(String importId) throws AtlasServiceException {
callAPI(formatPathParameters(API_V2.ABORT_ASYNC_IMPORT_BY_ID, importId), null, null);
}

@Override
protected API formatPathParameters(API api, String... params) {
return new API(String.format(api.getPath(), params), api.getMethod(), api.getExpectedStatus());
Expand Down Expand Up @@ -1152,6 +1198,20 @@ private <T> T getTypeDefByGuid(String guid, Class<T> typeDefClass) throws AtlasS
return callAPI(api, typeDefClass, null);
}

private FormDataBodyPart getImportRequestBodyPart(AtlasImportRequest request) {
return new FormDataBodyPart(IMPORT_REQUEST_PARAMTER, AtlasType.toJson(request), MediaType.APPLICATION_JSON_TYPE);
}

private AtlasAsyncImportRequest performAsyncImport(BodyPart requestPart, BodyPart filePart) throws AtlasServiceException {
try (FormDataMultiPart formDataMultiPart = new FormDataMultiPart()) {
MultiPart multipartEntity = formDataMultiPart.bodyPart(requestPart).bodyPart(filePart);

return callAPI(API_V2.ASYNC_IMPORT, AtlasAsyncImportRequest.class, multipartEntity);
} catch (IOException e) {
throw new AtlasServiceException(e);
}
}

public static class API_V2 extends API {
// TypeDef APIs
public static final API_V2 GET_TYPEDEF_BY_NAME = new API_V2(TYPEDEF_BY_NAME, HttpMethod.GET, Response.Status.OK);
Expand Down Expand Up @@ -1249,6 +1309,12 @@ public static class API_V2 extends API {
public static final API_V2 GET_ATLAS_AUDITS = new API_V2(ATLAS_AUDIT_API, HttpMethod.POST, Response.Status.OK);
public static final API_V2 AGEOUT_ATLAS_AUDITS = new API_V2(ATLAS_AUDIT_API + "ageout/", HttpMethod.POST, Response.Status.OK);

// Async Import APIs
public static final API_V2 ASYNC_IMPORT = new API_V2(ASYNC_IMPORT_URI, HttpMethod.POST, Response.Status.OK, MediaType.MULTIPART_FORM_DATA, MediaType.APPLICATION_JSON);
public static final API_V2 ASYNC_IMPORT_STATUS = new API_V2(ASYNC_IMPORT_STATUS_URI, HttpMethod.GET, Response.Status.OK);
public static final API_V2 ASYNC_IMPORT_STATUS_BY_ID = new API_V2(ASYNC_IMPORT_STATUS_URI + "/%s", HttpMethod.GET, Response.Status.OK);
public static final API_V2 ABORT_ASYNC_IMPORT_BY_ID = new API_V2(ASYNC_IMPORT_URI + "/%s", HttpMethod.DELETE, Response.Status.NO_CONTENT);

// Glossary APIs
public static final API_V2 GET_ALL_GLOSSARIES = new API_V2(GLOSSARY_URI, HttpMethod.GET, Response.Status.OK);
public static final API_V2 GET_GLOSSARY_BY_GUID = new API_V2(GLOSSARY_URI + "/%s", HttpMethod.GET, Response.Status.OK);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,14 @@ public final class Constants {
public static final String PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_CUSTOM = encodePropertyKey(AUDIT_REDUCTION_PREFIX + "custom");
public static final String PROPERTY_KEY_GUIDS_TO_SWEEPOUT = encodePropertyKey(AUDIT_REDUCTION_PREFIX + "sweepout");

/**
* Atlas Async Import vertex property keys.
*/
public static final String ATLAS_ASYNC_IMPORT_PREFIX = INTERNAL_PROPERTY_KEY_PREFIX + "AtlasAsyncImportRequest.";
public static final String PROPERTY_KEY_RECEIVED_TIME = encodePropertyKey(ATLAS_ASYNC_IMPORT_PREFIX + "receivedTime");
public static final String PROPERTY_KEY_ASYNC_IMPORT_STATUS = encodePropertyKey(ATLAS_ASYNC_IMPORT_PREFIX + "status");
public static final String PROPERTY_KEY_ASYNC_IMPORT_ID = encodePropertyKey(ATLAS_ASYNC_IMPORT_PREFIX + "importId");

public static final String SQOOP_SOURCE = "sqoop";
public static final String FALCON_SOURCE = "falcon";
public static final String HBASE_SOURCE = "hbase";
Expand Down
5 changes: 4 additions & 1 deletion intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ public enum AtlasConfiguration {
ATLAS_AUDIT_DEFAULT_AGEOUT_IGNORE_TTL("atlas.audit.default.ageout.ignore.ttl", false),
ATLAS_AUDIT_AGING_TTL_TEST_AUTOMATION("atlas.audit.aging.ttl.test.automation", false), //Only for test automation
RELATIONSHIP_SEARCH_ENABLED("atlas.relationship.search.enabled", false),
UI_TASKS_TAB_USE_ENABLED("atlas.tasks.ui.tab.enabled", false);
UI_TASKS_TAB_USE_ENABLED("atlas.tasks.ui.tab.enabled", false),
ATLAS_ASYNC_IMPORT_MIN_DURATION_OVERRIDE_TEST_AUTOMATION("atlas.async.import.min.duration.override.test.automation", false),
ASYNC_IMPORT_TOPIC_PREFIX("atlas.async.import.topic.prefix", "ATLAS_IMPORT_"),
ASYNC_IMPORT_REQUEST_ID_PREFIX("atlas.async.import.request_id.prefix", "async_import_");

private static final Configuration APPLICATION_PROPERTIES;

Expand Down
9 changes: 8 additions & 1 deletion intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ public enum AtlasErrorCode {
FILE_NAME_NOT_FOUND(404, "ATLAS-404-00-014", "File name should not be blank"),
NO_TYPE_NAME_ON_VERTEX(404, "ATLAS-404-00-015", "No typename found for given entity with guid: {0}"),
NO_LINEAGE_CONSTRAINTS_FOR_GUID(404, "ATLAS-404-00-016", "No lineage constraints found for requested entity with guid : {0}"),
IMPORT_NOT_FOUND(404, "ATLAS-404-00-017", "Import id {0} is not found"),

METHOD_NOT_ALLOWED(405, "ATLAS-405-00-001", "Error 405 - The request method {0} is inappropriate for the URL: {1}"),

Expand All @@ -226,6 +227,7 @@ public enum AtlasErrorCode {
GLOSSARY_IMPORT_FAILED(409, "ATLAS-409-00-011", "Glossary import failed"),
METRICSSTAT_ALREADY_EXISTS(409, "ATLAS-409-00-012", "Metric Statistics already collected at {0}"),
PENDING_TASKS_ALREADY_IN_PROGRESS(409, "ATLAS-409-00-013", "There are already {0} pending tasks in queue"),
IMPORT_ABORT_NOT_ALLOWED(409, "ATLAS-409-00-016", "Import id {0} is currently in state {1}, cannot be aborted"),

// All internal errors go here
INTERNAL_ERROR(500, "ATLAS-500-00-001", "Internal server error {0}"),
Expand All @@ -250,7 +252,12 @@ public enum AtlasErrorCode {
FAILED_TO_UPLOAD(500, "ATLAS-500-00-015", "Error occurred while uploading the file: {0}"),
FAILED_TO_CREATE_GLOSSARY_TERM(500, "ATLAS-500-00-016", "Error occurred while creating glossary term: {0}"),
FAILED_TO_UPDATE_GLOSSARY_TERM(500, "ATLAS-500-00-017", "Error occurred while updating glossary term: {0}"),
NOTIFICATION_EXCEPTION(500, "ATLAS-500-00-018", "{0}");
NOTIFICATION_EXCEPTION(500, "ATLAS-500-00-018", "{0}"),
IMPORT_UPDATE_FAILED(500, "ATLAS-500-00-019", "Failed to update import with id={0}"),
IMPORT_REGISTRATION_FAILED(500, "ATLAS-500-00-020", "Failed to register import request"),
IMPORT_FAILED(500, "ATLAS-500-00-021", "Import with id {0} failed"),
ABORT_IMPORT_FAILED(500, "ATLAS-500-00-022", "Failed to abort import with id {0}"),
IMPORT_QUEUEING_FAILED(500, "ATLAS-500-00-023", "Failed to add import with id {0} to request queue, please try again later");

private static final Logger LOG = LoggerFactory.getLogger(AtlasErrorCode.class);
private final String errorCode;
Expand Down
Loading