Skip to content

Commit d83ae97

Browse files
committed
add message object pool
1 parent 2117734 commit d83ae97

File tree

12 files changed

+157
-67
lines changed

12 files changed

+157
-67
lines changed

app/pipeline/input/message.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55

66
"github.com/go-mysql-org/go-mysql/canal"
77
message2 "github.com/jin06/binlogo/app/pipeline/message"
8-
"github.com/jin06/binlogo/pkg/store/model/pipeline"
98
)
109

1110
func emptyMessage() (msgs []*message2.Message) {
@@ -122,14 +121,19 @@ func delete(e *canal.RowsEvent) (msgs []*message2.Message) {
122121
}
123122

124123
func toMessage(e *canal.RowsEvent) (msg *message2.Message) {
125-
msg = message2.New()
126-
msg.Content.Head = &message2.Head{
127-
Type: mapType(e.Action),
128-
Database: e.Table.Schema,
129-
Table: e.Table.Name,
130-
Time: e.Header.Timestamp,
131-
Position: &pipeline.Position{},
132-
}
124+
//msg = message2.New()
125+
msg = message2.Get()
126+
//msg.Content.Head = &message2.Head{
127+
// Type: mapType(e.Action),
128+
// Database: e.Table.Schema,
129+
// Table: e.Table.Name,
130+
// Time: e.Header.Timestamp,
131+
// Position: &pipeline.Position{},
132+
//}
133+
msg.Content.Head.Type = mapType(e.Action)
134+
msg.Content.Head.Database = e.Table.Schema
135+
msg.Content.Head.Table = e.Table.Name
136+
msg.Content.Head.Time = e.Header.Timestamp
133137
return
134138
}
135139

app/pipeline/message/message.go

Lines changed: 61 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package message
33
import (
44
"encoding/json"
55
"fmt"
6+
"sync"
67

78
"github.com/jin06/binlogo/pkg/store/model/pipeline"
89
)
@@ -23,16 +24,16 @@ type STATUS int16
2324
type Message struct {
2425
Status int16
2526
Filter bool
26-
Content *Content
27+
Content Content
2728
}
2829

2930
// New return a new message
3031
func New() *Message {
3132
msg := &Message{
3233
Status: STATUS_NEW,
3334
Filter: false,
34-
Content: &Content{
35-
Head: &Head{},
35+
Content: Content{
36+
Head: Head{},
3637
Data: nil,
3738
},
3839
}
@@ -41,17 +42,30 @@ func New() *Message {
4142

4243
// Content of Message
4344
type Content struct {
44-
Head *Head `json:"head"`
45+
Head Head `json:"head"`
4546
Data interface{} `json:"data"`
4647
}
4748

49+
func (c *Content) reset() {
50+
c.Data = nil
51+
c.Head.reset()
52+
}
53+
4854
// Head of Message
4955
type Head struct {
50-
Type string `json:"type"`
51-
Time uint32 `json:"time"`
52-
Database string `json:"database"`
53-
Table string `json:"table"`
54-
Position *pipeline.Position `json:"position"`
56+
Type string `json:"type"`
57+
Time uint32 `json:"time"`
58+
Database string `json:"database"`
59+
Table string `json:"table"`
60+
Position pipeline.Position `json:"position"`
61+
}
62+
63+
func (h *Head) reset() {
64+
h.Type = ""
65+
h.Time = 0
66+
h.Database = ""
67+
h.Table = ""
68+
h.Position.Reset()
5569
}
5670

5771
// Json marshal message to json data
@@ -65,9 +79,9 @@ func (msg *Message) Json() (string, error) {
6579

6680
// JsonContent only marshal message's content to josn data
6781
func (msg *Message) JsonContent() (string, error) {
68-
if msg.Content == nil {
69-
return "", nil
70-
}
82+
//if msg.Content == nil {
83+
// return "", nil
84+
//}
7185
b, err := json.Marshal(msg.Content)
7286
if err != nil {
7387
return "", err
@@ -79,16 +93,44 @@ func (msg *Message) JsonContent() (string, error) {
7993
func (msg *Message) ToString() string {
8094
res := "Status: " + fmt.Sprintf("%v\n", msg.Status)
8195
res += "Filter: " + fmt.Sprintf("%v\n", msg.Filter)
82-
if msg.Content != nil {
83-
res += "BinlogPosition.File: " + fmt.Sprintf("%v\n", msg.Content.Head.Position.BinlogFile)
84-
res += "BinlogPosition.Pos: " + fmt.Sprintf("%v\n", msg.Content.Head.Position.BinlogPosition)
85-
res += "BinlogPosition.GTID: " + fmt.Sprintf("%v\n", msg.Content.Head.Position.GTIDSet)
86-
res += "Content.Head: " + fmt.Sprintf("%v\n", msg.Content.Head)
87-
res += "Content.Data: " + fmt.Sprintf("%v\n", msg.Content.Data)
88-
}
96+
//if msg.Content != nil {
97+
res += "BinlogPosition.File: " + fmt.Sprintf("%v\n", msg.Content.Head.Position.BinlogFile)
98+
res += "BinlogPosition.Pos: " + fmt.Sprintf("%v\n", msg.Content.Head.Position.BinlogPosition)
99+
res += "BinlogPosition.GTID: " + fmt.Sprintf("%v\n", msg.Content.Head.Position.GTIDSet)
100+
res += "Content.Head: " + fmt.Sprintf("%v\n", msg.Content.Head)
101+
res += "Content.Data: " + fmt.Sprintf("%v\n", msg.Content.Data)
102+
//}
89103
return res
90104
}
91105

106+
// Table returns table with database
92107
func (msg *Message) Table() string {
93108
return fmt.Sprintf("%s.%s", msg.Content.Head.Database, msg.Content.Head.Table)
94109
}
110+
111+
// reset
112+
func (msg *Message) reset() {
113+
msg.Status = STATUS_NEW
114+
msg.Filter = false
115+
msg.Content.reset()
116+
}
117+
118+
// Pool reuse message object
119+
var Pool = sync.Pool{
120+
New: func() interface{} {
121+
return &Message{
122+
}
123+
},
124+
}
125+
126+
// Get get a message object from Pool
127+
func Get() *Message {
128+
msg := Pool.Get().(*Message)
129+
msg.reset()
130+
return msg
131+
}
132+
133+
// Put put a message to Pool
134+
func Put(msg *Message) {
135+
Pool.Put(msg)
136+
}

app/pipeline/output/output.go

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -152,17 +152,21 @@ func (o *Output) Run(ctx context.Context) (err error) {
152152
{
153153
check, errPrepare := o.prepareRecord(msg)
154154
if errPrepare != nil {
155+
message2.Put(msg)
155156
return
156157
}
157158
if !check {
159+
message2.Put(msg)
158160
continue
159161
}
160162
if err1 := o.loopHandle(ctx, msg); err1 != nil {
163+
message2.Put(msg)
161164
return
162165
}
163166
promeths.MessageSendCounter.With(prometheus.Labels{"pipeline": o.Options.PipelineName, "node": configs.NodeName}).Inc()
164167
pass := uint32(time.Now().Unix()) - msg.Content.Head.Time
165168
promeths.MessageSendHistogram.With(prometheus.Labels{"pipeline": o.Options.PipelineName, "node": configs.NodeName}).Observe(float64(pass))
169+
message2.Put(msg)
166170
}
167171
}
168172
}
@@ -183,48 +187,61 @@ func (o *Output) prepareRecord(msg *message2.Message) (pass bool, err error) {
183187
return
184188
}
185189
if o.record == nil {
186-
o.record = &pipeline.RecordPosition{
187-
PipelineName: o.Options.PipelineName,
188-
Pre: msg.Content.Head.Position,
189-
Now: msg.Content.Head.Position,
190-
}
190+
o.record = pipeline.NewRecordPosition(pipeline.WithPipelineName(o.Options.PipelineName))
191+
//o.record = &pipeline.RecordPosition{
192+
// PipelineName: o.Options.PipelineName,
193+
// Pre: &pipeline.Position{},
194+
// Now: &pipeline.Position{},
195+
//}
196+
*o.record.Pre = msg.Content.Head.Position
197+
*o.record.Now = msg.Content.Head.Position
191198
return
192199
}
193200
}
194201
if msg.Content.Head.Position.TotalRows == msg.Content.Head.Position.ConsumeRows {
195-
o.record = &pipeline.RecordPosition{
196-
PipelineName: o.Options.PipelineName,
197-
Pre: msg.Content.Head.Position,
198-
Now: msg.Content.Head.Position,
199-
}
202+
//o.record = &pipeline.RecordPosition{
203+
// PipelineName: o.Options.PipelineName,
204+
//Pre: &msg.Content.Head.Position,
205+
//Now: &msg.Content.Head.Position,
206+
//}
207+
o.record = pipeline.NewRecordPosition(pipeline.WithPipelineName(o.Options.PipelineName))
208+
*o.record.Pre = msg.Content.Head.Position
209+
*o.record.Now = msg.Content.Head.Position
200210
return
201211
}
202212
if o.record.Pre == nil {
203-
o.record.Pre = msg.Content.Head.Position
213+
//o.record.Pre = &msg.Content.Head.Position
214+
o.record.Pre = &pipeline.Position{}
215+
*o.record.Pre = msg.Content.Head.Position
204216
}
205217
if o.record.Now == nil {
206-
o.record.Now = msg.Content.Head.Position
218+
//o.record.Now = &msg.Content.Head.Position
219+
o.record.Now = &pipeline.Position{}
220+
*o.record.Now = msg.Content.Head.Position
207221
return
208222
}
209223
switch o.Options.MysqlMode {
210224
case pipeline.MODE_GTID:
211225
{
212226
if o.record.Now.GTIDSet != msg.Content.Head.Position.GTIDSet {
213-
o.record.Now = msg.Content.Head.Position
227+
*o.record.Now = msg.Content.Head.Position
228+
//o.record.Now = msg.Content.Head.Position
214229
return
215230
}
216231
}
217232
case pipeline.MODE_POSITION:
218233
{
219234
if o.record.Now.BinlogFile != msg.Content.Head.Position.BinlogFile ||
220235
o.record.Now.BinlogPosition != msg.Content.Head.Position.BinlogPosition {
221-
o.record.Now = msg.Content.Head.Position
236+
//o.record.Now = msg.Content.Head.Position
237+
*o.record.Now = msg.Content.Head.Position
222238
return
223239
}
224240
}
225241
}
226242
if o.record.Now.ConsumeRows < msg.Content.Head.Position.ConsumeRows {
227-
o.record.Now = msg.Content.Head.Position
243+
//o.record.Now = msg.Content.Head.Position
244+
*o.record.Now = msg.Content.Head.Position
228245
} else {
229246
pass = false
230247
}

app/server/node/election/manager.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,9 @@ func (m *Manager) Run(ctx context.Context) {
3535
m.ctx = myCtx
3636
go func() {
3737
defer func() {
38-
defer func() {
39-
r := recover()
40-
if r != nil {
41-
logrus.Errorln("election manager panic, ", r)
42-
}
43-
}()
38+
if r := recover(); r != nil {
39+
logrus.Errorln("election manager panic, ", r)
40+
}
4441
cancel()
4542
}()
4643
for {
@@ -50,13 +47,14 @@ func (m *Manager) Run(ctx context.Context) {
5047
)
5148

5249
m.election = en
53-
cCtx, _ := context.WithCancel(myCtx)
50+
cCtx, cCancel := context.WithCancel(myCtx)
5451
en.Run(cCtx)
5552
go func() {
5653
defer func() {
5754
if r := recover(); r != nil {
5855
logrus.Errorln("election manager panic, ", r)
5956
}
57+
cCancel()
6058
}()
6159
for {
6260
select {

app/server/node/manager/manager_status/manager.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,17 @@ func NewManager(n *node.Node) *Manager {
3131
}
3232

3333
// Run start working
34-
func (m *Manager) Run(ctx context.Context) (err error) {
34+
func (m *Manager) Run(ctx context.Context) {
3535
myCtx, cancel := context.WithCancel(ctx)
3636
m.ctx = myCtx
37-
if err = m.syncStatus(); err != nil {
38-
logrus.Error("Sync status failed: ", err)
39-
cancel()
40-
return
41-
}
4237
go func() {
4338
defer func() {
4439
if r := recover(); r != nil {
4540
logrus.Errorln("manager status panic, ", r)
4641
}
4742
cancel()
4843
}()
44+
var err error
4945
for {
5046
select {
5147
case <-ctx.Done():

app/server/node/manager/manager_status/manager_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,12 @@ func TestRun(t *testing.T) {
1818
defer dao_node.DeleteNode(nodeName)
1919
m := NewManager(&node.Node{Name: nodeName})
2020
ctx, cancel := context.WithCancel(context.Background())
21-
err := m.Run(ctx)
22-
if err != nil {
23-
t.Error(err)
24-
}
21+
m.Run(ctx)
2522
ns := NewNodeStatus(m.Node.Name)
2623
if ns == nil {
2724
t.Fail()
2825
}
26+
var err error
2927
err = ns.syncNodeStatus()
3028
if err != nil {
3129
t.Error(err)

app/server/node/manager/manager_status/node_stats.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func NewNodeStatus(nodeName string) *NodeStatus {
2727
}
2828

2929
func (ns *NodeStatus) syncNodeStatus() (err error) {
30-
logrus.Debug("Sync node status ")
30+
logrus.Infoln("Sync node status ")
3131
err = ns.setStatus()
3232
if err != nil {
3333
return

app/server/node/node.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package node
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"sync"
78
"time"
89

@@ -95,7 +96,12 @@ func (n *Node) refreshNode() (err error) {
9596
// Run start working
9697
func (n *Node) Run(ctx context.Context) (err error) {
9798
myCtx, cancel := context.WithCancel(ctx)
98-
defer cancel()
99+
defer func() {
100+
if re := recover(); re != nil {
101+
err = errors.New(fmt.Sprintf("panic %v", re))
102+
}
103+
cancel()
104+
}()
99105
err = n.refreshNode()
100106
if err != nil {
101107
return

configs/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
package configs
22

33
// VERSITON version of binlogo
4-
const VERSITON = "1.0.38"
4+
const VERSITON = "1.0.41"

pkg/register/register.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (r *Register) Run(ctx context.Context) {
5252
r.ctx = myCtx
5353
go func() {
5454
defer func() {
55-
if r := recover(); r != nil {
55+
if re := recover(); re != nil {
5656
logrus.Errorln("register panic, ", r)
5757
}
5858
cancel()

0 commit comments

Comments
 (0)