Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions util/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,58 @@ type ReqDetailInfo struct {
ExecDetails TiKVExecDetails
}

func cloneRUV2(ru *kvrpcpb.RUV2) *kvrpcpb.RUV2 {
if ru == nil {
return nil
}
cloned := *ru
if ru.ExecutorInputs != nil {
execInputs := *ru.ExecutorInputs
cloned.ExecutorInputs = &execInputs
}
return &cloned
}

func mergeCommitDetailsRUV2(dst, src *kvrpcpb.RUV2, extraWriteRPCCount uint64) *kvrpcpb.RUV2 {
if dst == nil && src == nil && extraWriteRPCCount == 0 {
return nil
}
merged := cloneRUV2(dst)
if merged == nil {
merged = &kvrpcpb.RUV2{}
}
if src != nil {
mergeRUV2(merged, src)
}
if extraWriteRPCCount != 0 {
merged.WriteRpcCount += extraWriteRPCCount
}
return merged
}

func mergeRUV2(dst, src *kvrpcpb.RUV2) {
dst.KvEngineCacheMiss += src.KvEngineCacheMiss
dst.CoprocessorExecutorIterations += src.CoprocessorExecutorIterations
dst.CoprocessorResponseBytes += src.CoprocessorResponseBytes
dst.RaftstoreStoreWriteTriggerWbBytes += src.RaftstoreStoreWriteTriggerWbBytes
dst.StorageProcessedKeysBatchGet += src.StorageProcessedKeysBatchGet
dst.StorageProcessedKeysGet += src.StorageProcessedKeysGet
dst.ReadRpcCount += src.ReadRpcCount
dst.WriteRpcCount += src.WriteRpcCount
if src.ExecutorInputs != nil {
if dst.ExecutorInputs == nil {
dst.ExecutorInputs = &kvrpcpb.ExecutorInputs{}
}
dst.ExecutorInputs.TikvCoprocessorExecutorWorkTotalBatchIndexScan += src.ExecutorInputs.TikvCoprocessorExecutorWorkTotalBatchIndexScan
dst.ExecutorInputs.TikvCoprocessorExecutorWorkTotalBatchTableScan += src.ExecutorInputs.TikvCoprocessorExecutorWorkTotalBatchTableScan
dst.ExecutorInputs.TikvCoprocessorExecutorWorkTotalBatchSelection += src.ExecutorInputs.TikvCoprocessorExecutorWorkTotalBatchSelection
dst.ExecutorInputs.TikvCoprocessorExecutorWorkTotalBatchTopN += src.ExecutorInputs.TikvCoprocessorExecutorWorkTotalBatchTopN
dst.ExecutorInputs.TikvCoprocessorExecutorWorkTotalBatchLimit += src.ExecutorInputs.TikvCoprocessorExecutorWorkTotalBatchLimit
dst.ExecutorInputs.TikvCoprocessorExecutorWorkTotalBatchSimpleAggr += src.ExecutorInputs.TikvCoprocessorExecutorWorkTotalBatchSimpleAggr
dst.ExecutorInputs.TikvCoprocessorExecutorWorkTotalBatchFastHashAggr += src.ExecutorInputs.TikvCoprocessorExecutorWorkTotalBatchFastHashAggr
}
}

