Skip to content

Commit b5d0f47

Browse files
committed
fix grpc request
Signed-off-by: Ziqian Qin <eke@fastmail.com>
1 parent 84d8269 commit b5d0f47

13 files changed

Lines changed: 57 additions & 17 deletions

File tree

br/pkg/restore/log_client/client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ func (rc *LogClient) InitClients(
559559
log.Fatal("failed to get stores", zap.Error(err))
560560
}
561561

562-
metaClient := split.NewClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, maxSplitKeysOnce, len(stores)+1)
562+
metaClient := split.NewClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, maxSplitKeysOnce, len(stores)+1, split.WithRequestSource(kvutil.BuildRequestSource(true, kv.InternalTxnBR, kvutil.ExplicitTypeBR)))
563563
importCli := importclient.NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
564564

565565
rc.logRestoreManager, err = NewLogRestoreManager(
@@ -1499,7 +1499,7 @@ func (rc *LogClient) WrapCompactedFilesIterWithSplitHelper(
14991499
splitSize uint64,
15001500
splitKeys int64,
15011501
) (iter.TryNextor[SSTs], error) {
1502-
client := split.NewClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, maxSplitKeysOnce, 3)
1502+
client := split.NewClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, maxSplitKeysOnce, 3, split.WithRequestSource(kvutil.BuildRequestSource(true, kv.InternalTxnBR, kvutil.ExplicitTypeBR)))
15031503
wrapper := restore.PipelineRestorerWrapper[SSTs]{
15041504
PipelineRegionsSplitter: split.NewPipelineRegionsSplitter(client, splitSize, splitKeys),
15051505
}
@@ -1518,7 +1518,7 @@ func (rc *LogClient) WrapLogFilesIterWithSplitHelper(
15181518
splitSize uint64,
15191519
splitKeys int64,
15201520
) (LogIter, error) {
1521-
client := split.NewClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, maxSplitKeysOnce, 3)
1521+
client := split.NewClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, maxSplitKeysOnce, 3, split.WithRequestSource(kvutil.BuildRequestSource(true, kv.InternalTxnBR, kvutil.ExplicitTypeBR)))
15221522
wrapper := restore.PipelineRestorerWrapper[*LogDataFileInfo]{
15231523
PipelineRegionsSplitter: split.NewPipelineRegionsSplitter(client, splitSize, splitKeys),
15241524
}

br/pkg/restore/log_client/import.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"github.com/pingcap/tidb/br/pkg/utils/consts"
4242
"github.com/pingcap/tidb/pkg/kv"
4343
"github.com/pingcap/tidb/pkg/metrics"
44+
kvutil "github.com/tikv/client-go/v2/util"
4445
pd "github.com/tikv/pd/client"
4546
"go.uber.org/multierr"
4647
"go.uber.org/zap"
@@ -287,9 +288,10 @@ func (importer *LogFileImporter) downloadAndApplyKVFile(
287288
}
288289

289290
reqCtx := &kvrpcpb.Context{
290-
RegionId: regionInfo.Region.GetId(),
291-
RegionEpoch: regionInfo.Region.GetRegionEpoch(),
292-
Peer: leader,
291+
RegionId: regionInfo.Region.GetId(),
292+
RegionEpoch: regionInfo.Region.GetRegionEpoch(),
293+
Peer: leader,
294+
RequestSource: kvutil.BuildRequestSource(true, kv.InternalTxnBR, kvutil.ExplicitTypeBR),
293295
}
294296

295297
var req *import_sstpb.ApplyRequest

