Skip to content

Commit 9648b2f

Browse files
committed
Add enforce replicas in idempotent
Add enforce replication parallel mode Fixing email format
1 parent d829751 commit 9648b2f

File tree

12 files changed

+292
-8
lines changed

12 files changed

+292
-8
lines changed

cluster/cluster_chk.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ func (cluster *Cluster) CheckAlert(state state.State) {
481481

482482
if strings.Contains(cluster.Conf.MonitoringAlertTrigger, state.ErrKey) {
483483
a := alert.Alert{
484-
State: state.ErrKey,
484+
State: state.ErrDesc,
485485
Cluster: cluster.Name,
486486
}
487487

cluster/cluster_tgl.go

+58-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@
66

77
package cluster
88

9-
import "os"
9+
import (
10+
"os"
11+
"strings"
12+
)
1013

1114
func (cluster *Cluster) SwitchForceSlaveNoGtid() {
1215
cluster.Conf.ForceSlaveNoGtid = !cluster.Conf.ForceSlaveNoGtid
@@ -48,6 +51,60 @@ func (cluster *Cluster) SwitchForceSlaveGtidStrict() {
4851
cluster.Conf.ForceSlaveGtidStrict = !cluster.Conf.ForceSlaveGtidStrict
4952
}
5053

54+
func (cluster *Cluster) SwitchForceSlaveModeStrict() {
55+
cluster.Conf.ForceSlaveStrict = !cluster.Conf.ForceSlaveStrict
56+
if cluster.Conf.ForceSlaveStrict == true {
57+
cluster.Conf.ForceSlaveIdempotent = !cluster.Conf.ForceSlaveStrict
58+
}
59+
}
60+
61+
func (cluster *Cluster) SwitchForceSlaveModeIdempotent() {
62+
cluster.Conf.ForceSlaveIdempotent = !cluster.Conf.ForceSlaveIdempotent
63+
if cluster.Conf.ForceSlaveIdempotent == true {
64+
cluster.Conf.ForceSlaveStrict = !cluster.Conf.ForceSlaveIdempotent
65+
}
66+
}
67+
68+
func (cluster *Cluster) SwitchForceSlaveParallelModeSerialized() {
69+
if strings.ToUpper(cluster.Conf.ForceSlaveParallelMode) != "SERIALIZED" {
70+
cluster.Conf.ForceSlaveParallelMode = "SERIALIZED"
71+
} else {
72+
cluster.Conf.ForceSlaveParallelMode = ""
73+
}
74+
}
75+
76+
func (cluster *Cluster) SwitchForceSlaveParallelModeMinimal() {
77+
if strings.ToUpper(cluster.Conf.ForceSlaveParallelMode) != "MINIMAL" {
78+
cluster.Conf.ForceSlaveParallelMode = "MINIMAL"
79+
} else {
80+
cluster.Conf.ForceSlaveParallelMode = ""
81+
}
82+
}
83+
84+
func (cluster *Cluster) SwitchForceSlaveParallelModeConservative() {
85+
if strings.ToUpper(cluster.Conf.ForceSlaveParallelMode) != "CONSERVATIVE" {
86+
cluster.Conf.ForceSlaveParallelMode = "CONSERVATIVE"
87+
} else {
88+
cluster.Conf.ForceSlaveParallelMode = ""
89+
}
90+
}
91+
92+
func (cluster *Cluster) SwitchForceSlaveParallelModeOptimistic() {
93+
if strings.ToUpper(cluster.Conf.ForceSlaveParallelMode) != "OPTIMISTIC" {
94+
cluster.Conf.ForceSlaveParallelMode = "OPTIMISTIC"
95+
} else {
96+
cluster.Conf.ForceSlaveParallelMode = ""
97+
}
98+
}
99+
100+
func (cluster *Cluster) SwitchForceSlaveParallelModeAggressive() {
101+
if strings.ToUpper(cluster.Conf.ForceSlaveParallelMode) != "AGGRESSIVE" {
102+
cluster.Conf.ForceSlaveParallelMode = "AGGRESSIVE"
103+
} else {
104+
cluster.Conf.ForceSlaveParallelMode = ""
105+
}
106+
}
107+
51108
func (cluster *Cluster) SwitchForceBinlogCompress() {
52109
cluster.Conf.ForceBinlogCompress = !cluster.Conf.ForceBinlogCompress
53110
}

cluster/error.go

+2
Original file line numberDiff line numberDiff line change
@@ -159,4 +159,6 @@ var clusterError = map[string]string{
159159
"WARN0100": "No space left on device pn %s",
160160
"WARN0101": "Cluster does not have backup",
161161
"WARN0102": "The config file must be merge because an immutable parameter has been changed. Use the config-merge command to save your changes.",
162+
"WARN0103": "Enforce replication mode idempotent but strict on server %s",
163+
"WARN0104": "Enforce replication mode strict but idempotent on server %s",
162164
}

cluster/srv.go

+15
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,12 @@ type ServerMonitor struct {
106106
HaveWsrep bool `json:"haveWsrep"`
107107
HaveReadOnly bool `json:"haveReadOnly"`
108108
HaveNoMasterOnStart bool `json:"haveNoMasterOnStart"`
109+
HaveSlaveIdempotent bool `json:"haveSlaveIdempotent"`
110+
HaveSlaveOptimistic bool `json:"haveSlaveOptimistic "`
111+
HaveSlaveSerialized bool `json:"haveSlaveSerialized"`
112+
HaveSlaveAggressive bool `json:"haveSlaveAggressive"`
113+
HaveSlaveMinimal bool `json:"haveSlaveMinimal"`
114+
HaveSlaveConservative bool `json:"haveSlaveConservative"`
109115
IsWsrepSync bool `json:"isWsrepSync"`
110116
IsWsrepDonor bool `json:"isWsrepDonor"`
111117
IsWsrepPrimary bool `json:"isWsrepPrimary"`
@@ -648,6 +654,12 @@ func (server *ServerMonitor) Refresh() error {
648654
server.LogOutput = server.Variables["LOG_OUTPUT"]
649655
server.SlowQueryLog = server.Variables["SLOW_QUERY_LOG"]
650656
server.HaveReadOnly = server.HasReadOnly()
657+
server.HaveSlaveIdempotent = server.HasSlaveIndempotent()
658+
server.HaveSlaveOptimistic = server.HasSlaveParallelOptimistic()
659+
server.HaveSlaveSerialized = server.HasSlaveParallelSerialized()
660+
server.HaveSlaveAggressive = server.HasSlaveParallelAggressive()
661+
server.HaveSlaveMinimal = server.HasSlaveParallelMinimal()
662+
server.HaveSlaveConservative = server.HasSlaveParallelConservative()
651663
server.HaveBinlog = server.HasBinlog()
652664
server.HaveBinlogRow = server.HasBinlogRow()
653665
server.HaveBinlogAnnotate = server.HasBinlogRowAnnotate()
@@ -1054,6 +1066,9 @@ func (server *ServerMonitor) writeState() error {
10541066
if err != nil {
10551067
return err
10561068
}
1069+
if server.GTIDBinlogPos == nil {
1070+
return errors.New("No GTID Binlog Position")
1071+
}
10571072
_, err = f.WriteString(server.GTIDBinlogPos.Sprint())
10581073
if err != nil {
10591074
return err

cluster/srv_chk.go

+35-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ package cluster
1313
import (
1414
"fmt"
1515
"strconv"
16+
"strings"
1617

1718
"github.com/signal18/replication-manager/utils/dbhelper"
1819
"github.com/signal18/replication-manager/utils/misc"
@@ -182,13 +183,45 @@ func (server *ServerMonitor) CheckSlaveSettings() {
182183
} else if sl.IsIgnored() == false && sl.GetReplicationUsingGtid() == "No" && server.ClusterGroup.GetTopology() != topoMultiMasterWsrep && server.IsMariaDB() {
183184
server.ClusterGroup.StateMachine.AddState("WARN0051", state.State{ErrType: LvlWarn, ErrDesc: fmt.Sprintf(clusterError["WARN0051"], sl.URL), ErrFrom: "TOPO", ServerUrl: sl.URL})
184185
}
185-
if server.ClusterGroup.Conf.ForceSlaveGtidStrict && sl.IsReplicationUsingGtidStrict() == false && server.ClusterGroup.GetTopology() != topoMultiMasterWsrep && server.IsMariaDB() {
186+
if server.ClusterGroup.Conf.ForceSlaveGtidStrict && !sl.IsReplicationUsingGtidStrict() && server.ClusterGroup.GetTopology() != topoMultiMasterWsrep && server.IsMariaDB() {
186187
dbhelper.SetSlaveGTIDModeStrict(sl.Conn, server.DBVersion)
187188
server.ClusterGroup.LogPrintf("INFO", "Enforce GTID strict mode on slave %s", sl.URL)
188-
} else if sl.IsIgnored() == false && sl.IsReplicationUsingGtidStrict() == false && server.ClusterGroup.GetTopology() != topoMultiMasterWsrep && server.IsMariaDB() {
189+
} else if !sl.IsIgnored() && !sl.IsReplicationUsingGtidStrict() && server.ClusterGroup.GetTopology() != topoMultiMasterWsrep && server.IsMariaDB() {
189190
server.ClusterGroup.StateMachine.AddState("WARN0058", state.State{ErrType: LvlWarn, ErrDesc: fmt.Sprintf(clusterError["WARN0058"], sl.URL), ErrFrom: "TOPO", ServerUrl: sl.URL})
190191
}
191192

193+
if server.ClusterGroup.Conf.ForceSlaveIdempotent && !sl.HaveSlaveIdempotent && server.ClusterGroup.GetTopology() != topoMultiMasterWsrep && server.IsMariaDB() {
194+
dbhelper.SetSlaveExecMode(sl.Conn, "IDEMPOTENT", server.ClusterGroup.Conf.MasterConn, server.DBVersion)
195+
server.ClusterGroup.LogPrintf("INFO", "Enforce replication mode idempotent on slave %s", sl.URL)
196+
} /* else if !sl.IsIgnored() && server.ClusterGroup.Conf.ForceSlaveIdempotent && sl.HaveSlaveIdempotent && server.ClusterGroup.GetTopology() != topoMultiMasterWsrep && server.IsMariaDB() {
197+
server.ClusterGroup.StateMachine.AddState("WARN0103", state.State{ErrType: LvlWarn, ErrDesc: fmt.Sprintf(clusterError["WARN0103"], sl.URL), ErrFrom: "TOPO", ServerUrl: sl.URL})
198+
}*/
199+
if server.ClusterGroup.Conf.ForceSlaveStrict && sl.HaveSlaveIdempotent && server.ClusterGroup.GetTopology() != topoMultiMasterWsrep && server.IsMariaDB() {
200+
dbhelper.SetSlaveExecMode(sl.Conn, "STRICT", server.ClusterGroup.Conf.MasterConn, server.DBVersion)
201+
server.ClusterGroup.LogPrintf("INFO", "Enforce replication mode strict on slave %s", sl.URL)
202+
} /*else if !sl.IsIgnored() && server.ClusterGroup.Conf.ForceSlaveStrict && && server.ClusterGroup.GetTopology() != topoMultiMasterWsrep && server.IsMariaDB() {
203+
server.ClusterGroup.StateMachine.AddState("WARN0104", state.State{ErrType: LvlWarn, ErrDesc: fmt.Sprintf(clusterError["WARN0103"], sl.URL), ErrFrom: "TOPO", ServerUrl: sl.URL})
204+
} */
205+
if strings.ToUpper(server.ClusterGroup.Conf.ForceSlaveParallelMode) == "OPTIMISTIC" && !sl.HaveSlaveOptimistic && server.ClusterGroup.GetTopology() != topoMultiMasterWsrep && server.IsMariaDB() {
206+
dbhelper.SetSlaveParallelMode(sl.Conn, "OPTIMISTIC", server.ClusterGroup.Conf.MasterConn, server.DBVersion)
207+
server.ClusterGroup.LogPrintf("INFO", "Enforce replication parallel mode optimistic on slave %s", sl.URL)
208+
}
209+
if strings.ToUpper(server.ClusterGroup.Conf.ForceSlaveParallelMode) == "SERIALIZED" && !sl.HaveSlaveSerialized && server.ClusterGroup.GetTopology() != topoMultiMasterWsrep && server.IsMariaDB() {
210+
dbhelper.SetSlaveParallelMode(sl.Conn, "NONE", server.ClusterGroup.Conf.MasterConn, server.DBVersion)
211+
server.ClusterGroup.LogPrintf("INFO", "Enforce replication parallel mode serialized on slave %s", sl.URL)
212+
}
213+
if strings.ToUpper(server.ClusterGroup.Conf.ForceSlaveParallelMode) == "AGGRESSIVE" && !sl.HaveSlaveAggressive && server.ClusterGroup.GetTopology() != topoMultiMasterWsrep && server.IsMariaDB() {
214+
dbhelper.SetSlaveParallelMode(sl.Conn, "AGGRESSIVE", server.ClusterGroup.Conf.MasterConn, server.DBVersion)
215+
server.ClusterGroup.LogPrintf("INFO", "Enforce replication parallel mode aggressive on slave %s", sl.URL)
216+
}
217+
if strings.ToUpper(server.ClusterGroup.Conf.ForceSlaveParallelMode) == "MINIMAL" && !sl.HaveSlaveMinimal && server.ClusterGroup.GetTopology() != topoMultiMasterWsrep && server.IsMariaDB() {
218+
dbhelper.SetSlaveParallelMode(sl.Conn, "MINIMAL", server.ClusterGroup.Conf.MasterConn, server.DBVersion)
219+
server.ClusterGroup.LogPrintf("INFO", "Enforce replication parallel mode minimal on slave %s", sl.URL)
220+
}
221+
if strings.ToUpper(server.ClusterGroup.Conf.ForceSlaveParallelMode) == "CONSERVATIVE" && !sl.HaveSlaveConservative && server.ClusterGroup.GetTopology() != topoMultiMasterWsrep && server.IsMariaDB() {
222+
dbhelper.SetSlaveParallelMode(sl.Conn, "CONSERVATIVE", server.ClusterGroup.Conf.MasterConn, server.DBVersion)
223+
server.ClusterGroup.LogPrintf("INFO", "Enforce replication parallel mode conservative on slave %s", sl.URL)
224+
}
192225
if server.ClusterGroup.Conf.ForceSyncInnoDB && sl.HaveInnodbTrxCommit == false {
193226
dbhelper.SetSyncInnodb(sl.Conn)
194227
server.ClusterGroup.LogPrintf("INFO", "Enforce InnoDB durability on slave %s", sl.URL)

cluster/srv_has.go

+24
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,30 @@ func (server *ServerMonitor) HasBinlogRowAnnotate() bool {
176176
return server.Variables["BINLOG_ANNOTATE_ROW_EVENTS"] == "ON"
177177
}
178178

179+
func (server *ServerMonitor) HasSlaveIndempotent() bool {
180+
return server.Variables["SLAVE_EXEC_MODE"] == "IDEMPOTENT"
181+
}
182+
183+
func (server *ServerMonitor) HasSlaveParallelOptimistic() bool {
184+
return server.Variables["SLAVE_PARALLEL_MODE"] == "OPTIMISTIC"
185+
}
186+
187+
func (server *ServerMonitor) HasSlaveParallelConservative() bool {
188+
return server.Variables["SLAVE_PARALLEL_MODE"] == "CONSERVATIVE"
189+
}
190+
191+
func (server *ServerMonitor) HasSlaveParallelSerialized() bool {
192+
return server.Variables["SLAVE_PARALLEL_MODE"] == "NONE"
193+
}
194+
195+
func (server *ServerMonitor) HasSlaveParallelAggressive() bool {
196+
return server.Variables["SLAVE_PARALLEL_MODE"] == "AGGRESSIVE"
197+
}
198+
199+
func (server *ServerMonitor) HasSlaveParallelMinimal() bool {
200+
return server.Variables["SLAVE_PARALLEL_MODE"] == "MINIMAL"
201+
}
202+
179203
func (server *ServerMonitor) HasBinlogSlowSlaveQueries() bool {
180204
return server.Variables["LOG_SLOW_SLAVE_STATEMENTS"] == "ON"
181205
}

config/config.go

+3
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,9 @@ type Config struct {
202202
ForceSlaveGtid bool `mapstructure:"force-slave-gtid-mode" toml:"force-slave-gtid-mode" json:"forceSlaveGtidMode"`
203203
ForceSlaveGtidStrict bool `mapstructure:"force-slave-gtid-mode-strict" toml:"force-slave-gtid-mode-strict" json:"forceSlaveGtidModeStrict"`
204204
ForceSlaveNoGtid bool `mapstructure:"force-slave-no-gtid-mode" toml:"force-slave-no-gtid-mode" json:"forceSlaveNoGtidMode"`
205+
ForceSlaveIdempotent bool `mapstructure:"force-slave-idempotent" toml:"force-slave-idempotent" json:"forceSlaveIdempotent"`
206+
ForceSlaveStrict bool `mapstructure:"force-slave-strict" toml:"force-slave-strict" json:"forceSlaveStrict"`
207+
ForceSlaveParallelMode string `mapstructure:"force-slave-parallel-mode" toml:"force-slave-parallel-mode" json:"forceSlaveParallelMode"`
205208
ForceSlaveSemisync bool `mapstructure:"force-slave-semisync" toml:"force-slave-semisync" json:"forceSlaveSemisync"`
206209
ForceSlaveReadOnly bool `mapstructure:"force-slave-readonly" toml:"force-slave-readonly" json:"forceSlaveReadonly"`
207210
ForceBinlogRow bool `mapstructure:"force-binlog-row" toml:"force-binlog-row" json:"forceBinlogRow"`

server/api_cluster.go

+14
Original file line numberDiff line numberDiff line change
@@ -1119,6 +1119,20 @@ func (repman *ReplicationManager) switchSettings(mycluster *cluster.Cluster, set
11191119
mycluster.SwitchForceSlaveGtid()
11201120
case "force-slave-gtid-mode-strict":
11211121
mycluster.SwitchForceSlaveGtidStrict()
1122+
case "force-slave-idempotent":
1123+
mycluster.SwitchForceSlaveModeIdempotent()
1124+
case "force-slave-strict":
1125+
mycluster.SwitchForceSlaveModeStrict()
1126+
case "force-slave-serialized":
1127+
mycluster.SwitchForceSlaveParallelModeSerialized()
1128+
case "force-slave-minimal":
1129+
mycluster.SwitchForceSlaveParallelModeMinimal()
1130+
case "force-slave-conservative":
1131+
mycluster.SwitchForceSlaveParallelModeConservative()
1132+
case "force-slave-optimistic":
1133+
mycluster.SwitchForceSlaveParallelModeOptimistic()
1134+
case "force-slave-aggressive":
1135+
mycluster.SwitchForceSlaveParallelModeAggressive()
11221136
case "force-binlog-compress":
11231137
mycluster.SwitchForceBinlogCompress()
11241138
case "force-binlog-annotate":

server/server_monitor.go

+3
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,9 @@ func init() {
249249
monitorCmd.Flags().BoolVar(&conf.ForceDiskRelayLogSizeLimit, "force-disk-relaylog-size-limit", false, "Automatically limit the size of relay log on disk ")
250250
monitorCmd.Flags().Uint64Var(&conf.ForceDiskRelayLogSizeLimitSize, "force-disk-relaylog-size-limit-size", 1000000000, "Automatically limit the size of relay log on disk to 1G")
251251
monitorCmd.Flags().BoolVar(&conf.ForceInmemoryBinlogCacheSize, "force-inmemory-binlog-cache-size", false, "Automatically adapt binlog cache size based on monitoring")
252+
monitorCmd.Flags().BoolVar(&conf.ForceSlaveStrict, "force-slave-strict", false, "Slave mode to error when replica diverge")
253+
monitorCmd.Flags().BoolVar(&conf.ForceSlaveIdempotent, "force-slave-idempotent", false, "Slave mode to repair when replica diverge using full master row event")
254+
monitorCmd.Flags().StringVar(&conf.ForceSlaveParallelMode, "force-slave-parallel-mode", "", "serialized|minimal|conservative|optimistic|aggressive| empty for no enforcement")
252255
monitorCmd.Flags().BoolVar(&conf.ForceSyncBinlog, "force-sync-binlog", false, "Automatically force master crash safe")
253256
monitorCmd.Flags().BoolVar(&conf.ForceSyncInnoDB, "force-sync-innodb", false, "Automatically force master innodb crash safe")
254257
monitorCmd.Flags().BoolVar(&conf.ForceNoslaveBehind, "force-noslave-behind", false, "Automatically force no slave behing")

0 commit comments

Comments
 (0)