diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index a468a0ef7b0..e7aaca25948 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -27,7 +27,7 @@ on: - development - master - lineageondemand - - optimise_classifications_fetch + - maintainance-dyn jobs: build: diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskExecutor.java b/repository/src/main/java/org/apache/atlas/tasks/TaskExecutor.java index a1afedd49da..da2f48ada19 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskExecutor.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskExecutor.java @@ -109,7 +109,7 @@ public TaskConsumer(AtlasTask task, TaskRegistry registry, Map= MAX_ATTEMPT_COUNT) { - TASK_LOG.warn("Max retry count for task exceeded! Skipping!", task); - - task.setStatus(AtlasTask.Status.FAILED); - registry.updateStatus(taskVertex, task); - - return; - } +// taskVertex = registry.getVertex(task.getGuid()); +// if (taskVertex == null) { +// TASK_LOG.warn("Task not scheduled as vertex not found", task); +// } +// +// if (task.getStatus() == AtlasTask.Status.COMPLETE) { +// TASK_LOG.warn("Task not scheduled as status was COMPLETE!", task); +// } +// +// if (perfEnabled) { +// perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, String.format("atlas.task:%s", task.getGuid(), task.getType())); +// } + +// statistics.increment(1); +// +// attemptCount = task.getAttemptCount(); +// +// if (attemptCount >= MAX_ATTEMPT_COUNT) { +// TASK_LOG.warn("Max retry count for task exceeded! Skipping!", task); +// +// task.setStatus(AtlasTask.Status.FAILED); +// registry.updateStatus(taskVertex, task); +// +// return; +// } LOG.info(String.format("Started performing task with guid: %s", task.getGuid())); - performTask(taskVertex, task); + performTask(task); LOG.info(String.format("Finished task with guid: %s", task.getGuid())); } catch (InterruptedException exception) { - registry.updateStatus(taskVertex, task); - TASK_LOG.error("{}: {}: Interrupted!", task, exception); +// registry.updateStatus(taskVertex, task); +// TASK_LOG.error("{}: {}: Interrupted!", task, exception); statistics.error(); } catch (Exception exception) { if (task != null) { - registry.updateStatus(taskVertex, task); +// registry.updateStatus(taskVertex, task); TASK_LOG.error("Error executing task. Please perform the operation again!", task, exception); } else { @@ -180,7 +180,7 @@ public void run() { } } - private void performTask(AtlasVertex taskVertex, AtlasTask task) throws Exception { + private void performTask(AtlasTask task) throws Exception { TaskFactory factory = taskTypeFactoryMap.get(task.getType()); if (factory == null) { LOG.error("taskTypeFactoryMap does not contain task of type: {}", task.getType()); @@ -189,11 +189,11 @@ private void performTask(AtlasVertex taskVertex, AtlasTask task) throws Exceptio AbstractTask runnableTask = factory.create(task); - registry.inProgress(taskVertex, task); +// registry.inProgress(taskVertex, task); runnableTask.run(); - registry.complete(taskVertex, task); +// registry.complete(taskVertex, task); statistics.successPrint(); } diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java index 08571564423..392afc70c9e 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java @@ -29,15 +29,14 @@ import org.slf4j.LoggerFactory; import javax.annotation.PreDestroy; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLEANUP_CLASSIFICATION_PROPAGATION; + public class TaskQueueWatcher implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(TaskQueueWatcher.class); private static final TaskExecutor.TaskLogger TASK_LOG = TaskExecutor.TaskLogger.getLogger(); @@ -167,8 +166,21 @@ public void run() { } public List getTasks() { - run(); - return tasks; + Map parameters = Map.of( + "classificationName", "pA29bWAyFerMqOglcN7Chb" + ); + AtlasTask newTask = new AtlasTask(); + newTask.setParameters(parameters); + newTask.setType(CLEANUP_CLASSIFICATION_PROPAGATION); + newTask.setCreatedBy("admin"); + newTask.setStatusPending(); + newTask.setAttemptCount(0); + newTask.setClassificationTypeName("pA29bWAyFerMqOglcN7Chb"); + newTask.setGuid("my-custom-guid-12345"); + newTask.setCreatedTime(new Date()); + newTask.setUpdatedTime(new Date()); + + return new ArrayList<>(List.of(newTask)); } public void clearTasks() {