Skip to content

Commit 9d6bbb8

Browse files
authored
fix(tso): do not exit allocatorUpdater if the updating fails (tikv#9180)
close tikv#9179 Do not exit `allocatorUpdater` if the updating fails to prevent the TSO from stuck. Signed-off-by: JmPotato <github@ipotato.me>
1 parent 7b5d47d commit 9d6bbb8

File tree

4 files changed

+27
-21
lines changed

4 files changed

+27
-21
lines changed

pkg/tso/allocator.go

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,17 @@ import (
4343
"github.com/tikv/pd/pkg/utils/logutil"
4444
)
4545

46-
// GlobalDCLocation is the Global TSO Allocator's DC location label.
47-
// Deprecated: This is a legacy label, it should be removed in the future.
48-
const GlobalDCLocation = "global"
46+
const (
47+
// GlobalDCLocation is the Global TSO Allocator's DC location label.
48+
// Deprecated: This is a legacy label, it should be removed in the future.
49+
GlobalDCLocation = "global"
50+
// maxUpdateTSORetryCount is the max retry count for updating TSO.
51+
// When encountering a network partition, manually retrying may help the next request succeed with the new endpoint according to https://github.com/etcd-io/etcd/issues/8711
52+
maxUpdateTSORetryCount = 3
53+
// Etcd client retry with `roundRobinQuorumBackoff` (https://github.com/etcd-io/etcd/blob/d62cdeee4863001b09e772ed013eb1342a1d0f89/client/v3/client.go#L488),
54+
// whose default interval is 25ms, so we sleep 50ms here. (https://github.com/etcd-io/etcd/blob/d62cdeee4863001b09e772ed013eb1342a1d0f89/client/v3/options.go#L53)
55+
updateTSORetryInterval = 50 * time.Millisecond
56+
)
4957

5058
// ElectionMember defines the interface for the election related logic.
5159
type ElectionMember interface {
@@ -175,7 +183,8 @@ func (a *Allocator) allocatorUpdater() {
175183
if err := a.UpdateTSO(); err != nil {
176184
log.Warn("failed to update allocator's timestamp", append(a.logFields, errs.ZapError(err))...)
177185
a.Reset(true)
178-
return
186+
// To wait for the allocator to be re-initialized next time.
187+
continue
179188
}
180189
case <-a.ctx.Done():
181190
a.Reset(false)
@@ -185,9 +194,9 @@ func (a *Allocator) allocatorUpdater() {
185194
}
186195
}
187196

188-
// close is used to shutdown the primary election loop.
197+
// Close is used to close the allocator and shutdown all the daemon loops.
189198
// tso service call this function to shutdown the loop here, but pd manages its own loop.
190-
func (a *Allocator) close() {
199+
func (a *Allocator) Close() {
191200
log.Info("closing the allocator", a.logFields...)
192201
a.cancel()
193202
a.wg.Wait()
@@ -207,18 +216,14 @@ func (a *Allocator) IsInitialize() bool {
207216

208217
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
209218
func (a *Allocator) UpdateTSO() (err error) {
210-
// When meet network partition, we need to manually retry to update the tso,
211-
// next request succeeds with the new endpoint, according to https://github.com/etcd-io/etcd/issues/8711
212-
maxRetryCount := 3
213-
for range maxRetryCount {
219+
for i := range maxUpdateTSORetryCount {
214220
err = a.timestampOracle.updateTimestamp()
215221
if err == nil {
216222
return nil
217223
}
218-
log.Warn("try to update the tso but failed", errs.ZapError(err))
219-
// Etcd client retry with roundRobinQuorumBackoff https://github.com/etcd-io/etcd/blob/d62cdeee4863001b09e772ed013eb1342a1d0f89/client/v3/client.go#L488
220-
// And its default interval is 25ms, so we sleep 50ms here. https://github.com/etcd-io/etcd/blob/d62cdeee4863001b09e772ed013eb1342a1d0f89/client/v3/options.go#L53
221-
time.Sleep(50 * time.Millisecond)
224+
log.Warn("try to update the tso but failed",
225+
zap.Int("retry-count", i), zap.Duration("retry-interval", updateTSORetryInterval), errs.ZapError(err))
226+
time.Sleep(updateTSORetryInterval)
222227
}
223228
return
224229
}

pkg/tso/keyspace_group_manager.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func (s *state) deInitialize() {
113113
go func(allocator *Allocator) {
114114
defer logutil.LogPanic()
115115
defer wg.Done()
116-
allocator.close()
116+
allocator.Close()
117117
log.Info("keyspace group closed", zap.Uint32("keyspace-group-id", allocator.keyspaceGroupID))
118118
}(allocator)
119119
}
@@ -938,7 +938,7 @@ func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) {
938938

939939
allocator := kgm.allocators[groupID]
940940
if allocator != nil {
941-
allocator.close()
941+
allocator.Close()
942942
kgm.allocators[groupID] = nil
943943
}
944944

@@ -955,7 +955,7 @@ func (kgm *KeyspaceGroupManager) exitElectionMembership(group *endpoint.Keyspace
955955

956956
allocator := kgm.allocators[group.ID]
957957
if allocator != nil {
958-
allocator.close()
958+
allocator.Close()
959959
kgm.allocators[group.ID] = nil
960960
}
961961

server/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,9 @@ func (s *Server) Close() {
533533
if s.IsKeyspaceGroupEnabled() {
534534
s.keyspaceGroupManager.Close()
535535
}
536+
if s.tsoAllocator != nil {
537+
s.tsoAllocator.Close()
538+
}
536539

537540
if s.client != nil {
538541
if err := s.client.Close(); err != nil {

tests/integrations/tso/client_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ func TestMicroserviceTSOClientSuite(t *testing.T) {
9292

9393
func (suite *tsoClientTestSuite) SetupSuite() {
9494
re := suite.Require()
95+
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
9596

9697
var err error
9798
suite.ctx, suite.cancel = context.WithCancel(context.Background())
@@ -207,6 +208,7 @@ func (suite *tsoClientTestSuite) TearDownTest() {
207208
}
208209

209210
func (suite *tsoClientTestSuite) TearDownSuite() {
211+
failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval")
210212
suite.cancel()
211213
if !suite.legacy {
212214
suite.tsoCluster.Destroy()
@@ -379,10 +381,8 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() {
379381

380382
func (suite *tsoClientTestSuite) TestRandomResignLeader() {
381383
re := suite.Require()
382-
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
383384
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck", "return(true)"))
384385
defer func() {
385-
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval"))
386386
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck"))
387387
}()
388388

@@ -426,7 +426,6 @@ func (suite *tsoClientTestSuite) TestRandomResignLeader() {
426426

427427
func (suite *tsoClientTestSuite) TestRandomShutdown() {
428428
re := suite.Require()
429-
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
430429

431430
parallelAct := func() {
432431
// After https://github.com/tikv/pd/issues/6376 is fixed, we can use a smaller number here.
@@ -445,7 +444,6 @@ func (suite *tsoClientTestSuite) TestRandomShutdown() {
445444
mcs.CheckMultiKeyspacesTSO(suite.ctx, re, suite.clients, parallelAct)
446445
suite.TearDownSuite()
447446
suite.SetupSuite()
448-
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval"))
449447
}
450448

451449
func (suite *tsoClientTestSuite) TestGetTSWhileResettingTSOClient() {

0 commit comments

Comments
 (0)