Skip to content

Commit 5063cd4

Browse files
committed
[add] code structure
1 parent acfc261 commit 5063cd4

File tree

3 files changed

+123
-0
lines changed

3 files changed

+123
-0
lines changed

common/src/main/java/org/apache/atlas/repository/Constants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,9 @@ public final class Constants {
202202

203203
public static String[] PROCESS_EDGE_LABELS = {PROCESS_OUTPUTS, PROCESS_INPUTS};
204204

205+
public static final String PROCESS_ENTITY_TYPE = "Process";
206+
207+
205208
/**
206209
* The homeId field is used when saving into Atlas a copy of an object that is being imported from another
207210
* repository. The homeId will be set to a String that identifies the other repository. The specific format

repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.StakeholderPreProcessor;
6363
import org.apache.atlas.repository.store.graph.v2.preprocessor.contract.ContractPreProcessor;
6464
import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.StakeholderTitlePreProcessor;
65+
import org.apache.atlas.repository.store.graph.v2.preprocessor.lineage.LineagePreProcessor;
6566
import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.LinkPreProcessor;
6667
import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor;
6768
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PersonaPreProcessor;
@@ -1887,6 +1888,9 @@ public PreProcessor getPreProcessor(String typeName) {
18871888
case STAKEHOLDER_TITLE_ENTITY_TYPE:
18881889
preProcessor = new StakeholderTitlePreProcessor(graph, typeRegistry, entityRetriever);
18891890
break;
1891+
1892+
case PROCESS_ENTITY_TYPE:
1893+
preProcessor = new LineagePreProcessor(typeRegistry, entityRetriever, graph, this);
18901894
}
18911895

18921896
return preProcessor;
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.atlas.repository.store.graph.v2.preprocessor.lineage;
19+
20+
21+
import org.apache.atlas.RequestContext;
22+
import org.apache.atlas.exception.AtlasBaseException;
23+
import org.apache.atlas.model.instance.AtlasEntity;
24+
import org.apache.atlas.model.instance.AtlasStruct;
25+
import org.apache.atlas.model.instance.EntityMutations;
26+
import org.apache.atlas.repository.graphdb.AtlasGraph;
27+
import org.apache.atlas.repository.graphdb.AtlasVertex;
28+
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
29+
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
30+
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
31+
import org.apache.atlas.repository.store.graph.v2.EntityMutationContext;
32+
import org.apache.atlas.repository.store.graph.v2.EntityStream;
33+
import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor;
34+
import org.apache.atlas.type.AtlasTypeRegistry;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
38+
import java.util.Map;
39+
40+
import static org.apache.atlas.repository.Constants.*;
41+
import static org.apache.atlas.repository.Constants.NAME;
42+
43+
public class LineagePreProcessor implements PreProcessor {
44+
private static final Logger LOG = LoggerFactory.getLogger(LineagePreProcessor.class);
45+
46+
private final AtlasTypeRegistry typeRegistry;
47+
private final EntityGraphRetriever entityRetriever;
48+
private AtlasEntityStore entityStore;
49+
50+
51+
public LineagePreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, AtlasGraph graph, AtlasEntityStore entityStore) {
52+
this.entityRetriever = entityRetriever;
53+
this.typeRegistry = typeRegistry;
54+
this.entityStore = entityStore;
55+
}
56+
57+
@Override
58+
public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context,
59+
EntityMutations.EntityOperation operation) throws AtlasBaseException {
60+
if (LOG.isDebugEnabled()) {
61+
LOG.debug("LineageProcessPreProcessor.processAttributes: pre processing {}, {}", entityStruct.getAttribute(QUALIFIED_NAME), operation);
62+
}
63+
64+
AtlasEntity entity = (AtlasEntity) entityStruct;
65+
AtlasVertex vertex = context.getVertex(entity.getGuid());
66+
67+
switch (operation) {
68+
case CREATE:
69+
processCreateLineageProcess(entity);
70+
break;
71+
case UPDATE:
72+
processUpdateLineageProcess(entity, vertex, context);
73+
break;
74+
}
75+
}
76+
77+
private void processCreateLineageProcess(AtlasEntity entity) {
78+
// check if connection lineage exists
79+
Map<String, Object> relAttrValues = entity.getRelationshipAttributes();
80+
81+
relAttrValues.get("outputs");
82+
relAttrValues.get("inputs");
83+
84+
85+
// if not exist create lineage process
86+
}
87+
88+
private void processUpdateLineageProcess(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) {
89+
// check if connection lineage exists
90+
91+
// if not exist update lineage process
92+
93+
}
94+
95+
private void createConnectionProcessEntity(AtlasEntity entity, String connectionProcessName, String connectionProcessQn, Map<String, Object> relAttrValues) throws AtlasBaseException {
96+
AtlasEntity processEntity = new AtlasEntity();
97+
processEntity.setTypeName(PROCESS_ENTITY_TYPE);
98+
processEntity.setAttribute(NAME, connectionProcessName);
99+
processEntity.setAttribute(QUALIFIED_NAME, connectionProcessQn + "/process");
100+
processEntity.setRelationshipAttributes(relAttrValues);
101+
102+
try {
103+
RequestContext.get().setSkipAuthorizationCheck(true);
104+
AtlasEntity.AtlasEntitiesWithExtInfo processExtInfo = new AtlasEntity.AtlasEntitiesWithExtInfo();
105+
processExtInfo.addEntity(processEntity);
106+
EntityStream entityStream = new AtlasEntityStream(processExtInfo);
107+
entityStore.createOrUpdate(entityStream, false); // adding new process
108+
} finally {
109+
RequestContext.get().setSkipAuthorizationCheck(false);
110+
}
111+
}
112+
113+
private void updateConnectionProcessEntity(AtlasEntity processEntity) throws AtlasBaseException{
114+
115+
}
116+
}

0 commit comments

Comments
 (0)