Skip to content

Commit cec12ec

Browse files
authored
Merge branch 'master' into optimize_test_process_in_case_of_cancellation
2 parents 75734c5 + c1eea39 commit cec12ec

10 files changed

Lines changed: 317 additions & 68 deletions

File tree

.github/workflows/docker-tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ jobs:
6565
- 'GODOG_FEATURE=offline_mode.feature make test'
6666
- 'GODOG_FEATURE=priority.feature make test'
6767
- 'GODOG_FEATURE=readonly_filesystem.feature make test'
68-
- 'GODOG_FEATURE=recovery.feature make test'
68+
- 'GODOG_FEATURE=recovery.57.feature make test'
6969
- 'GODOG_FEATURE=repair.feature make test'
7070
- 'GODOG_FEATURE=repl_mon.feature make test'
7171
- 'GODOG_FEATURE=statefile.feature make test'

internal/app/app.go

Lines changed: 85 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
nodestate "github.com/yandex/mysync/internal/app/node_state"
2222
"github.com/yandex/mysync/internal/app/optimization"
23+
"github.com/yandex/mysync/internal/app/resetup"
2324
"github.com/yandex/mysync/internal/config"
2425
"github.com/yandex/mysync/internal/dcs"
2526
"github.com/yandex/mysync/internal/log"
@@ -46,6 +47,7 @@ type App struct {
4647
switchHelper mysql.ISwitchHelper
4748
lostQuorumTime time.Time
4849
replicationOptimizer optimization.ReplicationOpitimizer
50+
lagResetupper *resetup.LagResetupper
4951
}
5052

5153
// NewApp returns new App. Suddenly.
@@ -88,6 +90,7 @@ func NewApp(configFile, logLevel string, interactive bool) (*App, error) {
8890
logger,
8991
config.OptimizationConfig,
9092
)
93+
9194
switchHelper := mysql.NewSwitchHelper(config)
9295
app := &App{
9396
state: stateFirstRun,
@@ -101,6 +104,10 @@ func NewApp(configFile, logLevel string, interactive bool) (*App, error) {
101104
switchHelper: switchHelper,
102105
replicationOptimizer: replicationOptimizer,
103106
}
107+
108+
// TODO: appDcs should be separate entity
109+
app.lagResetupper = resetup.NewLagResetupper(logger, app, config.ResetupHostLag.Seconds())
110+
104111
logger.Info("app created")
105112
return app, nil
106113
}
@@ -159,9 +166,9 @@ func (app *App) writeEmergeFile(msg string) {
159166
}
160167
}
161168

