Skip to content

Commit 498de31

Browse files
committed
wrapper for workload
1 parent 1a5b1e8 commit 498de31

File tree

6 files changed

+154
-53
lines changed

6 files changed

+154
-53
lines changed

cluster/cluster.go

+1-11
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ type Cluster struct {
110110
UptimeFailable string `json:"uptimeFailable"`
111111
UptimeSemiSync string `json:"uptimeSemisync"`
112112
MonitorSpin string `json:"monitorSpin"`
113-
WorkLoad WorkLoad `json:"workLoad"`
113+
WorkLoad config.WorkLoad `json:"workLoad"`
114114
LogPushover *log.Logger `json:"-"`
115115
Log s18log.HttpLog `json:"log"`
116116
LogTask s18log.HttpLog `json:"logTask"`
@@ -255,16 +255,6 @@ type Agent struct {
255255
Version string `json:"version"`
256256
}
257257

258-
type WorkLoad struct {
259-
DBTableSize int64 `json:"dbTableSize"`
260-
DBIndexSize int64 `json:"dbIndexSize"`
261-
Connections int `json:"connections"`
262-
QPS int64 `json:"qps"`
263-
CpuThreadPool float64 `json:"cpuThreadPool"`
264-
CpuUserStats float64 `json:"cpuUserStats"`
265-
BusyTime string
266-
}
267-
268258
type Alerts struct {
269259
Errors []state.StateHttp `json:"errors"`
270260
Warnings []state.StateHttp `json:"warnings"`

cluster/cluster_get.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -283,9 +283,9 @@ func (cluster *Cluster) GetCpuTime() float64 {
283283

284284
for _, s := range cluster.Servers {
285285
if s != nil {
286-
v, ok := s.WorkLoad["current"]
286+
v, ok := s.WorkLoad.CheckAndGet("current")
287287
if ok && v.CpuThreadPool > max_cpu_usage {
288-
max_cpu_usage = s.WorkLoad["current"].CpuThreadPool
288+
max_cpu_usage = s.WorkLoad.Get("current").CpuThreadPool
289289
}
290290
}
291291
}
@@ -296,8 +296,8 @@ func (cluster *Cluster) GetCpuTimeFromStat() float64 {
296296
max_cpu_usage := 0.0
297297
for _, s := range cluster.Servers {
298298
if s != nil {
299-
if s.WorkLoad["current"].CpuUserStats > max_cpu_usage {
300-
max_cpu_usage = s.WorkLoad["current"].CpuUserStats
299+
if s.WorkLoad.Get("current").CpuUserStats > max_cpu_usage {
300+
max_cpu_usage = s.WorkLoad.Get("current").CpuUserStats
301301
}
302302
}
303303
}

cluster/srv.go

+31-32
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ type ServerMonitor struct {
183183
BinaryLogOldestTimestamp int64 `json:"binaryLogOldestTimestamp"`
184184
BinaryLogPurgeBefore int64 `json:"binaryLogPurgeBefore"`
185185
MaxSlowQueryTimestamp int64 `json:"maxSlowQueryTimestamp"`
186-
WorkLoad map[string]WorkLoad `json:"workLoad"`
186+
WorkLoad *config.WorkLoadsMap `json:"workLoad"`
187187
DelayStat *ServerDelayStat `json:"delayStat"`
188188
SlaveVariables SlaveVariables `json:"slaveVariables"`
189189
IsInSlowQueryCapture bool
@@ -285,6 +285,7 @@ func (cluster *Cluster) newServerMonitor(url string, user string, pass string, c
285285
server.Plugins = config.NewPluginsMap()
286286
server.Users = config.NewGrantsMap()
287287
server.BinaryLogFiles = config.NewUIntsMap()
288+
server.WorkLoad = config.NewWorkLoadsMap()
288289

289290
server.HaveSemiSync = true
290291
server.HaveInnodbTrxCommit = true
@@ -335,11 +336,9 @@ func (cluster *Cluster) newServerMonitor(url string, user string, pass string, c
335336
server.DelayStat = new(ServerDelayStat)
336337
server.DelayStat.ResetDelayStat()
337338

338-
server.WorkLoad = make(map[string]WorkLoad)
339-
340339
server.CurrentWorkLoad()
341-
server.WorkLoad["max"] = server.WorkLoad["current"]
342-
server.WorkLoad["average"] = server.WorkLoad["current"]
340+
server.WorkLoad.Set("max", server.WorkLoad.Get("current"))
341+
server.WorkLoad.Set("average", server.WorkLoad.Get("current"))
343342

344343
/*if cluster.Conf.MasterSlavePgStream || cluster.Conf.MasterSlavePgLogical {
345344
server.Conn, err = sqlx.Open("postgres", server.DSN)
@@ -1679,81 +1678,81 @@ func (server *ServerMonitor) StartGroupReplication() error {
16791678
}
16801679

16811680
func (server *ServerMonitor) CurrentWorkLoad() {
1682-
new_current_WorkLoad := server.WorkLoad["current"]
1681+
new_current_WorkLoad := server.WorkLoad.GetOrNew("current")
16831682
new_current_WorkLoad.Connections = server.GetServerConnections()
16841683
new_current_WorkLoad.CpuThreadPool = server.GetCPUUsageFromThreadsPool()
16851684
new_current_WorkLoad.QPS = server.QPS
1686-
server.WorkLoad["current"] = new_current_WorkLoad
1685+
server.WorkLoad.Set("current", new_current_WorkLoad)
16871686

16881687
}
16891688

16901689
func (server *ServerMonitor) AvgWorkLoad() {
1691-
new_avg_WorkLoad := server.WorkLoad["average"]
1692-
if server.WorkLoad["average"].Connections > 0 {
1693-
new_avg_WorkLoad.Connections = (server.GetServerConnections() + server.WorkLoad["average"].Connections) / 2
1690+
new_avg_WorkLoad := server.WorkLoad.Get("average")
1691+
if server.WorkLoad.Get("average").Connections > 0 {
1692+
new_avg_WorkLoad.Connections = (server.GetServerConnections() + server.WorkLoad.Get("average").Connections) / 2
16941693
} else {
16951694
new_avg_WorkLoad.Connections = server.GetServerConnections()
16961695
}
16971696

1698-
if server.WorkLoad["average"].CpuThreadPool > 0 {
1699-
new_avg_WorkLoad.CpuThreadPool = (server.GetCPUUsageFromThreadsPool() + server.WorkLoad["average"].CpuThreadPool) / 2
1697+
if server.WorkLoad.Get("average").CpuThreadPool > 0 {
1698+
new_avg_WorkLoad.CpuThreadPool = (server.GetCPUUsageFromThreadsPool() + server.WorkLoad.Get("average").CpuThreadPool) / 2
17001699
} else {
17011700
new_avg_WorkLoad.CpuThreadPool = server.GetCPUUsageFromThreadsPool()
17021701
}
17031702

1704-
if server.WorkLoad["average"].QPS > 0 {
1705-
new_avg_WorkLoad.QPS = (server.QPS + server.WorkLoad["average"].QPS) / 2
1703+
if server.WorkLoad.Get("average").QPS > 0 {
1704+
new_avg_WorkLoad.QPS = (server.QPS + server.WorkLoad.Get("average").QPS) / 2
17061705
} else {
1707-
new_avg_WorkLoad.QPS = server.WorkLoad["average"].QPS
1706+
new_avg_WorkLoad.QPS = server.WorkLoad.Get("average").QPS
17081707
}
17091708

1710-
server.WorkLoad["average"] = new_avg_WorkLoad
1709+
server.WorkLoad.Set("average", new_avg_WorkLoad)
17111710
}
17121711

17131712
func (server *ServerMonitor) MaxWorkLoad() {
1714-
max_workLoad := server.WorkLoad["max"]
1715-
if server.GetServerConnections() > server.WorkLoad["max"].Connections {
1713+
max_workLoad := server.WorkLoad.Get("max")
1714+
if server.GetServerConnections() > server.WorkLoad.Get("max").Connections {
17161715
max_workLoad.Connections = server.GetServerConnections()
17171716
}
17181717

1719-
if server.QPS > server.WorkLoad["max"].QPS {
1718+
if server.QPS > server.WorkLoad.Get("max").QPS {
17201719
max_workLoad.QPS = server.QPS
17211720
}
17221721

1723-
if server.GetCPUUsageFromThreadsPool() > server.WorkLoad["max"].CpuThreadPool {
1722+
if server.GetCPUUsageFromThreadsPool() > server.WorkLoad.Get("max").CpuThreadPool {
17241723
max_workLoad.CpuThreadPool = server.GetCPUUsageFromThreadsPool()
17251724
}
17261725

1727-
server.WorkLoad["max"] = max_workLoad
1726+
server.WorkLoad.Set("max", max_workLoad)
17281727
}
17291728

17301729
func (server *ServerMonitor) CpuFromStatWorkLoad(start_time time.Time) time.Time {
1731-
if server.WorkLoad["current"].BusyTime != "" {
1730+
if server.WorkLoad.Get("current").BusyTime != "" {
17321731

1733-
old_cpu_time := server.WorkLoad["current"].CpuUserStats
1734-
current_workLoad := server.WorkLoad["current"]
1732+
old_cpu_time := server.WorkLoad.Get("current").CpuUserStats
1733+
current_workLoad := server.WorkLoad.Get("current")
17351734
new_cpu_usage, _ := server.GetCPUUsageFromStats(start_time)
17361735
current_workLoad.BusyTime, _ = server.GetBusyTimeFromStats()
17371736
current_workLoad.CpuUserStats = new_cpu_usage
1738-
server.WorkLoad["current"] = current_workLoad
1737+
server.WorkLoad.Set("current", current_workLoad)
17391738

17401739
if old_cpu_time != 0 {
1741-
avg_workLoad := server.WorkLoad["average"]
1740+
avg_workLoad := server.WorkLoad.Get("average")
17421741
avg_workLoad.CpuUserStats = (current_workLoad.CpuUserStats + old_cpu_time) / 2
1743-
server.WorkLoad["average"] = avg_workLoad
1742+
server.WorkLoad.Set("average", avg_workLoad)
17441743
}
1745-
if current_workLoad.CpuUserStats > server.WorkLoad["max"].CpuUserStats {
1746-
max_workLoad := server.WorkLoad["max"]
1744+
if current_workLoad.CpuUserStats > server.WorkLoad.Get("max").CpuUserStats {
1745+
max_workLoad := server.WorkLoad.Get("max")
17471746
max_workLoad.CpuUserStats = current_workLoad.CpuUserStats
1748-
server.WorkLoad["max"] = max_workLoad
1747+
server.WorkLoad.Set("max", max_workLoad)
17491748

17501749
}
17511750
return time.Now()
17521751

17531752
} else {
1754-
current_workLoad := server.WorkLoad["current"]
1753+
current_workLoad := server.WorkLoad.Get("current")
17551754
current_workLoad.BusyTime, _ = server.GetBusyTimeFromStats()
1756-
server.WorkLoad["current"] = current_workLoad
1755+
server.WorkLoad.Set("current", current_workLoad)
17571756
return time.Now()
17581757
}
17591758
}

cluster/srv_get.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,7 @@ func (server *ServerMonitor) GetWsrepNodeAddress() string {
718718
}
719719

720720
func (server *ServerMonitor) GetCPUUsageFromStats(t time.Time) (float64, error) {
721-
last_busy_time, _ := strconv.ParseFloat(server.WorkLoad["current"].BusyTime, 8)
721+
last_busy_time, _ := strconv.ParseFloat(server.WorkLoad.Get("current").BusyTime, 64)
722722
t_now := time.Now()
723723
elapsed := t_now.Sub(t).Seconds()
724724
if server.DBVersion.IsMariaDB() && last_busy_time != 0 {
@@ -727,8 +727,8 @@ func (server *ServerMonitor) GetCPUUsageFromStats(t time.Time) (float64, error)
727727
if server.HasUserStats() {
728728
res, _, err := dbhelper.GetCPUUsageFromUserStats(server.Conn)
729729
if err == nil {
730-
busy_time, _ := strconv.ParseFloat(res, 8)
731-
core, _ := strconv.ParseFloat(server.GetCluster().Conf.ProvCores, 8)
730+
busy_time, _ := strconv.ParseFloat(res, 64)
731+
core, _ := strconv.ParseFloat(server.GetCluster().Conf.ProvCores, 64)
732732
return ((busy_time - last_busy_time) / (core * elapsed)) * 100, nil
733733
}
734734
}
@@ -756,9 +756,9 @@ func (server *ServerMonitor) GetBusyTimeFromStats() (string, error) {
756756
func (server *ServerMonitor) GetCPUUsageFromThreadsPool() float64 {
757757
if server.DBVersion.IsMariaDB() {
758758
//we compute it from status
759-
thread_idle, _ := strconv.ParseFloat(server.Status.Get("THREADPOOL_IDLE_THREADS"), 8)
760-
thread, _ := strconv.ParseFloat(server.Status.Get("THREADPOOL_THREADS"), 8)
761-
core, _ := strconv.ParseFloat(server.GetCluster().Conf.ProvCores, 8)
759+
thread_idle, _ := strconv.ParseFloat(server.Status.Get("THREADPOOL_IDLE_THREADS"), 64)
760+
thread, _ := strconv.ParseFloat(server.Status.Get("THREADPOOL_THREADS"), 64)
761+
core, _ := strconv.ParseFloat(server.GetCluster().Conf.ProvCores, 64)
762762
return ((thread - thread_idle) / core) * 100
763763
}
764764
return -1

config/config.go

+10
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,16 @@ type Config struct {
676676

677677
}
678678

679+
type WorkLoad struct {
680+
DBTableSize int64 `json:"dbTableSize"`
681+
DBIndexSize int64 `json:"dbIndexSize"`
682+
Connections int `json:"connections"`
683+
QPS int64 `json:"qps"`
684+
CpuThreadPool float64 `json:"cpuThreadPool"`
685+
CpuUserStats float64 `json:"cpuUserStats"`
686+
BusyTime string
687+
}
688+
679689
type ConfigVariableType struct {
680690
Id int `json:"id"`
681691
Name string `json:"name"`

config/maps.go

+102
Original file line numberDiff line numberDiff line change
@@ -583,3 +583,105 @@ func NewTablesMap() *TablesMap {
583583
m := &TablesMap{Map: s}
584584
return m
585585
}
586+
587+
type WorkLoadsMap struct {
588+
*sync.Map
589+
}
590+
591+
func NewWorkLoadsMap() *WorkLoadsMap {
592+
s := new(sync.Map)
593+
m := &WorkLoadsMap{Map: s}
594+
return m
595+
}
596+
597+
func (m *WorkLoadsMap) Get(key string) *WorkLoad {
598+
if v, ok := m.Load(key); ok {
599+
return v.(*WorkLoad)
600+
}
601+
return nil
602+
}
603+
604+
func (m *WorkLoadsMap) GetOrNew(key string) *WorkLoad {
605+
if v, ok := m.Load(key); ok {
606+
return v.(*WorkLoad)
607+
}
608+
return new(WorkLoad)
609+
}
610+
611+
func (m *WorkLoadsMap) CheckAndGet(key string) (*WorkLoad, bool) {
612+
v, ok := m.Load(key)
613+
if ok {
614+
return v.(*WorkLoad), true
615+
}
616+
return nil, false
617+
}
618+
619+
func (m *WorkLoadsMap) Set(key string, value *WorkLoad) {
620+
m.Store(key, value)
621+
}
622+
623+
func (m *WorkLoadsMap) ToNormalMap(c map[string]*WorkLoad) {
624+
// Clear the old values in the output map
625+
for k := range c {
626+
delete(c, k)
627+
}
628+
629+
// Insert all values from the WorkLoadsMap to the output map
630+
m.Callback(func(key string, value *WorkLoad) bool {
631+
c[key] = value
632+
return true
633+
})
634+
}
635+
636+
func (m *WorkLoadsMap) ToNewMap() map[string]*WorkLoad {
637+
result := make(map[string]*WorkLoad)
638+
m.Range(func(k, v any) bool {
639+
result[k.(string)] = v.(*WorkLoad)
640+
return true
641+
})
642+
return result
643+
}
644+
645+
func (m *WorkLoadsMap) Callback(f func(key string, value *WorkLoad) bool) {
646+
m.Range(func(k, v any) bool {
647+
return f(k.(string), v.(*WorkLoad))
648+
})
649+
}
650+
651+
func (m *WorkLoadsMap) Clear() {
652+
m.Range(func(key, value any) bool {
653+
m.Delete(key.(string))
654+
return true
655+
})
656+
}
657+
658+
func FromNormalWorkLoadsMap(m *WorkLoadsMap, c map[string]*WorkLoad) *WorkLoadsMap {
659+
if m == nil {
660+
m = NewWorkLoadsMap()
661+
} else {
662+
m.Clear()
663+
}
664+
665+
for k, v := range c {
666+
m.Set(k, v)
667+
}
668+
669+
return m
670+
}
671+
672+
func FromWorkLoadsMap(m *WorkLoadsMap, c *WorkLoadsMap) *WorkLoadsMap {
673+
if m == nil {
674+
m = NewWorkLoadsMap()
675+
} else {
676+
m.Clear()
677+
}
678+
679+
if c != nil {
680+
c.Callback(func(key string, value *WorkLoad) bool {
681+
m.Set(key, value)
682+
return true
683+
})
684+
}
685+
686+
return m
687+
}

0 commit comments

Comments
 (0)