Skip to content

Commit e8de3a6

Browse files
authored
metrics: add changefeed error info info (#4499)
close #4498
1 parent d711743 commit e8de3a6

File tree

6 files changed

+432
-12
lines changed

6 files changed

+432
-12
lines changed

coordinator/controller.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ func NewController(
186186
func (c *Controller) collectMetrics(ctx context.Context) error {
187187
ticker := time.NewTicker(5 * time.Second)
188188
defer ticker.Stop()
189+
errorMetricLabels := make(map[common.ChangeFeedID]changefeedErrorMetricLabels)
189190
for {
190191
select {
191192
case <-ctx.Done():
@@ -197,6 +198,8 @@ func (c *Controller) collectMetrics(ctx context.Context) error {
197198
metrics.ChangefeedStateGauge.WithLabelValues("Absent").Set(float64(c.changefeedDB.GetAbsentSize()))
198199
metrics.ChangefeedStateGauge.WithLabelValues("Stopped").Set(float64(c.changefeedDB.GetStoppedSize()))
199200

201+
currentChangefeeds := make(map[common.ChangeFeedID]struct{})
202+
200203
c.changefeedDB.Foreach(func(cf *changefeed.Changefeed) {
201204
info := cf.GetInfo()
202205
keyspace := info.ChangefeedID.Keyspace()
@@ -213,7 +216,35 @@ func (c *Controller) collectMetrics(ctx context.Context) error {
213216
lag := float64(pdPhysicalTime-phyCkpTs) / 1e3
214217
metrics.ChangefeedCheckpointTsGauge.WithLabelValues(keyspace, name).Set(float64(phyCkpTs))
215218
metrics.ChangefeedCheckpointTsLagGauge.WithLabelValues(keyspace, name).Set(lag)
219+
220+
// sync changefeed error metrics
221+
currentChangefeeds[cf.ID] = struct{}{}
222+
oldLabels, exists := errorMetricLabels[cf.ID]
223+
newLabels, hasError := getChangefeedErrorMetricLabels(cf.GetInfo())
224+
// If the error state has not changed, do nothing.
225+
if exists && hasError && oldLabels == newLabels {
226+
return
227+
}
228+
// If there was an old metric, delete it, as the state has changed.
229+
if exists {
230+
metrics.ChangefeedErrorInfoGauge.DeleteLabelValues(oldLabels.labelValues()...)
231+
}
232+
if hasError {
233+
// An error exists (either new or changed). Set the new metric and update cache.
234+
metrics.ChangefeedErrorInfoGauge.WithLabelValues(newLabels.labelValues()...).Set(1)
235+
errorMetricLabels[cf.ID] = newLabels
236+
} else {
237+
// The error has disappeared, remove from cache.
238+
delete(errorMetricLabels, cf.ID)
239+
}
216240
})
241+
for changefeedID, labels := range errorMetricLabels {
242+
if _, ok := currentChangefeeds[changefeedID]; ok {
243+
continue
244+
}
245+
metrics.ChangefeedErrorInfoGauge.DeleteLabelValues(labels.labelValues()...)
246+
delete(errorMetricLabels, changefeedID)
247+
}
217248
}
218249
}
219250
}

coordinator/helper.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
package coordinator
1515

1616
import (
17+
"strings"
18+
19+
"github.com/pingcap/ticdc/pkg/config"
1720
"github.com/pingcap/ticdc/pkg/messaging"
1821
)
1922

@@ -35,7 +38,54 @@ const (
3538
EventPeriod
3639
)
3740

41+
const changefeedErrorMetricMsgLimit = 256
42+
3843
type Event struct {
3944
eventType int
4045
message *messaging.TargetMessage
4146
}
47+
48+
type changefeedErrorMetricLabels struct {
49+
keyspace string
50+
changefeed string
51+
state string
52+
code string
53+
message string
54+
}
55+
56+
func (l changefeedErrorMetricLabels) labelValues() []string {
57+
return []string{l.keyspace, l.changefeed, l.state, l.code, l.message}
58+
}
59+
60+
func normalizeChangefeedErrorMetricMessage(message string) string {
61+
message = strings.Join(strings.Fields(message), " ")
62+
if len(message) <= changefeedErrorMetricMsgLimit {
63+
return message
64+
}
65+
return message[:changefeedErrorMetricMsgLimit-3] + "..."
66+
}
67+
68+
func getChangefeedErrorMetricLabels(info *config.ChangeFeedInfo) (changefeedErrorMetricLabels, bool) {
69+
if info == nil {
70+
return changefeedErrorMetricLabels{}, false
71+
}
72+
if info.State != config.StateFailed && info.State != config.StateWarning {
73+
return changefeedErrorMetricLabels{}, false
74+
}
75+
76+
runningErr := info.Error
77+
if runningErr == nil {
78+
runningErr = info.Warning
79+
}
80+
if runningErr == nil {
81+
return changefeedErrorMetricLabels{}, false
82+
}
83+
84+
return changefeedErrorMetricLabels{
85+
keyspace: info.ChangefeedID.Keyspace(),
86+
changefeed: info.ChangefeedID.Name(),
87+
state: string(info.State),
88+
code: runningErr.Code,
89+
message: normalizeChangefeedErrorMetricMessage(runningErr.Message),
90+
}, true
91+
}

metrics/grafana/ticdc_new_arch.json

Lines changed: 114 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1191,6 +1191,116 @@
11911191
"align": false,
11921192
"alignLevel": null
11931193
}
1194+
},
1195+
{
1196+
"datasource": "${DS_TEST-CLUSTER}",
1197+
"description": "Current warning or failed reason of each changefeed. The metric message is normalized to a single line and truncated to 256 characters.",
1198+
"fieldConfig": {
1199+
"defaults": {
1200+
"custom": {
1201+
"align": null,
1202+
"filterable": false
1203+
},
1204+
"links": []
1205+
},
1206+
"overrides": [
1207+
{
1208+
"matcher": {
1209+
"id": "byName",
1210+
"options": "namespace"
1211+
},
1212+
"properties": [
1213+
{
1214+
"id": "custom.width",
1215+
"value": 120
1216+
}
1217+
]
1218+
},
1219+
{
1220+
"matcher": {
1221+
"id": "byName",
1222+
"options": "changefeed"
1223+
},
1224+
"properties": [
1225+
{
1226+
"id": "custom.width",
1227+
"value": 180
1228+
}
1229+
]
1230+
},
1231+
{
1232+
"matcher": {
1233+
"id": "byName",
1234+
"options": "state"
1235+
},
1236+
"properties": [
1237+
{
1238+
"id": "custom.width",
1239+
"value": 100
1240+
}
1241+
]
1242+
},
1243+
{
1244+
"matcher": {
1245+
"id": "byName",
1246+
"options": "code"
1247+
},
1248+
"properties": [
1249+
{
1250+
"id": "custom.width",
1251+
"value": 180
1252+
}
1253+
]
1254+
}
1255+
]
1256+
},
1257+
"gridPos": {
1258+
"h": 8,
1259+
"w": 24,
1260+
"x": 0,
1261+
"y": 32
1262+
},
1263+
"id": 62010,
1264+
"options": {
1265+
"showHeader": true,
1266+
"sortBy": []
1267+
},
1268+
"pluginVersion": "7.5.17",
1269+
"targets": [
1270+
{
1271+
"expr": "max by (namespace, changefeed, state, code, message) (ticdc_owner_changefeed_error_info{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", namespace=~\"$namespace\", changefeed=~\"$changefeed\"})",
1272+
"format": "time_series",
1273+
"instant": true,
1274+
"refId": "A"
1275+
}
1276+
],
1277+
"title": "Changefeed Error Details",
1278+
"transformations": [
1279+
{
1280+
"id": "labelsToFields",
1281+
"options": {}
1282+
},
1283+
{
1284+
"id": "organize",
1285+
"options": {
1286+
"excludeByName": {
1287+
"Metric": true,
1288+
"Time": true,
1289+
"Value": true,
1290+
"__name__": true
1291+
},
1292+
"indexByName": {
1293+
"changefeed": 1,
1294+
"code": 3,
1295+
"message": 4,
1296+
"namespace": 0,
1297+
"state": 2
1298+
},
1299+
"renameByName": {}
1300+
}
1301+
}
1302+
],
1303+
"type": "table"
11941304
}
11951305
],
11961306
"title": "Summary",
@@ -18388,7 +18498,7 @@
1838818498
"y": 19
1838918499
},
1839018500
"hiddenSeries": false,
18391-
"id": 60009,
18501+
"id": 62004,
1839218502
"legend": {
1839318503
"avg": false,
1839418504
"current": false,
@@ -18484,7 +18594,7 @@
1848418594
"y": 19
1848518595
},
1848618596
"hiddenSeries": false,
18487-
"id": 62004,
18597+
"id": 62011,
1848818598
"legend": {
1848918599
"avg": false,
1849018600
"current": false,
@@ -18590,7 +18700,7 @@
1859018700
"heatmap": {},
1859118701
"hideZeroBuckets": true,
1859218702
"highlightCards": true,
18593-
"id": 62005,
18703+
"id": 62012,
1859418704
"legend": {
1859518705
"show": true
1859618706
},
@@ -19173,7 +19283,7 @@
1917319283
"y": 26
1917419284
},
1917519285
"hiddenSeries": false,
19176-
"id": 22271,
19286+
"id": 62013,
1917719287
"legend": {
1917819288
"alignAsTable": false,
1917919289
"avg": false,

0 commit comments

Comments
 (0)