Skip to content

Commit a1484d8

Browse files
ztisonXComp
authored andcommitted
[FLINK-38703][runtime] Update slot manager metrics in thread-safety manner
1 parent 9f6cff3 commit a1484d8

File tree

4 files changed

+145
-6
lines changed

4 files changed

+145
-6
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import java.util.HashSet;
5757
import java.util.List;
5858
import java.util.Map;
59+
import java.util.Objects;
5960
import java.util.Optional;
6061
import java.util.Set;
6162
import java.util.StringJoiner;
@@ -69,6 +70,8 @@
6970

7071
/** Implementation of {@link SlotManager} supporting fine-grained resource management. */
7172
public class FineGrainedSlotManager implements SlotManager {
73+
public static final Duration METRICS_UPDATE_INTERVAL = Duration.ofSeconds(1);
74+
7275
private static final Logger LOG = LoggerFactory.getLogger(FineGrainedSlotManager.class);
7376

7477
private final TaskManagerTracker taskManagerTracker;
@@ -114,6 +117,8 @@ public class FineGrainedSlotManager implements SlotManager {
114117

115118
@Nullable private ScheduledFuture<?> clusterReconciliationCheck;
116119

120+
@Nullable private ScheduledFuture<?> metricsUpdateFuture;
121+
117122
@Nullable private CompletableFuture<Void> requirementsCheckFuture;
118123

119124
@Nullable private CompletableFuture<Void> declareNeededResourceFuture;
@@ -124,6 +129,11 @@ public class FineGrainedSlotManager implements SlotManager {
124129
/** True iff the component has been started. */
125130
private boolean started;
126131

132+
/** Metrics. */
133+
private long lastNumberFreeSlots;
134+
135+
private long lastNumberRegisteredSlots;
136+
127137
public FineGrainedSlotManager(
128138
ScheduledExecutor scheduledExecutor,
129139
SlotManagerConfiguration slotManagerConfiguration,
@@ -159,6 +169,7 @@ public FineGrainedSlotManager(
159169
mainThreadExecutor = null;
160170
clusterReconciliationCheck = null;
161171
requirementsCheckFuture = null;
172+
metricsUpdateFuture = null;
162173

163174
started = false;
164175
}
@@ -227,10 +238,26 @@ public void start(
227238
}
228239

229240
private void registerSlotManagerMetrics() {
230-
slotManagerMetricGroup.gauge(
231-
MetricNames.TASK_SLOTS_AVAILABLE, () -> (long) getNumberFreeSlots());
232-
slotManagerMetricGroup.gauge(
233-
MetricNames.TASK_SLOTS_TOTAL, () -> (long) getNumberRegisteredSlots());
241+
// Because taskManagerTracker is not thread-safe, metrics must be updated periodically on
242+
// the main thread to prevent concurrent modification issues.
243+
metricsUpdateFuture =
244+
scheduledExecutor.scheduleAtFixedRate(
245+
this::updateMetrics,
246+
0L,
247+
METRICS_UPDATE_INTERVAL.toMillis(),
248+
TimeUnit.MILLISECONDS);
249+
250+
slotManagerMetricGroup.gauge(MetricNames.TASK_SLOTS_AVAILABLE, () -> lastNumberFreeSlots);
251+
slotManagerMetricGroup.gauge(MetricNames.TASK_SLOTS_TOTAL, () -> lastNumberRegisteredSlots);
252+
}
253+
254+
private void updateMetrics() {
255+
Objects.requireNonNull(mainThreadExecutor)
256+
.execute(
257+
() -> {
258+
lastNumberFreeSlots = getNumberFreeSlots();
259+
lastNumberRegisteredSlots = getNumberRegisteredSlots();
260+
});
234261
}
235262

236263
/** Suspends the component. This clears the internal state of the slot manager. */
@@ -250,6 +277,12 @@ public void suspend() {
250277
clusterReconciliationCheck = null;
251278
}
252279

280+
// stop the metrics updates
281+
if (metricsUpdateFuture != null) {
282+
metricsUpdateFuture.cancel(false);
283+
metricsUpdateFuture = null;
284+
}
285+
253286
slotStatusSyncer.close();
254287
taskManagerTracker.clear();
255288
resourceTracker.clear();

flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020
import org.apache.flink.api.common.JobID;
2121
import org.apache.flink.api.java.tuple.Tuple2;
2222
import org.apache.flink.api.java.tuple.Tuple6;
23+
import org.apache.flink.metrics.Gauge;
2324
import org.apache.flink.runtime.clusterframework.types.AllocationID;
2425
import org.apache.flink.runtime.clusterframework.types.ResourceID;
2526
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
2627
import org.apache.flink.runtime.clusterframework.types.SlotID;
2728
import org.apache.flink.runtime.instance.InstanceID;
2829
import org.apache.flink.runtime.messages.Acknowledge;
30+
import org.apache.flink.runtime.metrics.MetricNames;
2931
import org.apache.flink.runtime.metrics.MetricRegistry;
3032
import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
3133
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
@@ -38,6 +40,7 @@
3840
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
3941
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
4042
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
43+
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
4144
import org.apache.flink.util.function.ThrowingConsumer;
4245

4346
import org.junit.jupiter.api.Test;
@@ -50,6 +53,7 @@
5053
import java.util.Optional;
5154
import java.util.concurrent.CompletableFuture;
5255
import java.util.concurrent.atomic.AtomicInteger;
56+
import java.util.concurrent.atomic.AtomicReference;
5357
import java.util.function.Consumer;
5458

5559
import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
@@ -1058,4 +1062,100 @@ void testClearResourceRequirementsWithPendingTaskManager() throws Exception {
10581062
}
10591063
};
10601064
}
1065+
1066+
@Test
1067+
void testMetricsUpdate() throws Exception {
1068+
final AtomicReference<Gauge<Long>> slotsAvailableGauge = new AtomicReference<>();
1069+
final AtomicReference<Gauge<Long>> slotsTotalGauge = new AtomicReference<>();
1070+
1071+
final MetricRegistry metricRegistry =
1072+
TestingMetricRegistry.builder()
1073+
.setRegisterConsumer(
1074+
(metric, name, group) -> {
1075+
if (name.equals(MetricNames.TASK_SLOTS_AVAILABLE)) {
1076+
slotsAvailableGauge.set((Gauge<Long>) metric);
1077+
} else if (name.equals(MetricNames.TASK_SLOTS_TOTAL)) {
1078+
slotsTotalGauge.set((Gauge<Long>) metric);
1079+
}
1080+
})
1081+
.build();
1082+
1083+
final Context context = new Context();
1084+
context.setSlotManagerMetricGroup(
1085+
SlotManagerMetricGroup.create(metricRegistry, "localhost"));
1086+
final ManuallyTriggeredScheduledExecutor scheduledExecutor =
1087+
new ManuallyTriggeredScheduledExecutor();
1088+
context.setScheduledExecutor(scheduledExecutor);
1089+
final TaskExecutorConnection executorConnection1 = createTaskExecutorConnection();
1090+
final TaskExecutorConnection executorConnection2 = createTaskExecutorConnection();
1091+
1092+
context.runTest(
1093+
() -> {
1094+
assertThat(slotsAvailableGauge.get().getValue()).isEqualTo(0);
1095+
assertThat(slotsTotalGauge.get().getValue()).isEqualTo(0);
1096+
1097+
final CompletableFuture<SlotManager.RegistrationResult>
1098+
registerTaskManagerFuture1 = new CompletableFuture<>();
1099+
context.runInMainThreadAndWait(
1100+
() ->
1101+
registerTaskManagerFuture1.complete(
1102+
context.getSlotManager()
1103+
.registerTaskManager(
1104+
executorConnection1,
1105+
new SlotReport(),
1106+
DEFAULT_TOTAL_RESOURCE_PROFILE,
1107+
DEFAULT_SLOT_RESOURCE_PROFILE)));
1108+
assertThat(assertFutureCompleteAndReturn(registerTaskManagerFuture1))
1109+
.isEqualTo(SlotManager.RegistrationResult.SUCCESS);
1110+
1111+
final CompletableFuture<SlotManager.RegistrationResult>
1112+
registerTaskManagerFuture2 = new CompletableFuture<>();
1113+
context.runInMainThreadAndWait(
1114+
() ->
1115+
registerTaskManagerFuture2.complete(
1116+
context.getSlotManager()
1117+
.registerTaskManager(
1118+
executorConnection2,
1119+
new SlotReport(
1120+
createAllocatedSlotStatus(
1121+
new JobID(),
1122+
new AllocationID(),
1123+
DEFAULT_SLOT_RESOURCE_PROFILE)),
1124+
DEFAULT_TOTAL_RESOURCE_PROFILE,
1125+
DEFAULT_SLOT_RESOURCE_PROFILE)));
1126+
assertThat(assertFutureCompleteAndReturn(registerTaskManagerFuture2))
1127+
.isEqualTo(SlotManager.RegistrationResult.SUCCESS);
1128+
1129+
// triggers the metric update task on the main thread and waits for the main
1130+
// thread to process queued up callbacks
1131+
scheduledExecutor.triggerPeriodicScheduledTasks();
1132+
context.runInMainThreadAndWait(() -> {});
1133+
1134+
assertThat(slotsTotalGauge.get().getValue())
1135+
.isEqualTo(2 * DEFAULT_NUM_SLOTS_PER_WORKER);
1136+
assertThat(slotsAvailableGauge.get().getValue())
1137+
.isEqualTo(2 * DEFAULT_NUM_SLOTS_PER_WORKER - 1);
1138+
1139+
final CompletableFuture<Boolean> unRegisterTaskManagerFuture =
1140+
new CompletableFuture<>();
1141+
context.runInMainThreadAndWait(
1142+
() ->
1143+
unRegisterTaskManagerFuture.complete(
1144+
context.getSlotManager()
1145+
.unregisterTaskManager(
1146+
executorConnection2.getInstanceID(),
1147+
TEST_EXCEPTION)));
1148+
assertThat(assertFutureCompleteAndReturn(unRegisterTaskManagerFuture)).isTrue();
1149+
1150+
// triggers the metric update task on the main thread and waits for the main
1151+
// thread to process queued up callbacks
1152+
scheduledExecutor.triggerPeriodicScheduledTasks();
1153+
context.runInMainThreadAndWait(() -> {});
1154+
1155+
assertThat(slotsTotalGauge.get().getValue())
1156+
.isEqualTo(DEFAULT_NUM_SLOTS_PER_WORKER);
1157+
assertThat(slotsAvailableGauge.get().getValue())
1158+
.isEqualTo(DEFAULT_NUM_SLOTS_PER_WORKER);
1159+
});
1160+
}
10611161
}

flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ protected class Context {
152152
private SlotManagerMetricGroup slotManagerMetricGroup =
153153
UnregisteredMetricGroups.createUnregisteredSlotManagerMetricGroup();
154154
private BlockedTaskManagerChecker blockedTaskManagerChecker = resourceID -> false;
155-
private final ScheduledExecutor scheduledExecutor =
155+
private ScheduledExecutor scheduledExecutor =
156156
new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
157157
private final Executor mainThreadExecutor = EXECUTOR_RESOURCE.getExecutor();
158158
private FineGrainedSlotManager slotManager;
@@ -193,6 +193,10 @@ public void setBlockedTaskManagerChecker(
193193
this.blockedTaskManagerChecker = blockedTaskManagerChecker;
194194
}
195195

196+
public void setScheduledExecutor(ScheduledExecutor scheduledExecutor) {
197+
this.scheduledExecutor = scheduledExecutor;
198+
}
199+
196200
void runInMainThread(Runnable runnable) {
197201
mainThreadExecutor.execute(runnable);
198202
}

flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.metrics.reporter.AbstractReporter;
2727
import org.apache.flink.metrics.reporter.MetricReporter;
2828
import org.apache.flink.metrics.reporter.MetricReporterFactory;
29+
import org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager;
2930
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
3031
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3132
import org.apache.flink.streaming.api.functions.sink.legacy.PrintSinkFunction;
@@ -131,7 +132,8 @@ void testJobManagerMetrics() throws Exception {
131132
expectedPattern, gaugeNames));
132133
}
133134
}
134-
135+
// wait for metrics update
136+
Thread.sleep(FineGrainedSlotManager.METRICS_UPDATE_INTERVAL.toMillis());
135137
for (Map.Entry<Gauge<?>, String> entry : reporter.getGauges().entrySet()) {
136138
if (entry.getValue().contains(MetricNames.TASK_SLOTS_AVAILABLE)) {
137139
assertEquals(0L, entry.getKey().getValue());

0 commit comments

Comments
 (0)