-
Notifications
You must be signed in to change notification settings - Fork 10
DG-1924 | Add 'assetsCountToPropagate' and 'assetsCountPropagated' in task vertex. #4032
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 17 commits
759f717
65d920d
61d58dd
dfddb79
b4ce7af
ed847c1
c257421
adc6ed1
fdca703
164ec29
4e8c6df
d046141
288062c
ae9dec1
a13f17e
d364c8e
b9b6887
317322e
7cf3072
ba5d144
14db31d
102ae40
64775b4
42540a0
97bbbaf
42b074a
c7c8c4e
6eaf1eb
5caa581
6d176e7
6451bfd
e51846d
48e07f4
547d561
616dcef
06fb81c
cab6deb
157e2db
06e406a
3936029
de81019
720cda0
1b4dbee
e66c636
0f6af6d
73c7cd8
edebe8c
6961d65
5f31f32
8063ff0
b0aadc1
5d31524
08fa364
77dd858
817418b
613db6c
18ccd0f
d7f5e47
4e6eb48
0d8bdf9
386b92c
850fffe
65cb0fb
6aede5c
22e973a
2991b78
75a8e14
bc27ce9
0b8dcd0
754ceb0
e297cae
c1de532
c1fa864
6abecbb
a4b9046
050c1f3
c35fde8
fda9378
7c89b28
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,6 +58,7 @@ | |
| import org.apache.commons.lang.StringUtils; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
| import static org.apache.atlas.repository.graph.GraphHelper.getTypeName; | ||
|
|
||
| import java.util.*; | ||
| import java.util.stream.Collectors; | ||
|
|
@@ -95,6 +96,7 @@ public abstract class DeleteHandlerV1 { | |
| private final TaskManagement taskManagement; | ||
| private final AtlasGraph graph; | ||
| private final TaskUtil taskUtil; | ||
| private static final int CHUNK_SIZE = AtlasConfiguration.TASKS_GRAPH_COMMIT_CHUNK_SIZE.getInt(); | ||
|
|
||
| public DeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry, boolean shouldUpdateInverseReference, boolean softDelete, TaskManagement taskManagement) { | ||
| this.typeRegistry = typeRegistry; | ||
|
|
@@ -1235,16 +1237,32 @@ public void updateTagPropagations(AtlasEdge edge, AtlasRelationship relationship | |
| } | ||
| } | ||
|
|
||
| taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_TO_PROPAGATE, graph, addPropagationsMap.size() + removePropagationsMap.size()); | ||
|
|
||
| int propagatedCount = 0; | ||
| for (AtlasVertex classificationVertex : addPropagationsMap.keySet()) { | ||
| List<AtlasVertex> entitiesToAddPropagation = addPropagationsMap.get(classificationVertex); | ||
|
|
||
| addTagPropagation(classificationVertex, entitiesToAddPropagation); | ||
| propagatedCount++; | ||
| if (propagatedCount == CHUNK_SIZE){ | ||
|
||
| taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_PROPAGATED, graph, propagatedCount - 1); | ||
| propagatedCount = 0; | ||
| } | ||
| } | ||
|
|
||
| for (AtlasVertex classificationVertex : removePropagationsMap.keySet()) { | ||
| List<AtlasVertex> entitiesToRemovePropagation = removePropagationsMap.get(classificationVertex); | ||
|
|
||
| removeTagPropagation(classificationVertex, entitiesToRemovePropagation); | ||
| propagatedCount++; | ||
| if (propagatedCount == CHUNK_SIZE){ | ||
| taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_PROPAGATED, graph, propagatedCount); | ||
| propagatedCount = 0; | ||
| } | ||
| } | ||
| if (propagatedCount != 0){ | ||
| taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_PROPAGATED, graph, propagatedCount); | ||
| } | ||
| } else { | ||
| // update blocked propagated classifications only if there is no change is tag propagation (don't update both) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ | |
|
|
||
|
|
||
| import com.google.common.annotations.VisibleForTesting; | ||
| import com.google.common.collect.Iterators; | ||
| import org.apache.atlas.*; | ||
| import org.apache.atlas.annotation.GraphTransaction; | ||
| import org.apache.atlas.authorize.AtlasAuthorizationUtils; | ||
|
|
@@ -78,6 +79,7 @@ | |
|
|
||
| import javax.inject.Inject; | ||
| import java.util.*; | ||
| import java.util.function.BiConsumer; | ||
| import java.util.regex.Matcher; | ||
| import java.util.regex.Pattern; | ||
| import java.util.stream.Collectors; | ||
|
|
@@ -94,6 +96,7 @@ | |
| import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE; | ||
| import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE; | ||
| import static org.apache.atlas.model.tasks.AtlasTask.Status.IN_PROGRESS; | ||
| import static org.apache.atlas.model.tasks.AtlasTask.Status.PENDING; | ||
| import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET; | ||
| import static org.apache.atlas.repository.Constants.*; | ||
| import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge; | ||
|
|
@@ -3158,9 +3161,10 @@ public void cleanUpClassificationPropagation(String classificationName, int batc | |
| long classificationEdgeCount = 0; | ||
| long classificationEdgeInMemoryCount = 0; | ||
| Iterator<AtlasVertex> tagVertices = GraphHelper.getClassificationVertices(graph, classificationName, CLEANUP_BATCH_SIZE); | ||
|
|
||
| List<AtlasVertex> tagVerticesProcessed = new ArrayList<>(0); | ||
| List<AtlasVertex> currentAssetVerticesBatch = new ArrayList<>(0); | ||
|
|
||
| int totalCount = 0; | ||
hr2904 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| while (tagVertices != null && tagVertices.hasNext()) { | ||
| if (cleanedUpCount >= CLEANUP_MAX){ | ||
| return; | ||
|
|
@@ -3179,6 +3183,8 @@ public void cleanUpClassificationPropagation(String classificationName, int batc | |
| } | ||
|
|
||
| int currentAssetsBatchSize = currentAssetVerticesBatch.size(); | ||
| totalCount += currentAssetsBatchSize; | ||
|
|
||
| if (currentAssetsBatchSize > 0) { | ||
| LOG.info("To clean up tag {} from {} entities", classificationName, currentAssetsBatchSize); | ||
| int offset = 0; | ||
|
|
@@ -3210,17 +3216,20 @@ public void cleanUpClassificationPropagation(String classificationName, int batc | |
| classificationEdgeInMemoryCount = 0; | ||
| } | ||
| } | ||
|
|
||
| try { | ||
| AtlasEntity entity = repairClassificationMappings(vertex); | ||
| entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassifications); | ||
| } catch (IllegalStateException | AtlasBaseException e) { | ||
| e.printStackTrace(); | ||
| } | ||
|
|
||
| } | ||
|
|
||
| transactionInterceptHelper.intercept(); | ||
|
|
||
| offset += CHUNK_SIZE; | ||
|
|
||
|
|
||
| } finally { | ||
| LOG.info("For offset {} , classificationEdge were : {}", offset, classificationEdgeCount); | ||
| classificationEdgeCount = 0; | ||
|
|
@@ -3236,7 +3245,6 @@ public void cleanUpClassificationPropagation(String classificationName, int batc | |
| e.printStackTrace(); | ||
| } | ||
| } | ||
| transactionInterceptHelper.intercept(); | ||
|
|
||
| cleanedUpCount += currentAssetsBatchSize; | ||
| currentAssetVerticesBatch.clear(); | ||
|
|
@@ -3245,6 +3253,10 @@ public void cleanUpClassificationPropagation(String classificationName, int batc | |
| tagVertices = GraphHelper.getClassificationVertices(graph, classificationName, CLEANUP_BATCH_SIZE); | ||
| } | ||
|
|
||
| taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_TO_PROPAGATE, graph, totalCount); | ||
| taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_PROPAGATED, graph, totalCount); | ||
|
|
||
| transactionInterceptHelper.intercept(); | ||
| LOG.info("Completed cleaning up classification {}", classificationName); | ||
| } | ||
|
|
||
|
|
@@ -3438,6 +3450,7 @@ public void addClassifications(final EntityMutationContext context, String guid, | |
|
|
||
| public List<String> propagateClassification(String entityGuid, String classificationVertexId, String relationshipGuid, Boolean previousRestrictPropagationThroughLineage,Boolean previousRestrictPropagationThroughHierarchy) throws AtlasBaseException { | ||
| try { | ||
|
|
||
| if (StringUtils.isEmpty(entityGuid) || StringUtils.isEmpty(classificationVertexId)) { | ||
| LOG.error("propagateClassification(entityGuid={}, classificationVertexId={}): entityGuid and/or classification vertex id is empty", entityGuid, classificationVertexId); | ||
|
|
||
|
|
@@ -3481,9 +3494,10 @@ public List<String> propagateClassification(String entityGuid, String classifica | |
| Boolean toExclude = propagationMode == CLASSIFICATION_PROPAGATION_MODE_RESTRICT_LINEAGE ? true:false; | ||
| List<AtlasVertex> impactedVertices = entityRetriever.getIncludedImpactedVerticesV2(entityVertex, relationshipGuid, classificationVertexId, edgeLabelsToCheck,toExclude); | ||
|
|
||
| taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_TO_PROPAGATE, graph, impactedVertices.size() - 1); | ||
|
|
||
| if (CollectionUtils.isEmpty(impactedVertices)) { | ||
| LOG.debug("propagateClassification(entityGuid={}, classificationVertexId={}): found no entities to propagate the classification", entityGuid, classificationVertexId); | ||
|
|
||
| return null; | ||
| } | ||
|
|
||
|
|
@@ -3526,9 +3540,15 @@ public List<String> processClassificationPropagationAddition(List<AtlasVertex> v | |
|
|
||
| propagatedEntitiesGuids.addAll(chunkedPropagatedEntitiesGuids); | ||
|
|
||
| transactionInterceptHelper.intercept(); | ||
|
|
||
| int propagatedAssetsCount = (offset + CHUNK_SIZE >= impactedVerticesSize && impactedVerticesSize == verticesToPropagate.size()) | ||
|
||
| ? toIndex - offset - 1 // Subtract 1 for the last chunk | ||
| : toIndex - offset; | ||
|
|
||
| offset += CHUNK_SIZE; | ||
|
|
||
| transactionInterceptHelper.intercept(); | ||
| taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_PROPAGATED, graph, propagatedAssetsCount); | ||
|
|
||
| } while (offset < impactedVerticesSize); | ||
| } catch (AtlasBaseException exception) { | ||
|
|
@@ -3538,7 +3558,7 @@ public List<String> processClassificationPropagationAddition(List<AtlasVertex> v | |
| RequestContext.get().endMetricRecord(classificationPropagationMetricRecorder); | ||
| } | ||
|
|
||
| return propagatedEntitiesGuids; | ||
| return propagatedEntitiesGuids; | ||
|
|
||
| } | ||
|
|
||
|
|
@@ -4072,6 +4092,9 @@ public void updateClassificationTextPropagation(String classificationVertexId) t | |
| AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex); | ||
| LOG.info("Fetched classification : {} ", classification.toString()); | ||
| List<AtlasVertex> impactedVertices = graphHelper.getAllPropagatedEntityVertices(classificationVertex); | ||
|
|
||
| taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_TO_PROPAGATE, graph, impactedVertices.size()); | ||
|
|
||
| LOG.info("impactedVertices : {}", impactedVertices.size()); | ||
| int batchSize = 100; | ||
| for (int i = 0; i < impactedVertices.size(); i += batchSize) { | ||
|
|
@@ -4086,6 +4109,9 @@ public void updateClassificationTextPropagation(String classificationVertexId) t | |
| entityChangeNotifier.onClassificationUpdatedToEntity(entity, Collections.singletonList(classification)); | ||
| } | ||
| } | ||
|
|
||
| taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_PROPAGATED, graph, end); | ||
|
|
||
| transactionInterceptHelper.intercept(); | ||
| LOG.info("Updated classificationText from {} for {}", i, batchSize); | ||
| } | ||
|
|
@@ -4276,6 +4302,8 @@ public void classificationRefreshPropagation(String classificationId) throws Atl | |
| .filter(vertex -> vertex != null) | ||
| .collect(Collectors.toList()); | ||
|
|
||
| taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_TO_PROPAGATE, graph, verticesToRemove.size() + verticesToAddClassification.size()); | ||
|
|
||
| //Remove classifications from unreachable vertices | ||
| processPropagatedClassificationDeletionFromVertices(verticesToRemove, currentClassificationVertex, classification); | ||
|
|
||
|
|
@@ -4353,8 +4381,9 @@ private void processPropagatedClassificationDeletionFromVertices(List<AtlasVerte | |
| List<AtlasEntity> updatedEntities = updateClassificationText(classification, updatedVertices); | ||
| entityChangeNotifier.onClassificationsDeletedFromEntities(updatedEntities, Collections.singletonList(classification)); | ||
|
|
||
| int propagatedAssetsCount = toIndex - offset; | ||
| offset += CHUNK_SIZE; | ||
|
|
||
| taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_PROPAGATED, graph, propagatedAssetsCount); | ||
| transactionInterceptHelper.intercept(); | ||
|
|
||
| } while (offset < propagatedVerticesSize); | ||
|
|
@@ -4372,6 +4401,8 @@ List<String> processClassificationEdgeDeletionInChunk(AtlasClassification classi | |
| int toIndex; | ||
| int offset = 0; | ||
|
|
||
| taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_TO_PROPAGATE, graph, propagatedEdgesSize); | ||
|
|
||
| do { | ||
| toIndex = ((offset + CHUNK_SIZE > propagatedEdgesSize) ? propagatedEdgesSize : (offset + CHUNK_SIZE)); | ||
|
|
||
|
|
@@ -4387,8 +4418,11 @@ List<String> processClassificationEdgeDeletionInChunk(AtlasClassification classi | |
| deletedPropagationsGuid.addAll(propagatedEntities.stream().map(x -> x.getGuid()).collect(Collectors.toList())); | ||
| } | ||
|
|
||
| int propagatedAssetsCount = toIndex - offset; | ||
|
|
||
| offset += CHUNK_SIZE; | ||
|
|
||
| taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_PROPAGATED, graph, propagatedAssetsCount); | ||
| transactionInterceptHelper.intercept(); | ||
|
|
||
| } while (offset < propagatedEdgesSize); | ||
|
|
||
jnkrmg marked this conversation as resolved.
Show resolved
Hide resolved
|

Uh oh!
There was an error while loading. Please reload this page.