Skip to content

[DSIP-55][Master] Separate the waiting dispatched task into different queue by worker group #17037

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 46 commits into from
Apr 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
e76f57b
fix #16260
Mar 3, 2025
b03e038
fix code style
Mar 3, 2025
7bf262c
fix spotless
Mar 4, 2025
cdc3749
fix unit test
Mar 4, 2025
c771b2c
fix spotless
Mar 4, 2025
e20ba6d
WorkerGroupTaskDispatchManager manages threads and queues.
Mar 6, 2025
6e232c9
WorkerGroupTaskDispatchManager manages threads and queues.
Mar 6, 2025
89f173a
rollback master logback.xml
Mar 6, 2025
c3a45a9
fix style
Mar 6, 2025
098e971
close workerGroupTaskDispatchManager
Mar 7, 2025
ffea08e
close workerGroupTaskDispatchManager
Mar 7, 2025
0686ae1
fix spotless
Mar 7, 2025
45e5e69
add ThreadCreatingAndDestroyingWorkerGroupListener
Mar 7, 2025
eb29608
add unit test
Mar 8, 2025
9ed7d43
add unit test
Mar 8, 2025
6096beb
add unit test
Mar 8, 2025
b9f8434
optimized code
Mar 8, 2025
63f51b0
fix lazy
Mar 11, 2025
02b4a2c
add todo
Mar 11, 2025
61deeb8
fix spotless:
Mar 11, 2025
9accebc
change WorkerGroupTaskDispatchManager -> WorkerGroupTaskDispatcherMan…
Mar 12, 2025
6038d23
fix sonar
Mar 12, 2025
f75fc95
optimized code
Mar 12, 2025
8a60ce4
optimized code
Mar 12, 2025
19c6128
global queue is delayQueue
Mar 12, 2025
1159a1e
PriorityDelayQueue move to WorkerGroupTaskDispatcher,Simplify the Wor…
Mar 13, 2025
bd6d9ee
Remove unnecessary resource cleanup
Mar 14, 2025
63ffb7c
less unit test time
Mar 15, 2025
fcdec92
fix jdk8 unit test
Mar 15, 2025
ee3fae8
optimized code
Mar 18, 2025
1e86477
fix sonar
Mar 18, 2025
c751272
override equals
Mar 18, 2025
85fa740
Optimize concurrency issues
Mar 28, 2025
af6be6a
\cannot find the workergroup, task will kill.
Mar 28, 2025
ca78f4f
fix unit test
Mar 29, 2025
48ebb78
fix unit test
Mar 29, 2025
9340610
spotless
Mar 29, 2025
7c9f842
retry e2e
Mar 29, 2025
057cac9
jdk 11 unit test
Mar 29, 2025
18cf5c0
Maintain the original logic: if the worker group cannot be found, pla…
Mar 31, 2025
d86b44a
optimized code
Apr 3, 2025
b4abd0b
Remove the close logic for WorkerGroupTaskDispatcher. Since this scen…
Apr 9, 2025
0fb49ae
fix unit test
Apr 9, 2025
f5f0bfe
Change the logic to dynamically create the workerGroupTaskDispatcher …
Apr 13, 2025
c2487a1
fix unit test
Apr 14, 2025
a8b6650
optimization unit test
Apr 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.rpc.MasterRpcServer;
import org.apache.dolphinscheduler.server.master.runner.WorkerGroupTaskDispatcherManager;
import org.apache.dolphinscheduler.server.master.utils.MasterThreadFactory;
import org.apache.dolphinscheduler.service.ServiceConfiguration;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
Expand Down Expand Up @@ -99,6 +100,9 @@ public class MasterServer implements IStoppable {
@Autowired
private MasterCoordinator masterCoordinator;

@Autowired
private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;

public static void main(String[] args) {
MasterServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);

Expand Down Expand Up @@ -129,6 +133,7 @@ public void initialized() {
this.masterCoordinator.start();

this.clusterManager.start();

this.clusterStateMonitors.start();

this.workflowEngine.start();
Expand Down Expand Up @@ -183,7 +188,9 @@ public void close(String cause) {
MasterRegistryClient closedMasterRegistryClient = masterRegistryClient;
// close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
// like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
SpringApplicationContext closedSpringContext = springApplicationContext) {
SpringApplicationContext closedSpringContext = springApplicationContext;
WorkerGroupTaskDispatcherManager closeWorkerGroupTaskDispatcherManager =
workerGroupTaskDispatcherManager) {

log.info("MasterServer is stopping, current cause : {}", cause);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
package org.apache.dolphinscheduler.server.master.runner;

import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import org.apache.dolphinscheduler.server.master.runner.queue.DelayEntry;
import org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
import org.apache.dolphinscheduler.server.master.runner.queue.TimeBasedTaskExecutionRunnableComparableEntry;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.PriorityBlockingQueue;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -31,43 +32,37 @@

/**
* The class is used to store {@link ITaskExecutionRunnable} which needs to be dispatched. The {@link ITaskExecutionRunnable}
* will be stored in {@link PriorityDelayQueue}, if the {@link ITaskExecutionRunnable}'s delay time is 0, then it will be
* will be stored in {@link DelayQueue}, if the {@link ITaskExecutionRunnable}'s delay time is 0, then it will be
* consumed by {@link GlobalTaskDispatchWaitingQueueLooper}.
* <p>
* The order of {@link ITaskExecutionRunnable} in the {@link PriorityDelayQueue} is determined by {@link ITaskExecutionRunnable#compareTo}.
* The order of {@link ITaskExecutionRunnable} in the {@link DelayQueue} is determined by {@link ITaskExecutionRunnable#compareTo}.
*/
@Slf4j
@Component
public class GlobalTaskDispatchWaitingQueue {

private final Set<Integer> waitingTaskInstanceIds = ConcurrentHashMap.newKeySet();
private final PriorityDelayQueue<DelayEntry<ITaskExecutionRunnable>> priorityDelayQueue =
new PriorityDelayQueue<>();

/**
* Submit a {@link ITaskExecutionRunnable} with delay time 0, it will be consumed immediately.
*/
public synchronized void dispatchTaskExecuteRunnable(ITaskExecutionRunnable ITaskExecutionRunnable) {
dispatchTaskExecuteRunnableWithDelay(ITaskExecutionRunnable, 0);
}
private final DelayQueue<TimeBasedTaskExecutionRunnableComparableEntry> delayQueue =
new DelayQueue<>();

/**
* Submit a {@link ITaskExecutionRunnable} with delay time, if the delay time <= 0 then it can be consumed.
*/
public synchronized void dispatchTaskExecuteRunnableWithDelay(ITaskExecutionRunnable taskExecutionRunnable,
long delayTimeMills) {
waitingTaskInstanceIds.add(taskExecutionRunnable.getTaskInstance().getId());
priorityDelayQueue.add(new DelayEntry<>(delayTimeMills, taskExecutionRunnable));
delayQueue.add(new TimeBasedTaskExecutionRunnableComparableEntry(delayTimeMills, taskExecutionRunnable));
}

/**
* Consume {@link ITaskExecutionRunnable} from the {@link PriorityDelayQueue}, only the delay time <= 0 can be consumed.
* Consume {@link ITaskExecutionRunnable} from the {@link PriorityBlockingQueue}, only the delay time <= 0 can be consumed.
*/
@SneakyThrows
public ITaskExecutionRunnable takeTaskExecuteRunnable() {
ITaskExecutionRunnable taskExecutionRunnable = priorityDelayQueue.take().getData();
ITaskExecutionRunnable taskExecutionRunnable = (ITaskExecutionRunnable) delayQueue.take().getData();
while (!markTaskExecutionRunnableRemoved(taskExecutionRunnable)) {
taskExecutionRunnable = priorityDelayQueue.take().getData();
taskExecutionRunnable = (ITaskExecutionRunnable) delayQueue.take().getData();
}
return taskExecutionRunnable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;

import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -38,7 +37,7 @@ public class GlobalTaskDispatchWaitingQueueLooper extends BaseDaemonThread imple
private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;

@Autowired
private ITaskExecutorClient taskExecutorClient;
private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;

private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);

Expand Down Expand Up @@ -73,23 +72,35 @@ void doDispatch() {
log.warn("The TaskInstance {} state is : {}, will not dispatch", taskInstance.getName(), status);
return;
}
taskExecutorClient.dispatch(taskExecutionRunnable);
this.dispatchTaskToWorkerGroup(taskExecutionRunnable);
} catch (Exception e) {
// If dispatch failed, will put the task back to the queue
// The task will be dispatched after waiting time.
// the waiting time will increase multiple of times, but will not exceed 60 seconds
long waitingTimeMills = Math.min(
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 1_000L, 60_000L);
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecutionRunnable,
waitingTimeMills);
log.error("Dispatch Task: {} failed will retry after: {}/ms", taskInstance.getName(), waitingTimeMills, e);
this.delayRetryDispatch(taskExecutionRunnable, e);
}
}

private void delayRetryDispatch(ITaskExecutionRunnable taskExecutionRunnable, Exception e) {
// If dispatch failed, will put the task back to the queue
// The task will be dispatched after waiting time.
// the waiting time will increase multiple of times, but will not exceed 60 seconds
long waitingTimeMills = Math.min(
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 1_000L, 60_000L);
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecutionRunnable,
waitingTimeMills);
log.error("Dispatch Task: {} failed will retry after: {}/ms", taskExecutionRunnable.getTaskInstance().getName(),
waitingTimeMills, e);
}