162-
func (app *App) writeResetupFile(msg string) {
169+
func (app *App) writeResetupFile() {
163170
app.logger.Warn("touch resetup file")
164-
err := os.WriteFile(app.config.Resetupfile, []byte(msg), 0644)
171+
err := os.WriteFile(app.config.Resetupfile, []byte{}, 0644)
165172
if err != nil {
166173
app.logger.Errorf("failed to write resetup file: %v", err)
167174
}
@@ -1581,20 +1588,20 @@ func (app *App) repairOfflineMode(clusterState map[string]*nodestate.NodeState,
15811588
if !state.PingOk {
15821589
continue
15831590
}
1584-
node := app.cluster.Get(host)
15851591
if host == master {
1586-
app.repairMasterOfflineMode(host, node, state)
1592+
app.repairMasterOfflineMode(host, state)
15871593
} else {
1588-
app.repairSlaveOfflineMode(host, node, state, masterNode, clusterState[master])
1594+
app.repairSlaveOfflineMode(host, state, masterNode, clusterState[master])
15891595
}
15901596
}
15911597
}
15921598

1593-
func (app *App) repairMasterOfflineMode(host string, node *mysql.Node, state *nodestate.NodeState) {
1599+
func (app *App) repairMasterOfflineMode(host string, state *nodestate.NodeState) {
15941600
if state.IsOffline {
15951601
if app.IsRecoveryNeeded(host) {
15961602
return
15971603
}
1604+
node := app.cluster.Get(host)
15981605
err := node.SetOnline()
15991606
if err != nil {
16001607
app.logger.Errorf("repair: failed to set master %s online: %s", host, err)
@@ -1604,73 +1611,86 @@ func (app *App) repairMasterOfflineMode(host string, node *mysql.Node, state *no
16041611
}
16051612
}
16061613

1607-
func (app *App) repairSlaveOfflineMode(host string, node *mysql.Node, state *nodestate.NodeState, masterNode *mysql.Node, masterState *nodestate.NodeState) {
1608-
if state.SlaveState != nil && state.SlaveState.ReplicationLag != nil {
1609-
replPermBroken, _ := state.IsReplicationPermanentlyBroken()
1610-
if state.IsOffline && *state.SlaveState.ReplicationLag <= app.config.OfflineModeDisableLag.Seconds() {
1611-
if replPermBroken {
1612-
app.logger.Infof("repair: replica %s is permanently broken, won't set online", host)
1613-
return
1614-
}
1615-
resetupStatus, err := app.GetResetupStatus(host)
1616-
if err != nil {
1617-
app.logger.Errorf("repair: failed to get resetup status from host %s: %v", host, err)
1618-
return
1619-
}
1620-
startupTime, err := node.GetStartupTime()
1621-
if err != nil {
1622-
app.logger.Errorf("repair: failed to get mysql startup time from host %s: %v", host, err)
1623-
return
1624-
}
1625-
if resetupStatus.Status || resetupStatus.UpdateTime.Before(startupTime) {
1626-
app.logger.Errorf("repair: should not turn slave to online until get actual resetup status")
1627-
return
1628-
}
1629-
err = node.SetDefaultReplicationSettings(masterNode)
1630-
if err != nil {
1631-
app.logger.Errorf("repair: failed to set default replication settings on slave %s: %s", host, err)
1632-
}
1633-
err = node.SetOnline()
1634-
if err != nil {
1635-
app.logger.Errorf("repair: failed to set slave %s online: %s", host, err)
1636-
} else {
1637-
app.logger.Infof("repair: slave %s set online, because ReplicationLag (%f s) <= OfflineModeDisableLag (%v)",
1638-
host, *state.SlaveState.ReplicationLag, app.config.OfflineModeDisableLag)
1639-
}
1614+
// nolint: gocyclo
1615+
func (app *App) repairSlaveOfflineMode(host string, state *nodestate.NodeState, masterNode *mysql.Node, masterState *nodestate.NodeState) {
1616+
if state.SlaveState == nil || state.SlaveState.ReplicationLag == nil {
1617+
return
1618+
}
1619+
1620+
replPermBroken, _ := state.IsReplicationPermanentlyBroken()
1621+
node := app.cluster.Get(host)
1622+
// offline => online, if lag has decreased
1623+
if state.IsOffline && *state.SlaveState.ReplicationLag <= app.config.OfflineModeDisableLag.Seconds() {
1624+
if replPermBroken {
1625+
app.logger.Infof("repair: replica %s is permanently broken, won't set online", host)
1626+
return
16401627
}
1641-
if !state.IsOffline && !masterState.IsReadOnly && *state.SlaveState.ReplicationLag > app.config.OfflineModeEnableLag.Seconds() {
1642-
err := node.SetOffline()
1643-
if err != nil {
1644-
app.logger.Errorf("repair: failed to set slave %s offline: %s", host, err)
1645-
} else {
1646-
app.logger.Infof("repair: slave %s set offline, because ReplicationLag (%f s) >= OfflineModeEnableLag (%v)",
1647-
host, *state.SlaveState.ReplicationLag, app.config.OfflineModeEnableLag)
1648-
err = app.replicationOptimizer.EnableNodeOptimization(node)
1649-
if err != nil {
1650-
app.logger.Errorf("repair: failed to set optimize replication settings on slave %s: %s", host, err)
1651-
}
1652-
}
1628+
resetupStatus, err := app.GetResetupStatus(host)
1629+
if err != nil {
1630+
app.logger.Errorf("repair: failed to get resetup status from host %s: %v", host, err)
1631+
return
16531632
}
1654-
// gradual transfer of permanently broken nodes to offline
1655-
lastShutdownNodeTime, err := app.GetLastShutdownNodeTime()
1633+
startupTime, err := node.GetStartupTime()
16561634
if err != nil {
1657-
app.logger.Errorf("repair: failed to get last shutdown node time: %s", err)
1635+
app.logger.Errorf("repair: failed to get mysql startup time from host %s: %v", host, err)
16581636
return
16591637
}
1660-
setOfflineIsPossible := time.Since(lastShutdownNodeTime) > app.config.OfflineModeEnableInterval
1661-
if !state.IsOffline && replPermBroken && setOfflineIsPossible {
1662-
err = app.UpdateLastShutdownNodeTime()
1663-
if err != nil {
1664-
app.logger.Errorf("repair: failed to update last shutdown node time: %s", err)
1665-
}
1666-
err = node.SetOffline()
1638+
if resetupStatus.Status || resetupStatus.UpdateTime.Before(startupTime) {
1639+
app.logger.Errorf("repair: should not turn slave to online until get actual resetup status")
1640+
return
1641+
}
1642+
err = node.SetDefaultReplicationSettings(masterNode)
1643+
if err != nil {
1644+
app.logger.Errorf("repair: failed to set default replication settings on slave %s: %s", host, err)
1645+
}
1646+
err = node.SetOnline()
1647+
if err != nil {
1648+
app.logger.Errorf("repair: failed to set slave %s online: %s", host, err)
1649+
} else {
1650+
app.logger.Infof("repair: slave %s set online, because ReplicationLag (%f s) <= OfflineModeDisableLag (%v)",
1651+
host, *state.SlaveState.ReplicationLag, app.config.OfflineModeDisableLag)
1652+
}
1653+
return
1654+
}
1655+
// online => offline, if lag has increased
1656+
if !state.IsOffline && !masterState.IsReadOnly && *state.SlaveState.ReplicationLag > app.config.OfflineModeEnableLag.Seconds() {
1657+
err := node.SetOffline()
1658+
if err != nil {
1659+
app.logger.Errorf("repair: failed to set slave %s offline: %s", host, err)
1660+
} else {
1661+
app.logger.Infof("repair: slave %s set offline, because ReplicationLag (%f s) >= OfflineModeEnableLag (%v)",
1662+
host, *state.SlaveState.ReplicationLag, app.config.OfflineModeEnableLag)
1663+
err = app.replicationOptimizer.EnableNodeOptimization(node)
16671664
if err != nil {
1668-
app.logger.Errorf("repair: failed to set slave %s offline: %s", host, err)
1669-
} else {
1670-
app.logger.Infof("repair: slave %s set offline, because replication permanently broken", host)
1665+
app.logger.Errorf("repair: failed to set optimize replication settings on slave %s: %s", host, err)
16711666
}
16721667
}
16731668
}
1669+
1670+
// nothing to do here
1671+
if !replPermBroken {
1672+
return
1673+
}
1674+
1675+
// gradual transfer of permanently broken nodes to offline
1676+
lastShutdownNodeTime, err := app.GetLastShutdownNodeTime()
1677+
if err != nil {
1678+
app.logger.Errorf("repair: failed to get last shutdown node time: %s", err)
1679+
return
1680+
}
1681+
setOfflineIsPossible := time.Since(lastShutdownNodeTime) > app.config.OfflineModeEnableInterval
1682+
if !state.IsOffline && replPermBroken && setOfflineIsPossible {
1683+
err = app.UpdateLastShutdownNodeTime()
1684+
if err != nil {
1685+
app.logger.Errorf("repair: failed to update last shutdown node time: %s", err)
1686+
}
1687+
err = node.SetOffline()
1688+
if err != nil {
1689+
app.logger.Errorf("repair: failed to set slave %s offline: %s", host, err)
1690+
} else {
1691+
app.logger.Infof("repair: slave %s set offline, because replication permanently broken", host)
1692+
}
1693+
}
16741694
}
16751695

16761696
// nolint: gocyclo
@@ -2485,6 +2505,7 @@ func (app *App) Run() int {
24852505

24862506
go app.healthChecker(ctx)
24872507
go app.recoveryChecker(ctx)
2508+
go app.replicationLagChecker(ctx)
24882509
go app.stateFileHandler(ctx)
24892510
if app.config.ExternalReplicationType != util.Disabled {
24902511
go app.externalCAFileChecker(ctx)

internal/app/recovery.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func (app *App) checkRecovery() {
8989
}
9090

9191
app.logger.Infof("recovery: new master is found %s, and current node is stuck for more than %v. Writing resetup file", master, StuckWaitTime)
92-
app.writeResetupFile("")
92+
app.writeResetupFile()
9393
app.t.Clean(MasterStuckAt, localNode.Host())
9494
return
9595
}
@@ -110,7 +110,7 @@ func (app *App) checkRecovery() {
110110
app.logger.Errorf("recovery: error getting replica status: %v", err)
111111
}
112112
app.logger.Errorf("recovery: local node %s is NOT behind the master %s, need RESETUP", localNode.Host(), masterNode)
113-
app.writeResetupFile("")
113+
app.writeResetupFile()
114114
} else {
115115
readOnly, _, err := localNode.IsReadOnly()
116116
if err != nil {
@@ -164,5 +164,5 @@ func (app *App) checkCrashRecovery() {
164164
return
165165
}
166166
app.logger.Errorf("recovery: local node %s is running after crash recovery %v, need RESETUP", localNode.Host(), ds.RecoveryTime)
167-
app.writeResetupFile("")
167+
app.writeResetupFile()
168168
}

internal/app/replication.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,28 @@ type ReplicationRepairState struct {
2525
LastGTIDExecuted string
2626
}
2727

28+
// separated gorutine for checking local mysql lag
29+
func (app *App) replicationLagChecker(ctx context.Context) {
30+
// 30s
31+
// TODO: we should use 2 tikers - fast (recovery, ticker, etc) and slow - lag check
32+
ticker := time.NewTicker(6 * app.config.RecoveryCheckInterval)
33+
for {
34+
select {
35+
case <-ticker.C:
36+
if app.doesResetupFileExist() {
37+
app.logger.Infof("lag check: resetup file exists, waiting for resetup to complete")
38+
return
39+
}
40+
41+
if app.lagResetupper.CheckNeedResetup(app.cluster) {
42+
app.writeResetupFile()
43+
}
44+
case <-ctx.Done():
45+
return
46+
}
47+
}
48+
}
49+
2850
func (app *App) MarkReplicationRunning(node *mysql.Node, channel string) {
2951
var replState *ReplicationRepairState
3052
key := app.makeReplStateKey(node, channel)
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package resetup
2+
3+
import (
4+
"github.com/yandex/mysync/internal/log"
5+
"github.com/yandex/mysync/internal/mysql"
6+
)
7+
8+
const (
9+
prefix = "lag check"
10+
)
11+
12+
type LagResetupperDcs interface {
13+
GetMasterHostFromDcs() (string, error)
14+
}
15+
16+
type LagResetupper struct {
17+
logger *log.Logger
18+
dcs LagResetupperDcs
19+
resetupHostLag float64
20+
}
21+
22+
func NewLagResetupper(l *log.Logger, dcs LagResetupperDcs, lag float64) *LagResetupper {
23+
return &LagResetupper{
24+
logger: l,
25+
dcs: dcs,
26+
resetupHostLag: lag,
27+
}
28+
}
29+
30+
func (r *LagResetupper) CheckNeedResetup(cluster *mysql.Cluster) bool {
31+
err := cluster.UpdateHostsInfo()
32+
if err != nil {
33+
r.logger.Errorf("%s: failed to update hosts info: %v", prefix, err)
34+
return false
35+
}
36+
localNode := cluster.Local()
37+
sstatus, err := localNode.GetReplicaStatus()
38+
if err != nil {
39+
r.logger.Errorf("%s: host %s failed to get slave status %v", prefix, localNode.Host(), err)
40+
return false
41+
}
42+
43+
// definitely not a replica
44+
if sstatus == nil {
45+
return false
46+
}
47+
48+
var master string
49+
master, err = r.dcs.GetMasterHostFromDcs()
50+
if err != nil {
51+
r.logger.Errorf("%s: failed to get current master from dcs: %v", prefix, err)
52+
return false
53+
}
54+
55+
// probably not a replica, we need to wait
56+
if master == localNode.Host() {
57+
return false
58+
}
59+
60+
lag := sstatus.GetReplicationLag()
61+
// We have another problems, or small lag
62+
if !lag.Valid || lag.Float64 <= r.resetupHostLag {
63+
return false
64+
}
65+
66+
// so, we clearly have replica with large lag
67+
ifOffline, err := localNode.IsOffline()
68+
if err != nil {
69+
r.logger.Errorf("%s: failed to check local node if offline: %v", prefix, err)
70+
return false
71+
}
72+
73+
masterNode := cluster.Get(master)
74+
masterRO, _, err := masterNode.IsReadOnly()
75+
if err != nil {
76+
r.logger.Errorf("%s: failed to check master (%s) if RO: %v", prefix, master, err)
77+
return false
78+
}
79+
80+
if ifOffline && !masterRO {
81+
r.logger.Infof("%s: local host set to resetup, because ReplicationLag (%f s) > ResetupHostLag (%v)",
82+
prefix, lag.Float64, r.resetupHostLag)
83+
84+
return true
85+
}
86+
87+
return false
88+
}

internal/config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ type Config struct {
7676
OfflineModeEnableInterval time.Duration `config:"offline_mode_enable_interval" yaml:"offline_mode_enable_interval"`
7777
OfflineModeEnableLag time.Duration `config:"offline_mode_enable_lag" yaml:"offline_mode_enable_lag"`
7878
OfflineModeDisableLag time.Duration `config:"offline_mode_disable_lag" yaml:"offline_mode_disable_lag"`
79+
ResetupHostLag time.Duration `config:"resetup_host_lag" yaml:"resetup_host_lag"`
7980
DisableSetReadonlyOnLost bool `config:"disable_set_readonly_on_lost" yaml:"disable_set_readonly_on_lost"`
8081
ResetupCrashedHosts bool `config:"resetup_crashed_hosts" yaml:"resetup_crashed_hosts"`
8182
StreamFromReasonableLag time.Duration `config:"stream_from_reasonable_lag" yaml:"stream_from_reasonable_lag"`
@@ -175,6 +176,7 @@ func DefaultConfig() (Config, error) {
175176
OfflineModeEnableInterval: 15 * time.Minute,
176177
OfflineModeEnableLag: 24 * time.Hour,
177178
OfflineModeDisableLag: 30 * time.Second,
179+
ResetupHostLag: 25 * time.Hour,
178180
StreamFromReasonableLag: 5 * time.Minute,
179181
PriorityChoiceMaxLag: 60 * time.Second,
180182
TestDiskUsageFile: "", // fake disk usage, only for docker tests

0 commit comments

Comments
 (0)