Skip to content

Commit 1a5b1e8

Browse files
committed
wrapper for binlogs
1 parent 59628c2 commit 1a5b1e8

File tree

5 files changed

+115
-18
lines changed

5 files changed

+115
-18
lines changed

cluster/srv.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ type ServerMonitor struct {
178178
TLSConfigUsed string `json:"tlsConfigUsed"` //used to track TLS config during key rotation
179179
SSTPort string `json:"sstPort"` //used to send data to dbjobs
180180
Agent string `json:"agent"` //used to provision service in orchestrator
181-
BinaryLogFiles map[string]uint `json:"binaryLogFiles"`
181+
BinaryLogFiles *config.UIntsMap `json:"binaryLogFiles"`
182182
BinaryLogFileOldest string `json:"binaryLogFileOldest"`
183183
BinaryLogOldestTimestamp int64 `json:"binaryLogOldestTimestamp"`
184184
BinaryLogPurgeBefore int64 `json:"binaryLogPurgeBefore"`
@@ -284,6 +284,7 @@ func (cluster *Cluster) newServerMonitor(url string, user string, pass string, c
284284
server.DictTables = config.NewTablesMap()
285285
server.Plugins = config.NewPluginsMap()
286286
server.Users = config.NewGrantsMap()
287+
server.BinaryLogFiles = config.NewUIntsMap()
287288

288289
server.HaveSemiSync = true
289290
server.HaveInnodbTrxCommit = true
@@ -335,7 +336,7 @@ func (cluster *Cluster) newServerMonitor(url string, user string, pass string, c
335336
server.DelayStat.ResetDelayStat()
336337

337338
server.WorkLoad = make(map[string]WorkLoad)
338-
server.BinaryLogFiles = make(map[string]uint)
339+
339340
server.CurrentWorkLoad()
340341
server.WorkLoad["max"] = server.WorkLoad["current"]
341342
server.WorkLoad["average"] = server.WorkLoad["current"]

cluster/srv_binlog.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (server *ServerMonitor) RefreshBinaryLogs() error {
4545
server.SetBinaryLogFiles(binlogs)
4646
}
4747

48-
if len(server.BinaryLogFiles) > 0 {
48+
if len(server.BinaryLogFiles.ToNewMap()) > 0 {
4949
server.SetBinaryLogFileOldest()
5050
}
5151

@@ -62,7 +62,7 @@ func (server *ServerMonitor) CheckBinaryLogs() error {
6262
return err
6363
}
6464

65-
if len(server.BinaryLogFiles) == 0 {
65+
if len(server.BinaryLogFiles.ToNewMap()) == 0 {
6666
server.RefreshBinaryLogs()
6767
}
6868

@@ -115,7 +115,7 @@ func (server *ServerMonitor) ForcePurgeBinlogs() {
115115
if err != nil {
116116
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModPurge, config.LvlWarn, err.Error())
117117
}
118-
} else if len(server.BinaryLogFiles) > 2 {
118+
} else if len(server.BinaryLogFiles.ToNewMap()) > 2 {
119119
if isMaster {
120120
go server.JobBinlogPurgeMaster()
121121
}
@@ -295,7 +295,7 @@ func (server *ServerMonitor) SetMaxBinlogTotalSize() error {
295295

296296
func (server *ServerMonitor) SetBinaryLogFileOldest() {
297297
cluster := server.ClusterGroup
298-
files := len(server.BinaryLogFiles)
298+
files := len(server.BinaryLogFiles.ToNewMap())
299299

300300
if server.IsRefreshingBinlog {
301301
return
@@ -324,10 +324,10 @@ func (server *ServerMonitor) SetBinaryLogFileOldest() {
324324
last := len(parts) - 1
325325
prefix := strings.Join(parts[:last], ".")
326326
latestbinlog, _ := strconv.Atoi(parts[last])
327-
oldestbinlog := latestbinlog - len(server.BinaryLogFiles) + 1
327+
oldestbinlog := latestbinlog - files + 1
328328
oldest := prefix + "." + fmt.Sprintf("%06d", oldestbinlog)
329329

330-
if _, ok := server.BinaryLogFiles[oldest]; ok && server.BinaryLogFileOldest != server.BinaryLogFile {
330+
if _, ok := server.BinaryLogFiles.CheckAndGet(oldest); ok && server.BinaryLogFileOldest != server.BinaryLogFile {
331331
if server.BinaryLogFileOldest != oldest {
332332
server.BinaryLogFileOldest = oldest
333333
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModPurge, config.LvlDbg, "Refreshed binary logs on %s. oldest: %s", server.Host+":"+server.Port, server.BinaryLogFileOldest)
@@ -356,7 +356,7 @@ func (server *ServerMonitor) JobBinlogPurgeMaster() {
356356
return
357357
}
358358

359-
if len(server.BinaryLogFiles) == 0 {
359+
if len(server.BinaryLogFiles.ToNewMap()) == 0 {
360360
server.RefreshBinaryLogs()
361361
}
362362

@@ -388,7 +388,7 @@ func (server *ServerMonitor) JobBinlogPurgeMaster() {
388388
prefix := strings.Join(parts[:last], ".")
389389

390390
suffix, _ := strconv.Atoi(parts[last])
391-
oldestbinlog := suffix + 1 - len(server.BinaryLogFiles)
391+
oldestbinlog := suffix + 1 - len(server.BinaryLogFiles.ToNewMap())
392392

393393
if cluster.SlavesOldestMasterFile.Prefix != prefix {
394394
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModPurge, config.LvlDbg, "Purge cancelled, master binlog file has different prefix")
@@ -429,7 +429,7 @@ func (server *ServerMonitor) JobBinlogPurgeMaster() {
429429
//Accumulating newest binlog size and shifting to oldest
430430
for suffix > 0 && totalSize < uint(cluster.Conf.ForceBinlogPurgeTotalSize*(1024*1024*1024)) {
431431
filename := prefix + "." + fmt.Sprintf("%06d", suffix)
432-
if size, ok := server.BinaryLogFiles[filename]; ok {
432+
if size, ok := server.BinaryLogFiles.CheckAndGet(filename); ok {
433433
//accumulating size
434434
totalSize += size
435435
lastfile = suffix //last file based on total size
@@ -454,7 +454,7 @@ func (server *ServerMonitor) JobBinlogPurgeMaster() {
454454

455455
if oldestbinlog > 0 && oldestbinlog < cluster.SlavesOldestMasterFile.Suffix-1 {
456456
filename := prefix + "." + fmt.Sprintf("%06d", oldestbinlog)
457-
if _, ok := server.BinaryLogFiles[filename]; ok {
457+
if _, ok := server.BinaryLogFiles.CheckAndGet(filename); ok {
458458
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModPurge, config.LvlInfo, "Purging binlog of %s: %s. ", server.URL, filename)
459459
_, err := dbhelper.PurgeBinlogTo(server.Conn, filename)
460460
if err != nil {
@@ -476,7 +476,7 @@ func (server *ServerMonitor) JobBinlogPurgeMaster() {
476476
func (server *ServerMonitor) PurgeBinlogTo(filename string) {
477477
cluster := server.ClusterGroup
478478
//Check if file exists
479-
if _, ok := server.BinaryLogFiles[filename]; ok {
479+
if _, ok := server.BinaryLogFiles.CheckAndGet(filename); ok {
480480
_, err := dbhelper.PurgeBinlogTo(server.Conn, filename)
481481
if err != nil {
482482
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModPurge, config.LvlWarn, "Error purging binlog of %s,%s : %s", server.URL, filename, err.Error())
@@ -509,7 +509,7 @@ func (server *ServerMonitor) JobBinlogPurgeSlave() {
509509
}
510510

511511
//Purge slaves to oldest master binlog timestamp and skip if slave only has 2 binary logs file left (Current Binlog and Prev Binlog)
512-
if server.BinaryLogOldestTimestamp > 0 && master.BinaryLogOldestTimestamp > server.BinaryLogPurgeBefore && len(server.BinaryLogFiles) > 2 {
512+
if server.BinaryLogOldestTimestamp > 0 && master.BinaryLogOldestTimestamp > server.BinaryLogPurgeBefore && len(server.BinaryLogFiles.ToNewMap()) > 2 {
513513
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModPurge, config.LvlInfo, "Purging slave binlog of %s from %s until oldest timestamp on master: %s", server.URL, time.Unix(server.BinaryLogOldestTimestamp, 0).String(), time.Unix(master.BinaryLogOldestTimestamp, 0).String())
514514
q, err := dbhelper.PurgeBinlogBefore(server.Conn, master.BinaryLogOldestTimestamp)
515515
if err != nil {

cluster/srv_job.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1321,7 +1321,7 @@ func (server *ServerMonitor) JobBackupBinlogPurge(binlogfile string) error {
13211321
if binlogfilestop > 0 {
13221322
filename := prefix + "." + fmt.Sprintf("%06d", binlogfilestop)
13231323
if _, err := os.Stat(server.GetMyBackupDirectory() + "/" + filename); os.IsNotExist(err) {
1324-
if _, ok := server.BinaryLogFiles[filename]; ok {
1324+
if _, ok := server.BinaryLogFiles.CheckAndGet(filename); ok {
13251325
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlInfo, "Backup master missing binlog of %s,%s", server.URL, filename)
13261326
//Set true to skip sending to resting multiple times
13271327
server.InitiateJobBackupBinlog(filename, true)

cluster/srv_set.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,7 @@ func (server *ServerMonitor) SetMaintenance() {
191191
}
192192

193193
func (server *ServerMonitor) SetBinaryLogFiles(files map[string]uint) {
194-
server.Lock()
195-
server.BinaryLogFiles = files
196-
server.Unlock()
194+
server.BinaryLogFiles = config.FromNormalUIntsMap(server.BinaryLogFiles, files)
197195
}
198196

199197
func (server *ServerMonitor) SetDSN() {

config/maps.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,104 @@ func NewStringsMap() *StringsMap {
105105
return m
106106
}
107107

108+
type UIntsMap struct {
109+
*sync.Map
110+
}
111+
112+
func (m *UIntsMap) Get(key string) uint {
113+
if v, ok := m.Load(key); ok {
114+
return v.(uint)
115+
}
116+
return 0
117+
}
118+
119+
func (m *UIntsMap) CheckAndGet(key string) (uint, bool) {
120+
v, ok := m.Load(key)
121+
if ok {
122+
return v.(uint), true
123+
}
124+
return 0, false
125+
}
126+
127+
func (m *UIntsMap) ToNormalMap(c map[string]uint) {
128+
// clear old value
129+
c = make(map[string]uint)
130+
131+
//Insert all values to new map
132+
m.Range(func(k any, v any) bool {
133+
c[k.(string)] = v.(uint)
134+
return true
135+
})
136+
}
137+
138+
func (m *UIntsMap) ToNewMap() map[string]uint {
139+
// clear old value
140+
c := make(map[string]uint)
141+
142+
//Insert all values to new map
143+
m.Range(func(k any, v any) bool {
144+
c[k.(string)] = v.(uint)
145+
return true
146+
})
147+
148+
return c
149+
}
150+
151+
func (m *UIntsMap) Set(k string, v uint) {
152+
m.Store(k, v)
153+
}
154+
155+
func FromNormalUIntsMap(m *UIntsMap, c map[string]uint) *UIntsMap {
156+
if m == nil {
157+
m = NewUIntsMap()
158+
} else {
159+
m.Clear()
160+
}
161+
162+
for k, v := range c {
163+
m.Store(k, v)
164+
}
165+
166+
return m
167+
}
168+
169+
func FromUIntSyncMap(m *UIntsMap, c *UIntsMap) *UIntsMap {
170+
if m == nil {
171+
m = NewUIntsMap()
172+
} else {
173+
m.Clear()
174+
}
175+
176+
if c != nil {
177+
c.Range(func(k any, v any) bool {
178+
m.Store(k.(string), v.(uint))
179+
return true
180+
})
181+
}
182+
183+
return m
184+
}
185+
186+
func (m *UIntsMap) Callback(f func(key, value any) bool) {
187+
//Insert all values to new map
188+
m.Range(f)
189+
}
190+
191+
func (m *UIntsMap) Clear() {
192+
//Insert all values to new map
193+
m.Range(func(key any, value any) bool {
194+
k := key.(string)
195+
m.Delete(k)
196+
return true
197+
})
198+
}
199+
200+
func NewUIntsMap() *UIntsMap {
201+
s := new(sync.Map)
202+
m := &UIntsMap{Map: s}
203+
return m
204+
}
205+
108206
type PFSQueriesMap struct {
109207
*sync.Map
110208
}

0 commit comments

Comments
 (0)