br/pkg/restore/snap_client/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,7 @@ func (rc *SnapClient) initClients(ctx context.Context, backend *backuppb.Storage
719719
var createCallBacks []func(*SnapFileImporter) error
720720
var closeCallBacks []func(*SnapFileImporter) error
721721
var splitClientOpts []split.ClientOptionalParameter
722+
splitClientOpts = append(splitClientOpts, split.WithRequestSource(kvutil.BuildRequestSource(true, kv.InternalTxnBR, kvutil.ExplicitTypeBR)))
722723
if isRawKvMode {
723724
splitClientOpts = append(splitClientOpts, split.WithRawKV())
724725
createCallBacks = append(createCallBacks, func(importer *SnapFileImporter) error {

br/pkg/restore/snap_client/placement_rule_manager.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@ import (
3131
berrors "github.com/pingcap/tidb/br/pkg/errors"
3232
"github.com/pingcap/tidb/br/pkg/restore/split"
3333
restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils"
34+
"github.com/pingcap/tidb/pkg/kv"
3435
"github.com/pingcap/tidb/pkg/tablecodec"
3536
"github.com/pingcap/tidb/pkg/util/codec"
37+
kvutil "github.com/tikv/client-go/v2/util"
3638
pd "github.com/tikv/pd/client"
3739
pdhttp "github.com/tikv/pd/client/http"
3840
"go.uber.org/zap"
@@ -91,7 +93,7 @@ func NewPlacementRuleManager(ctx context.Context, pdClient pd.Client, pdHTTPCli
9193
return &onlinePlacementRuleManager{
9294
// toolClient reuse the split.SplitClient to do miscellaneous things. It doesn't
9395
// call split related functions so set the arguments to arbitrary values.
94-
toolClient: split.NewClient(pdClient, pdHTTPCli, tlsConf, maxSplitKeysOnce, 3),
96+
toolClient: split.NewClient(pdClient, pdHTTPCli, tlsConf, maxSplitKeysOnce, 3, split.WithRequestSource(kvutil.BuildRequestSource(true, kv.InternalTxnBR, kvutil.ExplicitTypeBR))),
9597

9698
restoreStores: restoreStores,
9799
restoreTables: make(map[int64]struct{}),

br/pkg/restore/snap_client/tikv_sender.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ import (
3737
"github.com/pingcap/tidb/br/pkg/restore/split"
3838
restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils"
3939
"github.com/pingcap/tidb/br/pkg/summary"
40+
"github.com/pingcap/tidb/pkg/kv"
4041
tidbutil "github.com/pingcap/tidb/pkg/util"
42+
kvutil "github.com/tikv/client-go/v2/util"
4143
"go.uber.org/zap"
4244
"golang.org/x/sync/errgroup"
4345
"google.golang.org/grpc/codes"
@@ -362,6 +364,7 @@ func (rc *SnapClient) SplitPoints(
362364
splitClientOpts = append(splitClientOpts, split.WithOnSplit(func(keys [][]byte) {
363365
onProgress(int64(len(keys)))
364366
}))
367+
splitClientOpts = append(splitClientOpts, split.WithRequestSource(kvutil.BuildRequestSource(true, kv.InternalTxnBR, kvutil.ExplicitTypeBR)))
365368
// TODO seems duplicate with metaClient.
366369
if isRawKv {
367370
splitClientOpts = append(splitClientOpts, split.WithRawKV())

br/pkg/restore/split/client.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ type pdClient struct {
125125
onSplit func(key [][]byte)
126126
splitConcurrency int
127127
splitBatchKeyCnt int
128+
requestSource string
128129
}
129130

130131
type ClientOptionalParameter func(*pdClient)
@@ -143,6 +144,13 @@ func WithOnSplit(onSplit func(key [][]byte)) ClientOptionalParameter {
143144
}
144145
}
145146

147+
// WithRequestSource sets the request source for the split client.
148+
func WithRequestSource(source string) ClientOptionalParameter {
149+
return func(c *pdClient) {
150+
c.requestSource = source
151+
}
152+
}
153+
146154
// NewClient creates a SplitClient.
147155
//
148156
// splitBatchKeyCnt controls how many keys are sent to TiKV in a batch in split
@@ -309,6 +317,7 @@ func splitRegionWithFailpoint(
309317
client tikvpb.TikvClient,
310318
keys [][]byte,
311319
isRawKv bool,
320+
requestSource string,
312321
) (*kvrpcpb.SplitRegionResponse, error) {
313322
failpoint.Inject("not-leader-error", func(injectNewLeader failpoint.Value) {
314323
log.Debug("failpoint not-leader-error injected.")
@@ -334,9 +343,10 @@ func splitRegionWithFailpoint(
334343
})
335344
return client.SplitRegion(ctx, &kvrpcpb.SplitRegionRequest{
336345
Context: &kvrpcpb.Context{
337-
RegionId: regionInfo.Region.Id,
338-
RegionEpoch: regionInfo.Region.RegionEpoch,
339-
Peer: peer,
346+
RegionId: regionInfo.Region.Id,
347+
RegionEpoch: regionInfo.Region.RegionEpoch,
348+
Peer: peer,
349+
RequestSource: requestSource,
340350
},
341351
SplitKeys: keys,
342352
IsRawKv: isRawKv,
@@ -406,7 +416,7 @@ func sendSplitRegionRequest(
406416
}
407417
defer conn.Close()
408418
client := tikvpb.NewTikvClient(conn)
409-
resp, err := splitRegionWithFailpoint(ctx, regionInfo, peer, client, keys, c.isRawKv)
419+
resp, err := splitRegionWithFailpoint(ctx, regionInfo, peer, client, keys, c.isRawKv, c.requestSource)
410420
if err != nil {
411421
return false, nil, err
412422
}

br/tests/br_key_locked/locker.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"github.com/tikv/client-go/v2/oracle"
4545
"github.com/tikv/client-go/v2/tikv"
4646
"github.com/tikv/client-go/v2/tikvrpc"
47+
kvutil "github.com/tikv/client-go/v2/util"
4748
pd "github.com/tikv/pd/client"
4849
"github.com/tikv/pd/client/pkg/caller"
4950
"go.uber.org/zap"
@@ -311,7 +312,9 @@ func (c *Locker) lockBatch(ctx context.Context, keys [][]byte, primary []byte) (
311312
StartVersion: startTS,
312313
LockTtl: uint64(c.lockTTL.Milliseconds()),
313314
}
314-
req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, prewrite)
315+
req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, prewrite, kvrpcpb.Context{
316+
RequestSource: kvutil.BuildRequestSource(true, kv.InternalTxnBR, kvutil.ExplicitTypeBR),
317+
})
315318

316319
// Send the requests
317320
resp, err := c.kv.SendReq(bo, req, loc.Region, time.Second*20)

pkg/ddl/cluster.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import (
4949
"github.com/tikv/client-go/v2/tikv"
5050
"github.com/tikv/client-go/v2/tikvrpc"
5151
"github.com/tikv/client-go/v2/txnkv/rangetask"
52+
"github.com/tikv/client-go/v2/util"
5253
"go.uber.org/atomic"
5354
"go.uber.org/zap"
5455
)
@@ -492,6 +493,8 @@ func SendPrepareFlashbackToVersionRPC(
492493
EndKey: endKey,
493494
StartTs: startTS,
494495
Version: flashbackTS,
496+
}, kvrpcpb.Context{
497+
RequestSource: util.BuildRequestSource(true, kv.InternalTxnDDL, ""),
495498
})
496499

497500
resp, err := s.SendReq(bo, req, loc.Region, flashbackTimeout)
@@ -587,6 +590,8 @@ func SendFlashbackToVersionRPC(
587590
EndKey: endKey,
588591
StartTs: startTS,
589592
CommitTs: commitTS,
593+
}, kvrpcpb.Context{
594+
RequestSource: util.BuildRequestSource(true, kv.InternalTxnDDL, ""),
590595
})
591596

592597
resp, err := s.SendReq(bo, req, loc.Region, flashbackTimeout)

pkg/lightning/backend/local/local.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ import (
6666
"github.com/pingcap/tidb/pkg/util/redact"
6767
"github.com/tikv/client-go/v2/oracle"
6868
tikvclient "github.com/tikv/client-go/v2/tikv"
69+
cgutil "github.com/tikv/client-go/v2/util"
6970
pd "github.com/tikv/pd/client"
7071
pdhttp "github.com/tikv/pd/client/http"
7172
"github.com/tikv/pd/client/opt"
@@ -659,7 +660,7 @@ func NewBackend(
659660
pdCli.GetServiceDiscovery(),
660661
pdhttp.WithTLSConfig(tls.TLSConfig()),
661662
).WithBackoffer(retry.InitialBackoffer(time.Second, time.Second, pdutil.PDRequestRetryTime*time.Second))
662-
splitCli := split.NewClient(pdCli, pdHTTPCli, tls.TLSConfig(), config.RegionSplitBatchSize, config.RegionSplitConcurrency)
663+
splitCli := split.NewClient(pdCli, pdHTTPCli, tls.TLSConfig(), config.RegionSplitBatchSize, config.RegionSplitConcurrency, split.WithRequestSource(cgutil.BuildRequestSource(true, kv.InternalTxnLightning, cgutil.ExplicitTypeImport)))
663664
importClientFactory = newImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType)
664665

665666
multiIngestSupported, err = checkMultiIngestSupport(ctx, pdCli, importClientFactory)

pkg/store/copr/batch_coprocessor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1473,6 +1473,7 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoff.Backo
14731473
ResourceControlContext: &kvrpcpb.ResourceControlContext{
14741474
ResourceGroupName: rgName,
14751475
},
1476+
RequestSource: b.req.RequestSource.GetRequestSource(),
14761477
})
14771478
if b.req.ResourceGroupTagger != nil {
14781479
b.req.ResourceGroupTagger.Build(req)

0 commit comments

Comments
 (0)