diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index 908e8e6539c..7b56885dcff 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -203,6 +203,11 @@ public final class Constants { public static String[] PROCESS_EDGE_LABELS = {PROCESS_OUTPUTS, PROCESS_INPUTS}; + public static final String PROCESS_ENTITY_TYPE = "Process"; + + public static final String CONNECTION_PROCESS_ENTITY_TYPE = "ConnectionProcess"; + public static final String PARENT_CONNECTION_PROCESS_QUALIFIED_NAME = "parentConnectionProcessQualifiedName"; + /** * The homeId field is used when saving into Atlas a copy of an object that is being imported from another * repository. The homeId will be set to a String that identifies the other repository. The specific format @@ -264,6 +269,7 @@ public final class Constants { public static final String NAME = "name"; public static final String QUALIFIED_NAME = "qualifiedName"; + public static final String CONNECTION_QUALIFIED_NAME = "connectionQualifiedName"; public static final String TYPE_NAME_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "typeName"; public static final String INDEX_SEARCH_MAX_RESULT_SET_SIZE = "atlas.graph.index.search.max-result-set-size"; public static final String INDEX_SEARCH_TYPES_MAX_QUERY_STR_LENGTH = "atlas.graph.index.search.types.max-query-str-length"; diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index fe788235fee..eec0f1d6887 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -84,6 +84,10 @@ public class EntityLineageService implements AtlasLineageService { private static final String PROCESS_INPUTS_EDGE = "__Process.inputs"; private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs"; + + private static final String CONNECTION_PROCESS_INPUTS_EDGE = "__ConnectionProcess.inputs"; + + private static final String CONNECTION_PROCESS_OUTPUTS_EDGE = "__ConnectionProcess.outputs"; private static final String COLUMNS = "columns"; private static final boolean LINEAGE_USING_GREMLIN = AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean(); private static final Integer DEFAULT_LINEAGE_MAX_NODE_COUNT = 9000; @@ -177,8 +181,8 @@ public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemand RequestContext.get().setRelationAttrsForSearch(lineageOnDemandRequest.getRelationAttributes()); AtlasLineageOnDemandContext atlasLineageOnDemandContext = new AtlasLineageOnDemandContext(lineageOnDemandRequest, atlasTypeRegistry); - boolean isDataSet = validateEntityTypeAndCheckIfDataSet(guid); - AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, isDataSet); + EntityValidationResult entityValidationResult = validateEntityTypeAndCheckIfDataSet(guid); + AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, entityValidationResult); appendLineageOnDemandPayload(ret, lineageOnDemandRequest); // filtering out on-demand relations which has input & output nodes within the limit cleanupRelationsOnDemand(ret); @@ -203,20 +207,44 @@ public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListR return ret; } - private boolean validateEntityTypeAndCheckIfDataSet(String guid) throws AtlasBaseException { + public class EntityValidationResult { + public final boolean isProcess; + public final boolean isDataSet; + public final boolean isConnection; + public final boolean isConnectionProcess; + + public EntityValidationResult(boolean isProcess, boolean isDataSet, boolean isConnection, boolean isConnectionProcess) { + this.isProcess = isProcess; + this.isDataSet = isDataSet; + this.isConnection = isConnection; + this.isConnectionProcess = isConnectionProcess; + } + } + + + private EntityValidationResult validateEntityTypeAndCheckIfDataSet(String guid) throws AtlasBaseException { String typeName = entityRetriever.getEntityVertex(guid).getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class); AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName(typeName); if (entityType == null) { throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, typeName); } boolean isProcess = entityType.getTypeAndAllSuperTypes().contains(PROCESS_SUPER_TYPE); + boolean isConnectionProcess = false; + boolean isDataSet = false; + boolean isConnection = false; if (!isProcess) { - boolean isDataSet = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE); - if (!isDataSet) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, typeName); + isConnectionProcess = entityType.getTypeAndAllSuperTypes().contains(CONNECTION_PROCESS_ENTITY_TYPE); + if(!isConnectionProcess){ + isDataSet = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE); + if (!isDataSet) { + isConnection = entityType.getTypeAndAllSuperTypes().contains(CONNECTION_ENTITY_TYPE); + if(!isConnection){ + throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, typeName); + } + } } } - return !isProcess; + return new EntityValidationResult(isProcess, isDataSet, isConnection, isConnectionProcess); } private LineageOnDemandConstraints getLineageConstraints(String guid, LineageOnDemandBaseParams defaultParams) { @@ -281,7 +309,7 @@ private void cleanupRelationsOnDemand(AtlasLineageOnDemandInfo lineageInfo) { } } - private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, boolean isDataSet) throws AtlasBaseException { + private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, EntityValidationResult entityValidationResult) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getLineageInfoOnDemand"); LineageOnDemandConstraints lineageConstraintsByGuid = getAndValidateLineageConstraintsByGuid(guid, atlasLineageOnDemandContext); @@ -298,12 +326,12 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag AtomicInteger inputEntitiesTraversed = new AtomicInteger(0); AtomicInteger outputEntitiesTraversed = new AtomicInteger(0); AtomicInteger traversalOrder = new AtomicInteger(1); - if (isDataSet) { + if (entityValidationResult.isConnection || entityValidationResult.isDataSet) { AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder); + traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder, entityValidationResult); if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder); + traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder, entityValidationResult); AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes()); setGraphTraversalMetadata(level, traversalOrder, baseEntityHeader); ret.getGuidEntityMap().put(guid, baseEntityHeader); @@ -311,12 +339,12 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { - Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE).iterator(); - traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder); + Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, entityValidationResult.isProcess? PROCESS_INPUTS_EDGE:CONNECTION_PROCESS_INPUTS_EDGE).iterator(); + traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder, entityValidationResult); } if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { - Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE).iterator(); - traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder); + Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, entityValidationResult.isProcess? PROCESS_OUTPUTS_EDGE:CONNECTION_PROCESS_OUTPUTS_EDGE).iterator(); + traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder, entityValidationResult); } } RequestContext.get().endMetricRecord(metricRecorder); @@ -329,7 +357,7 @@ private static void setGraphTraversalMetadata(int level, AtomicInteger traversal baseEntityHeader.setFinishTime(traversalOrder.get()); } - private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { + private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, EntityValidationResult entityValidationResult) throws AtlasBaseException { AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; int nextLevel = isInput ? level - 1: level + 1; @@ -360,11 +388,12 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI ret.getRelationsOnDemand().put(inGuid, new LineageInfoOnDemand(inGuidLineageConstrains)); } - traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); + EntityValidationResult entityValidationResult1 = validateEntityTypeAndCheckIfDataSet(inGuid); + traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, entityValidationResult1); } } - private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { + private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, EntityValidationResult entityValidationResult) throws AtlasBaseException { if (isEntityTraversalLimitReached(entitiesTraversed)) return; if (depth != 0) { // base condition of recursion for depth @@ -374,8 +403,13 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i // keep track of visited vertices to avoid circular loop visitedVertices.add(getId(datasetVertex)); + Iterator incomingEdges = null; AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesIn = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesIn"); - Iterator incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator(); + if(entityValidationResult.isDataSet){ + incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator(); + } else if (entityValidationResult.isConnection) { + incomingEdges = datasetVertex.getEdges(IN, isInput ? CONNECTION_PROCESS_OUTPUTS_EDGE : CONNECTION_PROCESS_INPUTS_EDGE).iterator(); + } RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesIn); while (incomingEdges.hasNext()) { @@ -403,7 +437,13 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i } AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut"); - Iterator outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); + Iterator outgoingEdges = null; + if(entityValidationResult.isDataSet){ + outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); + } else if (entityValidationResult.isConnection) { + outgoingEdges = processVertex.getEdges(OUT, isInput ? CONNECTION_PROCESS_INPUTS_EDGE : CONNECTION_PROCESS_OUTPUTS_EDGE).iterator(); + } + RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesOut); while (outgoingEdges.hasNext()) { @@ -434,7 +474,8 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i setEntityLimitReachedFlag(isInput, ret); } if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) { - traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); // execute inner depth + EntityValidationResult entityValidationResult1 = validateEntityTypeAndCheckIfDataSet(getGuid(entityVertex)); + traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, entityValidationResult1); // execute inner depth AtlasEntityHeader traversedEntity = ret.getGuidEntityMap().get(AtlasGraphUtilsV2.getIdFromVertex(entityVertex)); traversedEntity.setFinishTime(traversalOrder.get()); } @@ -465,7 +506,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid); - boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid); + boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid).isDataSet; // Get the neighbors for the current node enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); int currentDepth = 0; @@ -491,7 +532,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line if (Objects.isNull(currentVertex)) throw new AtlasBaseException("Found null vertex during lineage graph traversal for guid: " + currentGUID); - boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID); + boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID).isDataSet; if (!lineageListContext.evaluateVertexFilter(currentVertex)) { enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); continue; @@ -999,7 +1040,7 @@ private String getEdgeLabel(AtlasEdge edge) { String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex); String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex); String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); + boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE) || edge.getLabel().equalsIgnoreCase(CONNECTION_PROCESS_INPUTS_EDGE); if (isLineageOnDemandEnabled()) { return getEdgeLabelFromGuids(isInputEdge, inGuid, outGuid); @@ -1574,7 +1615,7 @@ private void processEdge(final AtlasEdge edge, final Map FETCH_ENTITY_ATTRIBUTES = Arrays.asList(CONNECTION_QUALIFIED_NAME); + private final AtlasTypeRegistry typeRegistry; + private final EntityGraphRetriever entityRetriever; + private AtlasEntityStore entityStore; + protected EntityDiscoveryService discovery; + private static final String HAS_LINEAGE = "__hasLineage"; + + public LineagePreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, AtlasGraph graph, AtlasEntityStore entityStore) { + this.entityRetriever = entityRetriever; + this.typeRegistry = typeRegistry; + this.entityStore = entityStore; + try { + this.discovery = new EntityDiscoveryService(typeRegistry, graph, null, null, null, null); + } catch (AtlasException e) { + e.printStackTrace(); + } + } + + @Override + public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context, + EntityMutations.EntityOperation operation) throws AtlasBaseException { + + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processAttributesForLineagePreprocessor"); + + try { + if (LOG.isDebugEnabled()) { + LOG.debug("LineageProcessPreProcessor.processAttributes: pre processing {}, {}", entityStruct.getAttribute(QUALIFIED_NAME), operation); + } + + AtlasEntity entity = (AtlasEntity) entityStruct; + AtlasVertex vertex = context.getVertex(entity.getGuid()); + ArrayList connectionProcessQNs = getConnectionProcessQNsForTheGivenInputOutputs(entity); + + switch (operation) { + case CREATE: + processCreateLineageProcess(entity, connectionProcessQNs); + break; + case UPDATE: + processUpdateLineageProcess(entity, vertex, context, connectionProcessQNs); + break; + } + }catch(Exception exp){ + if (LOG.isDebugEnabled()) { + LOG.debug("Lineage preprocessor: " + exp); + } + }finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + + } + + private void processCreateLineageProcess(AtlasEntity entity, ArrayList connectionProcessList) { + // if not exist create lineage process + // add owner connection process + if(!connectionProcessList.isEmpty()){ + entity.setAttribute(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, connectionProcessList); + } + } + + private void processUpdateLineageProcess(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context, ArrayList connectionProcessList) { + // check if connection lineage exists + // add owner connection process + if(!connectionProcessList.isEmpty()){ + entity.setAttribute(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, connectionProcessList); + } + } + + private AtlasEntity createConnectionProcessEntity(Map connectionProcessInfo) throws AtlasBaseException { + AtlasEntity processEntity = new AtlasEntity(); + processEntity.setTypeName(CONNECTION_PROCESS_ENTITY_TYPE); + processEntity.setAttribute(NAME, connectionProcessInfo.get("connectionProcessName")); + processEntity.setAttribute(QUALIFIED_NAME, connectionProcessInfo.get("connectionProcessQualifiedName")); + + // Set up relationship attributes for input and output connections + AtlasObjectId inputConnection = new AtlasObjectId(); + inputConnection.setTypeName(CONNECTION_ENTITY_TYPE); + inputConnection.setUniqueAttributes(mapOf(QUALIFIED_NAME, connectionProcessInfo.get("input"))); + + AtlasObjectId outputConnection = new AtlasObjectId(); + outputConnection.setTypeName(CONNECTION_ENTITY_TYPE); + outputConnection.setUniqueAttributes(mapOf(QUALIFIED_NAME, connectionProcessInfo.get("output"))); + + Map relationshipAttributes = new HashMap<>(); + relationshipAttributes.put("inputs", Collections.singletonList(inputConnection)); + relationshipAttributes.put("outputs", Collections.singletonList(outputConnection)); + processEntity.setRelationshipAttributes(relationshipAttributes); + + try { + RequestContext.get().setSkipAuthorizationCheck(true); + AtlasEntity.AtlasEntitiesWithExtInfo processExtInfo = new AtlasEntity.AtlasEntitiesWithExtInfo(); + processExtInfo.addEntity(processEntity); + EntityStream entityStream = new AtlasEntityStream(processExtInfo); + entityStore.createOrUpdate(entityStream, false); + + // Update hasLineage for both connections + updateConnectionLineageFlag((String) connectionProcessInfo.get("input"), true); + updateConnectionLineageFlag((String) connectionProcessInfo.get("output"), true); + } finally { + RequestContext.get().setSkipAuthorizationCheck(false); + } + + return processEntity; + } + + private void updateConnectionLineageFlag(String connectionQualifiedName, boolean hasLineage) throws AtlasBaseException { + AtlasObjectId connectionId = new AtlasObjectId(); + connectionId.setTypeName(CONNECTION_ENTITY_TYPE); + connectionId.setUniqueAttributes(mapOf(QUALIFIED_NAME, connectionQualifiedName)); + + try { + AtlasVertex connectionVertex = entityRetriever.getEntityVertex(connectionId); + AtlasEntity connection = entityRetriever.toAtlasEntity(connectionVertex); + connection.setAttribute(HAS_LINEAGE, hasLineage); + + AtlasEntity.AtlasEntitiesWithExtInfo connectionExtInfo = new AtlasEntity.AtlasEntitiesWithExtInfo(); + connectionExtInfo.addEntity(connection); + EntityStream entityStream = new AtlasEntityStream(connectionExtInfo); + + RequestContext.get().setSkipAuthorizationCheck(true); + try { + entityStore.createOrUpdate(entityStream, false); + } finally { + RequestContext.get().setSkipAuthorizationCheck(false); + } + } catch (AtlasBaseException e) { + if (!e.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)) { + throw e; + } + } + } + + private void checkAndUpdateConnectionLineage(String connectionQualifiedName) throws AtlasBaseException { + AtlasObjectId connectionId = new AtlasObjectId(); + connectionId.setTypeName(CONNECTION_ENTITY_TYPE); + connectionId.setUniqueAttributes(mapOf(QUALIFIED_NAME, connectionQualifiedName)); + + try { + AtlasVertex connectionVertex = entityRetriever.getEntityVertex(connectionId); + + // Check if this connection has any active connection processes + boolean hasActiveConnectionProcess = hasActiveConnectionProcesses(connectionVertex); + + // Only update if the hasLineage status needs to change + boolean currentHasLineage = getEntityHasLineage(connectionVertex); + if (currentHasLineage != hasActiveConnectionProcess) { + if (LOG.isDebugEnabled()) { + LOG.debug("Updating hasLineage for connection {} from {} to {}", + connectionQualifiedName, currentHasLineage, hasActiveConnectionProcess); + } + + AtlasEntity connection = entityRetriever.toAtlasEntity(connectionVertex); + connection.setAttribute(HAS_LINEAGE, hasActiveConnectionProcess); + + AtlasEntity.AtlasEntitiesWithExtInfo connectionExtInfo = new AtlasEntity.AtlasEntitiesWithExtInfo(); + connectionExtInfo.addEntity(connection); + EntityStream entityStream = new AtlasEntityStream(connectionExtInfo); + + RequestContext.get().setSkipAuthorizationCheck(true); + try { + entityStore.createOrUpdate(entityStream, false); + } finally { + RequestContext.get().setSkipAuthorizationCheck(false); + } + } + } catch (AtlasBaseException e) { + if (!e.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)) { + throw e; + } + } + } + + private boolean hasActiveConnectionProcesses(AtlasVertex connectionVertex) { + // Iterate over both input and output edges connected to this connection + Iterator edges = connectionVertex.getEdges(AtlasEdgeDirection.BOTH, + new String[]{"__ConnectionProcess.inputs", "__ConnectionProcess.outputs"}).iterator(); + + while (edges.hasNext()) { + AtlasEdge edge = edges.next(); + + // Check if the edge is ACTIVE + if (getStatus(edge) == ACTIVE) { + // Get the connected process vertex (the other vertex of the edge) + AtlasVertex processVertex = edge.getOutVertex().equals(connectionVertex) ? + edge.getInVertex() : edge.getOutVertex(); + + // Check if the connected vertex is an ACTIVE ConnectionProcess + if (getStatus(processVertex) == ACTIVE && + getTypeName(processVertex).equals(CONNECTION_PROCESS_ENTITY_TYPE)) { + return true; + } + } + } + return false; + } + + private ArrayList getConnectionProcessQNsForTheGivenInputOutputs(AtlasEntity processEntity) throws AtlasBaseException{ + + // check connection lineage exists or not + // check if connection lineage exists + Map entityAttrValues = processEntity.getRelationshipAttributes(); + + ArrayList inputsAssets = (ArrayList) entityAttrValues.get("inputs"); + ArrayList outputsAssets = (ArrayList) entityAttrValues.get("outputs"); + + // get connection process + Set> uniquesSetOfConnectionProcess = new HashSet<>(); + + for (AtlasObjectId input : inputsAssets){ + AtlasVertex inputVertex = entityRetriever.getEntityVertex(input); + Map inputVertexConnectionQualifiedName = fetchAttributes(inputVertex, FETCH_ENTITY_ATTRIBUTES); + for (AtlasObjectId output : outputsAssets){ + AtlasVertex outputVertex = entityRetriever.getEntityVertex(output); + Map outputVertexConnectionQualifiedName = fetchAttributes(outputVertex, FETCH_ENTITY_ATTRIBUTES); + + if(inputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME) == outputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME)){ + continue; + } + + String connectionProcessName = "(" + inputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME) + ")->(" + outputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME) + ")"; + String connectionProcessQualifiedName = outputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME) + "/" + connectionProcessName; + // Create a map to store both connectionProcessName and connectionProcessQualifiedName + Map connectionProcessMap = new HashMap<>(); + connectionProcessMap.put("input", inputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME)); + connectionProcessMap.put("output", outputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME)); + connectionProcessMap.put("connectionProcessName", connectionProcessName); + connectionProcessMap.put("connectionProcessQualifiedName", connectionProcessQualifiedName); + + // Add the map to the set + uniquesSetOfConnectionProcess.add(connectionProcessMap); + } + } + + ArrayList connectionProcessList = new ArrayList<>(); + + // check if connection process exists + for (Map connectionProcessInfo : uniquesSetOfConnectionProcess){ + AtlasObjectId atlasObjectId = new AtlasObjectId(); + atlasObjectId.setTypeName(CONNECTION_PROCESS_ENTITY_TYPE); + atlasObjectId.setUniqueAttributes(mapOf(QUALIFIED_NAME, connectionProcessInfo.get("connectionProcessQualifiedName"))); + AtlasVertex connectionProcessVertex = null; + try { + // TODO add caching here + connectionProcessVertex = entityRetriever.getEntityVertex(atlasObjectId); + } + catch(AtlasBaseException exp){ + if(!exp.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)){ + throw exp; + } + } + + AtlasEntity connectionProcess; + if (connectionProcessVertex == null) { + connectionProcess = createConnectionProcessEntity(connectionProcessInfo); + } else { + // exist so retrieve and perform any update so below statement to retrieve + // TODO add caching here + connectionProcess = entityRetriever.toAtlasEntity(connectionProcessVertex); + } + // only add in list if created + connectionProcessList.add(connectionProcess.getAttribute(QUALIFIED_NAME)); + } + + return connectionProcessList; + } + + public boolean checkIfMoreChildProcessExistForConnectionProcess(String connectionProcessQn) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("checkIfMoreChileProcessExistForConnectionProcess"); + boolean ret = false; + + try { + List mustClauseList = new ArrayList(); + mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", PROCESS_ENTITY_TYPE))); + mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE"))); + mustClauseList.add(mapOf("term", mapOf(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, connectionProcessQn))); + + Map dsl = mapOf("query", mapOf("bool", mapOf("must", mustClauseList))); + + List process = indexSearchPaginated(dsl, new HashSet<>(Arrays.asList(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME)) , this.discovery); + + if (CollectionUtils.isNotEmpty(process) && process.size()>1) { + ret = true; + } + } finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + return ret; + } + + // handle process delete logic + @Override + public void processDelete(AtlasVertex vertex) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processDeleteLineageProcess"); + + try { + // handle both soft and hard deletes + // Collect all connections involved in the process being deleted + AtlasEntity processEntity = entityRetriever.toAtlasEntity(vertex); + + Set involvedConnections = new HashSet<>(); + + // Retrieve inputs and outputs from the process + List inputs = (List) processEntity.getRelationshipAttribute("inputs"); + List outputs = (List) processEntity.getRelationshipAttribute("outputs"); + + if (inputs == null) inputs = Collections.emptyList(); + if (outputs == null) outputs = Collections.emptyList(); + + List allAssets = new ArrayList<>(); + allAssets.addAll(inputs); + allAssets.addAll(outputs); + + // For each asset, get its connection and add to involvedConnections + for (AtlasObjectId assetId : allAssets) { + try { + AtlasVertex assetVertex = entityRetriever.getEntityVertex(assetId); + Map assetConnectionAttributes = fetchAttributes(assetVertex, FETCH_ENTITY_ATTRIBUTES); + if (assetConnectionAttributes != null) { + String connectionQN = assetConnectionAttributes.get(CONNECTION_QUALIFIED_NAME); + if (StringUtils.isNotEmpty(connectionQN)) { + involvedConnections.add(connectionQN); + } + } + } catch (AtlasBaseException e) { + LOG.warn("Failed to retrieve connection for asset {}: {}", assetId.getGuid(), e.getMessage()); + } + } + + // Collect affected connections from connection processes to be deleted + Set connectionProcessQNs = new HashSet<>(); + + Object rawProperty = vertex.getProperty(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, Object.class); + + if (rawProperty instanceof List) { + // If the property is a List, cast and add all elements + List propertyList = (List) rawProperty; + connectionProcessQNs.addAll(propertyList); + } else if (rawProperty instanceof String) { + // If it's a single String, add it to the set + connectionProcessQNs.add((String) rawProperty); + } else if (rawProperty != null) { + // Handle other object types if necessary + connectionProcessQNs.add(rawProperty.toString()); + } + + if (connectionProcessQNs.isEmpty()) { + return; + } + + Set affectedConnections = new HashSet<>(); + + // Process each connection process + for (String connectionProcessQn : connectionProcessQNs) { + if (!checkIfMoreChildProcessExistForConnectionProcess(connectionProcessQn)) { + AtlasObjectId atlasObjectId = new AtlasObjectId(); + atlasObjectId.setTypeName(CONNECTION_PROCESS_ENTITY_TYPE); + atlasObjectId.setUniqueAttributes(AtlasEntityUtils.mapOf(QUALIFIED_NAME, connectionProcessQn)); + + try { + // Get connection process before deletion to track affected connections + AtlasVertex connectionProcessVertex = entityRetriever.getEntityVertex(atlasObjectId); + AtlasEntity connectionProcess = entityRetriever.toAtlasEntity(connectionProcessVertex); + + // Safely get connection qualified names + String inputConnQN = getConnectionQualifiedName(connectionProcess, "input"); + String outputConnQN = getConnectionQualifiedName(connectionProcess, "output"); + + // Add non-null qualified names to affected connections + if (StringUtils.isNotEmpty(inputConnQN)) { + affectedConnections.add(inputConnQN); + } + if (StringUtils.isNotEmpty(outputConnQN)) { + affectedConnections.add(outputConnQN); + } + + // Delete the connection process + entityStore.deleteById(connectionProcessVertex.getProperty("__guid", String.class)); + } catch (AtlasBaseException exp) { + if (!exp.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)) { + throw exp; + } + } + } + } + + // Combine involved and affected connections + Set connectionsToCheck = new HashSet<>(); + connectionsToCheck.addAll(involvedConnections); + connectionsToCheck.addAll(affectedConnections); + + // Check and update hasLineage for all connections involved + for (String connectionQN : connectionsToCheck) { + checkAndUpdateConnectionLineage(connectionQN); + } + } finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + } + + // Helper method to safely get connection qualified name + private String getConnectionQualifiedName(AtlasEntity connectionProcess, String attributeName) { + try { + Object relationshipAttr = connectionProcess.getRelationshipAttribute(attributeName); + if (relationshipAttr instanceof AtlasObjectId) { + AtlasObjectId connObjectId = (AtlasObjectId) relationshipAttr; + Map uniqueAttributes = connObjectId.getUniqueAttributes(); + if (uniqueAttributes != null) { + return (String) uniqueAttributes.get(QUALIFIED_NAME); + } + } + } catch (Exception e) { + LOG.warn("Error getting {} qualified name for connection process {}: {}", + attributeName, connectionProcess.getGuid(), e.getMessage()); + } + return null; + } +}