From c61453b95f3de5c08ec8710b152e40b575c96111 Mon Sep 17 00:00:00 2001 From: Feat Zhang Date: Tue, 24 Mar 2026 16:45:25 +0800 Subject: [PATCH] [FLINK-27773][Web Dashboard] Add Top N Metrics Dashboard feature --- .../src/test/resources/rest_api_v1.snapshot | 257 ++++++++++----- .../topn-metrics/topn-metrics.component.html | 113 +++++++ .../topn-metrics/topn-metrics.component.less | 61 ++++ .../topn-metrics/topn-metrics.component.ts | 41 +++ .../src/app/interfaces/public-api.ts | 1 + .../src/app/interfaces/topn-metrics.ts | 45 +++ .../overview-demo.component.html | 30 ++ .../overview-demo.component.less | 48 +++ .../overview-demo/overview-demo.component.ts | 139 ++++++++ .../src/app/services/public-api.ts | 1 + .../src/app/services/topn-metrics.service.ts | 39 +++ .../job/metrics/TopNMetricsHandler.java | 276 ++++++++++++++++ .../job/metrics/TopNMetricsHeaders.java | 76 +++++ .../metrics/TopNMetricsMessageParameters.java | 24 ++ .../job/metrics/TopNMetricsResponseBody.java | 306 ++++++++++++++++++ .../webmonitor/WebMonitorEndpoint.java | 5 + 16 files changed, 1378 insertions(+), 84 deletions(-) create mode 100644 flink-runtime-web/web-dashboard/src/app/components/topn-metrics/topn-metrics.component.html create mode 100644 flink-runtime-web/web-dashboard/src/app/components/topn-metrics/topn-metrics.component.less create mode 100644 flink-runtime-web/web-dashboard/src/app/components/topn-metrics/topn-metrics.component.ts create mode 100644 flink-runtime-web/web-dashboard/src/app/interfaces/topn-metrics.ts create mode 100644 flink-runtime-web/web-dashboard/src/app/pages/overview-demo/overview-demo.component.html create mode 100644 flink-runtime-web/web-dashboard/src/app/pages/overview-demo/overview-demo.component.less create mode 100644 flink-runtime-web/web-dashboard/src/app/pages/overview-demo/overview-demo.component.ts create mode 100644 flink-runtime-web/web-dashboard/src/app/services/topn-metrics.service.ts create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TopNMetricsHandler.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsHeaders.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsMessageParameters.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsResponseBody.java diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index 38cafeea1b9a2..39a254cdc28a9 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -169,38 +169,6 @@ "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyResponseBody" } - }, { - "url" : "/applications/:applicationid/jobmanager/config", - "method" : "GET", - "status-code" : "200 OK", - "file-upload" : false, - "path-parameters" : { - "pathParameters" : [ { - "key" : "applicationid" - } ] - }, - "query-parameters" : { - "queryParameters" : [ ] - }, - "request" : { - "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" - }, - "response" : { - "type" : "array", - "items" : { - "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry", - "properties" : { - "key" : { - "type" : "string" - }, - "value" : { - "type" : "string" - } - } - } - } }, { "url" : "/applications/:applicationid/exceptions", "method" : "GET", @@ -251,6 +219,38 @@ } } } + }, { + "url" : "/applications/:applicationid/jobmanager/config", + "method" : "GET", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "applicationid" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" + }, + "response" : { + "type" : "array", + "items" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry", + "properties" : { + "key" : { + "type" : "string" + }, + "value" : { + "type" : "string" + } + } + } + } }, { "url" : "/cluster", "method" : "DELETE", @@ -2741,6 +2741,95 @@ "response" : { "type" : "any" } + }, { + "url" : "/jobs/:jobid/metrics/top-n", + "method" : "GET", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "jobid" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:TopNMetricsResponseBody", + "properties" : { + "topCpuConsumers" : { + "type" : "array", + "items" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:TopNMetricsResponseBody:CpuConsumerInfo", + "properties" : { + "subtaskId" : { + "type" : "integer" + }, + "taskName" : { + "type" : "string" + }, + "operatorName" : { + "type" : "string" + }, + "cpuPercentage" : { + "type" : "number" + }, + "taskManagerId" : { + "type" : "string" + } + } + } + }, + "topBackpressureOperators" : { + "type" : "array", + "items" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:TopNMetricsResponseBody:BackpressureOperatorInfo", + "properties" : { + "operatorId" : { + "type" : "string" + }, + "operatorName" : { + "type" : "string" + }, + "backpressureRatio" : { + "type" : "number" + }, + "subtaskId" : { + "type" : "integer" + } + } + } + }, + "topGcIntensiveTasks" : { + "type" : "array", + "items" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:TopNMetricsResponseBody:GcTaskInfo", + "properties" : { + "taskId" : { + "type" : "string" + }, + "taskName" : { + "type" : "string" + }, + "gcTimePercentage" : { + "type" : "number" + }, + "taskManagerId" : { + "type" : "string" + } + } + } + } + } + } }, { "url" : "/jobs/:jobid/plan", "method" : "GET", @@ -2834,6 +2923,57 @@ } } } + }, { + "url" : "/jobs/:jobid/rescales/config", + "method" : "GET", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "jobid" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleConfigInfo", + "properties" : { + "rescaleHistoryMax" : { + "type" : "integer" + }, + "schedulerExecutionMode" : { + "type" : "string", + "enum" : [ "REACTIVE" ] + }, + "submissionResourceWaitTimeoutInMillis" : { + "type" : "integer" + }, + "submissionResourceStabilizationTimeoutInMillis" : { + "type" : "integer" + }, + "slotIdleTimeoutInMillis" : { + "type" : "integer" + }, + "executingCooldownTimeoutInMillis" : { + "type" : "integer" + }, + "executingResourceStabilizationTimeoutInMillis" : { + "type" : "integer" + }, + "maximumDelayForTriggeringRescaleInMillis" : { + "type" : "integer" + }, + "rescaleOnFailedCheckpointCount" : { + "type" : "integer" + } + } + } }, { "url" : "/jobs/:jobid/rescaling", "method" : "PATCH", @@ -4781,56 +4921,5 @@ } } } - }, { - "url" : "/jobs/:jobid/rescales/config", - "method" : "GET", - "status-code" : "200 OK", - "file-upload" : false, - "path-parameters" : { - "pathParameters" : [ { - "key" : "jobid" - } ] - }, - "query-parameters" : { - "queryParameters" : [ ] - }, - "request" : { - "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" - }, - "response" : { - "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleConfigInfo", - "properties" : { - "executingCooldownTimeoutInMillis" : { - "type" : "integer" - }, - "executingResourceStabilizationTimeoutInMillis" : { - "type" : "integer" - }, - "maximumDelayForTriggeringRescaleInMillis" : { - "type" : "integer" - }, - "rescaleHistoryMax" : { - "type" : "integer" - }, - "rescaleOnFailedCheckpointCount" : { - "type" : "integer" - }, - "schedulerExecutionMode" : { - "type" : "string", - "enum" : [ "REACTIVE" ] - }, - "slotIdleTimeoutInMillis" : { - "type" : "integer" - }, - "submissionResourceStabilizationTimeoutInMillis" : { - "type" : "integer" - }, - "submissionResourceWaitTimeoutInMillis" : { - "type" : "integer" - } - } - } } ] -} +} \ No newline at end of file diff --git a/flink-runtime-web/web-dashboard/src/app/components/topn-metrics/topn-metrics.component.html b/flink-runtime-web/web-dashboard/src/app/components/topn-metrics/topn-metrics.component.html new file mode 100644 index 0000000000000..3a8869f7d152b --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/components/topn-metrics/topn-metrics.component.html @@ -0,0 +1,113 @@ + + +
+
+

Top N CPU Consumers

+ + + + Rank + Subtask ID + Operator Name + Task Name + CPU Usage + TaskManager + + + + + {{ i + 1 }} + {{ item.subtaskId }} + {{ item.operatorName }} + {{ item.taskName }} + + {{ formatPercentage(item.cpuPercentage) }} + + {{ item.taskManagerId }} + + + +
+

No CPU metrics available

+
+
+ +
+

Top N Backpressure Operators

+ + + + Rank + Operator ID + Operator Name + Subtask ID + Backpressure Ratio + + + + + {{ i + 1 }} + {{ item.operatorId }} + {{ item.operatorName }} + {{ item.subtaskId }} + + {{ formatPercentage(item.backpressureRatio * 100) }} + + + + +
+

No backpressure metrics available

+
+
+ +
+

Top N GC Intensive Tasks

+ + + + Rank + Task ID + Task Name + GC Time Percentage + TaskManager + + + + + {{ i + 1 }} + {{ item.taskId }} + {{ item.taskName }} + + {{ formatPercentage(item.gcTimePercentage) }} + + {{ item.taskManagerId }} + + + +
+

No GC metrics available

+
+
+
diff --git a/flink-runtime-web/web-dashboard/src/app/components/topn-metrics/topn-metrics.component.less b/flink-runtime-web/web-dashboard/src/app/components/topn-metrics/topn-metrics.component.less new file mode 100644 index 0000000000000..517e63f15aec3 --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/components/topn-metrics/topn-metrics.component.less @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +.topn-metrics-container { + padding: 16px; + background: #fff; + border-radius: 4px; + margin-bottom: 16px; +} + +.topn-metrics-section { + margin-bottom: 24px; + + &:last-child { + margin-bottom: 0; + } + + h3 { + font-size: 16px; + font-weight: 600; + margin-bottom: 12px; + color: #262626; + } + + .no-data { + padding: 20px; + text-align: center; + color: #8c8c8c; + font-style: italic; + } +} + +.high-cpu { + color: #cf1322; + font-weight: 600; +} + +.high-backpressure { + color: #d4380d; + font-weight: 600; +} + +.high-gc { + color: #d46b08; + font-weight: 600; +} diff --git a/flink-runtime-web/web-dashboard/src/app/components/topn-metrics/topn-metrics.component.ts b/flink-runtime-web/web-dashboard/src/app/components/topn-metrics/topn-metrics.component.ts new file mode 100644 index 0000000000000..ecf9accb8a9f9 --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/components/topn-metrics/topn-metrics.component.ts @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { ChangeDetectionStrategy, Component, Input, OnChanges, SimpleChanges } from '@angular/core'; + +import { CpuConsumerInfo, BackpressureOperatorInfo, GcTaskInfo } from '@flink-runtime-web/interfaces'; + +@Component({ + selector: 'flink-topn-metrics', + templateUrl: './topn-metrics.component.html', + styleUrls: ['./topn-metrics.component.less'], + changeDetection: ChangeDetectionStrategy.OnPush +}) +export class TopNMetricsComponent implements OnChanges { + @Input() topCpuConsumers: CpuConsumerInfo[] = []; + @Input() topBackpressureOperators: BackpressureOperatorInfo[] = []; + @Input() topGcIntensiveTasks: GcTaskInfo[] = []; + + ngOnChanges(changes: SimpleChanges): void { + console.log('TopN metrics updated:', changes); + } + + formatPercentage(value: number): string { + return `${value.toFixed(2)}%`; + } +} diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts index 38d072557e173..693c07b900e4d 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts @@ -36,3 +36,4 @@ export * from './job-metrics'; export * from './application-overview'; export * from './application-detail'; export * from './application-exception'; +export * from './topn-metrics'; diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/topn-metrics.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/topn-metrics.ts new file mode 100644 index 0000000000000..1de9fd13a7d33 --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/topn-metrics.ts @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export interface TopNMetrics { + topCpuConsumers: CpuConsumerInfo[]; + topBackpressureOperators: BackpressureOperatorInfo[]; + topGcIntensiveTasks: GcTaskInfo[]; +} + +export interface CpuConsumerInfo { + subtaskId: number; + taskName: string; + operatorName: string; + cpuPercentage: number; + taskManagerId: string; +} + +export interface BackpressureOperatorInfo { + operatorId: string; + operatorName: string; + backpressureRatio: number; + subtaskId: number; +} + +export interface GcTaskInfo { + taskId: string; + taskName: string; + gcTimePercentage: number; + taskManagerId: string; +} diff --git a/flink-runtime-web/web-dashboard/src/app/pages/overview-demo/overview-demo.component.html b/flink-runtime-web/web-dashboard/src/app/pages/overview-demo/overview-demo.component.html new file mode 100644 index 0000000000000..f9e80eeaad09e --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/pages/overview-demo/overview-demo.component.html @@ -0,0 +1,30 @@ +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license +agreements. See the NOTICE file * distributed with this work for additional information * regarding +copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance * with the License. You may obtain a +copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by +applicable law or agreed to in writing, software * distributed under the License is distributed on +an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See +the License for the specific language governing permissions and * limitations under the License. */ + +
+

Cluster Overview

+ +
+

Top N Metrics Dashboard Demo

+

This is a demonstration of the Top N Metrics Dashboard feature.

+ + + + +
+
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/overview-demo/overview-demo.component.less b/flink-runtime-web/web-dashboard/src/app/pages/overview-demo/overview-demo.component.less new file mode 100644 index 0000000000000..ae3beb6758571 --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/pages/overview-demo/overview-demo.component.less @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +.overview-container { + padding: 24px; + background: #f0f2f5; + min-height: 100vh; + + h2 { + font-size: 24px; + font-weight: 600; + margin-bottom: 24px; + color: #262626; + } + + .demo-section { + h3 { + font-size: 18px; + font-weight: 500; + margin-bottom: 16px; + color: #262626; + } + + p { + color: #595959; + margin-bottom: 16px; + } + + nz-alert { + margin-top: 16px; + } + } +} diff --git a/flink-runtime-web/web-dashboard/src/app/pages/overview-demo/overview-demo.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/overview-demo/overview-demo.component.ts new file mode 100644 index 0000000000000..17b9246ae3ab1 --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/pages/overview-demo/overview-demo.component.ts @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Component, OnInit } from '@angular/core'; + +import { CpuConsumerInfo, BackpressureOperatorInfo, GcTaskInfo } from '@flink-runtime-web/interfaces'; + +@Component({ + selector: 'flink-overview-demo', + templateUrl: './overview-demo.component.html', + styleUrls: ['./overview-demo.component.less'] +}) +export class OverviewDemoComponent implements OnInit { + demoCpuConsumers: CpuConsumerInfo[] = []; + demoBackpressureOperators: BackpressureOperatorInfo[] = []; + demoGcIntensiveTasks: GcTaskInfo[] = []; + + ngOnInit(): void { + // Initialize demo data + this.demoCpuConsumers = [ + { + subtaskId: 0, + taskName: 'Source: Kafka', + operatorName: 'Kafka Source', + cpuPercentage: 95.5, + taskManagerId: 'container_123' + }, + { + subtaskId: 1, + taskName: 'Map: Process', + operatorName: 'ProcessFunction', + cpuPercentage: 88.2, + taskManagerId: 'container_124' + }, + { + subtaskId: 2, + taskName: 'Sink: HDFS', + operatorName: 'HDFS Sink', + cpuPercentage: 82.7, + taskManagerId: 'container_125' + }, + { + subtaskId: 3, + taskName: 'Window', + operatorName: 'WindowFunction', + cpuPercentage: 76.3, + taskManagerId: 'container_123' + }, + { + subtaskId: 4, + taskName: 'Aggregate', + operatorName: 'AggregateFunction', + cpuPercentage: 71.9, + taskManagerId: 'container_126' + } + ]; + + this.demoBackpressureOperators = [ + { + operatorId: 'op_456', + operatorName: 'Map', + backpressureRatio: 0.85, + subtaskId: 1 + }, + { + operatorId: 'op_789', + operatorName: 'Window', + backpressureRatio: 0.72, + subtaskId: 3 + }, + { + operatorId: 'op_234', + operatorName: 'Filter', + backpressureRatio: 0.58, + subtaskId: 2 + }, + { + operatorId: 'op_567', + operatorName: 'Join', + backpressureRatio: 0.45, + subtaskId: 4 + }, + { + operatorId: 'op_890', + operatorName: 'Aggregate', + backpressureRatio: 0.32, + subtaskId: 5 + } + ]; + + this.demoGcIntensiveTasks = [ + { + taskId: 'task_789', + taskName: 'ProcessFunction', + gcTimePercentage: 45.2, + taskManagerId: 'container_123' + }, + { + taskId: 'task_456', + taskName: 'WindowFunction', + gcTimePercentage: 38.7, + taskManagerId: 'container_124' + }, + { + taskId: 'task_234', + taskName: 'JoinFunction', + gcTimePercentage: 32.1, + taskManagerId: 'container_125' + }, + { + taskId: 'task_567', + taskName: 'AggregateFunction', + gcTimePercentage: 28.5, + taskManagerId: 'container_126' + }, + { + taskId: 'task_890', + taskName: 'FilterFunction', + gcTimePercentage: 24.3, + taskManagerId: 'container_127' + } + ]; + } +} diff --git a/flink-runtime-web/web-dashboard/src/app/services/public-api.ts b/flink-runtime-web/web-dashboard/src/app/services/public-api.ts index df66d9dcaba56..820809bb4634d 100644 --- a/flink-runtime-web/web-dashboard/src/app/services/public-api.ts +++ b/flink-runtime-web/web-dashboard/src/app/services/public-api.ts @@ -25,3 +25,4 @@ export * from './task-manager.service'; export * from './metrics.service'; export * from './config.service'; export * from './application.service'; +export * from './topn-metrics.service'; diff --git a/flink-runtime-web/web-dashboard/src/app/services/topn-metrics.service.ts b/flink-runtime-web/web-dashboard/src/app/services/topn-metrics.service.ts new file mode 100644 index 0000000000000..d7d80d623bd0b --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/services/topn-metrics.service.ts @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { HttpClient } from '@angular/common/http'; +import { Injectable } from '@angular/core'; +import { EMPTY, Observable } from 'rxjs'; +import { catchError } from 'rxjs/operators'; + +import { TopNMetrics } from '@flink-runtime-web/interfaces'; + +import { ConfigService } from './config.service'; + +@Injectable({ + providedIn: 'root' +}) +export class TopNMetricsService { + constructor(private readonly httpClient: HttpClient, private readonly configService: ConfigService) {} + + public loadTopNMetrics(jobId: string): Observable { + return this.httpClient + .get(`${this.configService.BASE_URL}/jobs/${jobId}/metrics/top-n`) + .pipe(catchError(() => EMPTY)); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TopNMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TopNMetricsHandler.java new file mode 100644 index 0000000000000..00dab66f5ad9e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TopNMetricsHandler.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.TopNMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.TopNMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.TopNMetricsResponseBody; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +/** + * Request handler that returns Top N metrics for a job, including CPU consumers, backpressured + * operators, and GC-intensive tasks. + */ +@Internal +public class TopNMetricsHandler + extends AbstractRestHandler< + RestfulGateway, + EmptyRequestBody, + TopNMetricsResponseBody, + TopNMetricsMessageParameters> { + + private static final Logger LOG = LoggerFactory.getLogger(TopNMetricsHandler.class); + + private static final int DEFAULT_TOP_N = 5; + + private static final String CPU_METRIC = "taskmanager.cpu.time"; + private static final String BACKPRESSURE_METRIC = "tasks.backpressure.ratio"; + private static final String GC_TIME_METRIC = "tasks.GC.time"; + + private final MetricFetcher metricFetcher; + + public TopNMetricsHandler( + GatewayRetriever leaderRetriever, + Duration timeout, + Map headers, + MetricFetcher metricFetcher) { + super(leaderRetriever, timeout, headers, TopNMetricsHeaders.getInstance()); + this.metricFetcher = requireNonNull(metricFetcher, "metricFetcher must not be null"); + } + + @Override + protected CompletableFuture handleRequest( + @Nonnull HandlerRequest request, @Nonnull RestfulGateway gateway) + throws RestHandlerException { + metricFetcher.update(); + + final MetricStore metricStore = metricFetcher.getMetricStore(); + final String jobId = request.getPathParameter(JobIDPathParameter.class).toString(); + + try { + // Collect Top N metrics + List topCpuConsumers = + getTopCpuConsumers(metricStore, jobId); + + List topBackpressureOperators = + getTopBackpressureOperators(metricStore, jobId); + + List topGcIntensiveTasks = + getTopGcIntensiveTasks(metricStore, jobId); + + return CompletableFuture.completedFuture( + new TopNMetricsResponseBody( + topCpuConsumers, topBackpressureOperators, topGcIntensiveTasks)); + + } catch (Exception e) { + LOG.info("Could not retrieve Top N metrics for job {}.", jobId, e); + throw new RestHandlerException( + "Could not retrieve Top N metrics for job " + jobId, + HttpResponseStatus.INTERNAL_SERVER_ERROR); + } + } + + private List getTopCpuConsumers( + MetricStore metricStore, String jobId) { + List cpuConsumers = new ArrayList<>(); + + // Get all task IDs for this job from representativeAttempts + Map>> representativeAttempts = + metricStore.getRepresentativeAttempts(); + Map> jobTasks = representativeAttempts.get(jobId); + + if (jobTasks == null) { + return cpuConsumers; + } + + for (String taskId : jobTasks.keySet()) { + MetricStore.TaskMetricStore taskMetricStore = + metricStore.getTaskMetricStore(jobId, taskId); + if (taskMetricStore == null) { + continue; + } + + // Get all subtasks + for (Map.Entry subtaskEntry : + taskMetricStore.getAllSubtaskMetricStores().entrySet()) { + int subtaskIndex = subtaskEntry.getKey(); + MetricStore.SubtaskMetricStore subtaskMetricStore = subtaskEntry.getValue(); + + String cpuValue = subtaskMetricStore.getMetric(CPU_METRIC); + if (cpuValue != null) { + try { + double cpuTime = Double.parseDouble(cpuValue); + cpuConsumers.add( + new TopNMetricsResponseBody.CpuConsumerInfo( + subtaskIndex, + taskId, + taskId, // Using taskId as operatorName for simplicity + cpuTime, + "unknown")); // TaskManager ID not directly available + } catch (NumberFormatException e) { + // Skip invalid values + } + } + } + } + + // Sort by CPU usage and take top N + return cpuConsumers.stream() + .sorted( + Comparator.comparing( + TopNMetricsResponseBody.CpuConsumerInfo::getCpuPercentage) + .reversed()) + .limit(DEFAULT_TOP_N) + .collect(Collectors.toList()); + } + + private List getTopBackpressureOperators( + MetricStore metricStore, String jobId) { + List backpressureOperators = + new ArrayList<>(); + + // Get all task IDs for this job from representativeAttempts + Map>> representativeAttempts = + metricStore.getRepresentativeAttempts(); + Map> jobTasks = representativeAttempts.get(jobId); + + if (jobTasks == null) { + return backpressureOperators; + } + + for (String operatorName : jobTasks.keySet()) { + MetricStore.TaskMetricStore taskMetricStore = + metricStore.getTaskMetricStore(jobId, operatorName); + if (taskMetricStore == null) { + continue; + } + + // Get all subtasks + for (Map.Entry subtaskEntry : + taskMetricStore.getAllSubtaskMetricStores().entrySet()) { + int subtaskIndex = subtaskEntry.getKey(); + MetricStore.SubtaskMetricStore subtaskMetricStore = subtaskEntry.getValue(); + + String backpressureValue = subtaskMetricStore.getMetric(BACKPRESSURE_METRIC); + if (backpressureValue != null) { + try { + double backpressureRatio = Double.parseDouble(backpressureValue); + backpressureOperators.add( + new TopNMetricsResponseBody.BackpressureOperatorInfo( + operatorName, + operatorName, + backpressureRatio, + subtaskIndex)); + } catch (NumberFormatException e) { + // Skip invalid values + } + } + } + } + + // Sort by backpressure ratio and take top N + return backpressureOperators.stream() + .sorted( + Comparator.comparing( + TopNMetricsResponseBody.BackpressureOperatorInfo + ::getBackpressureRatio) + .reversed()) + .limit(DEFAULT_TOP_N) + .collect(Collectors.toList()); + } + + private List getTopGcIntensiveTasks( + MetricStore metricStore, String jobId) { + List gcIntensiveTasks = new ArrayList<>(); + + // Get all task IDs for this job from representativeAttempts + Map>> representativeAttempts = + metricStore.getRepresentativeAttempts(); + Map> jobTasks = representativeAttempts.get(jobId); + + if (jobTasks == null) { + return gcIntensiveTasks; + } + + for (String taskId : jobTasks.keySet()) { + MetricStore.TaskMetricStore taskMetricStore = + metricStore.getTaskMetricStore(jobId, taskId); + if (taskMetricStore == null) { + continue; + } + + // Get all subtasks + for (Map.Entry subtaskEntry : + taskMetricStore.getAllSubtaskMetricStores().entrySet()) { + int subtaskIndex = subtaskEntry.getKey(); + MetricStore.SubtaskMetricStore subtaskMetricStore = subtaskEntry.getValue(); + + String gcTimeValue = subtaskMetricStore.getMetric(GC_TIME_METRIC); + if (gcTimeValue != null) { + try { + double gcTime = Double.parseDouble(gcTimeValue); + gcIntensiveTasks.add( + new TopNMetricsResponseBody.GcTaskInfo( + String.valueOf(subtaskIndex), + taskId, + gcTime, + "unknown")); // TaskManager ID not directly available + } catch (NumberFormatException e) { + // Skip invalid values + } + } + } + } + + // Sort by GC time and take top N + return gcIntensiveTasks.stream() + .sorted( + Comparator.comparing( + TopNMetricsResponseBody.GcTaskInfo::getGcTimePercentage) + .reversed()) + .limit(DEFAULT_TOP_N) + .collect(Collectors.toList()); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsHeaders.java new file mode 100644 index 0000000000000..0eadba3d96058 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsHeaders.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** {@link RuntimeMessageHeaders} for {@link TopNMetricsHandler}. */ +public final class TopNMetricsHeaders + implements RuntimeMessageHeaders< + EmptyRequestBody, TopNMetricsResponseBody, TopNMetricsMessageParameters> { + + private static final TopNMetricsHeaders INSTANCE = new TopNMetricsHeaders(); + + private TopNMetricsHeaders() {} + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public Class getResponseClass() { + return TopNMetricsResponseBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public TopNMetricsMessageParameters getUnresolvedMessageParameters() { + return new TopNMetricsMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return "/jobs/:" + JobIDPathParameter.KEY + "/metrics/top-n"; + } + + public static TopNMetricsHeaders getInstance() { + return INSTANCE; + } + + @Override + public String getDescription() { + return "Returns Top N metrics for a job including CPU consumers, " + + "backpressured operators, and GC-intensive tasks."; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsMessageParameters.java new file mode 100644 index 0000000000000..55812df7c7028 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsMessageParameters.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.rest.messages.JobMessageParameters; + +/** MessageParameters for Top N metrics. */ +public class TopNMetricsMessageParameters extends JobMessageParameters {} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsResponseBody.java new file mode 100644 index 0000000000000..e10dc368a5e29 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsResponseBody.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** Response body for Top N metrics aggregation. */ +public class TopNMetricsResponseBody implements ResponseBody { + + private static final long serialVersionUID = 1L; + + public static final String FIELD_NAME_TOP_CPU_CONSUMERS = "topCpuConsumers"; + public static final String FIELD_NAME_TOP_BACKPRESSURE_OPERATORS = "topBackpressureOperators"; + public static final String FIELD_NAME_TOP_GC_INTENSIVE_TASKS = "topGcIntensiveTasks"; + + @JsonProperty(FIELD_NAME_TOP_CPU_CONSUMERS) + private final List topCpuConsumers; + + @JsonProperty(FIELD_NAME_TOP_BACKPRESSURE_OPERATORS) + private final List topBackpressureOperators; + + @JsonProperty(FIELD_NAME_TOP_GC_INTENSIVE_TASKS) + private final List topGcIntensiveTasks; + + @JsonCreator + public TopNMetricsResponseBody( + @JsonProperty(FIELD_NAME_TOP_CPU_CONSUMERS) List topCpuConsumers, + @JsonProperty(FIELD_NAME_TOP_BACKPRESSURE_OPERATORS) + List topBackpressureOperators, + @JsonProperty(FIELD_NAME_TOP_GC_INTENSIVE_TASKS) List topGcIntensiveTasks) { + this.topCpuConsumers = topCpuConsumers; + this.topBackpressureOperators = topBackpressureOperators; + this.topGcIntensiveTasks = topGcIntensiveTasks; + } + + public List getTopCpuConsumers() { + return topCpuConsumers; + } + + public List getTopBackpressureOperators() { + return topBackpressureOperators; + } + + public List getTopGcIntensiveTasks() { + return topGcIntensiveTasks; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TopNMetricsResponseBody that = (TopNMetricsResponseBody) o; + return Objects.equals(topCpuConsumers, that.topCpuConsumers) + && Objects.equals(topBackpressureOperators, that.topBackpressureOperators) + && Objects.equals(topGcIntensiveTasks, that.topGcIntensiveTasks); + } + + @Override + public int hashCode() { + return Objects.hash(topCpuConsumers, topBackpressureOperators, topGcIntensiveTasks); + } + + /** Information about CPU-intensive subtasks. */ + public static class CpuConsumerInfo { + + public static final String FIELD_NAME_SUBTASK_ID = "subtaskId"; + public static final String FIELD_NAME_TASK_NAME = "taskName"; + public static final String FIELD_NAME_OPERATOR_NAME = "operatorName"; + public static final String FIELD_NAME_CPU_PERCENTAGE = "cpuPercentage"; + public static final String FIELD_NAME_TASKMANAGER_ID = "taskManagerId"; + + @JsonProperty(FIELD_NAME_SUBTASK_ID) + private final int subtaskId; + + @JsonProperty(FIELD_NAME_TASK_NAME) + private final String taskName; + + @JsonProperty(FIELD_NAME_OPERATOR_NAME) + private final String operatorName; + + @JsonProperty(FIELD_NAME_CPU_PERCENTAGE) + private final double cpuPercentage; + + @JsonProperty(FIELD_NAME_TASKMANAGER_ID) + private final String taskManagerId; + + @JsonCreator + public CpuConsumerInfo( + @JsonProperty(FIELD_NAME_SUBTASK_ID) int subtaskId, + @JsonProperty(FIELD_NAME_TASK_NAME) String taskName, + @JsonProperty(FIELD_NAME_OPERATOR_NAME) String operatorName, + @JsonProperty(FIELD_NAME_CPU_PERCENTAGE) double cpuPercentage, + @JsonProperty(FIELD_NAME_TASKMANAGER_ID) String taskManagerId) { + this.subtaskId = subtaskId; + this.taskName = taskName; + this.operatorName = operatorName; + this.cpuPercentage = cpuPercentage; + this.taskManagerId = taskManagerId; + } + + public int getSubtaskId() { + return subtaskId; + } + + public String getTaskName() { + return taskName; + } + + public String getOperatorName() { + return operatorName; + } + + public double getCpuPercentage() { + return cpuPercentage; + } + + public String getTaskManagerId() { + return taskManagerId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CpuConsumerInfo that = (CpuConsumerInfo) o; + return subtaskId == that.subtaskId + && Double.compare(that.cpuPercentage, cpuPercentage) == 0 + && Objects.equals(taskName, that.taskName) + && Objects.equals(operatorName, that.operatorName) + && Objects.equals(taskManagerId, that.taskManagerId); + } + + @Override + public int hashCode() { + return Objects.hash(subtaskId, taskName, operatorName, cpuPercentage, taskManagerId); + } + } + + /** Information about backpressured operators. */ + public static class BackpressureOperatorInfo { + + public static final String FIELD_NAME_OPERATOR_ID = "operatorId"; + public static final String FIELD_NAME_OPERATOR_NAME = "operatorName"; + public static final String FIELD_NAME_BACKPRESSURE_RATIO = "backpressureRatio"; + public static final String FIELD_NAME_SUBTASK_ID = "subtaskId"; + + @JsonProperty(FIELD_NAME_OPERATOR_ID) + private final String operatorId; + + @JsonProperty(FIELD_NAME_OPERATOR_NAME) + private final String operatorName; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_RATIO) + private final double backpressureRatio; + + @JsonProperty(FIELD_NAME_SUBTASK_ID) + private final int subtaskId; + + @JsonCreator + public BackpressureOperatorInfo( + @JsonProperty(FIELD_NAME_OPERATOR_ID) String operatorId, + @JsonProperty(FIELD_NAME_OPERATOR_NAME) String operatorName, + @JsonProperty(FIELD_NAME_BACKPRESSURE_RATIO) double backpressureRatio, + @JsonProperty(FIELD_NAME_SUBTASK_ID) int subtaskId) { + this.operatorId = operatorId; + this.operatorName = operatorName; + this.backpressureRatio = backpressureRatio; + this.subtaskId = subtaskId; + } + + public String getOperatorId() { + return operatorId; + } + + public String getOperatorName() { + return operatorName; + } + + public double getBackpressureRatio() { + return backpressureRatio; + } + + public int getSubtaskId() { + return subtaskId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BackpressureOperatorInfo that = (BackpressureOperatorInfo) o; + return Double.compare(that.backpressureRatio, backpressureRatio) == 0 + && subtaskId == that.subtaskId + && Objects.equals(operatorId, that.operatorId) + && Objects.equals(operatorName, that.operatorName); + } + + @Override + public int hashCode() { + return Objects.hash(operatorId, operatorName, backpressureRatio, subtaskId); + } + } + + /** Information about GC-intensive tasks. */ + public static class GcTaskInfo { + + public static final String FIELD_NAME_TASK_ID = "taskId"; + public static final String FIELD_NAME_TASK_NAME = "taskName"; + public static final String FIELD_NAME_GC_TIME_PERCENTAGE = "gcTimePercentage"; + public static final String FIELD_NAME_TASKMANAGER_ID = "taskManagerId"; + + @JsonProperty(FIELD_NAME_TASK_ID) + private final String taskId; + + @JsonProperty(FIELD_NAME_TASK_NAME) + private final String taskName; + + @JsonProperty(FIELD_NAME_GC_TIME_PERCENTAGE) + private final double gcTimePercentage; + + @JsonProperty(FIELD_NAME_TASKMANAGER_ID) + private final String taskManagerId; + + @JsonCreator + public GcTaskInfo( + @JsonProperty(FIELD_NAME_TASK_ID) String taskId, + @JsonProperty(FIELD_NAME_TASK_NAME) String taskName, + @JsonProperty(FIELD_NAME_GC_TIME_PERCENTAGE) double gcTimePercentage, + @JsonProperty(FIELD_NAME_TASKMANAGER_ID) String taskManagerId) { + this.taskId = taskId; + this.taskName = taskName; + this.gcTimePercentage = gcTimePercentage; + this.taskManagerId = taskManagerId; + } + + public String getTaskId() { + return taskId; + } + + public String getTaskName() { + return taskName; + } + + public double getGcTimePercentage() { + return gcTimePercentage; + } + + public String getTaskManagerId() { + return taskManagerId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GcTaskInfo that = (GcTaskInfo) o; + return Double.compare(that.gcTimePercentage, gcTimePercentage) == 0 + && Objects.equals(taskId, that.taskId) + && Objects.equals(taskName, that.taskName) + && Objects.equals(taskManagerId, that.taskManagerId); + } + + @Override + public int hashCode() { + return Objects.hash(taskId, taskName, gcTimePercentage, taskManagerId); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 211b8749425dc..43b4c38988127 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -96,6 +96,7 @@ import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexWatermarksHandler; import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler; import org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler; +import org.apache.flink.runtime.rest.handler.job.metrics.TopNMetricsHandler; import org.apache.flink.runtime.rest.handler.job.rescales.JobRescaleConfigHandler; import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingHandlers; import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers; @@ -561,6 +562,9 @@ protected List> initiali final JobMetricsHandler jobMetricsHandler = new JobMetricsHandler(leaderRetriever, timeout, responseHeaders, metricFetcher); + final TopNMetricsHandler topNMetricsHandler = + new TopNMetricsHandler(leaderRetriever, timeout, responseHeaders, metricFetcher); + final SubtaskMetricsHandler subtaskMetricsHandler = new SubtaskMetricsHandler(leaderRetriever, timeout, responseHeaders, metricFetcher); @@ -860,6 +864,7 @@ protected List> initiali jobVertexWatermarksHandler.getMessageHeaders(), jobVertexWatermarksHandler)); handlers.add(Tuple2.of(jobMetricsHandler.getMessageHeaders(), jobMetricsHandler)); + handlers.add(Tuple2.of(topNMetricsHandler.getMessageHeaders(), topNMetricsHandler)); handlers.add(Tuple2.of(subtaskMetricsHandler.getMessageHeaders(), subtaskMetricsHandler)); handlers.add( Tuple2.of(