Skip to content

Commit d829751

Browse files
authored
Merge pull request #510 from signal18/gzip
Gzip
2 parents 9dbc0a6 + 6d4bc95 commit d829751

File tree

15 files changed

+234
-66
lines changed

15 files changed

+234
-66
lines changed

cluster/cluster.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1264,8 +1264,11 @@ func (cluster *Cluster) BackupLogs() {
12641264
return
12651265
}
12661266
for _, s := range cluster.Servers {
1267-
s.JobBackupErrorLog()
1268-
s.JobBackupSlowQueryLog()
1267+
if s != nil {
1268+
s.JobBackupErrorLog()
1269+
s.JobBackupSlowQueryLog()
1270+
}
1271+
12691272
}
12701273
}
12711274
func (cluster *Cluster) RotateLogs() {
@@ -1541,8 +1544,8 @@ func (cluster *Cluster) ResetStates() {
15411544
cluster.master = nil
15421545
cluster.oldMaster = nil
15431546
cluster.vmaster = nil
1544-
cluster.Servers = nil
1545-
cluster.Proxies = nil
1547+
//cluster.Servers = nil
1548+
//cluster.Proxies = nil
15461549
//
15471550
cluster.ServerIdList = nil
15481551
//cluster.hostList = nil

cluster/cluster_has.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func (cluster *Cluster) HasAllDbUp() bool {
150150
}
151151
for _, s := range cluster.Servers {
152152
if s != nil {
153-
if s.State == stateFailed || s.State == stateErrorAuth /*&& misc.Contains(cluster.ignoreList, s.URL) == false*/ {
153+
if s.State == stateFailed /*|| s.State == stateErrorAuth /*&& misc.Contains(cluster.ignoreList, s.URL) == false*/ {
154154
return false
155155
}
156156
if s.State == stateSuspect && cluster.GetTopology() != topoUnknown {

cluster/cluster_set.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -250,19 +250,19 @@ func (cluster *Cluster) SetSchedulerDbJobsSsh() {
250250
cluster.LogPrintf(LvlInfo, "Schedule SshDbJob rotate at: %s", cluster.Conf.SchedulerJobsSSHCron)
251251
cluster.idSchedulerDbsjobsSsh, err = cluster.scheduler.AddFunc(cluster.Conf.SchedulerJobsSSHCron, func() {
252252
for _, s := range cluster.Servers {
253-
s.JobRunViaSSH()
253+
if s != nil {
254+
s.JobRunViaSSH()
255+
}
256+
254257
}
255258
})
256259
if err == nil {
257260
cluster.Schedule["dbjobsssh"] = cluster.scheduler.Entry(cluster.idSchedulerDbsjobsSsh)
258261
}
259-
if cluster.Conf.CompressBackups {
260-
cluster.CompressBackups()
261-
}
262-
263262
}
264263
}
265264

265+
266266
func (cluster *Cluster) SetSchedulerAlertDisable() {
267267
if cluster.HasSchedulerEntry("alertdisable") {
268268
cluster.LogPrintf(LvlInfo, "Stopping scheduler to disable alert")

cluster/cluster_sst.go

Lines changed: 157 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,23 @@ import (
1414
"os"
1515
"os/exec"
1616
"strconv"
17+
"strings"
1718
"sync"
1819
"time"
20+
21+
gzip "github.com/klauspost/pgzip"
1922
)
2023

2124
type SST struct {
22-
in io.Reader
23-
file *os.File
24-
listener net.Listener
25-
tcplistener *net.TCPListener
26-
outfilewriter io.Writer
27-
outresticreader io.WriteCloser
28-
cluster *Cluster
29-
port int
25+
in io.Reader
26+
file *os.File
27+
listener net.Listener
28+
tcplistener *net.TCPListener
29+
outfilewriter io.Writer
30+
outresticreader io.WriteCloser
31+
outfilegzipwriter *gzip.Writer
32+
cluster *Cluster
33+
port int
3034
}
3135

3236
type ProtectedSSTconnections struct {
@@ -120,6 +124,7 @@ func (cluster *Cluster) SSTRunReceiverToFile(filename string, openfile string) (
120124
} else {
121125
sst.file, err = os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600)
122126
}
127+
123128
if err != nil {
124129
cluster.LogPrintf(LvlErr, "Open file failed for job %s %s", filename, err)
125130
return "", err
@@ -147,6 +152,84 @@ func (cluster *Cluster) SSTRunReceiverToFile(filename string, openfile string) (
147152
return strconv.Itoa(destinationPort), nil
148153
}
149154

155+
func (cluster *Cluster) SSTRunReceiverToGZip(filename string, openfile string) (string, error) {
156+
sst := new(SST)
157+
sst.cluster = cluster
158+
159+
cluster.LogPrintf(LvlInfo, "Compressing mariadb backup")
160+
161+
var err error
162+
if openfile == ConstJobCreateFile {
163+
sst.file, err = os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
164+
} else {
165+
sst.file, err = os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600)
166+
}
167+
168+
gw := gzip.NewWriter(sst.file)
169+
170+
sst.outfilegzipwriter = gw
171+
172+
if err != nil {
173+
cluster.LogPrintf(LvlErr, "Open file failed for job %s %s", filename, err)
174+
return "", err
175+
}
176+
177+
sst.listener, err = net.Listen("tcp", cluster.Conf.BindAddr+":"+cluster.SSTGetSenderPort())
178+
if err != nil {
179+
cluster.LogPrintf(LvlErr, "Exiting SST on socket listen %s", err)
180+
return "", err
181+
}
182+
sst.tcplistener = sst.listener.(*net.TCPListener)
183+
sst.tcplistener.SetDeadline(time.Now().Add(time.Second * 3600))
184+
destinationPort := sst.listener.Addr().(*net.TCPAddr).Port
185+
if sst.cluster.Conf.LogSST {
186+
cluster.LogPrintf(LvlInfo, "Listening for SST on port to file %d", destinationPort)
187+
}
188+
SSTs.Lock()
189+
SSTs.SSTconnections[destinationPort] = sst
190+
SSTs.Unlock()
191+
go sst.tcp_con_handle_to_gzip()
192+
193+
return strconv.Itoa(destinationPort), nil
194+
}
195+
196+
func (sst *SST) tcp_con_handle_to_gzip() {
197+
198+
var err error
199+
200+
defer func() {
201+
if sst.cluster.Conf.LogSST {
202+
sst.cluster.LogPrintf(LvlInfo, "SST connection end cleanup %d", sst.listener.Addr().(*net.TCPAddr).Port)
203+
}
204+
port := sst.listener.Addr().(*net.TCPAddr).Port
205+
sst.tcplistener.Close()
206+
sst.outfilegzipwriter.Close()
207+
sst.file.Close()
208+
sst.listener.Close()
209+
SSTs.Lock()
210+
delete(SSTs.SSTconnections, port)
211+
sst.cluster.SSTSenderFreePort(strconv.Itoa(port))
212+
SSTs.Unlock()
213+
}()
214+
215+
sst.in, err = sst.listener.Accept()
216+
217+
if err != nil {
218+
219+
return
220+
}
221+
222+
chan_to_stdout := sst.stream_copy_to_gzip()
223+
224+
select {
225+
226+
case <-chan_to_stdout:
227+
if sst.cluster.Conf.LogSST {
228+
sst.cluster.LogPrintf(LvlInfo, "Chan SST out for %d", sst.listener.Addr().(*net.TCPAddr).Port)
229+
}
230+
}
231+
}
232+
150233
func (sst *SST) tcp_con_handle_to_file() {
151234

152235
var err error
@@ -221,7 +304,10 @@ func (sst *SST) tcp_con_handle_to_restic() {
221304

222305
// Performs copy operation between streams: os and tcp streams
223306
func (sst *SST) stream_copy_to_file() <-chan int {
224-
buf := make([]byte, 1024)
307+
//coucou
308+
//buf := make([]byte, 1024)
309+
buf := make([]byte, 8192)
310+
225311
sync_channel := make(chan int)
226312
go func() {
227313
defer func() {
@@ -255,6 +341,48 @@ func (sst *SST) stream_copy_to_file() <-chan int {
255341
return sync_channel
256342
}
257343

344+
func (sst *SST) stream_copy_to_gzip() <-chan int {
345+
//coucou
346+
//buf := make([]byte, 1024)
347+
buf := make([]byte, 8192)
348+
349+
sync_channel := make(chan int)
350+
go func() {
351+
defer func() {
352+
if con, ok := sst.in.(net.Conn); ok {
353+
354+
if sst.cluster.Conf.LogSST {
355+
sst.cluster.LogPrintf(LvlInfo, "SST closing connection from stream_copy %v ", con.RemoteAddr())
356+
}
357+
sst.in.(net.Conn).Close()
358+
}
359+
sync_channel <- 0 // Notify that processing is finished
360+
361+
}()
362+
for {
363+
var nBytes int
364+
var err error
365+
366+
nBytes, err = sst.in.Read(buf)
367+
368+
if err != nil {
369+
if err != io.EOF {
370+
sst.cluster.LogPrintf(LvlErr, "Read error: %s", err)
371+
}
372+
break
373+
}
374+
375+
_, err = sst.outfilegzipwriter.Write(buf[0:nBytes])
376+
if err != nil {
377+
sst.cluster.LogPrintf(LvlErr, "Write error: %s", err)
378+
}
379+
}
380+
381+
}()
382+
383+
return sync_channel
384+
}
385+
258386
func (sst *SST) stream_copy_to_restic() <-chan int {
259387
buf := make([]byte, 1024)
260388
sync_channel := make(chan int)
@@ -306,19 +434,33 @@ func (cluster *Cluster) SSTRunSender(backupfile string, sv *ServerMonitor) {
306434
defer client.Close()
307435
file, err := os.Open(backupfile)
308436
cluster.LogPrintf(LvlInfo, "SST sending file: %s to node: %s port: %s", backupfile, sv.Host, sv.SSTPort)
437+
if os.IsNotExist(err) && cluster.Conf.CompressBackups {
438+
backupfile = strings.Replace(backupfile, "xbtream", "gz", 1)
439+
file, err = os.Open(backupfile)
440+
}
309441
if err != nil {
310442
cluster.LogPrintf(LvlErr, "SST failed to open backup file server %s %s ", sv.URL, err)
311443
return
312444
}
313445

314-
sendBuffer := make([]byte, 16384)
446+
sendBuffer := make([]byte, cluster.Conf.SSTSendBuffer)
315447
//fmt.Println("Start sending file!")
316448
var total uint64
317449
for {
318-
_, err = file.Read(sendBuffer)
319-
if err == io.EOF {
320-
break
450+
if strings.Contains(backupfile, "gz") {
451+
fz, err := gzip.NewReader(file)
452+
if err != nil {
453+
return
454+
}
455+
defer fz.Close()
456+
fz.Read(sendBuffer)
457+
} else {
458+
_, err = file.Read(sendBuffer)
459+
if err == io.EOF {
460+
break
461+
}
321462
}
463+
322464
bts, err := client.Write(sendBuffer)
323465
if err != nil {
324466
cluster.LogPrintf(LvlErr, "SST failed to write chunk %s at position %d", err, total)
@@ -327,6 +469,8 @@ func (cluster *Cluster) SSTRunSender(backupfile string, sv *ServerMonitor) {
327469
}
328470
cluster.LogPrintf(LvlInfo, "Backup has been sent, closing connection!")
329471

472+
defer file.Close()
473+
330474
}
331475

332476
func (cluster *Cluster) SSTRunSenderSSL(backupfile string, sv *ServerMonitor) {

cluster/cluster_topo.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,12 @@ func (cluster *Cluster) TopologyDiscover(wcg *sync.WaitGroup) error {
290290
if s.IsReadWrite() {
291291
srw++
292292
}
293+
294+
_, err := s.GetSlaveStatus(s.ReplicationSourceName)
295+
296+
if err != nil {
297+
cluster.Conf.MultiMaster = false
298+
}
293299
}
294300
if srw > 1 {
295301
cluster.SetState("WARN0003", state.State{ErrType: "WARNING", ErrDesc: "RW server count > 1 in multi-master mode. set read_only=1 in cnf is a must have, choosing prefered master", ErrFrom: "TOPO"})

cluster/srv_job.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ package cluster
1212
import (
1313
"bufio"
1414
"bytes"
15-
"compress/gzip"
15+
1616
"errors"
1717
"fmt"
1818
"io"
@@ -27,6 +27,7 @@ import (
2727
"sync"
2828
"time"
2929

30+
gzip "github.com/klauspost/pgzip"
3031
dumplingext "github.com/pingcap/dumpling/v4/export"
3132
"github.com/signal18/replication-manager/config"
3233
"github.com/signal18/replication-manager/utils/dbhelper"
@@ -108,10 +109,18 @@ func (server *ServerMonitor) JobBackupPhysical() (int64, error) {
108109
return jobid, err
109110
} else {
110111
*/
111-
112-
port, err := server.ClusterGroup.SSTRunReceiverToFile(server.GetMyBackupDirectory()+server.ClusterGroup.Conf.BackupPhysicalType+".xbtream", ConstJobCreateFile)
113-
if err != nil {
114-
return 0, nil
112+
var port string
113+
var err error
114+
if server.ClusterGroup.Conf.CompressBackups {
115+
port, err = server.ClusterGroup.SSTRunReceiverToGZip(server.GetMyBackupDirectory()+server.ClusterGroup.Conf.BackupPhysicalType+".xbtream.gz", ConstJobCreateFile)
116+
if err != nil {
117+
return 0, nil
118+
}
119+
} else {
120+
port, err = server.ClusterGroup.SSTRunReceiverToFile(server.GetMyBackupDirectory()+server.ClusterGroup.Conf.BackupPhysicalType+".xbtream", ConstJobCreateFile)
121+
if err != nil {
122+
return 0, nil
123+
}
115124
}
116125

117126
jobid, err := server.JobInsertTaks(server.ClusterGroup.Conf.BackupPhysicalType, port, server.ClusterGroup.Conf.MonitorAddress)

config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ type Config struct {
9797
LogRotateMaxBackup int `mapstructure:"log-rotate-max-backup" toml:"log-rotate-max-backup" json:"logRotateMaxBackup"`
9898
LogRotateMaxAge int `mapstructure:"log-rotate-max-age" toml:"log-rotate-max-age" json:"logRotateMaxAge"`
9999
LogSST bool `mapstructure:"log-sst" toml:"log-sst" json:"logSst"` // internal replication-manager sst
100+
SSTSendBuffer int `mapstructure:"sst-send-buffer" toml:"sst-send-buffer" json:"sstSendBuffer"`
100101
LogHeartbeat bool `mapstructure:"log-heartbeat" toml:"log-heartbeat" json:"logHeartbeat"`
101102
LogSQLInMonitoring bool `mapstructure:"log-sql-in-monitoring" toml:"log-sql-in-monitoring" json:"logSqlInMonitoring"`
102103
LogFailedElection bool `mapstructure:"log-failed-election" toml:"log-failed-election" json:"logFailedElection"`

etc/config.toml

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
11
[Default]
22

33
include = "./etc/local/masterslave/proxysql"
4-
prov-orchestrator = "onpremise"
4+
prov-orchestrator = "local"
55
onpremise-ssh = true
66
onpremise-ssh-credential = "emma:"
77
scheduler-jobs-ssh = true
88
monitoring-address = "127.0.0.1"
99

10-
#monitoring-scheduler = true
11-
#scheduler-db-servers-logs = true
12-
#scheduler-db-servers-logs-cron = "0 * * * * *"
13-
#scheduler-db-servers-physical-backup = false
14-
#backup-physical-type = "mariabackup"
15-
#scheduler-db-servers-optimize = false
16-
#scheduler-db-servers-logical-backup = false
17-
#scheduler-db-servers-logs-table-rotate = false
18-
#scheduler-db-servers-sender-ports="4445,4446,4447,4448,4449"
10+
monitoring-scheduler = true
11+
scheduler-db-servers-logs = false
12+
scheduler-db-servers-physical-backup = true
13+
scheduler-db-servers-physical-backup-cron = "0 * * * * *"
14+
backup-physical-type = "mariabackup"
15+
scheduler-db-servers-optimize = false
16+
scheduler-db-servers-logical-backup = false
17+
scheduler-db-servers-logs-table-rotate = false
18+
compress-backups = true
19+
1920

2021
#mariadb
2122
prov-db-binary-basedir= "/usr/sbin"
@@ -63,6 +64,7 @@ http-refresh-interval = 4000
6364
#########
6465

6566
api-credentials = "admin:repman"
67+
api-credentials-acl-allow = "admin:cluster db prov"
6668
api-port = "10005"
6769

6870

0 commit comments

Comments
 (0)