// CommitTSLagDetails contain the detail when the commit timestamp
// from PD lags the expected ts set by `SetCommitWaitUntilTSO`.
type CommitTSLagDetails struct {
Expand Down Expand Up @@ -183,6 +235,8 @@ type CommitDetails struct {
SlowestPrewrite ReqDetailInfo
// It's recorded only when the commit mode is 2pc.
CommitPrimary ReqDetailInfo
// Aggregated prewrite/commit request RUv2 details.
WriteRUV2 *kvrpcpb.RUV2
}
WriteKeys int
WriteSize int
Expand Down Expand Up @@ -217,6 +271,7 @@ func (cd *CommitDetails) Merge(other *CommitDetails) {
if cd.Mu.CommitPrimary.ReqTotalTime < other.Mu.CommitPrimary.ReqTotalTime {
cd.Mu.CommitPrimary = other.Mu.CommitPrimary
}
cd.Mu.WriteRUV2 = mergeCommitDetailsRUV2(cd.Mu.WriteRUV2, other.Mu.WriteRUV2, 0)
}

// MergePrewriteReqDetails merges prewrite related ExecDetailsV2 into the current CommitDetails.
Expand All @@ -226,6 +281,11 @@ func (cd *CommitDetails) MergePrewriteReqDetails(reqDuration time.Duration, regi
}
cd.Mu.Lock()
defer cd.Mu.Unlock()
var writeRUV2 *kvrpcpb.RUV2
if execDetails != nil {
writeRUV2 = execDetails.RuV2
}
cd.Mu.WriteRUV2 = mergeCommitDetailsRUV2(cd.Mu.WriteRUV2, writeRUV2, 1)
if reqDuration > cd.Mu.SlowestPrewrite.ReqTotalTime {
cd.Mu.SlowestPrewrite.ReqTotalTime = reqDuration
cd.Mu.SlowestPrewrite.Region = regionID
Expand All @@ -241,6 +301,11 @@ func (cd *CommitDetails) MergeCommitReqDetails(reqDuration time.Duration, region
}
cd.Mu.Lock()
defer cd.Mu.Unlock()
var writeRUV2 *kvrpcpb.RUV2
if execDetails != nil {
writeRUV2 = execDetails.RuV2
}
cd.Mu.WriteRUV2 = mergeCommitDetailsRUV2(cd.Mu.WriteRUV2, writeRUV2, 1)
if reqDuration > cd.Mu.CommitPrimary.ReqTotalTime {
cd.Mu.CommitPrimary.ReqTotalTime = reqDuration
cd.Mu.CommitPrimary.Region = regionID
Expand All @@ -253,6 +318,16 @@ func (cd *CommitDetails) MergeFlushReqDetails(reqDuration time.Duration, regionI
// leave it empty for now
}

// GetWriteRUV2 returns a copy of aggregated prewrite/commit request RUv2 details.
func (cd *CommitDetails) GetWriteRUV2() *kvrpcpb.RUV2 {
if cd == nil {
return nil
}
cd.Mu.Lock()
defer cd.Mu.Unlock()
return cloneRUV2(cd.Mu.WriteRUV2)
}

// Clone returns a deep copy of itself.
func (cd *CommitDetails) Clone() *CommitDetails {
commit := &CommitDetails{
Expand All @@ -274,6 +349,7 @@ func (cd *CommitDetails) Clone() *CommitDetails {
commit.Mu.CommitBackoffTypes = append([]string{}, cd.Mu.CommitBackoffTypes...)
commit.Mu.SlowestPrewrite = cd.Mu.SlowestPrewrite
commit.Mu.CommitPrimary = cd.Mu.CommitPrimary
commit.Mu.WriteRUV2 = cloneRUV2(cd.Mu.WriteRUV2)
return commit
}

Expand Down
59 changes: 59 additions & 0 deletions util/execdetails_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,29 @@ import (
"testing"
"time"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/stretchr/testify/assert"
)

func newWriteRUV2(kvMiss, batchGet, get, readRPC, writeRPC uint64, indexScan, tableScan uint64) *kvrpcpb.RUV2 {
return &kvrpcpb.RUV2{
KvEngineCacheMiss: kvMiss,
StorageProcessedKeysBatchGet: batchGet,
StorageProcessedKeysGet: get,
ReadRpcCount: readRPC,
WriteRpcCount: writeRPC,
ExecutorInputs: &kvrpcpb.ExecutorInputs{
TikvCoprocessorExecutorWorkTotalBatchIndexScan: indexScan,
TikvCoprocessorExecutorWorkTotalBatchTableScan: tableScan,
},
}
}

func newWriteExecDetailsV2(kvMiss, batchGet, get, readRPC, writeRPC uint64, indexScan, tableScan uint64) *kvrpcpb.ExecDetailsV2 {
return &kvrpcpb.ExecDetailsV2{RuV2: newWriteRUV2(kvMiss, batchGet, get, readRPC, writeRPC, indexScan, tableScan)}
}

func TestLockKeysDetailsMerge(t *testing.T) {
a := &LockKeysDetails{
TotalTime: 10 * time.Millisecond,
Expand Down Expand Up @@ -163,6 +182,7 @@ func TestCommitDetailsMerge(t *testing.T) {
a.Mu.CommitBackoffTypes = []string{"regionMiss"}
a.Mu.SlowestPrewrite = ReqDetailInfo{ReqTotalTime: 5 * time.Millisecond, Region: 1, StoreAddr: "s1"}
a.Mu.CommitPrimary = ReqDetailInfo{ReqTotalTime: 3 * time.Millisecond, Region: 2, StoreAddr: "s2"}
a.Mu.WriteRUV2 = newWriteRUV2(1, 2, 3, 4, 5, 6, 7)

b := &CommitDetails{
GetCommitTsTime: 12 * time.Millisecond,
Expand All @@ -182,6 +202,7 @@ func TestCommitDetailsMerge(t *testing.T) {
b.Mu.CommitBackoffTypes = []string{"txnLock"}
b.Mu.SlowestPrewrite = ReqDetailInfo{ReqTotalTime: 8 * time.Millisecond, Region: 10, StoreAddr: "s10"}
b.Mu.CommitPrimary = ReqDetailInfo{ReqTotalTime: 6 * time.Millisecond, Region: 20, StoreAddr: "s20"}
b.Mu.WriteRUV2 = newWriteRUV2(10, 20, 30, 40, 50, 60, 70)

a.Merge(b)

Expand All @@ -206,6 +227,14 @@ func TestCommitDetailsMerge(t *testing.T) {
assert.Equal(t, 6*time.Millisecond, a.Mu.CommitPrimary.ReqTotalTime)
assert.Equal(t, uint64(20), a.Mu.CommitPrimary.Region)
assert.Equal(t, "s20", a.Mu.CommitPrimary.StoreAddr)
assert.NotNil(t, a.Mu.WriteRUV2)
assert.Equal(t, uint64(11), a.Mu.WriteRUV2.KvEngineCacheMiss)
assert.Equal(t, uint64(22), a.Mu.WriteRUV2.StorageProcessedKeysBatchGet)
assert.Equal(t, uint64(33), a.Mu.WriteRUV2.StorageProcessedKeysGet)
assert.Equal(t, uint64(44), a.Mu.WriteRUV2.ReadRpcCount)
assert.Equal(t, uint64(55), a.Mu.WriteRUV2.WriteRpcCount)
assert.Equal(t, uint64(66), a.Mu.WriteRUV2.ExecutorInputs.TikvCoprocessorExecutorWorkTotalBatchIndexScan)
assert.Equal(t, uint64(77), a.Mu.WriteRUV2.ExecutorInputs.TikvCoprocessorExecutorWorkTotalBatchTableScan)
}

func TestCommitDetailsMergeSlowestNotReplaced(t *testing.T) {
Expand Down Expand Up @@ -242,6 +271,7 @@ func TestCommitDetailsClone(t *testing.T) {
orig.Mu.CommitBackoffTypes = []string{"tikvRPC"}
orig.Mu.SlowestPrewrite = ReqDetailInfo{ReqTotalTime: 5 * time.Millisecond, Region: 1, StoreAddr: "s1"}
orig.Mu.CommitPrimary = ReqDetailInfo{ReqTotalTime: 3 * time.Millisecond, Region: 2, StoreAddr: "s2"}
orig.Mu.WriteRUV2 = newWriteRUV2(1, 2, 3, 4, 5, 6, 7)

cloned := orig.Clone()

Expand All @@ -261,6 +291,7 @@ func TestCommitDetailsClone(t *testing.T) {
assert.Equal(t, orig.Mu.CommitBackoffTypes, cloned.Mu.CommitBackoffTypes)
assert.Equal(t, orig.Mu.SlowestPrewrite, cloned.Mu.SlowestPrewrite)
assert.Equal(t, orig.Mu.CommitPrimary, cloned.Mu.CommitPrimary)
assert.Equal(t, orig.Mu.WriteRUV2, cloned.Mu.WriteRUV2)

// Verify deep copy: modifying cloned slices should not affect original
cloned.Mu.PrewriteBackoffTypes = append(cloned.Mu.PrewriteBackoffTypes, "extra")
Expand All @@ -271,6 +302,34 @@ func TestCommitDetailsClone(t *testing.T) {

cloned.GetCommitTsTime = 999 * time.Millisecond
assert.Equal(t, 10*time.Millisecond, orig.GetCommitTsTime)

cloned.Mu.WriteRUV2.KvEngineCacheMiss = 999
assert.Equal(t, uint64(1), orig.Mu.WriteRUV2.KvEngineCacheMiss)
}

func TestCommitDetailsMergeWriteRUV2(t *testing.T) {
commitDetails := &CommitDetails{}
prewriteDetails := newWriteExecDetailsV2(1, 2, 0, 0, 0, 4, 5)
commitDetails.MergePrewriteReqDetails(5*time.Millisecond, 10, "store-1", prewriteDetails)

commitDetails.MergePrewriteReqDetails(3*time.Millisecond, 11, "store-2", newWriteExecDetailsV2(10, 20, 0, 0, 0, 40, 50))
commitDetails.MergeCommitReqDetails(7*time.Millisecond, 12, "store-3", newWriteExecDetailsV2(100, 0, 200, 6, 0, 400, 500))

agg := commitDetails.GetWriteRUV2()
assert.NotNil(t, agg)
assert.Equal(t, uint64(111), agg.KvEngineCacheMiss)
assert.Equal(t, uint64(22), agg.StorageProcessedKeysBatchGet)
assert.Equal(t, uint64(200), agg.StorageProcessedKeysGet)
assert.Equal(t, uint64(6), agg.ReadRpcCount)
assert.Equal(t, uint64(3), agg.WriteRpcCount)
assert.Equal(t, uint64(444), agg.ExecutorInputs.TikvCoprocessorExecutorWorkTotalBatchIndexScan)
assert.Equal(t, uint64(555), agg.ExecutorInputs.TikvCoprocessorExecutorWorkTotalBatchTableScan)

assert.Equal(t, uint64(10), commitDetails.Mu.SlowestPrewrite.Region)
assert.Equal(t, uint64(12), commitDetails.Mu.CommitPrimary.Region)

agg.KvEngineCacheMiss = 999
assert.Equal(t, uint64(111), commitDetails.GetWriteRUV2().KvEngineCacheMiss)
}

func TestScanDetailMerge(t *testing.T) {
Expand Down
Loading