Skip to content

LIN-160 - Summary Lineage #4641

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open

LIN-160 - Summary Lineage #4641

wants to merge 8 commits into from

Conversation

akshaysw
Copy link

Change description

Connection level lineage
notion doc - https://www.notion.so/atlanhq/Summary-Lineage-eef721c5acfb48f9b5d3b84aa3da0d43

I have added new file LineagePreProcessor - >

  1. Files check bulk API payload for create process request
  2. checks whether the input/output assets belong to different connections.
  3. If they do, Atlas verifies whether a connection lineage already exists:
  • If a connection lineage exists, no further action is taken.
  • If no connection lineage exists, Atlas creates one.
  1. similarly delete process call is handled,

Type of change

  • Bug fix (fixes an issue)
  • New feature (adds functionality)

Related issues

Fix #1

Helm Config Changes for Running Tests (Staging PR)

Does this PR require Helm config changes for testing?

  • Tests are NOT required for this commit. (You can proceed with the PR.) ✅
  • No, Helm config changes are not needed. (You can proceed with the PR.) ✅
  • Yes, I have already updated the config-values on enpla9up36. (You can proceed with the PR.) ✅
  • Yes, but I have NOT updated the config-values. (Please update them before proceeding; or, tests will run with default values.)⚠️

Checklists

Development

  • Lint rules pass locally
  • Application changes have been tested thoroughly
  • Automated tests covering modified code pass

Security

  • Security impact of change has been considered
  • Code follows company security practices and guidelines

Code review

  • Pull request has a descriptive title and context useful to a reviewer. Screenshots or screencasts are attached as necessary
  • "Ready for review" label attached and reviewers assigned
  • Changes have been reviewed by at least one other contributor
  • Pull request linked to task tracker where applicable

Set<Map<String, Object>> uniqueConnectionProcesses = new HashSet<>();
for (String inputConnectionQN : inputConnectionQNs) {
for (String outputConnectionQN : outputConnectionQNs) {
if (!inputConnectionQN.equals(outputConnectionQN)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic looks not correct to me @akshaysw . We should compare all elements in inputConnection with outputConnection before we decide to create a connection!

this.isDataProduct = isDataProduct;
}

public boolean CheckIfConnectorVertex(String lineageType){

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code style. method name starts with lowercase

/**
* String[] => [Input edge Label, Output Edge Label]
*/
public static final HashMap<String, String[]> LINEAGE_MAP = new HashMap<String, String[]>(){{

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left side should always be an interface. So

public static final Map<String, String[]> LINEAGE_MAP = new HashMap<String, String[]>(2){{

add 2 as initial capacity. Also see if we need a Map for this ?

boolean isDataSet = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE);
if (!isDataSet) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, typeName);
isConnectionProcess = entityType.getTypeAndAllSuperTypes().contains(CONNECTION_PROCESS_ENTITY_TYPE);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review this code instead.

Set<String> allTypes = entityType.getTypeAndAllSuperTypes();

isConnectionProcess = allTypes.contains(CONNECTION_PROCESS_ENTITY_TYPE);
isDataSet = allTypes.contains(DATA_SET_SUPER_TYPE);
isConnection = allTypes.contains(CONNECTION_ENTITY_TYPE);

if (!isConnectionProcess && !isDataSet && !isConnection) {
    throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, typeName);
}

AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH)
traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder);
traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder, entityValidationResult);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

traverseEdgesOnDemand has 11 parameters now. This is not getting any easy to maintain.

Always make an attempt to make code better than current! cc: @sumandas0 @aarshi0301

}
}

private boolean hasActiveConnectionProcesses(AtlasVertex connectionVertex) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better optimised logic. Rather than traversing in both directions, this uses IN first and short circuits earlier. Only if not found, it fallbacks to OUT lazily.

private boolean hasActiveConnectionProcesses(AtlasVertex connectionVertex) {
    // First, check incoming edges (connection used as output)
    Iterable<AtlasEdge> inEdges = connectionVertex.getEdges(
        AtlasEdgeDirection.IN, "__ConnectionProcess.outputs");

    for (AtlasEdge edge : inEdges) {
        if (getStatus(edge) != ACTIVE) {
            continue;
        }

        AtlasVertex processVertex = edge.getOutVertex(); // From process to connection
        if (getStatus(processVertex) == ACTIVE &&
            CONNECTION_PROCESS_ENTITY_TYPE.equals(getTypeName(processVertex))) {
            return true;
        }
    }

    // If not found, check outgoing edges (connection used as input)
    Iterable<AtlasEdge> outEdges = connectionVertex.getEdges(
        AtlasEdgeDirection.OUT, "__ConnectionProcess.inputs");

    for (AtlasEdge edge : outEdges) {
        if (getStatus(edge) != ACTIVE) {
            continue;
        }

        AtlasVertex processVertex = edge.getInVertex(); // From connection to process
        if (getStatus(processVertex) == ACTIVE &&
            CONNECTION_PROCESS_ENTITY_TYPE.equals(getTypeName(processVertex))) {
            return true;
        }
    }

    return false;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants