Skip to content

Commit 2aa9a3e

Browse files
committed
Add more mutex code
1 parent 3e8fe83 commit 2aa9a3e

File tree

13 files changed

+44
-11
lines changed

13 files changed

+44
-11
lines changed

cluster/cluster.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,8 @@ func (cluster *Cluster) Run() {
546546
go cluster.createKeys()
547547
}
548548

549+
cluster.Topology = cluster.GetTopologyFromConf()
550+
549551
for cluster.exit == false {
550552
if !cluster.Conf.MonitorPause {
551553
cluster.ServerIdList = cluster.GetDBServerIdList()
@@ -653,7 +655,6 @@ func (cluster *Cluster) Run() {
653655
// CheckFailed trigger failover code if passing all false positiv and constraints
654656
cluster.CheckFailed()
655657

656-
cluster.Topology = cluster.GetTopology()
657658
cluster.SetStatus()
658659
cluster.StateProcessing()
659660
}
@@ -813,9 +814,8 @@ func (cluster *Cluster) Save() error {
813814
if err != nil {
814815
return err
815816
}
816-
cluster.Lock()
817+
817818
saveAgents, _ := json.MarshalIndent(cluster.Agents, "", "\t")
818-
cluster.Unlock()
819819

820820
err = ioutil.WriteFile(cluster.Conf.WorkingDir+"/"+cluster.Name+"/agents.json", saveAgents, 0644)
821821
if err != nil {
@@ -1428,6 +1428,7 @@ func (cluster *Cluster) MonitorSchema() {
14281428
t.TableSync = oldtable.TableSync
14291429
}
14301430
// lookup other clusters
1431+
14311432
for _, cl := range cluster.clusterList {
14321433
if cl.GetName() != cluster.GetName() {
14331434

cluster/cluster_get.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,8 @@ func (cluster *Cluster) GetProxyServerIdList() []string {
593593
return ret
594594
}
595595

596-
func (cluster *Cluster) GetTopology() string {
596+
func (cluster *Cluster) GetTopologyFromConf() string {
597+
597598
cluster.Conf.Topology = topoUnknown
598599
if cluster.Conf.MultiMaster {
599600
cluster.Conf.Topology = topoMultiMaster
@@ -626,6 +627,10 @@ func (cluster *Cluster) GetTopology() string {
626627
return cluster.Conf.Topology
627628
}
628629

630+
func (cluster *Cluster) GetTopology() string {
631+
return cluster.Topology
632+
}
633+
629634
/*
630635
func (cluster *Cluster) GetDatabaseTags() []string {
631636
return strings.Split(cluster.Conf.ProvTags, ",")

cluster/cluster_topo.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ func (cluster *Cluster) newServerList() error {
7575
func (cluster *Cluster) AddChildServers() error {
7676

7777
mychilds := cluster.GetChildClusters()
78-
78+
cluster.Lock()
79+
defer cluster.Unlock()
7980
for _, c := range mychilds {
8081
for _, sv := range c.Servers {
8182
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTopology, LvlDbg, "AddChildServers checking %s of %s ", sv.URL, c.Name)
@@ -126,6 +127,9 @@ func (cluster *Cluster) AddChildServers() error {
126127
// Start of topology detection
127128
// Create a connection to each host and build list of slaves.
128129
func (cluster *Cluster) TopologyDiscover(wcg *sync.WaitGroup) error {
130+
cluster.Lock()
131+
defer cluster.Unlock()
132+
129133
defer wcg.Done()
130134
//monitor ignored server fist so that their replication position get oldest
131135
wg := new(sync.WaitGroup)

cluster/prx.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ type Proxy struct {
6565
ServiceName string `json:"serviceName"`
6666
Agent string `json:"agent"`
6767
Weight string `json:"weight"`
68+
Lock sync.Mutex
6869
}
6970

7071
type DatabaseProxy interface {
@@ -83,7 +84,8 @@ type DatabaseProxy interface {
8384

8485
GetFailCount() int
8586
SetFailCount(c int)
86-
87+
DelLock()
88+
SetLock()
8789
GetAgent() string
8890
GetName() string
8991
GetHost() string
@@ -319,7 +321,9 @@ func (cluster *Cluster) SetProxyServerMaintenance(serverid uint64) {
319321
// called by server monitor if state change
320322
func (cluster *Cluster) backendStateChangeProxies() {
321323
for _, pr := range cluster.Proxies {
324+
pr.SetLock()
322325
pr.BackendsStateChange()
326+
pr.DelLock()
323327
}
324328
}
325329

@@ -332,6 +336,8 @@ func (cluster *Cluster) refreshProxies(wcg *sync.WaitGroup) {
332336
for _, pr := range cluster.Proxies {
333337
if pr != nil {
334338
var err error
339+
pr.SetLock()
340+
335341
err = pr.Refresh()
336342
if err == nil {
337343
pr.SetFailCount(0)
@@ -363,6 +369,7 @@ func (cluster *Cluster) refreshProxies(wcg *sync.WaitGroup) {
363369
if cluster.Conf.GraphiteMetrics {
364370
pr.SendStats()
365371
}
372+
pr.DelLock()
366373
}
367374
}
368375
// if cluster.Conf.LogLevel > 2 {

cluster/prx_del.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ import (
1616
"github.com/signal18/replication-manager/config"
1717
)
1818

19+
func (proxy *Proxy) DelLock() {
20+
proxy.Lock.Unlock()
21+
}
22+
1923
func (proxy *Proxy) delCookie(key string) error {
2024
err := os.Remove(proxy.Datadir + "/@" + key)
2125
cluster := proxy.ClusterGroup

cluster/prx_proxysql.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,6 @@ func (proxy *ProxySQLProxy) Failover() {
253253
}
254254

255255
func (proxy *ProxySQLProxy) Refresh() error {
256-
257256
cluster := proxy.ClusterGroup
258257
// if cluster.Conf.LogLevel > 9 {
259258
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModProxySQL, LvlDbg, "ProxySQL port : %s, user %s, pass %s\n", proxy.Port, proxy.User, proxy.Pass)
@@ -298,6 +297,7 @@ func (proxy *ProxySQLProxy) Refresh() error {
298297
if err != nil {
299298
isFoundBackendWrite = false
300299
} else {
300+
301301
proxy.BackendsWrite = append(proxy.BackendsWrite, bke)
302302
}
303303
isFoundBackendRead := true

cluster/prx_set.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ func (p *Proxy) SetID() {
2727
10)
2828
}
2929

30+
func (p *Proxy) SetLock() {
31+
p.Lock.Lock()
32+
}
33+
3034
// TODO: clarify where this is used, can maybe be replaced with a Getter
3135
func (proxy *Proxy) SetServiceName(namespace string) {
3236
proxy.ServiceName = namespace + "/svc/" + proxy.Name

cluster/srv.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ type ServerMonitor struct {
184184
DelayStat *ServerDelayStat `json:"delayStat"`
185185
IsInSlowQueryCapture bool
186186
IsInPFSQueryCapture bool
187+
sync.Mutex
187188
}
188189

189190
type serverList []*ServerMonitor
@@ -595,6 +596,8 @@ var start_time time.Time
595596

596597
// Refresh a server object
597598
func (server *ServerMonitor) Refresh() error {
599+
server.Lock()
600+
defer server.Unlock()
598601
cluster := server.ClusterGroup
599602
var err error
600603

etc/opensvc/cluster-api/cluster-demo/stephane.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,5 +67,5 @@ backup-streaming-endpoint= "https://s3.signal18.io/"
6767
backup-streaming-region= "fr-1"
6868
backup-streaming-bucket= "repman"
6969
backup-mysqldump-path = "/usr/local/bin/mysqldump"
70-
graphite-embedded = true
70+
graphite-embedded = false
7171
graphite-metrics = true

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ require (
119119
github.com/go-git/go-git v4.7.0+incompatible // indirect
120120
github.com/go-git/go-git/v5 v5.6.1 // indirect
121121
github.com/go-ole/go-ole v1.2.5 // indirect
122+
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
122123
github.com/gonum/blas v0.0.0-20180125090452-e7c5890b24cf // indirect
123124
github.com/gonum/floats v0.0.0-20180125090339-7de1f4ea7ab5 // indirect
124125
github.com/gonum/internal v0.0.0-20180125090855-fda53f8d2571 // indirect

0 commit comments

Comments
 (0)