Skip to content

Commit bc8861c

Browse files
author
chaitalithombare
committed
ATLAS-5017: Patch to replace the long strings set in spark_process attributes
1 parent c4c4cf2 commit bc8861c

File tree

3 files changed

+118
-1
lines changed

3 files changed

+118
-1
lines changed

intg/src/main/java/org/apache/atlas/AtlasConfiguration.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ public enum AtlasConfiguration {
111111
ATLAS_AUDIT_DEFAULT_AGEOUT_IGNORE_TTL("atlas.audit.default.ageout.ignore.ttl", false),
112112
ATLAS_AUDIT_AGING_TTL_TEST_AUTOMATION("atlas.audit.aging.ttl.test.automation", false), //Only for test automation
113113
RELATIONSHIP_SEARCH_ENABLED("atlas.relationship.search.enabled", false),
114-
UI_TASKS_TAB_USE_ENABLED("atlas.tasks.ui.tab.enabled", false);
114+
UI_TASKS_TAB_USE_ENABLED("atlas.tasks.ui.tab.enabled", false),
115+
REPLACE_HUGE_SPARK_PROCESS_ATTRIBUTES_PATCH("atlas.process.spark.attributes.update.patch", false);
115116

116117
private static final Configuration APPLICATION_PROPERTIES;
117118

repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ private void init() {
111111
handlers.add(new UpdateCompositeIndexStatusPatch(context));
112112
handlers.add(new RelationshipTypeNamePatch(context));
113113
handlers.add(new ProcessImpalaNamePatch(context));
114+
handlers.add(new ReplaceHugeSparkProcessAttributesPatch(context));
114115

115116
LOG.info("<== AtlasPatchManager.init()");
116117
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.atlas.repository.patches;
20+
21+
import org.apache.atlas.AtlasConfiguration;
22+
import org.apache.atlas.exception.AtlasBaseException;
23+
import org.apache.atlas.pc.WorkItemManager;
24+
import org.apache.atlas.repository.Constants;
25+
import org.apache.atlas.repository.graphdb.AtlasGraph;
26+
import org.apache.atlas.repository.graphdb.AtlasVertex;
27+
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
28+
import org.apache.atlas.type.AtlasEntityType;
29+
import org.apache.tinkerpop.gremlin.structure.Vertex;
30+
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
import java.util.Iterator;
35+
import java.util.Set;
36+
37+
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
38+
import static org.apache.atlas.repository.Constants.SUPER_TYPES_PROPERTY_KEY;
39+
40+
public class ReplaceHugeSparkProcessAttributesPatch extends AtlasPatchHandler {
41+
private static final Logger LOG = LoggerFactory.getLogger(ReplaceHugeSparkProcessAttributesPatch.class);
42+
43+
private static final String PATCH_ID = "JAVA_PATCH_0000_015";
44+
private static final String PATCH_DESCRIPTION = "Replace attributes details and sparkPlanDescription to null";
45+
46+
private final PatchContext context;
47+
48+
public ReplaceHugeSparkProcessAttributesPatch(PatchContext context) {
49+
super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION);
50+
51+
this.context = context;
52+
}
53+
54+
@Override
55+
public void apply() throws AtlasBaseException {
56+
if (AtlasConfiguration.REPLACE_HUGE_SPARK_PROCESS_ATTRIBUTES_PATCH.getBoolean() == false) {
57+
LOG.info("ReplaceHugeSparkProcessAttributesPatch: Skipped, since not enabled!");
58+
return;
59+
}
60+
ConcurrentPatchProcessor patchProcessor = new ReplaceHugeSparkProcessAttributesPatchProcessor(context);
61+
LOG.info("PATCH IS CHANGED");
62+
63+
patchProcessor.apply();
64+
65+
setStatus(APPLIED);
66+
67+
LOG.info("ReplaceHugeSparkProcessAttributesPatch.apply(): patchId={}, status={}", getPatchId(), getStatus());
68+
}
69+
70+
public static class ReplaceHugeSparkProcessAttributesPatchProcessor extends ConcurrentPatchProcessor {
71+
private static final String TYPE_NAME_SPARK_PROCESS = "spark_process";
72+
private static final String ATTR_NAME_DETAILS = "details";
73+
private static final String ATTR_NAME_SPARKPLANDESCRIPTION = "sparkPlanDescription";
74+
75+
public ReplaceHugeSparkProcessAttributesPatchProcessor(PatchContext context) {
76+
super(context);
77+
}
78+
79+
@Override
80+
protected void prepareForExecution() {
81+
}
82+
83+
@Override
84+
public void submitVerticesToUpdate(WorkItemManager manager) {
85+
AtlasGraph graph = getGraph();
86+
Iterable<Object> iterable = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, TYPE_NAME_SPARK_PROCESS).vertexIds();
87+
int count = 0;
88+
89+
for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) {
90+
Object vertexId = iter.next();
91+
92+
manager.checkProduce(vertexId);
93+
94+
count++;
95+
96+
}
97+
98+
LOG.info("found {} entities of type {}", count, TYPE_NAME_SPARK_PROCESS);
99+
}
100+
101+
@Override
102+
protected void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
103+
LOG.debug("processItem(typeName={}, vertexId={})", typeName, vertexId);
104+
105+
try {
106+
vertex.removeProperty(entityType.getVertexPropertyName(ATTR_NAME_DETAILS));
107+
vertex.removeProperty(entityType.getVertexPropertyName(ATTR_NAME_SPARKPLANDESCRIPTION));
108+
} catch (Exception e) {
109+
LOG.error("Error updating: {}", vertexId, e);
110+
}
111+
112+
LOG.debug("processItem(typeName={}, vertexId={}): Done!", typeName, vertexId);
113+
}
114+
}
115+
}

0 commit comments

Comments
 (0)