diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 38cafeea1b9a2..39a254cdc28a9 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -169,38 +169,6 @@
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyResponseBody"
}
- }, {
- "url" : "/applications/:applicationid/jobmanager/config",
- "method" : "GET",
- "status-code" : "200 OK",
- "file-upload" : false,
- "path-parameters" : {
- "pathParameters" : [ {
- "key" : "applicationid"
- } ]
- },
- "query-parameters" : {
- "queryParameters" : [ ]
- },
- "request" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
- },
- "response" : {
- "type" : "array",
- "items" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry",
- "properties" : {
- "key" : {
- "type" : "string"
- },
- "value" : {
- "type" : "string"
- }
- }
- }
- }
}, {
"url" : "/applications/:applicationid/exceptions",
"method" : "GET",
@@ -251,6 +219,38 @@
}
}
}
+ }, {
+ "url" : "/applications/:applicationid/jobmanager/config",
+ "method" : "GET",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ {
+ "key" : "applicationid"
+ } ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+ },
+ "response" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry",
+ "properties" : {
+ "key" : {
+ "type" : "string"
+ },
+ "value" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
}, {
"url" : "/cluster",
"method" : "DELETE",
@@ -2741,6 +2741,95 @@
"response" : {
"type" : "any"
}
+ }, {
+ "url" : "/jobs/:jobid/metrics/top-n",
+ "method" : "GET",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ {
+ "key" : "jobid"
+ } ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+ },
+ "response" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:TopNMetricsResponseBody",
+ "properties" : {
+ "topCpuConsumers" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:TopNMetricsResponseBody:CpuConsumerInfo",
+ "properties" : {
+ "subtaskId" : {
+ "type" : "integer"
+ },
+ "taskName" : {
+ "type" : "string"
+ },
+ "operatorName" : {
+ "type" : "string"
+ },
+ "cpuPercentage" : {
+ "type" : "number"
+ },
+ "taskManagerId" : {
+ "type" : "string"
+ }
+ }
+ }
+ },
+ "topBackpressureOperators" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:TopNMetricsResponseBody:BackpressureOperatorInfo",
+ "properties" : {
+ "operatorId" : {
+ "type" : "string"
+ },
+ "operatorName" : {
+ "type" : "string"
+ },
+ "backpressureRatio" : {
+ "type" : "number"
+ },
+ "subtaskId" : {
+ "type" : "integer"
+ }
+ }
+ }
+ },
+ "topGcIntensiveTasks" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:TopNMetricsResponseBody:GcTaskInfo",
+ "properties" : {
+ "taskId" : {
+ "type" : "string"
+ },
+ "taskName" : {
+ "type" : "string"
+ },
+ "gcTimePercentage" : {
+ "type" : "number"
+ },
+ "taskManagerId" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
+ }
}, {
"url" : "/jobs/:jobid/plan",
"method" : "GET",
@@ -2834,6 +2923,57 @@
}
}
}
+ }, {
+ "url" : "/jobs/:jobid/rescales/config",
+ "method" : "GET",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ {
+ "key" : "jobid"
+ } ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+ },
+ "response" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleConfigInfo",
+ "properties" : {
+ "rescaleHistoryMax" : {
+ "type" : "integer"
+ },
+ "schedulerExecutionMode" : {
+ "type" : "string",
+ "enum" : [ "REACTIVE" ]
+ },
+ "submissionResourceWaitTimeoutInMillis" : {
+ "type" : "integer"
+ },
+ "submissionResourceStabilizationTimeoutInMillis" : {
+ "type" : "integer"
+ },
+ "slotIdleTimeoutInMillis" : {
+ "type" : "integer"
+ },
+ "executingCooldownTimeoutInMillis" : {
+ "type" : "integer"
+ },
+ "executingResourceStabilizationTimeoutInMillis" : {
+ "type" : "integer"
+ },
+ "maximumDelayForTriggeringRescaleInMillis" : {
+ "type" : "integer"
+ },
+ "rescaleOnFailedCheckpointCount" : {
+ "type" : "integer"
+ }
+ }
+ }
}, {
"url" : "/jobs/:jobid/rescaling",
"method" : "PATCH",
@@ -4781,56 +4921,5 @@
}
}
}
- }, {
- "url" : "/jobs/:jobid/rescales/config",
- "method" : "GET",
- "status-code" : "200 OK",
- "file-upload" : false,
- "path-parameters" : {
- "pathParameters" : [ {
- "key" : "jobid"
- } ]
- },
- "query-parameters" : {
- "queryParameters" : [ ]
- },
- "request" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
- },
- "response" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleConfigInfo",
- "properties" : {
- "executingCooldownTimeoutInMillis" : {
- "type" : "integer"
- },
- "executingResourceStabilizationTimeoutInMillis" : {
- "type" : "integer"
- },
- "maximumDelayForTriggeringRescaleInMillis" : {
- "type" : "integer"
- },
- "rescaleHistoryMax" : {
- "type" : "integer"
- },
- "rescaleOnFailedCheckpointCount" : {
- "type" : "integer"
- },
- "schedulerExecutionMode" : {
- "type" : "string",
- "enum" : [ "REACTIVE" ]
- },
- "slotIdleTimeoutInMillis" : {
- "type" : "integer"
- },
- "submissionResourceStabilizationTimeoutInMillis" : {
- "type" : "integer"
- },
- "submissionResourceWaitTimeoutInMillis" : {
- "type" : "integer"
- }
- }
- }
} ]
-}
+}
\ No newline at end of file
diff --git a/flink-runtime-web/web-dashboard/src/app/components/topn-metrics/topn-metrics.component.html b/flink-runtime-web/web-dashboard/src/app/components/topn-metrics/topn-metrics.component.html
new file mode 100644
index 0000000000000..3a8869f7d152b
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/src/app/components/topn-metrics/topn-metrics.component.html
@@ -0,0 +1,113 @@
+
+
+
+
+
Top N CPU Consumers
+
+
+
+ | Rank |
+ Subtask ID |
+ Operator Name |
+ Task Name |
+ CPU Usage |
+ TaskManager |
+
+
+
+
+ | {{ i + 1 }} |
+ {{ item.subtaskId }} |
+ {{ item.operatorName }} |
+ {{ item.taskName }} |
+ 80">
+ {{ formatPercentage(item.cpuPercentage) }}
+ |
+ {{ item.taskManagerId }} |
+
+
+
+
+
No CPU metrics available
+
+
+
+
+
Top N Backpressure Operators
+
+
+
+ | Rank |
+ Operator ID |
+ Operator Name |
+ Subtask ID |
+ Backpressure Ratio |
+
+
+
+
+ | {{ i + 1 }} |
+ {{ item.operatorId }} |
+ {{ item.operatorName }} |
+ {{ item.subtaskId }} |
+ 0.5">
+ {{ formatPercentage(item.backpressureRatio * 100) }}
+ |
+
+
+
+
+
No backpressure metrics available
+
+
+
+
+
Top N GC Intensive Tasks
+
+
+
+ | Rank |
+ Task ID |
+ Task Name |
+ GC Time Percentage |
+ TaskManager |
+
+
+
+
+ | {{ i + 1 }} |
+ {{ item.taskId }} |
+ {{ item.taskName }} |
+ 30">
+ {{ formatPercentage(item.gcTimePercentage) }}
+ |
+ {{ item.taskManagerId }} |
+
+
+
+
+
No GC metrics available
+
+
+
diff --git a/flink-runtime-web/web-dashboard/src/app/components/topn-metrics/topn-metrics.component.less b/flink-runtime-web/web-dashboard/src/app/components/topn-metrics/topn-metrics.component.less
new file mode 100644
index 0000000000000..517e63f15aec3
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/src/app/components/topn-metrics/topn-metrics.component.less
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+.topn-metrics-container {
+ padding: 16px;
+ background: #fff;
+ border-radius: 4px;
+ margin-bottom: 16px;
+}
+
+.topn-metrics-section {
+ margin-bottom: 24px;
+
+ &:last-child {
+ margin-bottom: 0;
+ }
+
+ h3 {
+ font-size: 16px;
+ font-weight: 600;
+ margin-bottom: 12px;
+ color: #262626;
+ }
+
+ .no-data {
+ padding: 20px;
+ text-align: center;
+ color: #8c8c8c;
+ font-style: italic;
+ }
+}
+
+.high-cpu {
+ color: #cf1322;
+ font-weight: 600;
+}
+
+.high-backpressure {
+ color: #d4380d;
+ font-weight: 600;
+}
+
+.high-gc {
+ color: #d46b08;
+ font-weight: 600;
+}
diff --git a/flink-runtime-web/web-dashboard/src/app/components/topn-metrics/topn-metrics.component.ts b/flink-runtime-web/web-dashboard/src/app/components/topn-metrics/topn-metrics.component.ts
new file mode 100644
index 0000000000000..ecf9accb8a9f9
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/src/app/components/topn-metrics/topn-metrics.component.ts
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { ChangeDetectionStrategy, Component, Input, OnChanges, SimpleChanges } from '@angular/core';
+
+import { CpuConsumerInfo, BackpressureOperatorInfo, GcTaskInfo } from '@flink-runtime-web/interfaces';
+
+@Component({
+ selector: 'flink-topn-metrics',
+ templateUrl: './topn-metrics.component.html',
+ styleUrls: ['./topn-metrics.component.less'],
+ changeDetection: ChangeDetectionStrategy.OnPush
+})
+export class TopNMetricsComponent implements OnChanges {
+ @Input() topCpuConsumers: CpuConsumerInfo[] = [];
+ @Input() topBackpressureOperators: BackpressureOperatorInfo[] = [];
+ @Input() topGcIntensiveTasks: GcTaskInfo[] = [];
+
+ ngOnChanges(changes: SimpleChanges): void {
+ console.log('TopN metrics updated:', changes);
+ }
+
+ formatPercentage(value: number): string {
+ return `${value.toFixed(2)}%`;
+ }
+}
diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts
index 38d072557e173..693c07b900e4d 100644
--- a/flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts
+++ b/flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts
@@ -36,3 +36,4 @@ export * from './job-metrics';
export * from './application-overview';
export * from './application-detail';
export * from './application-exception';
+export * from './topn-metrics';
diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/topn-metrics.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/topn-metrics.ts
new file mode 100644
index 0000000000000..1de9fd13a7d33
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/src/app/interfaces/topn-metrics.ts
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+export interface TopNMetrics {
+ topCpuConsumers: CpuConsumerInfo[];
+ topBackpressureOperators: BackpressureOperatorInfo[];
+ topGcIntensiveTasks: GcTaskInfo[];
+}
+
+export interface CpuConsumerInfo {
+ subtaskId: number;
+ taskName: string;
+ operatorName: string;
+ cpuPercentage: number;
+ taskManagerId: string;
+}
+
+export interface BackpressureOperatorInfo {
+ operatorId: string;
+ operatorName: string;
+ backpressureRatio: number;
+ subtaskId: number;
+}
+
+export interface GcTaskInfo {
+ taskId: string;
+ taskName: string;
+ gcTimePercentage: number;
+ taskManagerId: string;
+}
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/overview-demo/overview-demo.component.html b/flink-runtime-web/web-dashboard/src/app/pages/overview-demo/overview-demo.component.html
new file mode 100644
index 0000000000000..f9e80eeaad09e
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/src/app/pages/overview-demo/overview-demo.component.html
@@ -0,0 +1,30 @@
+/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license
+agreements. See the NOTICE file * distributed with this work for additional information * regarding
+copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance * with the License. You may obtain a
+copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+applicable law or agreed to in writing, software * distributed under the License is distributed on
+an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See
+the License for the specific language governing permissions and * limitations under the License. */
+
+
+
Cluster Overview
+
+
+
Top N Metrics Dashboard Demo
+
This is a demonstration of the Top N Metrics Dashboard feature.
+
+
+
+
+
+
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/overview-demo/overview-demo.component.less b/flink-runtime-web/web-dashboard/src/app/pages/overview-demo/overview-demo.component.less
new file mode 100644
index 0000000000000..ae3beb6758571
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/src/app/pages/overview-demo/overview-demo.component.less
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+.overview-container {
+ padding: 24px;
+ background: #f0f2f5;
+ min-height: 100vh;
+
+ h2 {
+ font-size: 24px;
+ font-weight: 600;
+ margin-bottom: 24px;
+ color: #262626;
+ }
+
+ .demo-section {
+ h3 {
+ font-size: 18px;
+ font-weight: 500;
+ margin-bottom: 16px;
+ color: #262626;
+ }
+
+ p {
+ color: #595959;
+ margin-bottom: 16px;
+ }
+
+ nz-alert {
+ margin-top: 16px;
+ }
+ }
+}
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/overview-demo/overview-demo.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/overview-demo/overview-demo.component.ts
new file mode 100644
index 0000000000000..17b9246ae3ab1
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/src/app/pages/overview-demo/overview-demo.component.ts
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Component, OnInit } from '@angular/core';
+
+import { CpuConsumerInfo, BackpressureOperatorInfo, GcTaskInfo } from '@flink-runtime-web/interfaces';
+
+@Component({
+ selector: 'flink-overview-demo',
+ templateUrl: './overview-demo.component.html',
+ styleUrls: ['./overview-demo.component.less']
+})
+export class OverviewDemoComponent implements OnInit {
+ demoCpuConsumers: CpuConsumerInfo[] = [];
+ demoBackpressureOperators: BackpressureOperatorInfo[] = [];
+ demoGcIntensiveTasks: GcTaskInfo[] = [];
+
+ ngOnInit(): void {
+ // Initialize demo data
+ this.demoCpuConsumers = [
+ {
+ subtaskId: 0,
+ taskName: 'Source: Kafka',
+ operatorName: 'Kafka Source',
+ cpuPercentage: 95.5,
+ taskManagerId: 'container_123'
+ },
+ {
+ subtaskId: 1,
+ taskName: 'Map: Process',
+ operatorName: 'ProcessFunction',
+ cpuPercentage: 88.2,
+ taskManagerId: 'container_124'
+ },
+ {
+ subtaskId: 2,
+ taskName: 'Sink: HDFS',
+ operatorName: 'HDFS Sink',
+ cpuPercentage: 82.7,
+ taskManagerId: 'container_125'
+ },
+ {
+ subtaskId: 3,
+ taskName: 'Window',
+ operatorName: 'WindowFunction',
+ cpuPercentage: 76.3,
+ taskManagerId: 'container_123'
+ },
+ {
+ subtaskId: 4,
+ taskName: 'Aggregate',
+ operatorName: 'AggregateFunction',
+ cpuPercentage: 71.9,
+ taskManagerId: 'container_126'
+ }
+ ];
+
+ this.demoBackpressureOperators = [
+ {
+ operatorId: 'op_456',
+ operatorName: 'Map',
+ backpressureRatio: 0.85,
+ subtaskId: 1
+ },
+ {
+ operatorId: 'op_789',
+ operatorName: 'Window',
+ backpressureRatio: 0.72,
+ subtaskId: 3
+ },
+ {
+ operatorId: 'op_234',
+ operatorName: 'Filter',
+ backpressureRatio: 0.58,
+ subtaskId: 2
+ },
+ {
+ operatorId: 'op_567',
+ operatorName: 'Join',
+ backpressureRatio: 0.45,
+ subtaskId: 4
+ },
+ {
+ operatorId: 'op_890',
+ operatorName: 'Aggregate',
+ backpressureRatio: 0.32,
+ subtaskId: 5
+ }
+ ];
+
+ this.demoGcIntensiveTasks = [
+ {
+ taskId: 'task_789',
+ taskName: 'ProcessFunction',
+ gcTimePercentage: 45.2,
+ taskManagerId: 'container_123'
+ },
+ {
+ taskId: 'task_456',
+ taskName: 'WindowFunction',
+ gcTimePercentage: 38.7,
+ taskManagerId: 'container_124'
+ },
+ {
+ taskId: 'task_234',
+ taskName: 'JoinFunction',
+ gcTimePercentage: 32.1,
+ taskManagerId: 'container_125'
+ },
+ {
+ taskId: 'task_567',
+ taskName: 'AggregateFunction',
+ gcTimePercentage: 28.5,
+ taskManagerId: 'container_126'
+ },
+ {
+ taskId: 'task_890',
+ taskName: 'FilterFunction',
+ gcTimePercentage: 24.3,
+ taskManagerId: 'container_127'
+ }
+ ];
+ }
+}
diff --git a/flink-runtime-web/web-dashboard/src/app/services/public-api.ts b/flink-runtime-web/web-dashboard/src/app/services/public-api.ts
index df66d9dcaba56..820809bb4634d 100644
--- a/flink-runtime-web/web-dashboard/src/app/services/public-api.ts
+++ b/flink-runtime-web/web-dashboard/src/app/services/public-api.ts
@@ -25,3 +25,4 @@ export * from './task-manager.service';
export * from './metrics.service';
export * from './config.service';
export * from './application.service';
+export * from './topn-metrics.service';
diff --git a/flink-runtime-web/web-dashboard/src/app/services/topn-metrics.service.ts b/flink-runtime-web/web-dashboard/src/app/services/topn-metrics.service.ts
new file mode 100644
index 0000000000000..d7d80d623bd0b
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/src/app/services/topn-metrics.service.ts
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { HttpClient } from '@angular/common/http';
+import { Injectable } from '@angular/core';
+import { EMPTY, Observable } from 'rxjs';
+import { catchError } from 'rxjs/operators';
+
+import { TopNMetrics } from '@flink-runtime-web/interfaces';
+
+import { ConfigService } from './config.service';
+
+@Injectable({
+ providedIn: 'root'
+})
+export class TopNMetricsService {
+ constructor(private readonly httpClient: HttpClient, private readonly configService: ConfigService) {}
+
+ public loadTopNMetrics(jobId: string): Observable {
+ return this.httpClient
+ .get(`${this.configService.BASE_URL}/jobs/${jobId}/metrics/top-n`)
+ .pipe(catchError(() => EMPTY));
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TopNMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TopNMetricsHandler.java
new file mode 100644
index 0000000000000..00dab66f5ad9e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TopNMetricsHandler.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.TopNMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.TopNMetricsMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.TopNMetricsResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Request handler that returns Top N metrics for a job, including CPU consumers, backpressured
+ * operators, and GC-intensive tasks.
+ */
+@Internal
+public class TopNMetricsHandler
+ extends AbstractRestHandler<
+ RestfulGateway,
+ EmptyRequestBody,
+ TopNMetricsResponseBody,
+ TopNMetricsMessageParameters> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TopNMetricsHandler.class);
+
+ private static final int DEFAULT_TOP_N = 5;
+
+ private static final String CPU_METRIC = "taskmanager.cpu.time";
+ private static final String BACKPRESSURE_METRIC = "tasks.backpressure.ratio";
+ private static final String GC_TIME_METRIC = "tasks.GC.time";
+
+ private final MetricFetcher metricFetcher;
+
+ public TopNMetricsHandler(
+ GatewayRetriever extends RestfulGateway> leaderRetriever,
+ Duration timeout,
+ Map headers,
+ MetricFetcher metricFetcher) {
+ super(leaderRetriever, timeout, headers, TopNMetricsHeaders.getInstance());
+ this.metricFetcher = requireNonNull(metricFetcher, "metricFetcher must not be null");
+ }
+
+ @Override
+ protected CompletableFuture handleRequest(
+ @Nonnull HandlerRequest request, @Nonnull RestfulGateway gateway)
+ throws RestHandlerException {
+ metricFetcher.update();
+
+ final MetricStore metricStore = metricFetcher.getMetricStore();
+ final String jobId = request.getPathParameter(JobIDPathParameter.class).toString();
+
+ try {
+ // Collect Top N metrics
+ List topCpuConsumers =
+ getTopCpuConsumers(metricStore, jobId);
+
+ List topBackpressureOperators =
+ getTopBackpressureOperators(metricStore, jobId);
+
+ List topGcIntensiveTasks =
+ getTopGcIntensiveTasks(metricStore, jobId);
+
+ return CompletableFuture.completedFuture(
+ new TopNMetricsResponseBody(
+ topCpuConsumers, topBackpressureOperators, topGcIntensiveTasks));
+
+ } catch (Exception e) {
+ LOG.info("Could not retrieve Top N metrics for job {}.", jobId, e);
+ throw new RestHandlerException(
+ "Could not retrieve Top N metrics for job " + jobId,
+ HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ private List getTopCpuConsumers(
+ MetricStore metricStore, String jobId) {
+ List cpuConsumers = new ArrayList<>();
+
+ // Get all task IDs for this job from representativeAttempts
+ Map>> representativeAttempts =
+ metricStore.getRepresentativeAttempts();
+ Map> jobTasks = representativeAttempts.get(jobId);
+
+ if (jobTasks == null) {
+ return cpuConsumers;
+ }
+
+ for (String taskId : jobTasks.keySet()) {
+ MetricStore.TaskMetricStore taskMetricStore =
+ metricStore.getTaskMetricStore(jobId, taskId);
+ if (taskMetricStore == null) {
+ continue;
+ }
+
+ // Get all subtasks
+ for (Map.Entry subtaskEntry :
+ taskMetricStore.getAllSubtaskMetricStores().entrySet()) {
+ int subtaskIndex = subtaskEntry.getKey();
+ MetricStore.SubtaskMetricStore subtaskMetricStore = subtaskEntry.getValue();
+
+ String cpuValue = subtaskMetricStore.getMetric(CPU_METRIC);
+ if (cpuValue != null) {
+ try {
+ double cpuTime = Double.parseDouble(cpuValue);
+ cpuConsumers.add(
+ new TopNMetricsResponseBody.CpuConsumerInfo(
+ subtaskIndex,
+ taskId,
+ taskId, // Using taskId as operatorName for simplicity
+ cpuTime,
+ "unknown")); // TaskManager ID not directly available
+ } catch (NumberFormatException e) {
+ // Skip invalid values
+ }
+ }
+ }
+ }
+
+ // Sort by CPU usage and take top N
+ return cpuConsumers.stream()
+ .sorted(
+ Comparator.comparing(
+ TopNMetricsResponseBody.CpuConsumerInfo::getCpuPercentage)
+ .reversed())
+ .limit(DEFAULT_TOP_N)
+ .collect(Collectors.toList());
+ }
+
+ private List getTopBackpressureOperators(
+ MetricStore metricStore, String jobId) {
+ List backpressureOperators =
+ new ArrayList<>();
+
+ // Get all task IDs for this job from representativeAttempts
+ Map>> representativeAttempts =
+ metricStore.getRepresentativeAttempts();
+ Map> jobTasks = representativeAttempts.get(jobId);
+
+ if (jobTasks == null) {
+ return backpressureOperators;
+ }
+
+ for (String operatorName : jobTasks.keySet()) {
+ MetricStore.TaskMetricStore taskMetricStore =
+ metricStore.getTaskMetricStore(jobId, operatorName);
+ if (taskMetricStore == null) {
+ continue;
+ }
+
+ // Get all subtasks
+ for (Map.Entry subtaskEntry :
+ taskMetricStore.getAllSubtaskMetricStores().entrySet()) {
+ int subtaskIndex = subtaskEntry.getKey();
+ MetricStore.SubtaskMetricStore subtaskMetricStore = subtaskEntry.getValue();
+
+ String backpressureValue = subtaskMetricStore.getMetric(BACKPRESSURE_METRIC);
+ if (backpressureValue != null) {
+ try {
+ double backpressureRatio = Double.parseDouble(backpressureValue);
+ backpressureOperators.add(
+ new TopNMetricsResponseBody.BackpressureOperatorInfo(
+ operatorName,
+ operatorName,
+ backpressureRatio,
+ subtaskIndex));
+ } catch (NumberFormatException e) {
+ // Skip invalid values
+ }
+ }
+ }
+ }
+
+ // Sort by backpressure ratio and take top N
+ return backpressureOperators.stream()
+ .sorted(
+ Comparator.comparing(
+ TopNMetricsResponseBody.BackpressureOperatorInfo
+ ::getBackpressureRatio)
+ .reversed())
+ .limit(DEFAULT_TOP_N)
+ .collect(Collectors.toList());
+ }
+
+ private List getTopGcIntensiveTasks(
+ MetricStore metricStore, String jobId) {
+ List gcIntensiveTasks = new ArrayList<>();
+
+ // Get all task IDs for this job from representativeAttempts
+ Map>> representativeAttempts =
+ metricStore.getRepresentativeAttempts();
+ Map> jobTasks = representativeAttempts.get(jobId);
+
+ if (jobTasks == null) {
+ return gcIntensiveTasks;
+ }
+
+ for (String taskId : jobTasks.keySet()) {
+ MetricStore.TaskMetricStore taskMetricStore =
+ metricStore.getTaskMetricStore(jobId, taskId);
+ if (taskMetricStore == null) {
+ continue;
+ }
+
+ // Get all subtasks
+ for (Map.Entry subtaskEntry :
+ taskMetricStore.getAllSubtaskMetricStores().entrySet()) {
+ int subtaskIndex = subtaskEntry.getKey();
+ MetricStore.SubtaskMetricStore subtaskMetricStore = subtaskEntry.getValue();
+
+ String gcTimeValue = subtaskMetricStore.getMetric(GC_TIME_METRIC);
+ if (gcTimeValue != null) {
+ try {
+ double gcTime = Double.parseDouble(gcTimeValue);
+ gcIntensiveTasks.add(
+ new TopNMetricsResponseBody.GcTaskInfo(
+ String.valueOf(subtaskIndex),
+ taskId,
+ gcTime,
+ "unknown")); // TaskManager ID not directly available
+ } catch (NumberFormatException e) {
+ // Skip invalid values
+ }
+ }
+ }
+ }
+
+ // Sort by GC time and take top N
+ return gcIntensiveTasks.stream()
+ .sorted(
+ Comparator.comparing(
+ TopNMetricsResponseBody.GcTaskInfo::getGcTimePercentage)
+ .reversed())
+ .limit(DEFAULT_TOP_N)
+ .collect(Collectors.toList());
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsHeaders.java
new file mode 100644
index 0000000000000..0eadba3d96058
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsHeaders.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** {@link RuntimeMessageHeaders} for {@link TopNMetricsHandler}. */
+public final class TopNMetricsHeaders
+ implements RuntimeMessageHeaders<
+ EmptyRequestBody, TopNMetricsResponseBody, TopNMetricsMessageParameters> {
+
+ private static final TopNMetricsHeaders INSTANCE = new TopNMetricsHeaders();
+
+ private TopNMetricsHeaders() {}
+
+ @Override
+ public Class getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class getResponseClass() {
+ return TopNMetricsResponseBody.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public TopNMetricsMessageParameters getUnresolvedMessageParameters() {
+ return new TopNMetricsMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return "/jobs/:" + JobIDPathParameter.KEY + "/metrics/top-n";
+ }
+
+ public static TopNMetricsHeaders getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public String getDescription() {
+ return "Returns Top N metrics for a job including CPU consumers, "
+ + "backpressured operators, and GC-intensive tasks.";
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsMessageParameters.java
new file mode 100644
index 0000000000000..55812df7c7028
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsMessageParameters.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+
+/** MessageParameters for Top N metrics. */
+public class TopNMetricsMessageParameters extends JobMessageParameters {}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsResponseBody.java
new file mode 100644
index 0000000000000..e10dc368a5e29
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsResponseBody.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/** Response body for Top N metrics aggregation. */
+public class TopNMetricsResponseBody implements ResponseBody {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String FIELD_NAME_TOP_CPU_CONSUMERS = "topCpuConsumers";
+ public static final String FIELD_NAME_TOP_BACKPRESSURE_OPERATORS = "topBackpressureOperators";
+ public static final String FIELD_NAME_TOP_GC_INTENSIVE_TASKS = "topGcIntensiveTasks";
+
+ @JsonProperty(FIELD_NAME_TOP_CPU_CONSUMERS)
+ private final List topCpuConsumers;
+
+ @JsonProperty(FIELD_NAME_TOP_BACKPRESSURE_OPERATORS)
+ private final List topBackpressureOperators;
+
+ @JsonProperty(FIELD_NAME_TOP_GC_INTENSIVE_TASKS)
+ private final List topGcIntensiveTasks;
+
+ @JsonCreator
+ public TopNMetricsResponseBody(
+ @JsonProperty(FIELD_NAME_TOP_CPU_CONSUMERS) List topCpuConsumers,
+ @JsonProperty(FIELD_NAME_TOP_BACKPRESSURE_OPERATORS)
+ List topBackpressureOperators,
+ @JsonProperty(FIELD_NAME_TOP_GC_INTENSIVE_TASKS) List topGcIntensiveTasks) {
+ this.topCpuConsumers = topCpuConsumers;
+ this.topBackpressureOperators = topBackpressureOperators;
+ this.topGcIntensiveTasks = topGcIntensiveTasks;
+ }
+
+ public List getTopCpuConsumers() {
+ return topCpuConsumers;
+ }
+
+ public List getTopBackpressureOperators() {
+ return topBackpressureOperators;
+ }
+
+ public List getTopGcIntensiveTasks() {
+ return topGcIntensiveTasks;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TopNMetricsResponseBody that = (TopNMetricsResponseBody) o;
+ return Objects.equals(topCpuConsumers, that.topCpuConsumers)
+ && Objects.equals(topBackpressureOperators, that.topBackpressureOperators)
+ && Objects.equals(topGcIntensiveTasks, that.topGcIntensiveTasks);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topCpuConsumers, topBackpressureOperators, topGcIntensiveTasks);
+ }
+
+ /** Information about CPU-intensive subtasks. */
+ public static class CpuConsumerInfo {
+
+ public static final String FIELD_NAME_SUBTASK_ID = "subtaskId";
+ public static final String FIELD_NAME_TASK_NAME = "taskName";
+ public static final String FIELD_NAME_OPERATOR_NAME = "operatorName";
+ public static final String FIELD_NAME_CPU_PERCENTAGE = "cpuPercentage";
+ public static final String FIELD_NAME_TASKMANAGER_ID = "taskManagerId";
+
+ @JsonProperty(FIELD_NAME_SUBTASK_ID)
+ private final int subtaskId;
+
+ @JsonProperty(FIELD_NAME_TASK_NAME)
+ private final String taskName;
+
+ @JsonProperty(FIELD_NAME_OPERATOR_NAME)
+ private final String operatorName;
+
+ @JsonProperty(FIELD_NAME_CPU_PERCENTAGE)
+ private final double cpuPercentage;
+
+ @JsonProperty(FIELD_NAME_TASKMANAGER_ID)
+ private final String taskManagerId;
+
+ @JsonCreator
+ public CpuConsumerInfo(
+ @JsonProperty(FIELD_NAME_SUBTASK_ID) int subtaskId,
+ @JsonProperty(FIELD_NAME_TASK_NAME) String taskName,
+ @JsonProperty(FIELD_NAME_OPERATOR_NAME) String operatorName,
+ @JsonProperty(FIELD_NAME_CPU_PERCENTAGE) double cpuPercentage,
+ @JsonProperty(FIELD_NAME_TASKMANAGER_ID) String taskManagerId) {
+ this.subtaskId = subtaskId;
+ this.taskName = taskName;
+ this.operatorName = operatorName;
+ this.cpuPercentage = cpuPercentage;
+ this.taskManagerId = taskManagerId;
+ }
+
+ public int getSubtaskId() {
+ return subtaskId;
+ }
+
+ public String getTaskName() {
+ return taskName;
+ }
+
+ public String getOperatorName() {
+ return operatorName;
+ }
+
+ public double getCpuPercentage() {
+ return cpuPercentage;
+ }
+
+ public String getTaskManagerId() {
+ return taskManagerId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CpuConsumerInfo that = (CpuConsumerInfo) o;
+ return subtaskId == that.subtaskId
+ && Double.compare(that.cpuPercentage, cpuPercentage) == 0
+ && Objects.equals(taskName, that.taskName)
+ && Objects.equals(operatorName, that.operatorName)
+ && Objects.equals(taskManagerId, that.taskManagerId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(subtaskId, taskName, operatorName, cpuPercentage, taskManagerId);
+ }
+ }
+
+ /** Information about backpressured operators. */
+ public static class BackpressureOperatorInfo {
+
+ public static final String FIELD_NAME_OPERATOR_ID = "operatorId";
+ public static final String FIELD_NAME_OPERATOR_NAME = "operatorName";
+ public static final String FIELD_NAME_BACKPRESSURE_RATIO = "backpressureRatio";
+ public static final String FIELD_NAME_SUBTASK_ID = "subtaskId";
+
+ @JsonProperty(FIELD_NAME_OPERATOR_ID)
+ private final String operatorId;
+
+ @JsonProperty(FIELD_NAME_OPERATOR_NAME)
+ private final String operatorName;
+
+ @JsonProperty(FIELD_NAME_BACKPRESSURE_RATIO)
+ private final double backpressureRatio;
+
+ @JsonProperty(FIELD_NAME_SUBTASK_ID)
+ private final int subtaskId;
+
+ @JsonCreator
+ public BackpressureOperatorInfo(
+ @JsonProperty(FIELD_NAME_OPERATOR_ID) String operatorId,
+ @JsonProperty(FIELD_NAME_OPERATOR_NAME) String operatorName,
+ @JsonProperty(FIELD_NAME_BACKPRESSURE_RATIO) double backpressureRatio,
+ @JsonProperty(FIELD_NAME_SUBTASK_ID) int subtaskId) {
+ this.operatorId = operatorId;
+ this.operatorName = operatorName;
+ this.backpressureRatio = backpressureRatio;
+ this.subtaskId = subtaskId;
+ }
+
+ public String getOperatorId() {
+ return operatorId;
+ }
+
+ public String getOperatorName() {
+ return operatorName;
+ }
+
+ public double getBackpressureRatio() {
+ return backpressureRatio;
+ }
+
+ public int getSubtaskId() {
+ return subtaskId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BackpressureOperatorInfo that = (BackpressureOperatorInfo) o;
+ return Double.compare(that.backpressureRatio, backpressureRatio) == 0
+ && subtaskId == that.subtaskId
+ && Objects.equals(operatorId, that.operatorId)
+ && Objects.equals(operatorName, that.operatorName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(operatorId, operatorName, backpressureRatio, subtaskId);
+ }
+ }
+
+ /** Information about GC-intensive tasks. */
+ public static class GcTaskInfo {
+
+ public static final String FIELD_NAME_TASK_ID = "taskId";
+ public static final String FIELD_NAME_TASK_NAME = "taskName";
+ public static final String FIELD_NAME_GC_TIME_PERCENTAGE = "gcTimePercentage";
+ public static final String FIELD_NAME_TASKMANAGER_ID = "taskManagerId";
+
+ @JsonProperty(FIELD_NAME_TASK_ID)
+ private final String taskId;
+
+ @JsonProperty(FIELD_NAME_TASK_NAME)
+ private final String taskName;
+
+ @JsonProperty(FIELD_NAME_GC_TIME_PERCENTAGE)
+ private final double gcTimePercentage;
+
+ @JsonProperty(FIELD_NAME_TASKMANAGER_ID)
+ private final String taskManagerId;
+
+ @JsonCreator
+ public GcTaskInfo(
+ @JsonProperty(FIELD_NAME_TASK_ID) String taskId,
+ @JsonProperty(FIELD_NAME_TASK_NAME) String taskName,
+ @JsonProperty(FIELD_NAME_GC_TIME_PERCENTAGE) double gcTimePercentage,
+ @JsonProperty(FIELD_NAME_TASKMANAGER_ID) String taskManagerId) {
+ this.taskId = taskId;
+ this.taskName = taskName;
+ this.gcTimePercentage = gcTimePercentage;
+ this.taskManagerId = taskManagerId;
+ }
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public String getTaskName() {
+ return taskName;
+ }
+
+ public double getGcTimePercentage() {
+ return gcTimePercentage;
+ }
+
+ public String getTaskManagerId() {
+ return taskManagerId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ GcTaskInfo that = (GcTaskInfo) o;
+ return Double.compare(that.gcTimePercentage, gcTimePercentage) == 0
+ && Objects.equals(taskId, that.taskId)
+ && Objects.equals(taskName, that.taskName)
+ && Objects.equals(taskManagerId, that.taskManagerId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(taskId, taskName, gcTimePercentage, taskManagerId);
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 211b8749425dc..43b4c38988127 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -96,6 +96,7 @@
import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexWatermarksHandler;
import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
import org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.TopNMetricsHandler;
import org.apache.flink.runtime.rest.handler.job.rescales.JobRescaleConfigHandler;
import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingHandlers;
import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers;
@@ -561,6 +562,9 @@ protected List> initiali
final JobMetricsHandler jobMetricsHandler =
new JobMetricsHandler(leaderRetriever, timeout, responseHeaders, metricFetcher);
+ final TopNMetricsHandler topNMetricsHandler =
+ new TopNMetricsHandler(leaderRetriever, timeout, responseHeaders, metricFetcher);
+
final SubtaskMetricsHandler subtaskMetricsHandler =
new SubtaskMetricsHandler(leaderRetriever, timeout, responseHeaders, metricFetcher);
@@ -860,6 +864,7 @@ protected List> initiali
jobVertexWatermarksHandler.getMessageHeaders(),
jobVertexWatermarksHandler));
handlers.add(Tuple2.of(jobMetricsHandler.getMessageHeaders(), jobMetricsHandler));
+ handlers.add(Tuple2.of(topNMetricsHandler.getMessageHeaders(), topNMetricsHandler));
handlers.add(Tuple2.of(subtaskMetricsHandler.getMessageHeaders(), subtaskMetricsHandler));
handlers.add(
Tuple2.of(