Skip to content

Commit 7ff6994

Browse files
jnkrmgabhijeet-atlan
authored andcommitted
Added ‘assetsCountToPropagate’ and ‘assetsCountPropagated’ to the task vertex. The former will be updated with the total count of propagations to be done once the planning phase is complete and the task begins execution. The latter will be updated as the task progresses, reflecting the count of completed propagations at any given point.
1 parent 400f814 commit 7ff6994

File tree

14 files changed

+218
-27
lines changed

14 files changed

+218
-27
lines changed

.github/workflows/maven.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ on:
2828
- master
2929
- lineageondemand
3030
- makerlogic
31+
- taskdg1924deleteprop
32+
- tagpropv1master
3133

3234
jobs:
3335
build:

common/src/main/java/org/apache/atlas/repository/Constants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,8 @@ public final class Constants {
359359
public static final String TASK_CLASSIFICATION_ID = encodePropertyKey(TASK_PREFIX + "classificationId");
360360
public static final String TASK_ENTITY_GUID = encodePropertyKey(TASK_PREFIX + "entityGuid");
361361
public static final String TASK_CLASSIFICATION_TYPENAME = encodePropertyKey(TASK_PREFIX + "classificationTypeName");
362+
public static final String TASK_ASSET_COUNT_TO_PROPAGATE = encodePropertyKey(TASK_PREFIX + "assetsCountToPropagate");
363+
public static final String TASK_ASSET_COUNT_PROPAGATED = encodePropertyKey(TASK_PREFIX + "assetsCountPropagated");
362364
public static final String ACTIVE_STATE_VALUE = "ACTIVE";
363365
public static final String TASK_HEADER_ATLAN_AGENT = "x-atlan-agent";
364366
public static final String TASK_HEADER_ATLAN_AGENT_ID = "x-atlan-agent-id";

intg/src/main/java/org/apache/atlas/model/patches/AtlasPatch.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,15 @@ public class AtlasPatch implements Serializable {
4949
private long createdTime;
5050
private long updatedTime;
5151
private PatchStatus status;
52+
private long assetsCountToPropagate;
53+
private long assetsCountPropagated;
5254

5355
public enum PatchStatus { UNKNOWN, APPLIED, SKIPPED, FAILED }
5456

5557
public AtlasPatch() { }
5658

5759
public AtlasPatch(String id, String patchName, String type, String action, PatchStatus status,
58-
String updatedBy, String createdBy, long createdTime, long updatedTime) {
60+
String updatedBy, String createdBy, long createdTime, long updatedTime, long assetsCountToPropagate, long assetsCountPropagated) {
5961
this.id = id;
6062
this.description = patchName;
6163
this.type = type;
@@ -65,6 +67,8 @@ public AtlasPatch(String id, String patchName, String type, String action, Patch
6567
this.createdBy = createdBy;
6668
this.createdTime = createdTime;
6769
this.updatedTime = updatedTime;
70+
this.assetsCountToPropagate = assetsCountToPropagate;
71+
this.assetsCountPropagated = assetsCountPropagated;
6872
}
6973

7074
public String getId() {
@@ -139,6 +143,18 @@ public void setUpdatedTime(long updatedTime) {
139143
this.updatedTime = updatedTime;
140144
}
141145

146+
public void setAssetsCountToPropagate(Long assetsCount) {
147+
this.assetsCountToPropagate = assetsCount;
148+
}
149+
150+
public Long getAssetsCountToPropagate() {
151+
return assetsCountToPropagate;
152+
}
153+
154+
public Long getAssetsCountPropagated(){
155+
return assetsCountPropagated;
156+
}
157+
142158
@Override
143159
public boolean equals(Object o) {
144160
if (this == o) return true;
@@ -157,7 +173,7 @@ public boolean equals(Object o) {
157173

158174
@Override
159175
public int hashCode() {
160-
return Objects.hash(id, description, type, action, updatedBy, createdBy, createdTime, updatedTime, status);
176+
return Objects.hash(id, description, type, action, updatedBy, createdBy, createdTime, updatedTime, status, assetsCountToPropagate, assetsCountPropagated);
161177
}
162178

163179
@Override
@@ -173,6 +189,8 @@ public String toString() {
173189
sb.append(", createdTime=").append(createdTime);
174190
sb.append(", updatedTime=").append(updatedTime);
175191
sb.append(", status=").append(status);
192+
sb.append(", assetsCountToPropagate=").append(assetsCountToPropagate);
193+
sb.append(", assetsCountPropagated=").append(assetsCountPropagated);
176194
sb.append('}');
177195

178196
return sb.toString();

intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ public static Status from(String s) {
9595
private String classificationId;
9696
private String entityGuid;
9797
private String classificationTypeName;
98+
private Long assetsCountToPropagate;
99+
private Long assetsCountPropagated;
98100

99101
public AtlasTask() {
100102
}
@@ -111,6 +113,8 @@ public AtlasTask(String type, String createdBy, Map<String, Object> parameters,
111113
this.attemptCount = 0;
112114
this.classificationId = classificationId;
113115
this.entityGuid = entityGuid;
116+
this.assetsCountToPropagate = 0L;
117+
this.assetsCountPropagated = 0L;
114118
}
115119

116120
public String getGuid() {
@@ -239,6 +243,22 @@ public String getEntityGuid() {
239243
return entityGuid;
240244
}
241245

246+
public void setAssetsCountToPropagate(Long assetsCount) {
247+
this.assetsCountToPropagate = assetsCount;
248+
}
249+
250+
public Long getAssetsCountToPropagate() {
251+
return assetsCountToPropagate;
252+
}
253+
254+
public void setAssetsCountPropagated(Long assetsCountPropagated) {
255+
this.assetsCountPropagated = assetsCountPropagated;
256+
}
257+
258+
public Long getAssetsCountPropagated(){
259+
return assetsCountPropagated;
260+
}
261+
242262
@JsonIgnore
243263
public void start() {
244264
this.setStatus(Status.IN_PROGRESS);
@@ -270,6 +290,8 @@ public String toString() {
270290
", attemptCount=" + attemptCount +
271291
", errorMessage='" + errorMessage + '\'' +
272292
", status=" + status +
293+
", assetsCountToPropagate=" + assetsCountToPropagate +
294+
", assetsCountPropagated=" + assetsCountPropagated +
273295
'}';
274296
}
275297
}

repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,8 @@ private void initialize(AtlasGraph graph) throws RepositoryException, IndexExcep
407407
createCommonVertexIndex(management, TASK_ENTITY_GUID, UniqueKind.NONE, String.class, SINGLE, false, false, true);
408408
createCommonVertexIndex(management, TASK_ERROR_MESSAGE, UniqueKind.NONE, String.class, SINGLE, false, false);
409409
createCommonVertexIndex(management, TASK_ATTEMPT_COUNT, UniqueKind.NONE, Integer.class, SINGLE, false, false);
410+
createCommonVertexIndex(management, TASK_ASSET_COUNT_TO_PROPAGATE, UniqueKind.NONE, Long.class, SINGLE, false, false);
411+
createCommonVertexIndex(management, TASK_ASSET_COUNT_PROPAGATED, UniqueKind.NONE, Long.class, SINGLE, false, false);
410412

411413
createCommonVertexIndex(management, TASK_UPDATED_TIME, UniqueKind.NONE, Long.class, SINGLE, false, false);
412414
createCommonVertexIndex(management, TASK_TIME_TAKEN_IN_SECONDS, UniqueKind.NONE, Long.class, SINGLE, false, false);

repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ private static AtlasPatch toAtlasPatch(AtlasVertex vertex) {
210210
ret.setUpdatedBy(getEncodedProperty(vertex, MODIFIED_BY_KEY, String.class));
211211
ret.setCreatedTime(getEncodedProperty(vertex, TIMESTAMP_PROPERTY_KEY, Long.class));
212212
ret.setUpdatedTime(getEncodedProperty(vertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class));
213+
ret.setAssetsCountToPropagate(getEncodedProperty(vertex, TASK_ASSET_COUNT_TO_PROPAGATE, Long.class));
213214
ret.setStatus(getPatchStatus(vertex));
214215

215216
return ret;

repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1220,23 +1220,52 @@ public void updateTagPropagations(AtlasEdge edge, AtlasRelationship relationship
12201220
}
12211221
}
12221222

1223+
// update the 'assetsCountToPropagate' on in memory java object.
1224+
AtlasTask currentTask = RequestContext.get().getCurrentTask();
1225+
currentTask.setAssetsCountToPropagate((long) addPropagationsMap.size() + removePropagationsMap.size());
1226+
1227+
//update the 'assetsCountToPropagate' in the current task vertex.
1228+
AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next();
1229+
currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate());
1230+
//commiting to graph
1231+
graph.commit();
1232+
1233+
//total propagated count
1234+
int propagatedCount = 0;
12231235
for (AtlasVertex classificationVertex : addPropagationsMap.keySet()) {
12241236
List<AtlasVertex> entitiesToAddPropagation = addPropagationsMap.get(classificationVertex);
12251237

12261238
addTagPropagation(classificationVertex, entitiesToAddPropagation);
1239+
propagatedCount++;
1240+
if (propagatedCount == 100){
1241+
currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + propagatedCount - 1);
1242+
currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated());
1243+
propagatedCount = 0;
1244+
}
12271245
}
12281246

12291247
for (AtlasVertex classificationVertex : removePropagationsMap.keySet()) {
12301248
List<AtlasVertex> entitiesToRemovePropagation = removePropagationsMap.get(classificationVertex);
12311249

12321250
removeTagPropagation(classificationVertex, entitiesToRemovePropagation);
1251+
propagatedCount++;
1252+
if (propagatedCount == 100){
1253+
currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + propagatedCount);
1254+
currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated());
1255+
propagatedCount = 0;
1256+
}
1257+
}
1258+
if (propagatedCount != 0){
1259+
currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + propagatedCount);
1260+
currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated());
12331261
}
12341262
} else {
12351263
// update blocked propagated classifications only if there is no change is tag propagation (don't update both)
12361264
handleBlockedClassifications(edge, relationship.getBlockedPropagatedClassifications());
12371265
}
12381266
}
12391267

1268+
12401269
public void handleBlockedClassifications(AtlasEdge edge, Set<AtlasClassification> blockedClassifications) throws AtlasBaseException {
12411270
if (blockedClassifications != null) {
12421271
List<AtlasVertex> propagatableClassifications = getPropagatableClassifications(edge);

0 commit comments

Comments
 (0)