Skip to content

Commit 110f73c

Browse files
ti-chi-botrleungx
andauthored
mcs: fix the compatibility issue of batch allocating IDs (#9137) (#9146)
close #9138 Signed-off-by: Ryan Leung <rleungx@gmail.com> Co-authored-by: Ryan Leung <rleungx@gmail.com>
1 parent 159fe3b commit 110f73c

File tree

4 files changed

+323
-17
lines changed

4 files changed

+323
-17
lines changed

pkg/mcs/scheduling/server/cluster.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,12 @@ func (c *Cluster) AllocID(count uint32) (uint64, uint32, error) {
247247
}
248248
ctx, cancel := context.WithTimeout(c.ctx, requestTimeout)
249249
defer cancel()
250-
resp, err := client.AllocID(ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: keypath.ClusterID()}, Count: count})
250+
req := &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: keypath.ClusterID()}, Count: count}
251+
252+
failpoint.Inject("allocIDNonBatch", func() {
253+
req = &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: keypath.ClusterID()}}
254+
})
255+
resp, err := client.AllocID(ctx, req)
251256
if err != nil {
252257
c.triggerMembershipCheck()
253258
return 0, 0, err

pkg/mcs/scheduling/server/grpc_service.go

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -288,28 +288,57 @@ func (s *Service) AskBatchSplit(_ context.Context, request *schedulingpb.AskBatc
288288
splitIDs := make([]*pdpb.SplitID, 0, splitCount)
289289
recordRegions := make([]uint64, 0, splitCount+1)
290290

291-
id, count, err := c.AllocID(splitCount * (1 + uint32(len(request.Region.Peers))))
291+
requestIDCount := splitCount * (1 + uint32(len(request.Region.Peers)))
292+
id, count, err := c.AllocID(requestIDCount)
292293
if err != nil {
293294
return nil, err
294295
}
295-
curID := id - uint64(count)
296-
for range splitCount {
297-
newRegionID := curID
298-
curID++
299-
300-
peerIDs := make([]uint64, len(request.Region.Peers))
301-
for j := 0; j < len(peerIDs); j++ {
302-
peerIDs[j] = curID
303-
curID++
296+
297+
// If the count is not equal to the requestIDCount, it means that the
298+
// PD doesn't support allocating IDs in batch. We need to allocate IDs
299+
// for each region.
300+
if requestIDCount != count {
301+
// use non batch way to split region
302+
for range splitCount {
303+
newRegionID, _, err := c.AllocID(1)
304+
if err != nil {
305+
return nil, err
306+
}
307+
peerIDs := make([]uint64, len(request.Region.Peers))
308+
for i := 0; i < len(peerIDs); i++ {
309+
peerIDs[i], _, err = c.AllocID(1)
310+
if err != nil {
311+
return nil, err
312+
}
313+
}
314+
recordRegions = append(recordRegions, newRegionID)
315+
splitIDs = append(splitIDs, &pdpb.SplitID{
316+
NewRegionId: newRegionID,
317+
NewPeerIds: peerIDs,
318+
})
319+
log.Info("alloc ids for region split", zap.Uint64("region-id", newRegionID), zap.Uint64s("peer-ids", peerIDs))
304320
}
321+
} else {
322+
// use batch way to split region
323+
curID := id - uint64(requestIDCount) + 1
324+
for range splitCount {
325+
newRegionID := curID
326+
curID++
305327

306-
recordRegions = append(recordRegions, newRegionID)
307-
splitIDs = append(splitIDs, &pdpb.SplitID{
308-
NewRegionId: newRegionID,
309-
NewPeerIds: peerIDs,
310-
})
328+
peerIDs := make([]uint64, len(request.Region.Peers))
329+
for j := 0; j < len(peerIDs); j++ {
330+
peerIDs[j] = curID
331+
curID++
332+
}
311333

312-
log.Info("alloc ids for region split", zap.Uint64("region-id", newRegionID), zap.Uint64s("peer-ids", peerIDs))
334+
recordRegions = append(recordRegions, newRegionID)
335+
splitIDs = append(splitIDs, &pdpb.SplitID{
336+
NewRegionId: newRegionID,
337+
NewPeerIds: peerIDs,
338+
})
339+
340+
log.Info("alloc ids for region split", zap.Uint64("region-id", newRegionID), zap.Uint64s("peer-ids", peerIDs))
341+
}
313342
}
314343

315344
recordRegions = append(recordRegions, reqRegion.GetId())

server/grpc_service.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,10 @@ func (s *GrpcServer) AllocID(ctx context.Context, request *pdpb.AllocIDRequest)
674674
if request.GetCount() != 0 {
675675
reqCount = request.GetCount()
676676
}
677+
failpoint.Inject("handleAllocIDNonBatch", func() {
678+
reqCount = 1
679+
})
680+
677681
// We can use an allocator for all types ID allocation.
678682
id, count, err := s.idAllocator.Alloc(reqCount)
679683
if err != nil {

tests/integrations/mcs/scheduling/server_test.go

Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"net/http"
2121
"reflect"
22+
"sync"
2223
"testing"
2324
"time"
2425

@@ -833,3 +834,270 @@ func (suite *serverTestSuite) TestBatchSplit() {
833834
suite.TearDownSuite()
834835
suite.SetupSuite()
835836
}
837+
838+
func (suite *serverTestSuite) TestBatchSplitCompatibility() {
839+
re := suite.Require()
840+
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember", `return(true)`))
841+
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
842+
re.NoError(err)
843+
defer tc.Destroy()
844+
tc.WaitForPrimaryServing(re)
845+
846+
rc := suite.pdLeader.GetServer().GetRaftCluster()
847+
re.NotNil(rc)
848+
s := &server.GrpcServer{Server: suite.pdLeader.GetServer()}
849+
for i := uint64(1); i <= 3; i++ {
850+
resp, err := s.PutStore(
851+
context.Background(), &pdpb.PutStoreRequest{
852+
Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()},
853+
Store: &metapb.Store{
854+
Id: i,
855+
Address: fmt.Sprintf("mock://%d", i),
856+
State: metapb.StoreState_Up,
857+
Version: "7.0.0",
858+
},
859+
},
860+
)
861+
re.NoError(err)
862+
re.Empty(resp.GetHeader().GetError())
863+
}
864+
grpcPDClient := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr())
865+
stream, err := grpcPDClient.RegionHeartbeat(suite.ctx)
866+
re.NoError(err)
867+
peers := []*metapb.Peer{
868+
{Id: 11, StoreId: 1},
869+
{Id: 22, StoreId: 2},
870+
{Id: 33, StoreId: 3},
871+
}
872+
873+
interval := &pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: 10}
874+
regionReq := &pdpb.RegionHeartbeatRequest{
875+
Header: testutil.NewRequestHeader(suite.pdLeader.GetClusterID()),
876+
Region: &metapb.Region{Id: 10, Peers: peers, StartKey: []byte("a"), EndKey: []byte("b")},
877+
Leader: peers[0],
878+
ApproximateSize: 30 * units.MiB,
879+
ApproximateKeys: 300,
880+
Interval: interval,
881+
Term: 1,
882+
CpuUsage: 100,
883+
}
884+
err = stream.Send(regionReq)
885+
re.NoError(err)
886+
testutil.Eventually(re, func() bool {
887+
region := tc.GetPrimaryServer().GetCluster().GetRegion(10)
888+
return region != nil && region.GetTerm() == 1 &&
889+
region.GetApproximateKeys() == 300 && region.GetApproximateSize() == 30 &&
890+
reflect.DeepEqual(region.GetLeader(), peers[0]) &&
891+
reflect.DeepEqual(region.GetInterval(), interval)
892+
})
893+
894+
req := &pdpb.AskBatchSplitRequest{
895+
Header: &pdpb.RequestHeader{
896+
ClusterId: suite.pdLeader.GetClusterID(),
897+
},
898+
Region: regionReq.GetRegion(),
899+
SplitCount: 10,
900+
}
901+
902+
// case 1: The scheduling server is upgraded first, and then the PD server is upgraded.
903+
re.NoError(failpoint.Enable("github.com/tikv/pd/server/handleAllocIDNonBatch", `return(true)`))
904+
resp, err := grpcPDClient.AskBatchSplit(suite.ctx, req)
905+
re.NoError(err)
906+
allocatedIDs := map[uint64]struct{}{}
907+
var maxID uint64
908+
maxID = checkAllocatedID(re, resp, allocatedIDs, maxID)
909+
re.Len(allocatedIDs, 40)
910+
// Use the batch AllocID, which means the PD has finished the upgrade.
911+
re.NoError(failpoint.Disable("github.com/tikv/pd/server/handleAllocIDNonBatch"))
912+
resp, err = grpcPDClient.AskBatchSplit(suite.ctx, req)
913+
re.NoError(err)
914+
maxID = checkAllocatedID(re, resp, allocatedIDs, maxID)
915+
re.Len(allocatedIDs, 80)
916+
917+
// case 2: The PD server is downgraded first.
918+
re.NoError(failpoint.Enable("github.com/tikv/pd/server/handleAllocIDNonBatch", `return(true)`))
919+
resp, err = grpcPDClient.AskBatchSplit(suite.ctx, req)
920+
re.NoError(err)
921+
maxID = checkAllocatedID(re, resp, allocatedIDs, maxID)
922+
re.Len(allocatedIDs, 120)
923+
re.NoError(failpoint.Disable("github.com/tikv/pd/server/handleAllocIDNonBatch"))
924+
// Use the batch AllocID, which means the scheduling server has finished the upgrade.
925+
resp, err = grpcPDClient.AskBatchSplit(suite.ctx, req)
926+
re.NoError(err)
927+
maxID = checkAllocatedID(re, resp, allocatedIDs, maxID)
928+
re.Len(allocatedIDs, 160)
929+
930+
// case 3: The PD server is upgraded first, and then the scheduling server is upgraded.
931+
re.NoError(failpoint.Enable("github.com/tikv/pd/mcs/scheduling/server/allocIDNonBatch", `return(true)`))
932+
resp, err = grpcPDClient.AskBatchSplit(suite.ctx, req)
933+
re.NoError(err)
934+
maxID = checkAllocatedID(re, resp, allocatedIDs, maxID)
935+
re.Len(allocatedIDs, 200)
936+
re.NoError(failpoint.Disable("github.com/tikv/pd/mcs/scheduling/server/allocIDNonBatch"))
937+
// Use the batch AllocID, which means the scheduling server has finished the upgrade.
938+
resp, err = grpcPDClient.AskBatchSplit(suite.ctx, req)
939+
re.NoError(err)
940+
maxID = checkAllocatedID(re, resp, allocatedIDs, maxID)
941+
re.Len(allocatedIDs, 240)
942+
943+
// case 4: The scheduling server is downgraded first.
944+
re.NoError(failpoint.Enable("github.com/tikv/pd/mcs/scheduling/server/allocIDNonBatch", `return(true)`))
945+
resp, err = grpcPDClient.AskBatchSplit(suite.ctx, req)
946+
re.NoError(err)
947+
maxID = checkAllocatedID(re, resp, allocatedIDs, maxID)
948+
re.Len(allocatedIDs, 280)
949+
re.NoError(failpoint.Disable("github.com/tikv/pd/mcs/scheduling/server/allocIDNonBatch"))
950+
// Use the batch AllocID, which means the scheduling server has finished the upgrade.
951+
resp, err = grpcPDClient.AskBatchSplit(suite.ctx, req)
952+
re.NoError(err)
953+
_ = checkAllocatedID(re, resp, allocatedIDs, maxID)
954+
re.Len(allocatedIDs, 320)
955+
956+
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember"))
957+
958+
suite.TearDownSuite()
959+
suite.SetupSuite()
960+
}
961+
962+
func checkAllocatedID(re *require.Assertions, resp *pdpb.AskBatchSplitResponse, allocatedIDs map[uint64]struct{}, maxID uint64) uint64 {
963+
re.Empty(resp.GetHeader().GetError())
964+
for _, id := range resp.GetIds() {
965+
_, ok := allocatedIDs[id.NewRegionId]
966+
re.False(ok)
967+
re.Greater(id.NewRegionId, maxID)
968+
maxID = id.NewRegionId
969+
allocatedIDs[id.NewRegionId] = struct{}{}
970+
for _, peer := range id.NewPeerIds {
971+
_, ok := allocatedIDs[peer]
972+
re.False(ok)
973+
re.Greater(peer, maxID)
974+
maxID = peer
975+
allocatedIDs[peer] = struct{}{}
976+
}
977+
}
978+
return maxID
979+
}
980+
981+
func (suite *serverTestSuite) TestConcurrentBatchSplit() {
982+
re := suite.Require()
983+
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember", `return(true)`))
984+
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
985+
re.NoError(err)
986+
defer tc.Destroy()
987+
tc.WaitForPrimaryServing(re)
988+
989+
rc := suite.pdLeader.GetServer().GetRaftCluster()
990+
re.NotNil(rc)
991+
s := &server.GrpcServer{Server: suite.pdLeader.GetServer()}
992+
for i := uint64(1); i <= 3; i++ {
993+
resp, err := s.PutStore(
994+
context.Background(), &pdpb.PutStoreRequest{
995+
Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()},
996+
Store: &metapb.Store{
997+
Id: i,
998+
Address: fmt.Sprintf("mock://%d", i),
999+
State: metapb.StoreState_Up,
1000+
Version: "7.0.0",
1001+
},
1002+
},
1003+
)
1004+
re.NoError(err)
1005+
re.Empty(resp.GetHeader().GetError())
1006+
}
1007+
grpcPDClient := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr())
1008+
stream, err := grpcPDClient.RegionHeartbeat(suite.ctx)
1009+
re.NoError(err)
1010+
peers := []*metapb.Peer{
1011+
{Id: 11, StoreId: 1},
1012+
{Id: 22, StoreId: 2},
1013+
{Id: 33, StoreId: 3},
1014+
}
1015+
1016+
interval := &pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: 10}
1017+
regionReq := &pdpb.RegionHeartbeatRequest{
1018+
Header: testutil.NewRequestHeader(suite.pdLeader.GetClusterID()),
1019+
Region: &metapb.Region{Id: 10, Peers: peers, StartKey: []byte("a"), EndKey: []byte("b")},
1020+
Leader: peers[0],
1021+
ApproximateSize: 30 * units.MiB,
1022+
ApproximateKeys: 300,
1023+
Interval: interval,
1024+
Term: 1,
1025+
CpuUsage: 100,
1026+
}
1027+
err = stream.Send(regionReq)
1028+
re.NoError(err)
1029+
testutil.Eventually(re, func() bool {
1030+
region := tc.GetPrimaryServer().GetCluster().GetRegion(10)
1031+
return region != nil && region.GetTerm() == 1 &&
1032+
region.GetApproximateKeys() == 300 && region.GetApproximateSize() == 30 &&
1033+
reflect.DeepEqual(region.GetLeader(), peers[0]) &&
1034+
reflect.DeepEqual(region.GetInterval(), interval)
1035+
})
1036+
1037+
req := &pdpb.AskBatchSplitRequest{
1038+
Header: &pdpb.RequestHeader{
1039+
ClusterId: suite.pdLeader.GetClusterID(),
1040+
},
1041+
Region: regionReq.GetRegion(),
1042+
SplitCount: 10,
1043+
}
1044+
1045+
// case 1: The scheduling server is upgraded first, PD server has not been upgraded.
1046+
re.NoError(failpoint.Enable("github.com/tikv/pd/server/handleAllocIDNonBatch", `return(true)`))
1047+
suite.checkConcurrentAllocatedID(re, req, grpcPDClient)
1048+
1049+
// case 2: The scheduling server is upgraded first, PD server has been upgraded.
1050+
re.NoError(failpoint.Disable("github.com/tikv/pd/server/handleAllocIDNonBatch"))
1051+
suite.checkConcurrentAllocatedID(re, req, grpcPDClient)
1052+
1053+
// case 3: The PD server is upgraded first, scheduling server has not been upgraded.
1054+
re.NoError(failpoint.Enable("github.com/tikv/pd/mcs/scheduling/server/allocIDNonBatch", `return(true)`))
1055+
suite.checkConcurrentAllocatedID(re, req, grpcPDClient)
1056+
1057+
// case 4: The PD server is upgraded first, scheduling server has been upgraded.
1058+
re.NoError(failpoint.Disable("github.com/tikv/pd/mcs/scheduling/server/allocIDNonBatch"))
1059+
suite.checkConcurrentAllocatedID(re, req, grpcPDClient)
1060+
1061+
// case 5: Both the PD server and scheduling server has not been upgraded.
1062+
re.NoError(failpoint.Enable("github.com/tikv/pd/server/handleAllocIDNonBatch", `return(true)`))
1063+
re.NoError(failpoint.Enable("github.com/tikv/pd/mcs/scheduling/server/allocIDNonBatch", `return(true)`))
1064+
suite.checkConcurrentAllocatedID(re, req, grpcPDClient)
1065+
1066+
re.NoError(failpoint.Disable("github.com/tikv/pd/server/handleAllocIDNonBatch"))
1067+
re.NoError(failpoint.Disable("github.com/tikv/pd/mcs/scheduling/server/allocIDNonBatch"))
1068+
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember"))
1069+
1070+
suite.TearDownSuite()
1071+
suite.SetupSuite()
1072+
}
1073+
1074+
func (suite *serverTestSuite) checkConcurrentAllocatedID(re *require.Assertions, req *pdpb.AskBatchSplitRequest, grpcPDClient pdpb.PDClient) {
1075+
var wg sync.WaitGroup
1076+
var allocatedIDs sync.Map
1077+
for range 100 {
1078+
wg.Add(1)
1079+
go func() {
1080+
defer wg.Done()
1081+
resp, err := grpcPDClient.AskBatchSplit(suite.ctx, req)
1082+
re.NoError(err)
1083+
re.Empty(resp.GetHeader().GetError())
1084+
for _, id := range resp.GetIds() {
1085+
_, ok := allocatedIDs.Load(id.NewRegionId)
1086+
re.False(ok)
1087+
allocatedIDs.Store(id.NewRegionId, struct{}{})
1088+
for _, peer := range id.NewPeerIds {
1089+
_, ok := allocatedIDs.Load(peer)
1090+
re.False(ok)
1091+
allocatedIDs.Store(peer, struct{}{})
1092+
}
1093+
}
1094+
}()
1095+
}
1096+
wg.Wait()
1097+
var len int
1098+
allocatedIDs.Range(func(_, _ any) bool {
1099+
len++
1100+
return true
1101+
})
1102+
re.Equal(4000, len)
1103+
}

0 commit comments

Comments
 (0)