Skip to content

Commit 75a37ef

Browse files
authored
Merge pull request #717 from signal18/deadlock
Changing from sync.Mutex to sync.Map to avoid deadlock
2 parents acb8d3b + 9846557 commit 75a37ef

25 files changed

+1189
-471
lines changed

cluster/cluster.go

+4-17
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ type Cluster struct {
110110
UptimeFailable string `json:"uptimeFailable"`
111111
UptimeSemiSync string `json:"uptimeSemisync"`
112112
MonitorSpin string `json:"monitorSpin"`
113-
WorkLoad WorkLoad `json:"workLoad"`
113+
WorkLoad config.WorkLoad `json:"workLoad"`
114114
LogPushover *log.Logger `json:"-"`
115115
Log s18log.HttpLog `json:"log"`
116116
LogTask s18log.HttpLog `json:"logTask"`
@@ -255,16 +255,6 @@ type Agent struct {
255255
Version string `json:"version"`
256256
}
257257

258-
type WorkLoad struct {
259-
DBTableSize int64 `json:"dbTableSize"`
260-
DBIndexSize int64 `json:"dbIndexSize"`
261-
Connections int `json:"connections"`
262-
QPS int64 `json:"qps"`
263-
CpuThreadPool float64 `json:"cpuThreadPool"`
264-
CpuUserStats float64 `json:"cpuUserStats"`
265-
BusyTime string
266-
}
267-
268258
type Alerts struct {
269259
Errors []state.StateHttp `json:"errors"`
270260
Warnings []state.StateHttp `json:"warnings"`
@@ -1385,7 +1375,7 @@ func (cluster *Cluster) MonitorVariablesDiff() {
13851375
if !cluster.Conf.MonitorVariableDiff || cluster.GetMaster() == nil {
13861376
return
13871377
}
1388-
masterVariables := cluster.GetMaster().Variables
1378+
masterVariables := cluster.GetMaster().Variables.ToNewMap()
13891379
exceptVariables := map[string]bool{
13901380
"PORT": true,
13911381
"SERVER_ID": true,
@@ -1426,7 +1416,7 @@ func (cluster *Cluster) MonitorVariablesDiff() {
14261416
mastervalue.VariableValue = v
14271417
myvalues = append(myvalues, mastervalue)
14281418
for _, s := range cluster.slaves {
1429-
slaveVariables := s.Variables
1419+
slaveVariables := s.Variables.ToNewMap()
14301420
if slaveVariables[k] != v && exceptVariables[k] != true {
14311421
var slavevalue Diff
14321422
slavevalue.Server = s.URL
@@ -1470,9 +1460,6 @@ func (cluster *Cluster) MonitorSchema() {
14701460
return
14711461
}
14721462

1473-
cmaster.Lock()
1474-
defer cmaster.Unlock()
1475-
14761463
cluster.StateMachine.SetMonitorSchemaState()
14771464
cmaster.Conn.SetConnMaxLifetime(3595 * time.Second)
14781465

@@ -1541,7 +1528,7 @@ func (cluster *Cluster) MonitorSchema() {
15411528

15421529
cluster.WorkLoad.DBIndexSize = totindexsize
15431530
cluster.WorkLoad.DBTableSize = tottablesize
1544-
cmaster.DictTables = tables
1531+
cmaster.DictTables = config.FromNormalTablesMap(cmaster.DictTables, tables)
15451532
cluster.StateMachine.RemoveMonitorSchemaState()
15461533
}
15471534

cluster/cluster_chk.go

+12-12
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,7 @@ func (cluster *Cluster) IsEqualBinlogFilters(m *ServerMonitor, s *ServerMonitor)
442442

443443
func (cluster *Cluster) IsEqualReplicationFilters(m *ServerMonitor, s *ServerMonitor) bool {
444444

445-
if m.Variables["REPLICATE_DO_TABLE"] == s.Variables["REPLICATE_DO_TABLE"] && m.Variables["REPLICATE_IGNORE_TABLE"] == s.Variables["REPLICATE_IGNORE_TABLE"] && m.Variables["REPLICATE_WILD_DO_TABLE"] == s.Variables["REPLICATE_WILD_DO_TABLE"] && m.Variables["REPLICATE_WILD_IGNORE_TABLE"] == s.Variables["REPLICATE_WILD_IGNORE_TABLE"] && m.Variables["REPLICATE_DO_DB"] == s.Variables["REPLICATE_DO_DB"] && m.Variables["REPLICATE_IGNORE_DB"] == s.Variables["REPLICATE_IGNORE_DB"] {
445+
if m.Variables.Get("REPLICATE_DO_TABLE") == s.Variables.Get("REPLICATE_DO_TABLE") && m.Variables.Get("REPLICATE_IGNORE_TABLE") == s.Variables.Get("REPLICATE_IGNORE_TABLE") && m.Variables.Get("REPLICATE_WILD_DO_TABLE") == s.Variables.Get("REPLICATE_WILD_DO_TABLE") && m.Variables.Get("REPLICATE_WILD_IGNORE_TABLE") == s.Variables.Get("REPLICATE_WILD_IGNORE_TABLE") && m.Variables.Get("REPLICATE_DO_DB") == s.Variables.Get("REPLICATE_DO_DB") && m.Variables.Get("REPLICATE_IGNORE_DB") == s.Variables.Get("REPLICATE_IGNORE_DB") {
446446
return true
447447
} else {
448448
return false
@@ -451,8 +451,8 @@ func (cluster *Cluster) IsEqualReplicationFilters(m *ServerMonitor, s *ServerMon
451451

452452
func (cluster *Cluster) IsCurrentGTIDSync(m *ServerMonitor, s *ServerMonitor) bool {
453453

454-
sGtid := s.Variables["GTID_CURRENT_POS"]
455-
mGtid := m.Variables["GTID_CURRENT_POS"]
454+
sGtid := s.Variables.Get("GTID_CURRENT_POS")
455+
mGtid := m.Variables.Get("GTID_CURRENT_POS")
456456
if sGtid == mGtid {
457457
return true
458458
} else {
@@ -547,9 +547,9 @@ func (cluster *Cluster) CheckTableChecksum(schema string, table string) {
547547
pk, _ := cluster.master.GetTablePK(schema, table)
548548
if pk == "" {
549549
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "Checksum, no primary key for table %s.%s", schema, table)
550-
t := cluster.master.DictTables[schema+"."+table]
550+
t := cluster.master.DictTables.Get(schema + "." + table)
551551
t.TableSync = "NA"
552-
cluster.master.DictTables[schema+"."+table] = t
552+
cluster.master.DictTables.Set(schema+"."+table, t)
553553
return
554554
}
555555
if strings.Contains(pk, ",") {
@@ -655,17 +655,17 @@ func (cluster *Cluster) CheckTableChecksum(schema string, table string) {
655655
if chunk.ChunkCheckSum != slaveChecksums[chunk.ChunkId].ChunkCheckSum {
656656
checkok = false
657657
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Checksum table failed chunk(%s,%s) %s.%s %s", chunk.ChunkMinKey, chunk.ChunkMaxKey, schema, table, s.URL)
658-
t := cluster.master.DictTables[schema+"."+table]
658+
t := cluster.master.DictTables.Get(schema + "." + table)
659659
t.TableSync = "ER"
660-
cluster.master.DictTables[schema+"."+table] = t
660+
cluster.master.DictTables.Set(schema+"."+table, t)
661661
}
662662

663663
}
664664
if checkok {
665665
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Checksum table succeed %s.%s %s", schema, table, s.URL)
666-
t := cluster.master.DictTables[schema+"."+table]
666+
t := cluster.master.DictTables.Get(schema + "." + table)
667667
t.TableSync = "OK"
668-
cluster.master.DictTables[schema+"."+table] = t
668+
cluster.master.DictTables.Set(schema+"."+table, t)
669669
}
670670
}
671671
}
@@ -700,8 +700,8 @@ func (cluster *Cluster) IsSameWsrepUUID() bool {
700700
if sothers.IsFailed() || s.URL == sothers.URL {
701701
continue
702702
}
703-
if s.Status["WSREP_CLUSTER_STATE_UUID"] != sothers.Status["WSREP_CLUSTER_STATE_UUID"] {
704-
cluster.SetState("ERR00083", state.State{ErrType: config.LvlWarn, ErrDesc: fmt.Sprintf(clusterError["ERR00083"], s.URL, s.Status["WSREP_CLUSTER_STATE_UUID"], sothers.URL, sothers.Status["WSREP_CLUSTER_STATE_UUID"]), ErrFrom: "MON", ServerUrl: s.URL})
703+
if s.Status.Get("WSREP_CLUSTER_STATE_UUID") != sothers.Status.Get("WSREP_CLUSTER_STATE_UUID") {
704+
cluster.SetState("ERR00083", state.State{ErrType: config.LvlWarn, ErrDesc: fmt.Sprintf(clusterError["ERR00083"], s.URL, s.Status.Get("WSREP_CLUSTER_STATE_UUID"), sothers.URL, sothers.Status.Get("WSREP_CLUSTER_STATE_UUID")), ErrFrom: "MON", ServerUrl: s.URL})
705705
return false
706706
}
707707
}
@@ -725,7 +725,7 @@ func (cluster *Cluster) IsNotHavingMySQLErrantTransaction() bool {
725725
continue
726726
}
727727

728-
hasErrantTrx, _, _ := dbhelper.HaveErrantTransactions(s.Conn, cluster.master.Variables["GTID_EXECUTED"], s.Variables["GTID_EXECUTED"])
728+
hasErrantTrx, _, _ := dbhelper.HaveErrantTransactions(s.Conn, cluster.master.Variables.Get("GTID_EXECUTED"), s.Variables.Get("GTID_EXECUTED"))
729729
if hasErrantTrx {
730730
cluster.SetState("WARN0091", state.State{ErrType: config.LvlWarn, ErrDesc: fmt.Sprintf(clusterError["WARN0091"], s.URL), ErrFrom: "MON", ServerUrl: s.URL})
731731
return false

cluster/cluster_get.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -268,8 +268,8 @@ func (cluster *Cluster) GetConnections() int {
268268
allconns := 0
269269
for _, server := range cluster.Servers {
270270
if server != nil {
271-
if conns, ok := server.Status["THREADS_RUNNING"]; ok {
272-
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlDbg, "Reading connections on server: %s ,%s", server.URL, server.Status["THREADS_RUNNING"])
271+
if conns, ok := server.Status.CheckAndGet("THREADS_RUNNING"); ok {
272+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlDbg, "Reading connections on server: %s ,%s", server.URL, server.Status.Get("THREADS_RUNNING"))
273273
numconns, _ := strconv.Atoi(conns)
274274
allconns += numconns
275275
}
@@ -283,9 +283,9 @@ func (cluster *Cluster) GetCpuTime() float64 {
283283

284284
for _, s := range cluster.Servers {
285285
if s != nil {
286-
v, ok := s.WorkLoad["current"]
286+
v, ok := s.WorkLoad.CheckAndGet("current")
287287
if ok && v.CpuThreadPool > max_cpu_usage {
288-
max_cpu_usage = s.WorkLoad["current"].CpuThreadPool
288+
max_cpu_usage = s.WorkLoad.Get("current").CpuThreadPool
289289
}
290290
}
291291
}
@@ -296,8 +296,8 @@ func (cluster *Cluster) GetCpuTimeFromStat() float64 {
296296
max_cpu_usage := 0.0
297297
for _, s := range cluster.Servers {
298298
if s != nil {
299-
if s.WorkLoad["current"].CpuUserStats > max_cpu_usage {
300-
max_cpu_usage = s.WorkLoad["current"].CpuUserStats
299+
if s.WorkLoad.Get("current").CpuUserStats > max_cpu_usage {
300+
max_cpu_usage = s.WorkLoad.Get("current").CpuUserStats
301301
}
302302
}
303303
}

cluster/cluster_sec.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func (cluster *Cluster) RotatePasswords() error {
142142
srv.SetCredential(srv.URL, cluster.GetDbUser(), cluster.GetDbPass())
143143
}
144144

145-
for _, u := range cluster.master.Users {
145+
for _, u := range cluster.master.Users.ToNewMap() {
146146
if u.User == cluster.GetDbUser() {
147147
dbhelper.SetUserPassword(cluster.master.Conn, cluster.master.DBVersion, u.Host, u.User, new_password_db)
148148
}
@@ -177,7 +177,7 @@ func (cluster *Cluster) RotatePasswords() error {
177177
prx.RotateProxyPasswords(new_password_shard)
178178
prx.SetCredential(prx.User + ":" + new_password_shard)
179179
prx.ShardProxy.SetCredential(prx.ShardProxy.URL, prx.User, new_password_shard)
180-
for _, u := range prx.ShardProxy.Users {
180+
for _, u := range prx.ShardProxy.Users.ToNewMap() {
181181
if u.User == prx.User {
182182
dbhelper.SetUserPassword(prx.ShardProxy.Conn, prx.ShardProxy.DBVersion, u.Host, u.User, new_password_shard)
183183
}
@@ -255,7 +255,7 @@ func (cluster *Cluster) RotatePasswords() error {
255255
srv.SetCredential(srv.URL, cluster.GetDbUser(), cluster.GetDbPass())
256256
}
257257

258-
for _, u := range cluster.master.Users {
258+
for _, u := range cluster.master.Users.ToNewMap() {
259259
if u.User == cluster.GetDbUser() {
260260
dbhelper.SetUserPassword(cluster.master.Conn, cluster.master.DBVersion, u.Host, u.User, new_password_db)
261261
}
@@ -290,7 +290,7 @@ func (cluster *Cluster) RotatePasswords() error {
290290
prx.RotateProxyPasswords(new_password_shard)
291291
prx.SetCredential(prx.User + ":" + new_password_shard)
292292
prx.ShardProxy.SetCredential(prx.ShardProxy.URL, prx.User, new_password_shard)
293-
for _, u := range prx.ShardProxy.Users {
293+
for _, u := range prx.ShardProxy.Users.ToNewMap() {
294294
if u.User == prx.User {
295295
dbhelper.SetUserPassword(prx.ShardProxy.Conn, prx.ShardProxy.DBVersion, u.Host, u.User, new_password_shard)
296296
}

cluster/cluster_set.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -843,7 +843,7 @@ func (cluster *Cluster) SetDbServersMonitoringCredential(credential string) {
843843
cluster.SetDBRestartCookie()
844844
if cluster.Conf.VaultMode == VaultConfigStoreV2 && !cluster.isMasterFailed() {
845845
found_user := false
846-
for _, u := range cluster.master.Users {
846+
for _, u := range cluster.master.Users.ToNewMap() {
847847
if u.User == cluster.GetDbUser() {
848848
found_user = true
849849
logs, err := dbhelper.SetUserPassword(cluster.master.Conn, cluster.master.DBVersion, u.Host, u.User, cluster.GetDbPass())
@@ -857,7 +857,7 @@ func (cluster *Cluster) SetDbServersMonitoringCredential(credential string) {
857857
oldDbUser, _ := misc.SplitPair(cluster.Conf.Secrets["db-servers-credential"].OldValue)
858858
if oldDbUser != "root" {
859859

860-
for _, u := range cluster.master.Users {
860+
for _, u := range cluster.master.Users.ToNewMap() {
861861
if u.User == oldDbUser {
862862
logs, err := dbhelper.RenameUserPassword(cluster.master.Conn, cluster.master.DBVersion, u.Host, u.User, cluster.GetDbPass(), cluster.GetDbUser())
863863
cluster.LogSQL(logs, err, cluster.master.URL, "Security", config.LvlErr, "Alter user : %s", err)
@@ -902,7 +902,7 @@ func (cluster *Cluster) SetProxyServersCredential(credential string, proxytype s
902902
prx.RotateProxyPasswords(pass)
903903
prx.SetCredential(credential)
904904
prx.ShardProxy.SetCredential(prx.ShardProxy.URL, prx.User, pass)
905-
for _, u := range prx.ShardProxy.Users {
905+
for _, u := range prx.ShardProxy.Users.ToNewMap() {
906906
if u.User == prx.User {
907907
dbhelper.SetUserPassword(prx.ShardProxy.Conn, prx.ShardProxy.DBVersion, u.Host, u.User, pass)
908908
}

cluster/cluster_topo.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ func (cluster *Cluster) TopologyDiscover(wcg *sync.WaitGroup) error {
208208
continue
209209
}
210210
// Check for log slave updates
211-
if lsu, ok := sv.Variables["LOG_SLAVE_UPDATES"]; ok && lsu == "ON" {
211+
if lsu, ok := sv.Variables.CheckAndGet("LOG_SLAVE_UPDATES"); ok && lsu == "ON" {
212212
add := true
213213
for _, lsv := range cluster.LogSlaveServers {
214214
if lsv == sv.URL {

0 commit comments

Comments
 (0)