Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion internal/client/client_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"sync/atomic"
"time"

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/tikv/client-go/v2/internal/resourcecontrol"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
"github.com/tikv/client-go/v2/util"
Expand Down Expand Up @@ -63,6 +65,9 @@ func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *ti
ruDetails = val.(*util.RUDetails)
ruDetails.Update(consumption, waitDuration)
}
recordResourceControlMetrics(resourceGroupName, req.GetRequestSource(), consumption)
} else if reqInfo != nil && reqInfo.Bypass() {
metrics.TiKVResourceControlBypassedCounter.WithLabelValues(resourceGroupName, req.GetRequestSource()).Inc()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid introducing WithLabelValues op on the hot path, should cache it in advance.

}

if ctxInterceptor := interceptor.GetRPCInterceptorFromCtx(ctx); ctxInterceptor == nil {
Expand All @@ -82,6 +87,7 @@ func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *ti
if ruDetails != nil {
ruDetails.Update(consumption, waitDuration)
}
recordResourceControlMetrics(resourceGroupName, req.GetRequestSource(), consumption)
}

return resp, err
Expand Down Expand Up @@ -111,6 +117,7 @@ func (r interceptedClient) SendRequestAsync(ctx context.Context, addr string, re
ruDetails = val.(*util.RUDetails)
ruDetails.Update(consumption, waitDuration)
}
recordResourceControlMetrics(resourceGroupName, req.GetRequestSource(), consumption)

cb.Inject(func(resp *tikvrpc.Response, err error) (*tikvrpc.Response, error) {
if ctxInterceptor := interceptor.GetRPCInterceptorFromCtx(ctx); ctxInterceptor != nil {
Expand All @@ -130,9 +137,12 @@ func (r interceptedClient) SendRequestAsync(ctx context.Context, addr string, re
if ruDetails != nil {
ruDetails.Update(consumption, waitDuration)
}
recordResourceControlMetrics(resourceGroupName, req.GetRequestSource(), consumption)
}
return resp, err
})
} else if reqInfo != nil && reqInfo.Bypass() {
metrics.TiKVResourceControlBypassedCounter.WithLabelValues(resourceGroupName, req.GetRequestSource()).Inc()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

}

r.Client.SendRequestAsync(ctx, addr, req, cb)
Expand All @@ -145,6 +155,20 @@ var (
ResourceControlInterceptor atomic.Pointer[resourceControlClient.ResourceGroupKVInterceptor]
)

// recordResourceControlMetrics records RU consumption from a single resource
// control call (OnRequestWait or OnResponseWait).
func recordResourceControlMetrics(resourceGroupName, requestSource string, consumption *rmpb.Consumption) {
if consumption == nil {
return
}
if consumption.RRU > 0 {
metrics.TiKVResourceControlRUCounter.WithLabelValues(resourceGroupName, requestSource, "rru").Add(consumption.RRU)
}
if consumption.WRU > 0 {
metrics.TiKVResourceControlRUCounter.WithLabelValues(resourceGroupName, requestSource, "wru").Add(consumption.WRU)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

}
}

func getResourceControlInfo(ctx context.Context, req *tikvrpc.Request) (
string,
resourceControlClient.ResourceGroupKVInterceptor,
Expand All @@ -169,7 +193,7 @@ func getResourceControlInfo(ctx context.Context, req *tikvrpc.Request) (
}
reqInfo := resourcecontrol.MakeRequestInfo(req)
if reqInfo.Bypass() {
return "", nil, nil
return resourceGroupName, nil, reqInfo
}
return resourceGroupName, resourceControlInterceptor, reqInfo
}
Expand Down
54 changes: 41 additions & 13 deletions internal/resourcecontrol/resource_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,26 +55,54 @@ func toPDAccessLocationType(accessType kv.AccessLocationType) controller.AccessL
}

// reqTypeAnalyze is the type of analyze coprocessor request.
// ref: https://github.com/pingcap/tidb/blob/ee4eac2ccb83e1ea653b8131d9a43495019cb5ac/pkg/kv/kv.go#L340
// ref: https://github.com/pingcap/tidb/blob/e691904f3c60113f8031a48c3e4c5940e24b8104/pkg/kv/kv.go#L340
const reqTypeAnalyze = 104

// bypassResourceSourceList maintains a list of resource sources that should be bypassed from the resource control.
var bypassResourceSourceList = []string{
/* DDL */
// ref: https://github.com/pingcap/tidb/blob/e691904f3c60113f8031a48c3e4c5940e24b8104/pkg/ddl/job_worker.go#L503
util.InternalTxnAddIndex,
// ref: https://github.com/pingcap/tidb/blob/e691904f3c60113f8031a48c3e4c5940e24b8104/pkg/ddl/backfilling_operators.go#L1230
util.InternalTxnMergeTempIndex,
/* BR */
// ref: https://github.com/pingcap/tidb/blob/e691904f3c60113f8031a48c3e4c5940e24b8104/pkg/kv/option.go#L214
util.InternalTxnBR,
/* Import Into */
// ref: https://github.com/pingcap/tidb/blob/e691904f3c60113f8031a48c3e4c5940e24b8104/pkg/kv/option.go#L224
util.InternalImportInto,
/* Workload Learning */
// ref: https://github.com/pingcap/tidb/blob/e691904f3c60113f8031a48c3e4c5940e24b8104/pkg/kv/option.go#L201
util.InternalTxnWorkloadLearning,
}

