Skip to content

Commit 67c8b79

Browse files
authored
[improve] improve TaskLocation/TaskLocationGroup info (#8862)
1 parent b922bb9 commit 67c8b79

File tree

4 files changed

+119
-72
lines changed

4 files changed

+119
-72
lines changed

Diff for: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java

+17-40
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public class PhysicalPlanGenerator {
9191

9292
private final List<Pipeline> pipelines;
9393

94-
private final IdGenerator idGenerator = new IdGenerator();
94+
private final IdGenerator taskGroupIdGenerator = new IdGenerator();
9595

9696
private final JobImmutableInformation jobImmutableInformation;
9797

@@ -274,15 +274,14 @@ private List<PhysicalVertex> getCommitterTask(
274274
}
275275
// if sinkAggregatedCommitter is empty, don't create task.
276276
if (sinkAggregatedCommitter.isPresent()) {
277-
long taskGroupID = idGenerator.getNextId();
278-
long taskTypeId = idGenerator.getNextId();
277+
long taskGroupID = taskGroupIdGenerator.getNextId();
279278
TaskGroupLocation taskGroupLocation =
280279
new TaskGroupLocation(
281280
jobImmutableInformation.getJobId(),
282281
pipelineIndex,
283282
taskGroupID);
284283
TaskLocation taskLocation =
285-
new TaskLocation(taskGroupLocation, taskTypeId, 0);
284+
new TaskLocation(taskGroupLocation, 0, 0);
286285
SinkAggregatedCommitterTask<?, ?> t =
287286
new SinkAggregatedCommitterTask(
288287
jobImmutableInformation.getJobId(),
@@ -342,17 +341,15 @@ private List<PhysicalVertex> getShuffleTask(
342341
if (shuffleStrategy instanceof ShuffleMultipleRowStrategy) {
343342
ShuffleMultipleRowStrategy shuffleMultipleRowStrategy =
344343
(ShuffleMultipleRowStrategy) shuffleStrategy;
344+
AtomicInteger atomicInteger = new AtomicInteger(0);
345345
for (Flow nextFlow : flow.getNext()) {
346346
PhysicalExecutionFlow sinkFlow =
347347
(PhysicalExecutionFlow) nextFlow;
348348
SinkAction sinkAction = (SinkAction) sinkFlow.getAction();
349349
String sinkTableId =
350350
sinkAction.getConfig().getTablePath().toString();
351351

352-
long taskIDPrefix = idGenerator.getNextId();
353-
long taskGroupIDPrefix = idGenerator.getNextId();
354-
int parallelismIndex = 0;
355-
352+
int parallelismIndex = atomicInteger.getAndIncrement();
356353
ShuffleStrategy shuffleStrategyOfSinkFlow =
357354
shuffleMultipleRowStrategy
358355
.toBuilder()
@@ -363,7 +360,6 @@ private List<PhysicalVertex> getShuffleTask(
363360
.toBuilder()
364361
.shuffleStrategy(shuffleStrategyOfSinkFlow)
365362
.build();
366-
long shuffleActionId = idGenerator.getNextId();
367363
String shuffleActionName =
368364
String.format(
369365
"%s -> %s -> %s",
@@ -372,7 +368,7 @@ private List<PhysicalVertex> getShuffleTask(
372368
sinkAction.getName());
373369
ShuffleAction shuffleActionOfSinkFlow =
374370
new ShuffleAction(
375-
shuffleActionId,
371+
parallelismIndex,
376372
shuffleActionName,
377373
shuffleConfigOfSinkFlow);
378374
shuffleActionOfSinkFlow.setParallelism(1);
@@ -382,19 +378,15 @@ private List<PhysicalVertex> getShuffleTask(
382378
Collections.singletonList(sinkFlow));
383379
setFlowConfig(shuffleFlow);
384380

385-
long taskGroupID =
386-
mixIDPrefixAndIndex(
387-
taskGroupIDPrefix, parallelismIndex);
381+
long taskGroupID = taskGroupIdGenerator.getNextId();
388382
TaskGroupLocation taskGroupLocation =
389383
new TaskGroupLocation(
390384
jobImmutableInformation.getJobId(),
391385
pipelineIndex,
392386
taskGroupID);
393387
TaskLocation taskLocation =
394388
new TaskLocation(
395-
taskGroupLocation,
396-
taskIDPrefix,
397-
parallelismIndex);
389+
taskGroupLocation, 0, parallelismIndex);
398390
SeaTunnelTask seaTunnelTask =
399391
new TransformSeaTunnelTask(
400392
jobImmutableInformation.getJobId(),
@@ -428,17 +420,15 @@ private List<PhysicalVertex> getShuffleTask(
428420
runningJobStateTimestampsIMap));
429421
}
430422
} else {
431-
long taskIDPrefix = idGenerator.getNextId();
432-
long taskGroupIDPrefix = idGenerator.getNextId();
433423
for (int i = 0; i < flow.getAction().getParallelism(); i++) {
434-
long taskGroupID = mixIDPrefixAndIndex(taskGroupIDPrefix, i);
424+
long taskGroupID = taskGroupIdGenerator.getNextId();
435425
TaskGroupLocation taskGroupLocation =
436426
new TaskGroupLocation(
437427
jobImmutableInformation.getJobId(),
438428
pipelineIndex,
439429
taskGroupID);
440430
TaskLocation taskLocation =
441-
new TaskLocation(taskGroupLocation, taskIDPrefix, i);
431+
new TaskLocation(taskGroupLocation, 0, i);
442432
setFlowConfig(flow);
443433
SeaTunnelTask seaTunnelTask =
444434
new TransformSeaTunnelTask(
@@ -483,15 +473,13 @@ private List<PhysicalVertex> getEnumeratorTask(
483473
return sources.stream()
484474
.map(
485475
sourceAction -> {
486-
long taskGroupID = idGenerator.getNextId();
487-
long taskTypeId = idGenerator.getNextId();
476+
long taskGroupID = taskGroupIdGenerator.getNextId();
488477
TaskGroupLocation taskGroupLocation =
489478
new TaskGroupLocation(
490479
jobImmutableInformation.getJobId(),
491480
pipelineIndex,
492481
taskGroupID);
493-
TaskLocation taskLocation =
494-
new TaskLocation(taskGroupLocation, taskTypeId, 0);
482+
TaskLocation taskLocation = new TaskLocation(taskGroupLocation, 0, 0);
495483
SourceSplitEnumeratorTask<?> t =
496484
new SourceSplitEnumeratorTask<>(
497485
jobImmutableInformation.getJobId(),
@@ -541,32 +529,25 @@ private List<PhysicalVertex> getSourceTask(
541529
if (sourceWithSink(flow)) {
542530
flows.addAll(splitSinkFromFlow(flow));
543531
}
544-
long taskGroupIDPrefix = idGenerator.getNextId();
545-
Map<Long, Long> flowTaskIDPrefixMap = new HashMap<>();
546532
for (int i = 0; i < flow.getAction().getParallelism(); i++) {
533+
long taskGroupId = taskGroupIdGenerator.getNextId();
547534
int finalParallelismIndex = i;
548-
long taskGroupID = mixIDPrefixAndIndex(taskGroupIDPrefix, i);
549535
TaskGroupLocation taskGroupLocation =
550536
new TaskGroupLocation(
551537
jobImmutableInformation.getJobId(),
552538
pipelineIndex,
553-
taskGroupID);
539+
taskGroupId);
540+
AtomicInteger taskInTaskGroupIndex = new AtomicInteger(0);
554541
List<SeaTunnelTask> taskList =
555542
flows.stream()
556543
.map(
557544
f -> {
558545
setFlowConfig(f);
559-
long taskIDPrefix =
560-
flowTaskIDPrefixMap
561-
.computeIfAbsent(
562-
f.getFlowID(),
563-
id ->
564-
idGenerator
565-
.getNextId());
566546
final TaskLocation taskLocation =
567547
new TaskLocation(
568548
taskGroupLocation,
569-
taskIDPrefix,
549+
taskInTaskGroupIndex
550+
.getAndIncrement(),
570551
finalParallelismIndex);
571552
if (f
572553
instanceof
@@ -768,10 +749,6 @@ private static boolean sourceWithSink(PhysicalExecutionFlow<?, ?> flow) {
768749
.contains(true);
769750
}
770751

771-
private long mixIDPrefixAndIndex(long idPrefix, int index) {
772-
return idPrefix * 10000 + index;
773-
}
774-
775752
private List<Flow> getNextWrapper(List<ExecutionEdge> edges, Action start) {
776753
List<Action> actions =
777754
edges.stream()

Diff for: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java

+10-24
Original file line numberDiff line numberDiff line change
@@ -162,30 +162,16 @@ public PhysicalVertex(
162162
this.currExecutionState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation);
163163

164164
this.nodeEngine = nodeEngine;
165-
if (log.isDebugEnabled() || log.isTraceEnabled()) {
166-
this.taskFullName =
167-
String.format(
168-
"Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)], taskGroupLocation: [%s]",
169-
jobImmutableInformation.getJobConfig().getName(),
170-
jobImmutableInformation.getJobId(),
171-
pipelineId,
172-
totalPipelineNum,
173-
taskGroup.getTaskGroupName(),
174-
subTaskGroupIndex + 1,
175-
parallelism,
176-
taskGroupLocation);
177-
} else {
178-
this.taskFullName =
179-
String.format(
180-
"Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)]",
181-
jobImmutableInformation.getJobConfig().getName(),
182-
jobImmutableInformation.getJobId(),
183-
pipelineId,
184-
totalPipelineNum,
185-
taskGroup.getTaskGroupName(),
186-
subTaskGroupIndex + 1,
187-
parallelism);
188-
}
165+
this.taskFullName =
166+
String.format(
167+
"Job (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)], taskGroupLocation: [%s]",
168+
jobImmutableInformation.getJobId(),
169+
pipelineId,
170+
totalPipelineNum,
171+
taskGroup.getTaskGroupName(),
172+
subTaskGroupIndex + 1,
173+
parallelism,
174+
taskGroupLocation);
189175

190176
this.taskFuture = new CompletableFuture<>();
191177

Diff for: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java

+13-8
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,21 @@ public class TaskLocation implements IdentifiedDataSerializable, Serializable {
3737
private long taskID;
3838
private int index;
3939

40+
private static final long SUB_PIPELINE_ID_FACTORY = 10000L * 10000L * 10000L;
41+
private static final long GROUP_ID_FACTOR = 10000L * 10000L;
42+
private static final long TASK_GROUP_FACTOR = 10000L;
43+
4044
public TaskLocation() {}
4145

42-
public TaskLocation(TaskGroupLocation taskGroupLocation, long idPrefix, int index) {
46+
public TaskLocation(
47+
TaskGroupLocation taskGroupLocation, long taskInGroupIndex, int taskParallelismIndex) {
4348
this.taskGroupLocation = taskGroupLocation;
44-
this.taskID = mixIDPrefixAndIndex(idPrefix, index);
45-
this.index = index;
46-
}
47-
48-
private long mixIDPrefixAndIndex(long idPrefix, int index) {
49-
return idPrefix * 10000 + index;
49+
this.taskID =
50+
taskGroupLocation.getPipelineId() * SUB_PIPELINE_ID_FACTORY
51+
+ taskGroupLocation.getTaskGroupId() * GROUP_ID_FACTOR
52+
+ taskInGroupIndex * TASK_GROUP_FACTOR
53+
+ taskParallelismIndex;
54+
this.index = taskParallelismIndex;
5055
}
5156

5257
public TaskGroupLocation getTaskGroupLocation() {
@@ -66,7 +71,7 @@ public long getTaskID() {
6671
}
6772

6873
public long getTaskVertexId() {
69-
return taskID / 10000;
74+
return taskID;
7075
}
7176

7277
public int getTaskIndex() {

Diff for: seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java

+79
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,11 @@
5050
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
5151
import org.apache.seatunnel.engine.server.TestUtils;
5252
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
53+
import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
5354
import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
55+
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
56+
import org.apache.seatunnel.engine.server.execution.Task;
57+
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
5458

5559
import org.junit.jupiter.api.Assertions;
5660
import org.junit.jupiter.api.Test;
@@ -228,6 +232,81 @@ public void testLogicalToPhysical() throws MalformedURLException {
228232
Sets.newHashSet(new URL("file:///console.jar")));
229233
}
230234

235+
@Test
236+
public void testTaskGroupAndTaskLocationInfos() {
237+
Long jobId = 1L;
238+
LogicalDag testLogicalDag =
239+
TestUtils.createTestLogicalPlan(
240+
"stream_fake_to_console.conf", "test_task_group_info", jobId);
241+
JobImmutableInformation jobImmutableInformation =
242+
new JobImmutableInformation(
243+
jobId,
244+
"Test",
245+
nodeEngine.getSerializationService(),
246+
testLogicalDag,
247+
Collections.emptyList(),
248+
Collections.emptyList());
249+
IMap<Object, Object> runningJobState =
250+
nodeEngine.getHazelcastInstance().getMap("testRunningJobState");
251+
IMap<Object, Long[]> runningJobStateTimestamp =
252+
nodeEngine.getHazelcastInstance().getMap("testRunningJobStateTimestamp");
253+
PhysicalPlan physicalPlan =
254+
PlanUtils.fromLogicalDAG(
255+
testLogicalDag,
256+
nodeEngine,
257+
jobImmutableInformation,
258+
System.currentTimeMillis(),
259+
Executors.newCachedThreadPool(),
260+
server.getClassLoaderService(),
261+
instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
262+
runningJobState,
263+
runningJobStateTimestamp,
264+
QueueType.BLOCKINGQUEUE,
265+
new EngineConfig())
266+
.f0();
267+
Assertions.assertEquals(2, physicalPlan.getPipelineList().size());
268+
for (int i = 0; i < physicalPlan.getPipelineList().size(); i++) {
269+
SubPlan subPlan = physicalPlan.getPipelineList().get(i);
270+
int pipelineId = subPlan.getPipelineId();
271+
272+
for (int j = 0; j < subPlan.getCoordinatorVertexList().size(); j++) {
273+
PhysicalVertex physicalVertex = subPlan.getCoordinatorVertexList().get(j);
274+
TaskGroupLocation taskGroupLocation = physicalVertex.getTaskGroupLocation();
275+
List<Task> physicalTasks =
276+
new ArrayList<>(physicalVertex.getTaskGroup().getTasks());
277+
for (int taskInGroupIndex = 0;
278+
taskInGroupIndex < physicalTasks.size();
279+
taskInGroupIndex++) {
280+
Task task = physicalTasks.get(taskInGroupIndex);
281+
long expectedTaskId =
282+
pipelineId * 10000L * 10000L * 10000L
283+
+ taskGroupLocation.getTaskGroupId() * 10000L * 10000L
284+
+ taskInGroupIndex * 10000L;
285+
Assertions.assertEquals(expectedTaskId, task.getTaskID());
286+
}
287+
}
288+
289+
for (int j = 0; j < subPlan.getPhysicalVertexList().size(); j++) {
290+
PhysicalVertex physicalVertex = subPlan.getPhysicalVertexList().get(j);
291+
TaskGroupLocation taskGroupLocation = physicalVertex.getTaskGroupLocation();
292+
List<Task> physicalTasks =
293+
new ArrayList<>(physicalVertex.getTaskGroup().getTasks());
294+
for (int taskInGroupIndex = 0;
295+
taskInGroupIndex < physicalTasks.size();
296+
taskInGroupIndex++) {
297+
Task task = physicalTasks.get(taskInGroupIndex);
298+
// can't get job parallel index, use prefix check
299+
long expectedTaskIdPrefix =
300+
pipelineId * 10000L * 10000L * 10000L
301+
+ taskGroupLocation.getTaskGroupId() * 10000L * 10000L
302+
+ taskInGroupIndex * 10000L;
303+
Assertions.assertEquals(
304+
expectedTaskIdPrefix / 10000L, task.getTaskID() / 10000L);
305+
}
306+
}
307+
}
308+
}
309+
231310
private static FakeSource createFakeSource() {
232311
Config fakeSourceConfig =
233312
ConfigFactory.parseMap(

0 commit comments

Comments
 (0)