@@ -33,28 +33,28 @@ import (
33
33
)
34
34
35
35
// Ensure GossipMemberSet implements interfaces.
36
- var _ memberlist.Delegate = & gossipMemberSet {}
36
+ var _ memberlist.Delegate = & memberSet {}
37
37
38
- // gossipMemberSet represents a gossip implementation of MemberSet using memberlist.
39
- type gossipMemberSet struct {
38
+ // memberSet represents a gossip implementation of MemberSet using memberlist.
39
+ type memberSet struct {
40
40
mu sync.RWMutex
41
41
memberlist * memberlist.Memberlist
42
42
43
43
broadcasts * memberlist.TransmitLimitedQueue
44
44
45
45
papi * pilosa.API
46
- config * gossipConfig
46
+ config * config
47
47
48
48
Logger pilosa.Logger
49
49
50
50
logger * log.Logger
51
51
transport * Transport
52
52
53
- gossipEventReceiver * gossipEventReceiver
53
+ eventReceiver * eventReceiver
54
54
}
55
55
56
56
// Open implements the MemberSet interface to start network activity.
57
- func (g * gossipMemberSet ) Open () (err error ) {
57
+ func (g * memberSet ) Open () (err error ) {
58
58
g .mu .Lock ()
59
59
g .memberlist , err = memberlist .Create (g .config .memberlistConfig )
60
60
g .mu .Unlock ()
@@ -94,7 +94,7 @@ func (g *gossipMemberSet) Open() (err error) {
94
94
}
95
95
96
96
// joinWithRetry wraps the standard memberlist Join function in a retry.
97
- func (g * gossipMemberSet ) joinWithRetry (hosts []string ) error {
97
+ func (g * memberSet ) joinWithRetry (hosts []string ) error {
98
98
err := retry (60 , 2 * time .Second , func () error {
99
99
_ , err := g .memberlist .Join (hosts )
100
100
return err
@@ -120,34 +120,34 @@ func retry(attempts int, sleep time.Duration, fn func() error) (err error) {
120
120
121
121
////////////////////////////////////////////////////////////////
122
122
123
- type gossipConfig struct {
123
+ type config struct {
124
124
gossipSeeds []string
125
125
memberlistConfig * memberlist.Config
126
126
}
127
127
128
- // gossipMemberSetOption describes a functional option for GossipMemberSet.
129
- type gossipMemberSetOption func (* gossipMemberSet ) error
128
+ // memberSetOption describes a functional option for GossipMemberSet.
129
+ type memberSetOption func (* memberSet ) error
130
130
131
- // WithTransport is a functional option for providing a transport to NewGossipMemberSet .
132
- func WithTransport (transport * Transport ) gossipMemberSetOption {
133
- return func (g * gossipMemberSet ) error {
131
+ // WithTransport is a functional option for providing a transport to NewMemberSet .
132
+ func WithTransport (transport * Transport ) memberSetOption {
133
+ return func (g * memberSet ) error {
134
134
g .transport = transport
135
135
return nil
136
136
}
137
137
}
138
138
139
- // WithLogger is a functional option for providing a logger to NewGossipMemberSet .
140
- func WithLogger (logger * log.Logger ) gossipMemberSetOption {
141
- return func (g * gossipMemberSet ) error {
139
+ // WithLogger is a functional option for providing a logger to NewMemberSet .
140
+ func WithLogger (logger * log.Logger ) memberSetOption {
141
+ return func (g * memberSet ) error {
142
142
g .logger = logger
143
143
return nil
144
144
}
145
145
}
146
146
147
- // NewGossipMemberSet returns a new instance of GossipMemberSet based on options.
148
- func NewGossipMemberSet (cfg Config , api * pilosa.API , options ... gossipMemberSetOption ) (* gossipMemberSet , error ) {
147
+ // NewMemberSet returns a new instance of GossipMemberSet based on options.
148
+ func NewMemberSet (cfg Config , api * pilosa.API , options ... memberSetOption ) (* memberSet , error ) {
149
149
host := api .Node ().URI .Host
150
- g := & gossipMemberSet {
150
+ g := & memberSet {
151
151
papi : api ,
152
152
Logger : pilosa .NopLogger ,
153
153
}
@@ -158,8 +158,8 @@ func NewGossipMemberSet(cfg Config, api *pilosa.API, options ...gossipMemberSetO
158
158
return nil , errors .Wrap (err , "executing option" )
159
159
}
160
160
}
161
- ger := newGossipEventReceiver (g .logger , api )
162
- g .gossipEventReceiver = ger
161
+ ger := newEventReceiver (g .logger , api )
162
+ g .eventReceiver = ger
163
163
164
164
if g .transport == nil {
165
165
port , err := strconv .Atoi (cfg .Port )
@@ -210,7 +210,7 @@ func NewGossipMemberSet(cfg Config, api *pilosa.API, options ...gossipMemberSetO
210
210
conf .Events = ger
211
211
conf .Logger = g .logger
212
212
213
- g .config = & gossipConfig {
213
+ g .config = & config {
214
214
memberlistConfig : conf ,
215
215
gossipSeeds : cfg .Seeds ,
216
216
}
@@ -219,7 +219,7 @@ func NewGossipMemberSet(cfg Config, api *pilosa.API, options ...gossipMemberSetO
219
219
}
220
220
221
221
// NodeMeta implementation of the memberlist.Delegate interface.
222
- func (g * gossipMemberSet ) NodeMeta (limit int ) []byte {
222
+ func (g * memberSet ) NodeMeta (limit int ) []byte {
223
223
buf , err := g .papi .Serializer .Marshal (g .papi .Node ())
224
224
if err != nil {
225
225
g .Logger .Printf ("marshal message error: %s" , err )
@@ -230,7 +230,7 @@ func (g *gossipMemberSet) NodeMeta(limit int) []byte {
230
230
231
231
// NotifyMsg implementation of the memberlist.Delegate interface
232
232
// called when a user-data message is received.
233
- func (g * gossipMemberSet ) NotifyMsg (b []byte ) {
233
+ func (g * memberSet ) NotifyMsg (b []byte ) {
234
234
err := g .papi .ClusterMessage (context .Background (), bytes .NewBuffer (b ))
235
235
if err != nil {
236
236
g .Logger .Printf ("cluster message error: %s" , err )
@@ -239,13 +239,13 @@ func (g *gossipMemberSet) NotifyMsg(b []byte) {
239
239
240
240
// GetBroadcasts implementation of the memberlist.Delegate interface
241
241
// called when user data messages can be broadcast.
242
- func (g * gossipMemberSet ) GetBroadcasts (overhead , limit int ) [][]byte {
242
+ func (g * memberSet ) GetBroadcasts (overhead , limit int ) [][]byte {
243
243
return g .broadcasts .GetBroadcasts (overhead , limit )
244
244
}
245
245
246
246
// LocalState implementation of the memberlist.Delegate interface
247
247
// sends this Node's state data.
248
- func (g * gossipMemberSet ) LocalState (join bool ) []byte {
248
+ func (g * memberSet ) LocalState (join bool ) []byte {
249
249
m := & pilosa.NodeStatus {
250
250
Node : g .papi .Node (),
251
251
MaxShards : g .papi .MaxShards (context .Background ()),
@@ -263,28 +263,28 @@ func (g *gossipMemberSet) LocalState(join bool) []byte {
263
263
264
264
// MergeRemoteState implementation of the memberlist.Delegate interface
265
265
// receive and process the remote side's LocalState.
266
- func (g * gossipMemberSet ) MergeRemoteState (buf []byte , join bool ) {
266
+ func (g * memberSet ) MergeRemoteState (buf []byte , join bool ) {
267
267
err := g .papi .ClusterMessage (context .Background (), bytes .NewBuffer (buf ))
268
268
if err != nil {
269
269
g .Logger .Printf ("merge state error: %s" , err )
270
270
}
271
271
}
272
272
273
- // gossipEventReceiver is used to enable an application to receive
273
+ // eventReceiver is used to enable an application to receive
274
274
// events about joins and leaves over a channel.
275
275
//
276
276
// Care must be taken that events are processed in a timely manner from
277
277
// the channel, since this delegate will block until an event can be sent.
278
- type gossipEventReceiver struct {
278
+ type eventReceiver struct {
279
279
ch chan memberlist.NodeEvent
280
280
papi * pilosa.API
281
281
282
282
logger * log.Logger
283
283
}
284
284
285
- // newGossipEventReceiver returns a new instance of GossipEventReceiver.
286
- func newGossipEventReceiver (logger * log.Logger , papi * pilosa.API ) * gossipEventReceiver {
287
- ger := & gossipEventReceiver {
285
+ // newEventReceiver returns a new instance of GossipEventReceiver.
286
+ func newEventReceiver (logger * log.Logger , papi * pilosa.API ) * eventReceiver {
287
+ ger := & eventReceiver {
288
288
ch : make (chan memberlist.NodeEvent , 1 ),
289
289
logger : logger ,
290
290
papi : papi ,
@@ -293,19 +293,19 @@ func newGossipEventReceiver(logger *log.Logger, papi *pilosa.API) *gossipEventRe
293
293
return ger
294
294
}
295
295
296
- func (g * gossipEventReceiver ) NotifyJoin (n * memberlist.Node ) {
296
+ func (g * eventReceiver ) NotifyJoin (n * memberlist.Node ) {
297
297
g .ch <- memberlist.NodeEvent {memberlist .NodeJoin , n }
298
298
}
299
299
300
- func (g * gossipEventReceiver ) NotifyLeave (n * memberlist.Node ) {
300
+ func (g * eventReceiver ) NotifyLeave (n * memberlist.Node ) {
301
301
g .ch <- memberlist.NodeEvent {memberlist .NodeLeave , n }
302
302
}
303
303
304
- func (g * gossipEventReceiver ) NotifyUpdate (n * memberlist.Node ) {
304
+ func (g * eventReceiver ) NotifyUpdate (n * memberlist.Node ) {
305
305
g .ch <- memberlist.NodeEvent {memberlist .NodeUpdate , n }
306
306
}
307
307
308
- func (g * gossipEventReceiver ) listen () {
308
+ func (g * eventReceiver ) listen () {
309
309
var nodeEventType pilosa.NodeEventType
310
310
for {
311
311
e := <- g .ch
0 commit comments