func shouldBypass(req *tikvrpc.Request) bool {
requestSource := req.GetRequestSource()
// Check both coprocessor request type and the request source to ensure the request is an internal analyze request.
// Internal analyze request may consume a lot of resources, bypass it to avoid affecting the user experience.
// This bypass currently only works with NextGen.
if config.NextGen && strings.Contains(requestSource, util.InternalTxnStats) {
var tp int64
switch req.Type {
case tikvrpc.CmdBatchCop:
tp = req.BatchCop().GetTp()
case tikvrpc.CmdCop, tikvrpc.CmdCopStream:
tp = req.Cop().GetTp()

// These bypasses currently only works with NextGen.
if config.NextGen {
// Check both coprocessor request type and the request source to ensure the request is an internal analyze request.
// Internal analyze request may consume a lot of resources, bypass it to avoid affecting the user experience.
if strings.Contains(requestSource, util.InternalTxnStats) {
var tp int64
switch req.Type {
case tikvrpc.CmdBatchCop:
tp = req.BatchCop().GetTp()
case tikvrpc.CmdCop, tikvrpc.CmdCopStream:
tp = req.Cop().GetTp()
}
if tp == reqTypeAnalyze {
return true
}
}
if tp == reqTypeAnalyze {
return true
// Check other resource source types that should be bypassed.
for _, source := range bypassResourceSourceList {
if strings.Contains(requestSource, source) {
return true
}
}
}

// Some internal requests should be bypassed, which may affect the user experience.
// For example, the `alter user password` request completely bypasses resource control.
// Although it does not consume many resources, it can still impact the user experience.
Expand Down
23 changes: 23 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ var (
TiKVReadRequestBytes *prometheus.SummaryVec
TiKVTxnLagCommitTSWaitHistogram *prometheus.HistogramVec
TiKVTxnLagCommitTSAttemptHistogram *prometheus.HistogramVec
TiKVResourceControlRUCounter *prometheus.CounterVec
TiKVResourceControlBypassedCounter *prometheus.CounterVec
)

// Label constants.
Expand Down Expand Up @@ -157,6 +159,7 @@ const (
LblGeneral = "general"
LblDirection = "direction"
LblReason = "reason"
LblResourceGroup = "resource_group"
)

var storeMetricVecList atomic.Pointer[[]MetricVec]
Expand Down Expand Up @@ -979,6 +982,24 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) {
ConstLabels: constLabels,
}, []string{LblResult})

TiKVResourceControlRUCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "resource_control",
Name: "ru_total",
Help: "Counter of resource units consumed by requests going through resource control.",
ConstLabels: constLabels,
}, []string{LblResourceGroup, LblSource, LblType})

TiKVResourceControlBypassedCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "resource_control",
Name: "bypassed_request_total",
Help: "Counter of requests that bypassed resource control.",
ConstLabels: constLabels,
}, []string{LblResourceGroup, LblSource})

initShortcuts()
storeMetricVecList.Store(&storeMetrics)
}
Expand Down Expand Up @@ -1085,6 +1106,8 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVTxnLagCommitTSWaitHistogram)
prometheus.MustRegister(TiKVTxnLagCommitTSAttemptHistogram)
prometheus.MustRegister(TiKVStaleBucketFromPDCounter)
prometheus.MustRegister(TiKVResourceControlRUCounter)
prometheus.MustRegister(TiKVResourceControlBypassedCounter)
}

// readCounter reads the value of a prometheus.Counter.
Expand Down
10 changes: 10 additions & 0 deletions util/request_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ const (
InternalTxnMeta = InternalTxnOthers
// InternalTxnStats is the type of statistics txn.
InternalTxnStats = "stats"
// InternalTxnBR is the type of BR usage.
InternalTxnBR = "br"
// InternalImportInto is the type of IMPORT INTO usage
InternalImportInto = "ImportInto"
// InternalTxnAddIndex is the type of DDL add index backfill.
InternalTxnAddIndex = "add_index"
// InternalTxnMergeTempIndex is the type of DDL temp index merging.
InternalTxnMergeTempIndex = "merge_temp_index"
// InternalTxnWorkloadLearning is the type of workload-based cost learning.
InternalTxnWorkloadLearning = "WorkloadLearning"
)

// explicit source types.
Expand Down