Skip to content

Commit 0741f29

Browse files
author
chaitalithombare
committed
ATLAS-4889 : Incremental export : When an entity has tag propagated and is exported , the tag is not propagated to it in the export.
1 parent d11f415 commit 0741f29

File tree

4 files changed

+162
-7
lines changed

4 files changed

+162
-7
lines changed

repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java

Lines changed: 116 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.atlas.glossary.GlossaryService;
2424
import org.apache.atlas.model.impexp.AtlasExportRequest;
2525
import org.apache.atlas.model.impexp.AtlasExportResult;
26+
import org.apache.atlas.model.instance.AtlasClassification;
2627
import org.apache.atlas.model.instance.AtlasEntity;
2728
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
2829
import org.apache.atlas.model.instance.AtlasObjectId;
@@ -34,6 +35,8 @@
3435
import org.apache.atlas.model.typedef.AtlasStructDef;
3536
import org.apache.atlas.model.typedef.AtlasTypesDef;
3637
import org.apache.atlas.repository.graph.GraphHelper;
38+
import org.apache.atlas.repository.graphdb.AtlasEdge;
39+
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
3740
import org.apache.atlas.repository.graphdb.AtlasGraph;
3841
import org.apache.atlas.repository.graphdb.AtlasVertex;
3942
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
@@ -43,24 +46,36 @@
4346
import org.apache.atlas.util.AtlasGremlinQueryProvider;
4447
import org.apache.commons.collections.CollectionUtils;
4548
import org.apache.commons.collections.MapUtils;
49+
import org.apache.commons.lang3.StringUtils;
4650
import org.slf4j.Logger;
4751
import org.slf4j.LoggerFactory;
4852
import org.springframework.stereotype.Component;
4953

5054
import javax.inject.Inject;
5155

56+
import java.util.ArrayDeque;
5257
import java.util.ArrayList;
5358
import java.util.HashMap;
5459
import java.util.HashSet;
60+
import java.util.Iterator;
5561
import java.util.List;
5662
import java.util.Map;
63+
import java.util.Queue;
5764
import java.util.Set;
5865

5966
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_CONNECTED;
6067
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL;
6168
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL;
69+
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY;
70+
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY;
71+
import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
72+
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
6273
import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY;
6374
import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
75+
import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
76+
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
77+
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
78+
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
6479

