Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func (rc *LogClient) InitClients(
log.Fatal("failed to get stores", zap.Error(err))
}

metaClient := split.NewClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, maxSplitKeysOnce, len(stores)+1)
metaClient := split.NewClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, maxSplitKeysOnce, len(stores)+1, split.WithRequestSource(kvutil.BuildRequestSource(true, kv.InternalTxnBR, kvutil.ExplicitTypeBR)))
importCli := importclient.NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)

rc.logRestoreManager, err = NewLogRestoreManager(
Expand Down Expand Up @@ -1499,7 +1499,7 @@ func (rc *LogClient) WrapCompactedFilesIterWithSplitHelper(
splitSize uint64,
splitKeys int64,
) (iter.TryNextor[SSTs], error) {
client := split.NewClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, maxSplitKeysOnce, 3)
client := split.NewClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, maxSplitKeysOnce, 3, split.WithRequestSource(kvutil.BuildRequestSource(true, kv.InternalTxnBR, kvutil.ExplicitTypeBR)))
wrapper := restore.PipelineRestorerWrapper[SSTs]{
PipelineRegionsSplitter: split.NewPipelineRegionsSplitter(client, splitSize, splitKeys),
}
Expand All @@ -1518,7 +1518,7 @@ func (rc *LogClient) WrapLogFilesIterWithSplitHelper(
splitSize uint64,
splitKeys int64,
) (LogIter, error) {
client := split.NewClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, maxSplitKeysOnce, 3)
client := split.NewClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, maxSplitKeysOnce, 3, split.WithRequestSource(kvutil.BuildRequestSource(true, kv.InternalTxnBR, kvutil.ExplicitTypeBR)))
wrapper := restore.PipelineRestorerWrapper[*LogDataFileInfo]{
PipelineRegionsSplitter: split.NewPipelineRegionsSplitter(client, splitSize, splitKeys),
}
Expand Down
8 changes: 5 additions & 3 deletions br/pkg/restore/log_client/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils/consts"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
kvutil "github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -287,9 +288,10 @@ func (importer *LogFileImporter) downloadAndApplyKVFile(
}

reqCtx := &kvrpcpb.Context{
RegionId: regionInfo.Region.GetId(),
RegionEpoch: regionInfo.Region.GetRegionEpoch(),
Peer: leader,
RegionId: regionInfo.Region.GetId(),
RegionEpoch: regionInfo.Region.GetRegionEpoch(),
Peer: leader,
RequestSource: kvutil.BuildRequestSource(true, kv.InternalTxnBR, kvutil.ExplicitTypeBR),
}

