@@ -2,12 +2,16 @@ package topology
22
33import (
44 "context"
5+ "encoding/json"
6+ "strconv"
7+ "strings"
58 "sync"
69 "sync/atomic"
710 "time"
811
912 "github.com/pingcap/ng-monitoring/component/domain"
1013 "github.com/pingcap/ng-monitoring/utils"
14+ clientv3 "go.etcd.io/etcd/client/v3"
1115
1216 "github.com/pingcap/log"
1317 "github.com/pingcap/tidb-dashboard/util/topo"
@@ -20,6 +24,7 @@ const (
2024 ComponentTiKV = "tikv"
2125 ComponentTiFlash = "tiflash"
2226 ComponentPD = "pd"
27+ ComponentTiCDC = "ticdc"
2328)
2429
2530var (
@@ -126,6 +131,7 @@ func (d *TopologyDiscoverer) fetchAllScrapeTargets(ctx context.Context) ([]Compo
126131 d .getTiDBComponents ,
127132 d .getPDComponents ,
128133 d .getStoreComponents ,
134+ d .getTiCDCComponents ,
129135 }
130136 components := make ([]Component , 0 , 8 )
131137 for _ , fn := range fns {
@@ -220,3 +226,58 @@ func (d *TopologyDiscoverer) getStoreComponents(ctx context.Context) ([]Componen
220226 }
221227 return components , nil
222228}
229+
230+ func (d * TopologyDiscoverer ) getTiCDCComponents (ctx context.Context ) ([]Component , error ) {
231+ etcdCli , err := d .do .GetEtcdClient ()
232+ if err != nil {
233+ return nil , err
234+ }
235+ return getTiCDCComponents (ctx , etcdCli )
236+ }
237+
238+ const ticdcTopologyKeyPrefix = "/tidb/cdc/default/__cdc_meta__/capture/"
239+
240+ type ticdcNodeItem struct {
241+ ID string `json:"id"`
242+ Address string `json:"address"`
243+ Version string `json:"version"`
244+ }
245+
246+ func getTiCDCComponents (ctx context.Context , etcdCli * clientv3.Client ) ([]Component , error ) {
247+ resp , err := etcdCli .Get (ctx , ticdcTopologyKeyPrefix , clientv3 .WithPrefix ())
248+ if err != nil {
249+ return nil , err
250+ }
251+ components := make ([]Component , 0 , 3 )
252+ for _ , kv := range resp .Kvs {
253+ key := string (kv .Key )
254+ if ! strings .HasPrefix (key , ticdcTopologyKeyPrefix ) {
255+ continue
256+ }
257+ var item ticdcNodeItem
258+ if err := json .Unmarshal (kv .Value , & item ); err != nil {
259+ log .Warn ("invalid ticdc node item in etcd" , zap .Error (err ))
260+ continue
261+ }
262+ arr := strings .Split (item .Address , ":" )
263+ if len (arr ) != 2 {
264+ log .Warn ("invalid ticdc node address in etcd" , zap .String ("address" , item .Address ))
265+ continue
266+ }
267+ ip := arr [0 ]
268+ port , err := strconv .Atoi (arr [1 ])
269+ if err != nil {
270+ log .Warn ("invalid ticdc node address in etcd" ,
271+ zap .Error (err ),
272+ zap .String ("address" , item .Address ))
273+ continue
274+ }
275+ components = append (components , Component {
276+ Name : ComponentTiCDC ,
277+ IP : ip ,
278+ Port : uint (port ),
279+ StatusPort : uint (port ),
280+ })
281+ }
282+ return components , nil
283+ }
0 commit comments