Skip to content

Commit e42cdd1

Browse files
authored
[FLINK-38338][runtime] Introduce the abstraction to describe a rescale event. (apache#26981)
1 parent bb2b1db commit e42cdd1

24 files changed

+1888
-28
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.runtime.jobgraph.JobVertexID;
2424

2525
import java.util.Collections;
26+
import java.util.Objects;
2627
import java.util.Set;
2728
import java.util.TreeSet;
2829

@@ -41,6 +42,8 @@ public class SlotSharingGroup implements java.io.Serializable {
4142

4243
private final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
4344

45+
private String slotSharingGroupName;
46+
4447
// Represents resources of all tasks in the group. Default to be UNKNOWN.
4548
private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;
4649

@@ -70,12 +73,44 @@ public ResourceProfile getResourceProfile() {
7073
return resourceProfile;
7174
}
7275

76+
public String getSlotSharingGroupName() {
77+
return slotSharingGroupName;
78+
}
79+
80+
public void setSlotSharingGroupName(String slotSharingGroupName) {
81+
this.slotSharingGroupName = slotSharingGroupName;
82+
}
83+
7384
// ------------------------------------------------------------------------
7485
// Utilities
7586
// ------------------------------------------------------------------------
7687

88+
@Override
89+
public int hashCode() {
90+
return Objects.hash(slotSharingGroupId);
91+
}
92+
93+
@Override
94+
public boolean equals(Object o) {
95+
if (o == null || getClass() != o.getClass()) {
96+
return false;
97+
}
98+
SlotSharingGroup that = (SlotSharingGroup) o;
99+
return Objects.equals(slotSharingGroupId, that.slotSharingGroupId);
100+
}
101+
77102
@Override
78103
public String toString() {
79-
return "SlotSharingGroup{" + "ids=" + ids + ", resourceProfile=" + resourceProfile + '}';
104+
return "SlotSharingGroup{"
105+
+ "ids="
106+
+ ids
107+
+ ", slotSharingGroupId="
108+
+ slotSharingGroupId
109+
+ ", slotSharingGroupName='"
110+
+ slotSharingGroupName
111+
+ '\''
112+
+ ", resourceProfile="
113+
+ resourceProfile
114+
+ '}';
80115
}
81116
}

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.api.common.JobID;
2222
import org.apache.flink.api.common.JobStatus;
2323
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
24+
import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
2425

2526
import org.slf4j.Logger;
2627

@@ -34,13 +35,21 @@ class Finished implements State {
3435

3536
private final Logger logger;
3637

38+
private final Durable durable;
39+
3740
Finished(Context context, ArchivedExecutionGraph archivedExecutionGraph, Logger logger) {
3841
this.archivedExecutionGraph = archivedExecutionGraph;
3942
this.logger = logger;
43+
this.durable = new Durable();
4044

4145
context.onFinished(archivedExecutionGraph);
4246
}
4347

48+
@Override
49+
public Durable getDurable() {
50+
return durable;
51+
}
52+
4453
@Override
4554
public void cancel() {}
4655

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.flink.runtime.scheduler.adaptive;
1919

20+
import org.apache.flink.annotation.VisibleForTesting;
2021
import org.apache.flink.api.common.JobID;
2122
import org.apache.flink.runtime.jobgraph.JobGraph;
2223
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -41,7 +42,7 @@ public class JobGraphJobInformation implements JobInformation {
4142
private final JobGraph jobGraph;
4243
private final JobID jobID;
4344
private final String name;
44-
private final VertexParallelismStore vertexParallelismStore;
45+
protected final VertexParallelismStore vertexParallelismStore;
4546

4647
public JobGraphJobInformation(
4748
JobGraph jobGraph, VertexParallelismStore vertexParallelismStore) {
@@ -91,17 +92,19 @@ public JobGraph copyJobGraph() {
9192
return InstantiationUtil.cloneUnchecked(jobGraph);
9293
}
9394

95+
@Override
9496
public VertexParallelismStore getVertexParallelismStore() {
9597
return vertexParallelismStore;
9698
}
9799

98-
private static final class JobVertexInformation implements JobInformation.VertexInformation {
100+
@VisibleForTesting
101+
public static final class JobVertexInformation implements JobInformation.VertexInformation {
99102

100103
private final JobVertex jobVertex;
101104

102105
private final VertexParallelismInformation parallelismInfo;
103106

104-
private JobVertexInformation(
107+
public JobVertexInformation(
105108
JobVertex jobVertex, VertexParallelismInformation parallelismInfo) {
106109
this.jobVertex = jobVertex;
107110
this.parallelismInfo = parallelismInfo;
@@ -112,6 +115,11 @@ public JobVertexID getJobVertexID() {
112115
return jobVertex.getID();
113116
}
114117

118+
@Override
119+
public String getVertexName() {
120+
return jobVertex.getName();
121+
}
122+
115123
@Override
116124
public int getMinParallelism() {
117125
return parallelismInfo.getMinParallelism();
@@ -137,5 +145,10 @@ public SlotSharingGroup getSlotSharingGroup() {
137145
public CoLocationGroup getCoLocationGroup() {
138146
return jobVertex.getCoLocationGroup();
139147
}
148+
149+
@VisibleForTesting
150+
public VertexParallelismInformation getVertexParallelismInfo() {
151+
return parallelismInfo;
152+
}
140153
}
141154
}

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,37 @@
2121
import org.apache.flink.api.common.JobID;
2222
import org.apache.flink.api.common.JobStatus;
2323
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
24+
import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
2425
import org.apache.flink.util.function.FunctionWithException;
2526
import org.apache.flink.util.function.ThrowingConsumer;
2627

2728
import org.slf4j.Logger;
2829

30+
import java.time.Instant;
2931
import java.util.Optional;
3032
import java.util.function.Consumer;
3133

3234
/**
3335
* State abstraction of the {@link AdaptiveScheduler}. This interface contains all methods every
3436
* state implementation must support.
3537
*/
36-
interface State extends LabeledGlobalFailureHandler {
38+
public interface State extends LabeledGlobalFailureHandler {
39+
40+
/**
41+
* Get the durable time information of the current state.
42+
*
43+
* @return The durable time information of the current state.
44+
*/
45+
Durable getDurable();
3746

3847
/**
3948
* This method is called whenever one transitions out of this state.
4049
*
4150
* @param newState newState is the state into which the scheduler transitions
4251
*/
43-
default void onLeave(Class<? extends State> newState) {}
52+
default void onLeave(Class<? extends State> newState) {
53+
getDurable().setLeaveTimestamp(Instant.now().toEpochMilli());
54+
}
4455

4556
/** Cancels the job execution. */
4657
void cancel();

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.flink.runtime.scheduler.KvStateHandler;
5656
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
5757
import org.apache.flink.runtime.scheduler.VertexEndOfDataListener;
58+
import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
5859
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
5960
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
6061
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
@@ -69,6 +70,7 @@
6970

7071
import java.io.IOException;
7172
import java.net.InetSocketAddress;
73+
import java.time.Instant;
7274
import java.util.ArrayList;
7375
import java.util.List;
7476
import java.util.Map;
@@ -101,6 +103,8 @@ abstract class StateWithExecutionGraph implements State {
101103

102104
private final VertexEndOfDataListener vertexEndOfDataListener;
103105

106+
private final Durable durable;
107+
104108
StateWithExecutionGraph(
105109
Context context,
106110
ExecutionGraph executionGraph,
@@ -118,6 +122,7 @@ abstract class StateWithExecutionGraph implements State {
118122
this.userCodeClassLoader = userClassCodeLoader;
119123
this.failureCollection = new ArrayList<>(failureCollection);
120124
this.vertexEndOfDataListener = new VertexEndOfDataListener(executionGraph);
125+
this.durable = new Durable();
121126

122127
FutureUtils.assertNoException(
123128
executionGraph
@@ -137,6 +142,11 @@ abstract class StateWithExecutionGraph implements State {
137142
context.getMainThreadExecutor()));
138143
}
139144

145+
@Override
146+
public Durable getDurable() {
147+
return durable;
148+
}
149+
140150
ExecutionGraph getExecutionGraph() {
141151
return executionGraph;
142152
}
@@ -156,6 +166,7 @@ protected ExecutionGraphHandler getExecutionGraphHandler() {
156166

157167
@Override
158168
public void onLeave(Class<? extends State> newState) {
169+
getDurable().setLeaveTimestamp(Instant.now().toEpochMilli());
159170
if (!StateWithExecutionGraph.class.isAssignableFrom(newState)) {
160171
// we are leaving the StateWithExecutionGraph --> we need to dispose temporary services
161172
operatorCoordinatorHandler.disposeAllOperatorCoordinators();

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.api.common.JobID;
2222
import org.apache.flink.api.common.JobStatus;
2323
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
24+
import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
2425

2526
import org.slf4j.Logger;
2627

@@ -39,9 +40,17 @@ abstract class StateWithoutExecutionGraph implements State {
3940

4041
private final Logger logger;
4142

43+
private final Durable durable;
44+
4245
StateWithoutExecutionGraph(Context context, Logger logger) {
4346
this.context = context;
4447
this.logger = logger;
48+
this.durable = new Durable();
49+
}
50+
51+
@Override
52+
public Durable getDurable() {
53+
return durable;
4554
}
4655

4756
@Override

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.flink.runtime.scheduler.adaptive.allocator;
2020

2121
import org.apache.flink.runtime.clusterframework.types.ResourceID;
22-
import org.apache.flink.runtime.instance.SlotSharingGroupId;
22+
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
2323
import org.apache.flink.runtime.jobmaster.SlotInfo;
2424
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
2525

@@ -35,13 +35,13 @@ class AllocatorUtil {
3535

3636
private AllocatorUtil() {}
3737

38-
static Map<SlotSharingGroupId, SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
38+
static Map<SlotSharingGroup, SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
3939
getSlotSharingGroupMetaInfos(JobInformation jobInformation) {
4040
return SlotSharingSlotAllocator.SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
4141
}
4242

4343
static int getMinimumRequiredSlots(
44-
Map<SlotSharingGroupId, SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
44+
Map<SlotSharingGroup, SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
4545
slotSharingGroupMetaInfos) {
4646
return slotSharingGroupMetaInfos.values().stream()
4747
.map(SlotSharingSlotAllocator.SlotSharingGroupMetaInfo::getMaxLowerBound)

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.runtime.jobgraph.JobVertexID;
2121
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
2222
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
23+
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
2324

2425
import javax.annotation.Nullable;
2526

@@ -51,10 +52,16 @@ public interface JobInformation {
5152

5253
Iterable<VertexInformation> getVertices();
5354

55+
default VertexParallelismStore getVertexParallelismStore() {
56+
throw new UnsupportedOperationException();
57+
}
58+
5459
/** Information about a single vertex. */
5560
interface VertexInformation {
5661
JobVertexID getJobVertexID();
5762

63+
String getVertexName();
64+
5865
int getMinParallelism();
5966

6067
int getParallelism();

0 commit comments

Comments
 (0)