Skip to content

Commit f1b85a7

Browse files
authored
*: Add caller information to pd client (#59911)
close #59910
1 parent 2ce45f0 commit f1b85a7

21 files changed

Lines changed: 68 additions & 26 deletions

File tree

br/pkg/pdutil/pd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func NewPdController(
159159
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxMsgSize)),
160160
}
161161
pdClient, err := pd.NewClientWithContext(
162-
ctx, caller.Component("br-pd-controller"), pdAddrs, securityOption,
162+
ctx, caller.GetComponent(1), pdAddrs, securityOption,
163163
opt.WithGRPCDialOptions(maxCallMsgSize...),
164164
// If the time too short, we may scatter a region many times, because
165165
// the interface `ScatterRegions` may time out.

br/pkg/restore/split/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ go_library(
4444
"@com_github_tikv_pd_client//clients/router",
4545
"@com_github_tikv_pd_client//http",
4646
"@com_github_tikv_pd_client//opt",
47+
"@com_github_tikv_pd_client//pkg/caller",
4748
"@org_golang_google_grpc//:grpc",
4849
"@org_golang_google_grpc//codes",
4950
"@org_golang_google_grpc//credentials",

br/pkg/restore/split/client.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
pd "github.com/tikv/pd/client"
3737
pdhttp "github.com/tikv/pd/client/http"
3838
"github.com/tikv/pd/client/opt"
39+
"github.com/tikv/pd/client/pkg/caller"
3940
"go.uber.org/multierr"
4041
"go.uber.org/zap"
4142
"golang.org/x/sync/errgroup"
@@ -159,6 +160,14 @@ func WithOnSplit(onSplit func(key [][]byte)) ClientOptionalParameter {
159160
}
160161
}
161162

163+
func withCallerComponent(client pd.Client, component caller.Component) pd.Client {
164+
if _, ok := client.(*tikvclient.CodecPDClient); ok {
165+
// Keep codec-aware clients intact so callers can retrieve the same wrapper.
166+
return client
167+
}
168+
return client.WithCallerComponent(component)
169+
}
170+
162171
// NewClient creates a SplitClient.
163172
//
164173
// splitBatchKeyCnt controls how many keys are sent to TiKV in a batch in split
@@ -172,7 +181,7 @@ func NewClient(
172181
opts ...ClientOptionalParameter,
173182
) SplitClient {
174183
cli := &pdClient{
175-
client: client,
184+
client: withCallerComponent(client, caller.GetComponent(1)),
176185
httpCli: httpCli,
177186
tlsConf: tlsConf,
178187
storeCache: make(map[uint64]*metapb.Store),

br/pkg/restore/split/mock_pd_client.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/tikv/pd/client/clients/router"
2424
pdhttp "github.com/tikv/pd/client/http"
2525
"github.com/tikv/pd/client/opt"
26+
"github.com/tikv/pd/client/pkg/caller"
2627
"google.golang.org/grpc/codes"
2728
"google.golang.org/grpc/keepalive"
2829
"google.golang.org/grpc/status"
@@ -220,6 +221,10 @@ func newRegionNotFullyReplicatedErr(regionID uint64) error {
220221
return status.Errorf(codes.Unknown, "region %d is not fully replicated", regionID)
221222
}
222223

224+
func (c *MockPDClientForSplit) WithCallerComponent(_ caller.Component) pd.Client {
225+
return c
226+
}
227+
223228
func (c *MockPDClientForSplit) SetRegions(boundaries [][]byte) []*metapb.Region {
224229
c.mu.Lock()
225230
defer c.mu.Unlock()
@@ -561,6 +566,10 @@ func (fpdc *FakePDClient) SetRegions(regions []*router.Region) {
561566
fpdc.regions = regions
562567
}
563568

569+
func (fpdc *FakePDClient) WithCallerComponent(_ caller.Component) pd.Client {
570+
return fpdc
571+
}
572+
564573
func (fpdc *FakePDClient) GetAllStores(context.Context, ...opt.GetStoreOption) ([]*metapb.Store, error) {
565574
return slices.Clone(fpdc.stores), nil
566575
}

br/pkg/streamhelper/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ go_library(
4646
"@com_github_tikv_client_go_v2//txnkv/txnlock",
4747
"@com_github_tikv_pd_client//:client",
4848
"@com_github_tikv_pd_client//opt",
49+
"@com_github_tikv_pd_client//pkg/caller",
4950
"@io_etcd_go_etcd_client_v3//:client",
5051
"@org_golang_google_grpc//:grpc",
5152
"@org_golang_google_grpc//codes",

br/pkg/streamhelper/advancer_env.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/tikv/client-go/v2/txnkv/txnlock"
1818
pd "github.com/tikv/pd/client"
1919
"github.com/tikv/pd/client/opt"
20+
"github.com/tikv/pd/client/pkg/caller"
2021
clientv3 "go.etcd.io/etcd/client/v3"
2122
"google.golang.org/grpc"
2223
"google.golang.org/grpc/keepalive"
@@ -140,6 +141,7 @@ func (t clusterEnv) ClearCache(ctx context.Context, storeID uint64) error {
140141

141142
// CliEnv creates the Env for CLI usage.
142143
func CliEnv(cli *utils.StoreManager, tikvStore tikv.Storage, etcdCli *clientv3.Client) Env {
144+
cli.ResetPDClientCallerComponent(caller.Pitr)
143145
return clusterEnv{
144146
clis: cli,
145147
AdvancerExt: &AdvancerExt{MetaDataClient: *NewMetaDataClient(etcdCli)},
@@ -154,13 +156,14 @@ func TiDBEnv(tikvStore tikv.Storage, pdCli pd.Client, etcdCli *clientv3.Client,
154156
if err != nil {
155157
return nil, err
156158
}
159+
pitrPDClient := pdCli.WithCallerComponent(caller.Pitr)
157160
env := clusterEnv{
158-
clis: utils.NewStoreManager(pdCli, keepalive.ClientParameters{
161+
clis: utils.NewStoreManager(pitrPDClient, keepalive.ClientParameters{
159162
Time: time.Duration(conf.TiKVClient.GrpcKeepAliveTime) * time.Second,
160163
Timeout: time.Duration(conf.TiKVClient.GrpcKeepAliveTimeout) * time.Second,
161164
}, tconf),
162165
AdvancerExt: &AdvancerExt{MetaDataClient: *NewMetaDataClient(etcdCli)},
163-
PDRegionScanner: PDRegionScanner{Client: pdCli},
166+
PDRegionScanner: PDRegionScanner{Client: pitrPDClient},
164167
AdvancerLockResolver: newAdvancerLockResolver(tikvStore),
165168
}
166169

br/pkg/utils/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ go_library(
6262
"@com_github_prometheus_client_golang//prometheus/promhttp",
6363
"@com_github_tikv_client_go_v2//tikv",
6464
"@com_github_tikv_pd_client//:client",
65+
"@com_github_tikv_pd_client//pkg/caller",
6566
"@io_etcd_go_etcd_api_v3//v3rpc/rpctypes",
6667
"@io_etcd_go_etcd_client_v3//:client",
6768
"@org_golang_google_grpc//:grpc",

br/pkg/utils/store_manager.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
berrors "github.com/pingcap/tidb/br/pkg/errors"
1717
"github.com/pingcap/tidb/br/pkg/logutil"
1818
pd "github.com/tikv/pd/client"
19+
"github.com/tikv/pd/client/pkg/caller"
1920
"go.uber.org/zap"
2021
"google.golang.org/grpc"
2122
"google.golang.org/grpc/backoff"
@@ -127,6 +128,10 @@ func (mgr *StoreManager) PDClient() pd.Client {
127128
return mgr.pdClient
128129
}
129130

131+
func (mgr *StoreManager) ResetPDClientCallerComponent(component caller.Component) {
132+
mgr.pdClient = mgr.pdClient.WithCallerComponent(component)
133+
}
134+
130135
func (mgr *StoreManager) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) {
131136
failpoint.Inject("hint-get-backup-client", func(v failpoint.Value) {
132137
log.Info("failpoint hint-get-backup-client injected, "+

pkg/ddl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ go_library(
221221
"@com_github_tikv_pd_client//clients/router",
222222
"@com_github_tikv_pd_client//http",
223223
"@com_github_tikv_pd_client//opt",
224+
"@com_github_tikv_pd_client//pkg/caller",
224225
"@io_etcd_go_etcd_client_v3//:client",
225226
"@org_golang_x_sync//errgroup",
226227
"@org_uber_go_atomic//:atomic",

pkg/ddl/index.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3383,10 +3383,7 @@ func estimateRowSizeFromRegion(ctx context.Context, store kv.Storage, tbl table.
33833383
if !ok {
33843384
return 0, fmt.Errorf("not a helper.Storage")
33853385
}
3386-
h := &helper.Helper{
3387-
Store: hStore,
3388-
RegionCache: hStore.GetRegionCache(),
3389-
}
3386+
h := helper.NewHelper(hStore)
33903387
pdCli, err := h.TryGetPDHTTPClient()
33913388
if err != nil {
33923389
return 0, err

0 commit comments

Comments
 (0)