var req *import_sstpb.ApplyRequest
Expand Down
1 change: 1 addition & 0 deletions br/pkg/restore/snap_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ func (rc *SnapClient) initClients(ctx context.Context, backend *backuppb.Storage
var createCallBacks []func(*SnapFileImporter) error
var closeCallBacks []func(*SnapFileImporter) error
var splitClientOpts []split.ClientOptionalParameter
splitClientOpts = append(splitClientOpts, split.WithRequestSource(kvutil.BuildRequestSource(true, kv.InternalTxnBR, kvutil.ExplicitTypeBR)))
if isRawKvMode {
splitClientOpts = append(splitClientOpts, split.WithRawKV())
createCallBacks = append(createCallBacks, func(importer *SnapFileImporter) error {
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/restore/snap_client/placement_rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import (
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/restore/split"
restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util/codec"
kvutil "github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
pdhttp "github.com/tikv/pd/client/http"
"go.uber.org/zap"
Expand Down Expand Up @@ -91,7 +93,7 @@ func NewPlacementRuleManager(ctx context.Context, pdClient pd.Client, pdHTTPCli
return &onlinePlacementRuleManager{
// toolClient reuse the split.SplitClient to do miscellaneous things. It doesn't
// call split related functions so set the arguments to arbitrary values.
toolClient: split.NewClient(pdClient, pdHTTPCli, tlsConf, maxSplitKeysOnce, 3),
toolClient: split.NewClient(pdClient, pdHTTPCli, tlsConf, maxSplitKeysOnce, 3, split.WithRequestSource(kvutil.BuildRequestSource(true, kv.InternalTxnBR, kvutil.ExplicitTypeBR))),

restoreStores: restoreStores,
restoreTables: make(map[int64]struct{}),
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/restore/snap_client/tikv_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ import (
"github.com/pingcap/tidb/br/pkg/restore/split"
restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/pkg/kv"
tidbutil "github.com/pingcap/tidb/pkg/util"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -362,6 +364,7 @@ func (rc *SnapClient) SplitPoints(
splitClientOpts = append(splitClientOpts, split.WithOnSplit(func(keys [][]byte) {
onProgress(int64(len(keys)))
}))
splitClientOpts = append(splitClientOpts, split.WithRequestSource(kvutil.BuildRequestSource(true, kv.InternalTxnBR, kvutil.ExplicitTypeBR)))
// TODO seems duplicate with metaClient.
if isRawKv {
splitClientOpts = append(splitClientOpts, split.WithRawKV())
Expand Down
18 changes: 14 additions & 4 deletions br/pkg/restore/split/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ type pdClient struct {
onSplit func(key [][]byte)
splitConcurrency int
splitBatchKeyCnt int
requestSource string
}

type ClientOptionalParameter func(*pdClient)
Expand All @@ -143,6 +144,13 @@ func WithOnSplit(onSplit func(key [][]byte)) ClientOptionalParameter {
}
}

// WithRequestSource sets the request source for the split client.
func WithRequestSource(source string) ClientOptionalParameter {
return func(c *pdClient) {
c.requestSource = source
}
}

// NewClient creates a SplitClient.
//
// splitBatchKeyCnt controls how many keys are sent to TiKV in a batch in split
Expand Down Expand Up @@ -309,6 +317,7 @@ func splitRegionWithFailpoint(
client tikvpb.TikvClient,
keys [][]byte,
isRawKv bool,
requestSource string,
) (*kvrpcpb.SplitRegionResponse, error) {
failpoint.Inject("not-leader-error", func(injectNewLeader failpoint.Value) {
log.Debug("failpoint not-leader-error injected.")
Expand All @@ -334,9 +343,10 @@ func splitRegionWithFailpoint(
})
return client.SplitRegion(ctx, &kvrpcpb.SplitRegionRequest{
Context: &kvrpcpb.Context{
RegionId: regionInfo.Region.Id,
RegionEpoch: regionInfo.Region.RegionEpoch,
Peer: peer,
RegionId: regionInfo.Region.Id,
RegionEpoch: regionInfo.Region.RegionEpoch,
Peer: peer,
RequestSource: requestSource,
},
SplitKeys: keys,
IsRawKv: isRawKv,
Expand Down Expand Up @@ -406,7 +416,7 @@ func sendSplitRegionRequest(
}
defer conn.Close()
client := tikvpb.NewTikvClient(conn)
resp, err := splitRegionWithFailpoint(ctx, regionInfo, peer, client, keys, c.isRawKv)
resp, err := splitRegionWithFailpoint(ctx, regionInfo, peer, client, keys, c.isRawKv, c.requestSource)
if err != nil {
return false, nil, err
}
Expand Down
1 change: 1 addition & 0 deletions br/tests/br_key_locked/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//clients/router",
"@com_github_tikv_pd_client//opt",
Expand Down
5 changes: 4 additions & 1 deletion br/tests/br_key_locked/locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
kvutil "github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/pkg/caller"
"go.uber.org/zap"
Expand Down Expand Up @@ -311,7 +312,9 @@ func (c *Locker) lockBatch(ctx context.Context, keys [][]byte, primary []byte) (
StartVersion: startTS,
LockTtl: uint64(c.lockTTL.Milliseconds()),
}
req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, prewrite)
req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, prewrite, kvrpcpb.Context{
RequestSource: kvutil.BuildRequestSource(true, kv.InternalTxnBR, kvutil.ExplicitTypeBR),
})

// Send the requests
resp, err := c.kv.SendReq(bo, req, loc.Region, time.Second*20)
Expand Down
5 changes: 5 additions & 0 deletions pkg/ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/rangetask"
"github.com/tikv/client-go/v2/util"
"go.uber.org/atomic"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -492,6 +493,8 @@ func SendPrepareFlashbackToVersionRPC(
EndKey: endKey,
StartTs: startTS,
Version: flashbackTS,
}, kvrpcpb.Context{
RequestSource: util.BuildRequestSource(true, kv.InternalTxnDDL, ""),
})

resp, err := s.SendReq(bo, req, loc.Region, flashbackTimeout)
Expand Down Expand Up @@ -587,6 +590,8 @@ func SendFlashbackToVersionRPC(
EndKey: endKey,
StartTs: startTS,
CommitTs: commitTS,
}, kvrpcpb.Context{
RequestSource: util.BuildRequestSource(true, kv.InternalTxnDDL, ""),
})

resp, err := s.SendReq(bo, req, loc.Region, flashbackTimeout)
Expand Down
3 changes: 2 additions & 1 deletion pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
"github.com/pingcap/tidb/pkg/util/redact"
"github.com/tikv/client-go/v2/oracle"
tikvclient "github.com/tikv/client-go/v2/tikv"
cgutil "github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
pdhttp "github.com/tikv/pd/client/http"
"github.com/tikv/pd/client/opt"
Expand Down Expand Up @@ -659,7 +660,7 @@ func NewBackend(
pdCli.GetServiceDiscovery(),
pdhttp.WithTLSConfig(tls.TLSConfig()),
).WithBackoffer(retry.InitialBackoffer(time.Second, time.Second, pdutil.PDRequestRetryTime*time.Second))
splitCli := split.NewClient(pdCli, pdHTTPCli, tls.TLSConfig(), config.RegionSplitBatchSize, config.RegionSplitConcurrency)
splitCli := split.NewClient(pdCli, pdHTTPCli, tls.TLSConfig(), config.RegionSplitBatchSize, config.RegionSplitConcurrency, split.WithRequestSource(cgutil.BuildRequestSource(true, kv.InternalTxnLightning, cgutil.ExplicitTypeImport)))
importClientFactory = newImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType)

multiIngestSupported, err = checkMultiIngestSupport(ctx, pdCli, importClientFactory)
Expand Down
1 change: 1 addition & 0 deletions pkg/store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1473,6 +1473,7 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoff.Backo
ResourceControlContext: &kvrpcpb.ResourceControlContext{
ResourceGroupName: rgName,
},
RequestSource: b.req.RequestSource.GetRequestSource(),
})
if b.req.ResourceGroupTagger != nil {
b.req.ResourceGroupTagger.Build(req)
Expand Down
4 changes: 3 additions & 1 deletion pkg/store/driver/tikv_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,9 @@ func (s *tikvStore) GetLockWaits() ([]*deadlockpb.WaitForEntry, error) {
//nolint: prealloc
var result []*deadlockpb.WaitForEntry
for _, store := range stores {
resp, err := s.GetTiKVClient().SendRequest(context.TODO(), store.GetAddr(), tikvrpc.NewRequest(tikvrpc.CmdLockWaitInfo, &kvrpcpb.GetLockWaitInfoRequest{}), time.Second*30)
resp, err := s.GetTiKVClient().SendRequest(context.TODO(), store.GetAddr(), tikvrpc.NewRequest(tikvrpc.CmdLockWaitInfo, &kvrpcpb.GetLockWaitInfoRequest{}, kvrpcpb.Context{
RequestSource: util.BuildRequestSource(true, kv.InternalTxnAdmin, ""),
}), time.Second*30)
if err != nil {
logutil.BgLogger().Warn("query lock wait info failed", zap.Error(err))
continue
Expand Down
7 changes: 6 additions & 1 deletion pkg/store/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1033,7 +1033,10 @@ func (w *GCWorker) doUnsafeDestroyRangeRequest(
req := tikvrpc.NewRequest(tikvrpc.CmdUnsafeDestroyRange, &kvrpcpb.UnsafeDestroyRangeRequest{
StartKey: startKey,
EndKey: endKey,
}, kvrpcpb.Context{DiskFullOpt: kvrpcpb.DiskFullOpt_AllowedOnAlmostFull})
}, kvrpcpb.Context{
DiskFullOpt: kvrpcpb.DiskFullOpt_AllowedOnAlmostFull,
RequestSource: tikvutil.BuildRequestSource(true, kv.InternalTxnGC, ""),
})

var wg sync.WaitGroup
errChan := make(chan error, len(stores))
Expand Down Expand Up @@ -1403,6 +1406,8 @@ func (w *GCWorker) broadcastGCSafePoint(ctx context.Context, gcSafePoint uint64)
func (w *GCWorker) doGCForRegion(bo *tikv.Backoffer, safePoint uint64, region tikv.RegionVerID) (*errorpb.Error, error) {
req := tikvrpc.NewRequest(tikvrpc.CmdGC, &kvrpcpb.GCRequest{
SafePoint: safePoint,
}, kvrpcpb.Context{
RequestSource: tikvutil.BuildRequestSource(true, kv.InternalTxnGC, ""),
})

resp, err := w.tikvStore.SendReq(bo, req, region, gcTimeout)
Expand Down
1 change: 1 addition & 0 deletions pkg/store/helper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//txnkv/txnlock",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//http",
"@org_uber_go_zap//:zap",
],
Expand Down
9 changes: 7 additions & 2 deletions pkg/store/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnlock"
cgutil "github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client/http"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -132,7 +133,9 @@ const MaxBackoffTimeoutForMvccGet = 5000
// GetMvccByEncodedKeyWithTS get the MVCC value by the specific encoded key, if lock is encountered it would be resolved.
func (h *Helper) GetMvccByEncodedKeyWithTS(encodedKey kv.Key, startTS uint64) (*kvrpcpb.MvccGetByKeyResponse, error) {
bo := tikv.NewBackofferWithVars(context.Background(), MaxBackoffTimeoutForMvccGet, nil)
tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{Key: encodedKey})
tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{Key: encodedKey}, kvrpcpb.Context{
RequestSource: cgutil.BuildRequestSource(true, kv.InternalTxnAdmin, ""),
})
for {
keyLocation, err := h.RegionCache.LocateKey(bo, encodedKey)
if err != nil {
Expand Down Expand Up @@ -254,8 +257,10 @@ func (h *Helper) GetMvccByStartTs(startTS uint64, startKey, endKey kv.Key) (*Mvc

tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByStartTs, &kvrpcpb.MvccGetByStartTsRequest{
StartTs: startTS,
}, kvrpcpb.Context{
RequestSource: cgutil.BuildRequestSource(true, kv.InternalTxnAdmin, ""),
Priority: kvrpcpb.CommandPri_Low,
})
tikvReq.Context.Priority = kvrpcpb.CommandPri_Low
kvResp, err := h.Store.SendReq(bo, tikvReq, curRegion.Region, time.Hour)
if err != nil {
logutil.BgLogger().Error("get MVCC by startTS failed",
Expand Down
Loading