private void dispatchTaskToWorkerGroup(ITaskExecutionRunnable taskExecutionRunnable) {
workerGroupTaskDispatcherManager.addTaskToWorkerGroup(
taskExecutionRunnable.getTaskInstance().getWorkerGroup(),
taskExecutionRunnable, 0);
}

@Override
public void close() throws Exception {
if (RUNNING_FLAG.compareAndSet(true, false)) {
log.info("GlobalTaskDispatchWaitingQueueLooper stopping...");
workerGroupTaskDispatcherManager.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should before log.info("GlobalTaskDispatchWaitingQueueLooper stopped...");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake led to the meaningless log. I will make the changes as suggested.

log.info("GlobalTaskDispatchWaitingQueueLooper stopped...");
} else {
log.error("GlobalTaskDispatchWaitingQueueLooper is not started");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.runner;

import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry;
import org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;

import java.util.concurrent.atomic.AtomicBoolean;

import lombok.extern.slf4j.Slf4j;

/**
* WorkerGroupTaskDispatcher is responsible for dispatching tasks from the task queue.
* The main responsibilities include:
* 1. Continuously fetching tasks from the {@link PriorityDelayQueue} for dispatch.
* 2. Re-queuing tasks that fail to dispatch according to retry logic.
* 3. Ensuring thread safety and correct state transitions during task processing.
*/
@Slf4j
public class WorkerGroupTaskDispatcher extends BaseDaemonThread {

private final ITaskExecutorClient taskExecutorClient;

// TODO The current queue is flawed. When a high-priority task fails,
// it will be delayed and will not return to the first or second position.
// Tasks with the same priority will preempt its position.
// If it needs to be placed at the front of the queue, the queue needs to be re-implemented.
private final PriorityDelayQueue<PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable>> workerGroupQueue;

private final AtomicBoolean runningFlag = new AtomicBoolean(false);

public WorkerGroupTaskDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient) {
super("WorkerGroupTaskDispatcher-" + workerGroupName);
this.taskExecutorClient = taskExecutorClient;
this.workerGroupQueue = new PriorityDelayQueue<>();
}

/**
* Adds a task to the worker group queue.
* This method wraps the given task execution object into a priority and delay-based task entry and adds it to the worker group queue.
* The task is only added if the current dispatcher status is either STARTED or INIT. If the dispatcher is in any other state,
* the task addition will fail, and a warning message will be logged.
*
* @param taskExecutionRunnable The task execution object to add to the queue, which implements the {@link ITaskExecutionRunnable} interface.
* @param delayTimeMills The delay time in milliseconds before the task should be executed.
*/
public void addTaskToWorkerGroupQueue(ITaskExecutionRunnable taskExecutionRunnable,
long delayTimeMills) {
workerGroupQueue.add(new PriorityAndDelayBasedTaskEntry<>(delayTimeMills, taskExecutionRunnable));
}

@Override
public synchronized void start() {
if (runningFlag.compareAndSet(false, true)) {
log.info("The {} starting...", this.getName());
super.start();
log.info("The {} started", this.getName());
} else {
log.error("The {} status is {}, will not start again", this.getName(), runningFlag.get());
}
}

public synchronized void close() {
log.info("The {} closed called but not implemented", this.getName());
// todo WorkerGroupTaskDispatcher thread needs to be shut down after the WorkerGroup is deleted.
}

@Override
public void run() {
while (runningFlag.get()) {
dispatch();
}
}

private void dispatch() {
PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable> taskEntry = workerGroupQueue.take();
ITaskExecutionRunnable taskExecutionRunnable = taskEntry.getData();
final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();
try {
final TaskExecutionStatus taskStatus = taskInstance.getState();
if (taskStatus != TaskExecutionStatus.SUBMITTED_SUCCESS
&& taskStatus != TaskExecutionStatus.DELAY_EXECUTION) {
log.warn("The TaskInstance {} state is : {}, will not dispatch", taskInstance.getName(), taskStatus);
return;
}
taskExecutorClient.dispatch(taskExecutionRunnable);
} catch (Exception e) {
// If dispatch failed, will put the task back to the queue
// The task will be dispatched after waiting time.
// the waiting time will increase multiple of times, but will not exceed 60 seconds
long waitingTimeMills = Math.min(
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 1_000L, 60_000L);
workerGroupQueue.add(new PriorityAndDelayBasedTaskEntry<>(waitingTimeMills, taskExecutionRunnable));
log.error("Dispatch Task: {} failed will retry after: {}/ms", taskInstance.getName(), waitingTimeMills, e);
}
}

/**
* ony use unit test
* @return size
*/
protected int queueSize() {
return this.workerGroupQueue.size();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.runner;

import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* WorkerGroupTaskDispatcherManager is responsible for managing the task dispatching for worker groups.
* It maintains a mapping of worker groups to their task dispatchers and priority delay queues,
* and supports adding tasks, starting and stopping worker groups, as well as cleaning up resources upon shutdown.
*/
@Component
@Slf4j
public class WorkerGroupTaskDispatcherManager implements AutoCloseable {

@Autowired
private ITaskExecutorClient taskExecutorClient;

@Getter
private final ConcurrentHashMap<String, WorkerGroupTaskDispatcher> dispatchWorkerMap;

public WorkerGroupTaskDispatcherManager() {
dispatchWorkerMap = new ConcurrentHashMap<>();
}

/**
* Adds a task to the specified worker group queue and starts or wakes up the corresponding processing loop.
*
* @param workerGroup the identifier for the worker group, used to distinguish different task queues
* @param taskExecutionRunnable an instance of ITaskExecutionRunnable representing the task to be executed
* @param delayTimeMills the delay time before the task is executed, in milliseconds
*/
public synchronized void addTaskToWorkerGroup(String workerGroup, ITaskExecutionRunnable taskExecutionRunnable,
long delayTimeMills) {
WorkerGroupTaskDispatcher workerGroupTaskDispatcher = dispatchWorkerMap.computeIfAbsent(
workerGroup, key -> new WorkerGroupTaskDispatcher(workerGroup, taskExecutorClient));
if (!workerGroupTaskDispatcher.isAlive()) {
workerGroupTaskDispatcher.start();
}
workerGroupTaskDispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, delayTimeMills);
}

/**
* Stop all workerGroupTaskDispatchWaitingQueueLooper
*/
@Override
public void close() throws Exception {
log.info("WorkerGroupTaskDispatcherManager start close");
for (Map.Entry<String, WorkerGroupTaskDispatcher> entry : dispatchWorkerMap.entrySet()) {
try {
entry.getValue().close();
} catch (Exception e) {
log.error("close worker group error", e);
}
}
log.info("WorkerGroupTaskDispatcherManager closed");
}
}
Loading
Loading