6580
@Component
6681
public class ExportService {
@@ -75,6 +90,8 @@ public class ExportService {
7590
private final AuditsWriter auditsWriter;
7691
private ExportTypeProcessor exportTypeProcessor;
7792
private static final String ATLAS_TYPE_HIVE_DB = "hive_db";
93+
public static final String PROCESS_INPUTS = "__Process.inputs";
94+
public static final String PROCESS_OUTPUTS = "__Process.outputs";
7895

7996
@Inject
8097
public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph graph, AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator, GlossaryService glossaryService) {
@@ -91,13 +108,14 @@ public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, Str
91108
long startTime = System.currentTimeMillis();
92109
AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP, hostName, startTime, getCurrentChangeMarker());
93110
ExportContext context = new ExportContext(result, exportSink);
111+
RelationshipAttributesExtractor relationshipAttributesExtractor = new RelationshipAttributesExtractor(typeRegistry);
94112

95113
exportTypeProcessor = new ExportTypeProcessor(typeRegistry, glossaryService);
96114

97115
try {
98116
LOG.info("==> export(user={}, from={})", userName, requestingIP);
99117

100-
AtlasExportResult.OperationStatus[] statuses = processItems(request, context);
118+
AtlasExportResult.OperationStatus[] statuses = processItems(request, context, relationshipAttributesExtractor);
101119

102120
processTypesDef(context);
103121

@@ -219,20 +237,20 @@ private void processTypesDef(ExportContext context) {
219237
}
220238
}
221239

222-
private AtlasExportResult.OperationStatus[] processItems(AtlasExportRequest request, ExportContext context) {
223-
AtlasExportResult.OperationStatus[] statuses = new AtlasExportResult.OperationStatus[request.getItemsToExport().size()];
224-
List<AtlasObjectId> itemsToExport = request.getItemsToExport();
240+
private AtlasExportResult.OperationStatus[] processItems(AtlasExportRequest request, ExportContext context, RelationshipAttributesExtractor relationshipAttributesExtractor) {
241+
AtlasExportResult.OperationStatus[] statuses = new AtlasExportResult.OperationStatus[request.getItemsToExport().size()];
242+
List<AtlasObjectId> itemsToExport = request.getItemsToExport();
225243

226244
for (int i = 0; i < itemsToExport.size(); i++) {
227245
AtlasObjectId item = itemsToExport.get(i);
228246

229-
statuses[i] = processObjectId(item, context);
247+
statuses[i] = processObjectId(item, context, relationshipAttributesExtractor);
230248
}
231249

232250
return statuses;
233251
}
234252

235-
private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context) {
253+
private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context, RelationshipAttributesExtractor relationshipAttributesExtractor) {
236254
LOG.debug("==> processObjectId({})", item);
237255

238256
try {
@@ -266,6 +284,11 @@ private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, Ex
266284

267285
context.isSkipConnectedFetch = false;
268286
}
287+
if (context.fetchType != ExportFetchType.FULL && !context.skipLineage) {
288+
for (String guid : entityGuids) {
289+
addEntityGuids(guid, context, relationshipAttributesExtractor);
290+
}
291+
}
269292
} catch (AtlasBaseException excp) {
270293
LOG.error("Fetching entity failed for: {}", item, excp);
271294

@@ -413,6 +436,91 @@ private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext c
413436
context.reportProgress();
414437
}
415438

439+
public void addEntityGuids(String guid, ExportContext context, RelationshipAttributesExtractor relationshipAttributesExtractor) throws AtlasBaseException {
440+
AtlasVertex adjacentVertex;
441+
Iterator<AtlasEdge> entityEdges;
442+
Iterator<AtlasVertex> propagateClassificationVertices;
443+
Iterator<AtlasVertex> appliedClassificationVertices;
444+
String fetchedClassificationGuid;
445+
List<AtlasClassification> processedClassifications = new ArrayList<>();
446+
447+
AtlasVertex initialEntityVertex = entityGraphRetriever.getEntityVertex(guid);
448+
for (AtlasClassification currentClassification : entityGraphRetriever.getAllClassifications(initialEntityVertex)) {
449+
if (context.guidsProcessed.contains(currentClassification.getEntityGuid())) {
450+
processedClassifications.add(currentClassification);
451+
}
452+
}
453+
context.newAddedGuids.add(guid);
454+
while (!context.newAddedGuids.isEmpty()) {
455+
String currentGuid = context.newAddedGuids.poll();
456+
457+
AtlasVertex entityVertex = entityGraphRetriever.getEntityVertex(currentGuid);
458+
String entityTypeName = getTypeName(entityVertex);
459+
List<AtlasClassification> classifications = entityGraphRetriever.getAllClassifications(entityVertex);
460+
if (CollectionUtils.isNotEmpty(processedClassifications)) {
461+
classifications.removeAll(processedClassifications);
462+
}
463+
if (CollectionUtils.isNotEmpty(classifications)) {
464+
for (AtlasClassification classification : classifications) {
465+
String classificationName = classification.getTypeName();
466+
boolean isProcess = relationshipAttributesExtractor.isLineageType(entityTypeName);
467+
entityEdges = isProcess
468+
? GraphHelper.getEdgesForLabel(entityVertex, PROCESS_INPUTS, OUT)
469+
: GraphHelper.getEdgesForLabel(entityVertex, PROCESS_OUTPUTS, IN);
470+
while (entityEdges.hasNext()) {
471+
AtlasEdge propagationEdge = entityEdges.next();
472+
AtlasVertex outVertex = propagationEdge.getOutVertex();
473+
AtlasVertex inVertex = propagationEdge.getInVertex();
474+
adjacentVertex = StringUtils.equals(outVertex.getIdForDisplay(), entityVertex.getIdForDisplay()) ? inVertex : outVertex;
475+
String adjacentGuid = getGuid(adjacentVertex);
476+
boolean isPropagated = false;
477+
propagateClassificationVertices = getClassificationVertices(inVertex, outVertex, isProcess, true, classificationName);
478+
while (propagateClassificationVertices.hasNext()) {
479+
AtlasVertex classificationVertex = propagateClassificationVertices.next();
480+
fetchedClassificationGuid = classificationVertex.getProperty(CLASSIFICATION_ENTITY_GUID, String.class);
481+
if (StringUtils.equals(classification.getEntityGuid(), fetchedClassificationGuid)) {
482+
addAdjacentVertices(context, adjacentGuid);
483+
isPropagated = true;
484+
}
485+
}
486+
if (!isPropagated) {
487+
appliedClassificationVertices = getClassificationVertices(inVertex, outVertex, isProcess, false, classificationName);
488+
489+
while (appliedClassificationVertices.hasNext()) {
490+
AtlasVertex classificationVertex = appliedClassificationVertices.next();
491+
fetchedClassificationGuid = classificationVertex.getProperty(CLASSIFICATION_ENTITY_GUID, String.class);
492+
if (StringUtils.equals(classification.getEntityGuid(), fetchedClassificationGuid)) {
493+
addAdjacentVertices(context, adjacentGuid);
494+
break;
495+
}
496+
}
497+
}
498+
}
499+
}
500+
}
501+
}
502+
}
503+
504+
private Iterator<AtlasVertex> getClassificationVertices(AtlasVertex inVertex, AtlasVertex outVertex,
505+
boolean isProcess, boolean isPropagated, String name) {
506+
AtlasVertex base = isProcess ? inVertex : outVertex;
507+
return base.query()
508+
.direction(AtlasEdgeDirection.OUT)
509+
.label(CLASSIFICATION_LABEL)
510+
.has(CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, isPropagated)
511+
.has(CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, name)
512+
.vertices().iterator();
513+
}
514+
515+
private void addAdjacentVertices(ExportContext context, String adjacentGuid) throws AtlasBaseException {
516+
if (!context.newAddedGuids.contains(adjacentGuid)) {
517+
context.newAddedGuids.add(adjacentGuid);
518+
}
519+
if (!context.sink.guids.contains(adjacentGuid)) {
520+
context.addToSink(entityGraphRetriever.toAtlasEntityWithExtInfo(adjacentGuid));
521+
}
522+
}
523+
416524
public enum TraversalDirection {
417525
UNKNOWN,
418526
INWARD,
@@ -450,6 +558,7 @@ static class ExportContext {
450558
final UniqueList<String> entityCreationOrder = new UniqueList<>();
451559
final Set<String> guidsProcessed = new HashSet<>();
452560
final UniqueList<String> guidsToProcess = new UniqueList<>();
561+
final Queue<String> newAddedGuids = new ArrayDeque<>();
453562
final UniqueList<String> lineageToProcess = new UniqueList<>();
454563
final Set<String> lineageProcessed = new HashSet<>();
455564
final Map<String, TraversalDirection> guidDirection = new HashMap<>();
@@ -511,6 +620,7 @@ public void clear() {
511620
guidsToProcess.clear();
512621
guidsProcessed.clear();
513622
guidDirection.clear();
623+
newAddedGuids.clear();
514624
startingEntityType = null;
515625
}
516626

repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ private TraversalDirection getRelationshipEdgeDirection(AtlasRelatedObjectId rel
131131
return isOutEdge ? OUTWARD : INWARD;
132132
}
133133

134-
private boolean isLineageType(String typeName) {
134+
public boolean isLineageType(String typeName) {
135135
AtlasEntityDef entityDef = typeRegistry.getEntityDefByName(typeName);
136136

137137
return entityDef.getSuperTypes().contains(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS);

repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.atlas.TestUtilsV2;
2525
import org.apache.atlas.exception.AtlasBaseException;
2626
import org.apache.atlas.model.impexp.AtlasExportRequest;
27+
import org.apache.atlas.model.instance.AtlasClassification;
2728
import org.apache.atlas.model.instance.AtlasEntity;
2829
import org.apache.atlas.model.instance.AtlasObjectId;
2930
import org.apache.atlas.repository.AtlasTestBase;
@@ -49,6 +50,7 @@
4950
import java.io.IOException;
5051
import java.io.InputStream;
5152
import java.util.ArrayList;
53+
import java.util.Arrays;
5254
import java.util.HashMap;
5355
import java.util.List;
5456
import java.util.Map;
@@ -78,6 +80,24 @@ public class ExportIncrementalTest extends AtlasTestBase {
7880
private static final String EXPORT_REQUEST_INCREMENTAL = "export-incremental";
7981
private static final String EXPORT_REQUEST_CONNECTED = "export-connected";
8082

83+
private static final String FIRSTPARENT = "589a233a-f00e-4928-8efd-e7e72e30d370";
84+
private static final String HIVEDB = "12eb7a9b-3b4d-48c9-902c-1fa2401823f7";
85+
private static final String CTASLEVEL13 = "6f1c413c-1b35-421a-aabd-d5f94873ddf0";
86+
private static final String CTASLEVEL12 = "58a68a94-67bb-488d-b111-3dfdd3a220eb";
87+
private static final String CTASLEVEL11 = "c6657df3-3bea-44cc-a356-a81c9e72f9c7";
88+
private static final String SECONDPARENT = "0ce3573b-c535-4bf9-970e-4d37f01806ef";
89+
private static final String CTLASLEVEL11_1 = "80a3ead2-6ad7-4881-bd85-5e8b4fdb01c5";
90+
private static final String HDFS_PATH = "d9c50322-b130-405e-b560-2b15bcdddb97";
91+
private static final String SECONDPARENT_PROCESS = "f611662a-4ea6-4707-b7e9-02848fb28529";
92+
private static final String CTASLEVEL13_PROCESS = "da34b191-5ab9-4934-94c6-5a97d3e59608";
93+
private static final String CTASLEVEL12_PROCESS = "33fc0f3c-3522-4aaa-83c7-258752abe824";
94+
private static final String CTASLEVEL11_1_PROCESS = "1339782e-fde7-402b-8271-2f91a65396e9";
95+
private static final String CTASLEVEL11_PROCESS = "64cde929-195a-4c90-a921-b8c4d79ddfcf";
96+
97+
// Resolved after import
98+
private static final String CTASLEVEL11_1_TABLE_QUALIFIED_NAME = "default.ctaslevel11_1@cm";
99+
private static final String CTASLEVEL13_TABLE_QUALIFIED_NAME = "default.ctaslevel13@cm";
100+
81101
@Inject
82102
AtlasTypeRegistry typeRegistry;
83103

@@ -96,6 +116,7 @@ public class ExportIncrementalTest extends AtlasTestBase {
96116
private AtlasClassificationType classificationTypeT1;
97117
private AtlasClassificationType classificationTypeT2;
98118
private AtlasClassificationType classificationTypeT3;
119+
99120
private long nextTimestamp;
100121

101122
@DataProvider(name = "hiveDb")
@@ -226,6 +247,11 @@ public void importHiveDb(InputStream stream) throws AtlasBaseException, IOExcept
226247
runImportWithNoParameters(importService, stream);
227248
}
228249

250+
@Test(dataProvider = "classificationLineage")
251+
public void classificationineageDb(InputStream stream) throws AtlasBaseException, IOException {
252+
runImportWithNoParameters(importService, stream);
253+
}
254+
229255
@Test(dependsOnMethods = "importHiveDb")
230256
public void exportTableInrementalConnected() throws AtlasBaseException, IOException {
231257
InputStream source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 0, true));
@@ -247,6 +273,25 @@ public void exportTableInrementalConnected() throws AtlasBaseException, IOExcept
247273
verifyExpectedEntities(getFileNames(getZipSourceCopy(source)), GUID_TABLE_CTAS_2);
248274
}
249275

276+
@Test(dependsOnMethods = "classificationineageDb")
277+
public void exportTableInrementalConnectedClassificationLineage() throws AtlasBaseException, IOException {
278+
InputStream source = runExportWithParameters(exportService, getExportRequestForHiveTable(CTASLEVEL11_1_TABLE_QUALIFIED_NAME, EXPORT_INCREMENTAL, 0, false));
279+
ZipSource sourceCopy = getZipSourceCopy(source);
280+
if (entityStore.getClassification(FIRSTPARENT, "firstclassi") == null) {
281+
entityStore.addClassification(Arrays.asList(FIRSTPARENT), new AtlasClassification("firstclassi", null));
282+
}
283+
284+
verifyExpectedEntities(getFileNames(sourceCopy), HDFS_PATH, HIVEDB, CTLASLEVEL11_1, CTASLEVEL11_1_PROCESS, CTASLEVEL11_PROCESS, CTASLEVEL11, SECONDPARENT_PROCESS,
285+
SECONDPARENT, FIRSTPARENT);
286+
287+
nextTimestamp = updateTimesampForNextIncrementalExport(sourceCopy);
288+
289+
entityStore.deleteClassification(FIRSTPARENT, "firstclassi", FIRSTPARENT);
290+
291+
source = runExportWithParameters(exportService, getExportRequestForHiveTable(CTASLEVEL11_1_TABLE_QUALIFIED_NAME, EXPORT_INCREMENTAL, nextTimestamp, false));
292+
verifyExpectedEntities(getFileNames(getZipSourceCopy(source)), CTLASLEVEL11_1, CTASLEVEL11_1_PROCESS, CTASLEVEL11_PROCESS, CTASLEVEL11, SECONDPARENT);
293+
}
294+
250295
@Test(dependsOnMethods = "importHiveDb")
251296
public void exportTableIncrementalForParentEntity() throws AtlasBaseException, IOException {
252297
InputStream source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_2, EXPORT_INCREMENTAL, 0, false));
36.1 KB
Binary file not shown.

0 commit comments

Comments
 (0)