Skip to content

Commit f5f0bfe

Browse files
luxlruanwenjun
luxl
authored andcommitted
Change the logic to dynamically create the workerGroupTaskDispatcher when adding tasks.
1 parent 0fb49ae commit f5f0bfe

File tree

5 files changed

+72
-113
lines changed

5 files changed

+72
-113
lines changed

Diff for: dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ public void initialized() {
132132

133133
this.masterCoordinator.start();
134134

135-
this.clusterManager.registerWorkerGroupListener(this.workerGroupTaskDispatcherManager);
136135
this.clusterManager.start();
137136

138137
this.clusterStateMonitors.start();

Diff for: dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java

-5
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.dolphinscheduler.common.utils.JSONUtils;
2323
import org.apache.dolphinscheduler.registry.api.RegistryClient;
2424
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
25-
import org.apache.dolphinscheduler.server.master.runner.WorkerGroupTaskDispatcherManager;
2625

2726
import lombok.Getter;
2827
import lombok.extern.slf4j.Slf4j;
@@ -60,10 +59,6 @@ public void start() {
6059
log.info("ClusterManager started...");
6160
}
6261

63-
public void registerWorkerGroupListener(WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager) {
64-
this.workerGroupChangeNotifier.subscribeWorkerGroupsChange(workerGroupTaskDispatcherManager);
65-
}
66-
6762
/**
6863
* Initialize the master clusters.
6964
* <p> 1. Register master slot listener once master clusters changed.

Diff for: dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java

+1-7
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,9 @@ private void delayRetryDispatch(ITaskExecutionRunnable taskExecutionRunnable, Ex
9191
}
9292

9393
private void dispatchTaskToWorkerGroup(ITaskExecutionRunnable taskExecutionRunnable) {
94-
boolean addTaskSuccess = workerGroupTaskDispatcherManager.addTaskToWorkerGroup(
94+
workerGroupTaskDispatcherManager.addTaskToWorkerGroup(
9595
taskExecutionRunnable.getTaskInstance().getWorkerGroup(),
9696
taskExecutionRunnable, 0);
97-
if (!addTaskSuccess) {
98-
this.delayRetryDispatch(taskExecutionRunnable,
99-
new Exception(String.format("Dispatch TaskInstance: %s WorkerGrouTaskDispatcher: %s failed",
100-
taskExecutionRunnable.getTaskInstance().getName(),
101-
taskExecutionRunnable.getTaskInstance().getWorkerGroup())));
102-
}
10397
}
10498

10599
@Override

Diff for: dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java

+7-74
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,12 @@
1717

1818
package org.apache.dolphinscheduler.server.master.runner;
1919

20-
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
21-
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
22-
import org.apache.dolphinscheduler.server.master.cluster.WorkerGroupChangeNotifier;
2320
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
2421
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
2522

26-
import java.util.List;
2723
import java.util.Map;
2824
import java.util.concurrent.ConcurrentHashMap;
2925

30-
import javax.annotation.PostConstruct;
31-
3226
import lombok.Getter;
3327
import lombok.extern.slf4j.Slf4j;
3428

@@ -42,7 +36,7 @@
4236
*/
4337
@Component
4438
@Slf4j
45-
public class WorkerGroupTaskDispatcherManager implements AutoCloseable, WorkerGroupChangeNotifier.WorkerGroupListener {
39+
public class WorkerGroupTaskDispatcherManager implements AutoCloseable {
4640

4741
@Autowired
4842
private ITaskExecutorClient taskExecutorClient;
@@ -54,56 +48,21 @@ public WorkerGroupTaskDispatcherManager() {
5448
dispatchWorkerMap = new ConcurrentHashMap<>();
5549
}
5650

57-
@PostConstruct
58-
public void init() {
59-
this.addWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
60-
}
61-
6251
/**
6352
* Adds a task to the specified worker group queue and starts or wakes up the corresponding processing loop.
6453
*
6554
* @param workerGroup the identifier for the worker group, used to distinguish different task queues
6655
* @param taskExecutionRunnable an instance of ITaskExecutionRunnable representing the task to be executed
6756
* @param delayTimeMills the delay time before the task is executed, in milliseconds
68-
* @return true if the task is successfully added to the queue, false workerGroupTaskDispatcher not found
69-
*/
70-
public synchronized boolean addTaskToWorkerGroup(String workerGroup, ITaskExecutionRunnable taskExecutionRunnable,
71-
long delayTimeMills) {
72-
WorkerGroupTaskDispatcher workerGroupTaskDispatcher = dispatchWorkerMap.get(workerGroup);
73-
if (workerGroupTaskDispatcher != null) {
74-
workerGroupTaskDispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, delayTimeMills);
75-
return true;
76-
} else {
77-
log.error("workerGroupTaskDispatcher {} not found, will set task {} fail",
78-
workerGroup, taskExecutionRunnable.getTaskInstance().getId());
79-
}
80-
return false;
81-
}
82-
83-
/**
84-
* Stops a specific worker group's task dispatch waiting queue looper.
85-
*
86-
* @param workerGroup the identifier for the worker group
87-
*/
88-
private synchronized void deleteWorkerGroup(String workerGroup) {
89-
WorkerGroupTaskDispatcher workerGroupTaskDispatcher = dispatchWorkerMap.get(workerGroup);
90-
if (workerGroupTaskDispatcher != null) {
91-
workerGroupTaskDispatcher.close();
92-
} else {
93-
log.warn("workerGroupTaskDispatcher {} not found", workerGroup);
94-
}
95-
}
96-
97-
/**
98-
* add workerGroup
99-
*
100-
* @param workerGroup the identifier for the worker group
10157
*/
102-
private synchronized void addWorkerGroup(String workerGroup) {
103-
log.info("add workerGroup: {}", workerGroup);
58+
public synchronized void addTaskToWorkerGroup(String workerGroup, ITaskExecutionRunnable taskExecutionRunnable,
59+
long delayTimeMills) {
10460
WorkerGroupTaskDispatcher workerGroupTaskDispatcher = dispatchWorkerMap.computeIfAbsent(
10561
workerGroup, key -> new WorkerGroupTaskDispatcher(workerGroup, taskExecutorClient));
106-
workerGroupTaskDispatcher.start();
62+
if (!workerGroupTaskDispatcher.isAlive()) {
63+
workerGroupTaskDispatcher.start();
64+
}
65+
workerGroupTaskDispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, delayTimeMills);
10766
}
10867

10968
/**
@@ -121,30 +80,4 @@ public void close() throws Exception {
12180
}
12281
log.info("WorkerGroupTaskDispatcherManager closed");
12382
}
124-
125-
@Override
126-
public void onWorkerGroupAdd(List<WorkerGroup> workerGroups) {
127-
for (WorkerGroup workerGroup : workerGroups) {
128-
this.addWorkerGroup(workerGroup.getName());
129-
}
130-
}
131-
132-
@Override
133-
public void onWorkerGroupChange(List<WorkerGroup> workerGroups) {
134-
// Worker group changes will trigger add and delete events.
135-
// There is no need to handle the change events here; just log the records.
136-
log.info("on change worker groups: {}", workerGroups);
137-
}
138-
139-
@Override
140-
public void onWorkerGroupDelete(List<WorkerGroup> workerGroups) {
141-
for (WorkerGroup workerGroup : workerGroups) {
142-
try {
143-
this.deleteWorkerGroup(workerGroup.getName());
144-
} catch (Exception e) {
145-
log.error("Delete worker group: {} from WorkerGroupTaskDispatcherManager error", workerGroup.getName(),
146-
e);
147-
}
148-
}
149-
}
15083
}

Diff for: dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManagerTest.java

+64-26
Original file line numberDiff line numberDiff line change
@@ -17,51 +17,89 @@
1717

1818
package org.apache.dolphinscheduler.server.master.runner;
1919

20-
import static org.junit.jupiter.api.Assertions.assertEquals;
21-
import static org.junit.jupiter.api.Assertions.assertFalse;
20+
import static org.junit.Assert.assertTrue;
21+
import static org.mockito.Mockito.doNothing;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.times;
24+
import static org.mockito.Mockito.verify;
2225

23-
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
24-
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
26+
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
2527
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
2628

27-
import java.util.Arrays;
28-
import java.util.List;
29+
import java.util.concurrent.ConcurrentHashMap;
2930

31+
import org.junit.jupiter.api.BeforeEach;
3032
import org.junit.jupiter.api.Test;
31-
import org.junit.jupiter.api.extension.ExtendWith;
33+
import org.junit.runner.RunWith;
3234
import org.mockito.InjectMocks;
3335
import org.mockito.Mock;
34-
import org.mockito.Mockito;
35-
import org.mockito.junit.jupiter.MockitoExtension;
36+
import org.mockito.junit.MockitoJUnitRunner;
37+
import org.springframework.test.util.ReflectionTestUtils;
3638

37-
@ExtendWith(MockitoExtension.class)
39+
@RunWith(MockitoJUnitRunner.class)
3840
public class WorkerGroupTaskDispatcherManagerTest {
3941

4042
@InjectMocks
41-
private WorkerGroupTaskDispatcherManager manager;
43+
private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;
4244

4345
@Mock
44-
private ITaskExecutionRunnable taskExecutionRunnable;
46+
private ITaskExecutorClient taskExecutorClient;
4547

4648
@Mock
47-
private TaskInstance taskInstance;
49+
private ITaskExecutionRunnable taskExecutionRunnable;
50+
51+
@BeforeEach
52+
public void setUp() {
53+
workerGroupTaskDispatcherManager = new WorkerGroupTaskDispatcherManager();
54+
ReflectionTestUtils.setField(workerGroupTaskDispatcherManager, "taskExecutorClient", taskExecutorClient);
55+
}
56+
57+
@Test
58+
public void addTaskToWorkerGroup_NewWorkerGroup_ShouldAddTask() {
59+
String workerGroup = "newGroup";
60+
long delayTimeMills = 1000;
61+
62+
workerGroupTaskDispatcherManager.addTaskToWorkerGroup(workerGroup, taskExecutionRunnable, delayTimeMills);
63+
64+
ConcurrentHashMap<String, WorkerGroupTaskDispatcher> dispatchWorkerMap =
65+
(ConcurrentHashMap<String, WorkerGroupTaskDispatcher>) ReflectionTestUtils
66+
.getField(workerGroupTaskDispatcherManager, "dispatchWorkerMap");
67+
68+
assert dispatchWorkerMap != null;
69+
assertTrue(dispatchWorkerMap.containsKey(workerGroup));
70+
}
4871

4972
@Test
50-
public void testAddTaskToWorkerGroupTaskToWorkerGroupQueueTaskToNonExistingWorkerGroup_ShouldReturnFalse() {
51-
Mockito.when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
52-
String workerGroupName = "nonExistingGroup";
53-
boolean result = manager.addTaskToWorkerGroup(workerGroupName, taskExecutionRunnable, 0L);
54-
assertFalse(result);
73+
public void addTaskToWorkerGroup_ExistingWorkerGroup_ShouldAddTask() {
74+
String workerGroup = "existingGroup";
75+
long delayTimeMills = 1000;
76+
77+
WorkerGroupTaskDispatcher mockDispatcher = mock(WorkerGroupTaskDispatcher.class);
78+
79+
ConcurrentHashMap<String, WorkerGroupTaskDispatcher> dispatchWorkerMap = new ConcurrentHashMap<>();
80+
dispatchWorkerMap.put(workerGroup, mockDispatcher);
81+
82+
ReflectionTestUtils.setField(workerGroupTaskDispatcherManager, "dispatchWorkerMap", dispatchWorkerMap);
83+
doNothing().when(mockDispatcher).start();
84+
workerGroupTaskDispatcherManager.addTaskToWorkerGroup(workerGroup, taskExecutionRunnable, delayTimeMills);
85+
86+
verify(mockDispatcher, times(1)).addTaskToWorkerGroupQueue(taskExecutionRunnable, delayTimeMills);
5587
}
5688

5789
@Test
58-
public void testOnWorkerGroupAdd_ShouldAddTaskToWorkerGroupTaskToWorkerGroupQueueWorkerGroups() {
59-
WorkerGroup group1 = new WorkerGroup();
60-
WorkerGroup group2 = new WorkerGroup();
61-
group1.setName("testGroup1");
62-
group2.setName("testGroup2");
63-
List<WorkerGroup> workerGroups = Arrays.asList(group1, group2);
64-
manager.onWorkerGroupAdd(workerGroups);
65-
assertEquals(2, manager.getDispatchWorkerMap().size());
90+
public void close_ShouldCloseAllWorkerGroups() throws Exception {
91+
WorkerGroupTaskDispatcher mockDispatcher1 = mock(WorkerGroupTaskDispatcher.class);
92+
WorkerGroupTaskDispatcher mockDispatcher2 = mock(WorkerGroupTaskDispatcher.class);
93+
94+
ConcurrentHashMap<String, WorkerGroupTaskDispatcher> dispatchWorkerMap = new ConcurrentHashMap<>();
95+
dispatchWorkerMap.put("group1", mockDispatcher1);
96+
dispatchWorkerMap.put("group2", mockDispatcher2);
97+
98+
ReflectionTestUtils.setField(workerGroupTaskDispatcherManager, "dispatchWorkerMap", dispatchWorkerMap);
99+
100+
workerGroupTaskDispatcherManager.close();
101+
102+
verify(mockDispatcher1, times(1)).close();
103+
verify(mockDispatcher2, times(1)).close();
66104
}
67105
}

0 commit comments

Comments
 (0)