Skip to content

Commit 08c40f9

Browse files
hanshasselbergmkeeler
authored andcommitted
handle TCP transports properly (#201)
* handle TCP transports properly * switch to checking the error * simplify logic * better label name * add tests
1 parent e1138a6 commit 08c40f9

File tree

3 files changed

+67
-8
lines changed

3 files changed

+67
-8
lines changed

net.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,7 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
522522
// Send the ping.
523523
addr := joinHostPort(net.IP(ind.Target).String(), ind.Port)
524524
if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
525-
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s %s", err, LogAddress(from))
525+
m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s %s", err, LogAddress(from))
526526
}
527527

528528
// Setup a timer to fire off a nack if no ack is seen in time.

state.go

+35-7
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"math"
77
"math/rand"
88
"net"
9+
"strings"
910
"sync/atomic"
1011
"time"
1112

@@ -242,6 +243,21 @@ func (m *Memberlist) probeNodeByAddr(addr string) {
242243
m.probeNode(n)
243244
}
244245

246+
// failedRemote checks the error and decides if it indicates a failure on the
247+
// other end.
248+
func failedRemote(err error) bool {
249+
switch t := err.(type) {
250+
case *net.OpError:
251+
if strings.HasPrefix(t.Net, "tcp") {
252+
switch t.Op {
253+
case "dial", "read", "write":
254+
return true
255+
}
256+
}
257+
}
258+
return false
259+
}
260+
245261
// probeNode handles a single round of failure checking on a node.
246262
func (m *Memberlist) probeNode(node *nodeState) {
247263
defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now())
@@ -272,10 +288,20 @@ func (m *Memberlist) probeNode(node *nodeState) {
272288
// soon as possible.
273289
deadline := sent.Add(probeInterval)
274290
addr := node.Address()
291+
292+
// Arrange for our self-awareness to get updated.
293+
var awarenessDelta int
294+
defer func() {
295+
m.awareness.ApplyDelta(awarenessDelta)
296+
}()
275297
if node.State == stateAlive {
276298
if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
277299
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
278-
return
300+
if failedRemote(err) {
301+
goto HANDLE_REMOTE_FAILURE
302+
} else {
303+
return
304+
}
279305
}
280306
} else {
281307
var msgs [][]byte
@@ -296,7 +322,11 @@ func (m *Memberlist) probeNode(node *nodeState) {
296322
compound := makeCompoundMessage(msgs)
297323
if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil {
298324
m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", addr, err)
299-
return
325+
if failedRemote(err) {
326+
goto HANDLE_REMOTE_FAILURE
327+
} else {
328+
return
329+
}
300330
}
301331
}
302332

@@ -305,10 +335,7 @@ func (m *Memberlist) probeNode(node *nodeState) {
305335
// which will improve our health until we get to the failure scenarios
306336
// at the end of this function, which will alter this delta variable
307337
// accordingly.
308-
awarenessDelta := -1
309-
defer func() {
310-
m.awareness.ApplyDelta(awarenessDelta)
311-
}()
338+
awarenessDelta = -1
312339

313340
// Wait for response or round-trip-time.
314341
select {
@@ -333,9 +360,10 @@ func (m *Memberlist) probeNode(node *nodeState) {
333360
// probe interval it will give the TCP fallback more time, which
334361
// is more active in dealing with lost packets, and it gives more
335362
// time to wait for indirect acks/nacks.
336-
m.logger.Printf("[DEBUG] memberlist: Failed ping: %v (timeout reached)", node.Name)
363+
m.logger.Printf("[DEBUG] memberlist: Failed ping: %s (timeout reached)", node.Name)
337364
}
338365

366+
HANDLE_REMOTE_FAILURE:
339367
// Get some random live nodes.
340368
m.nodeLock.RLock()
341369
kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool {

state_test.go

+31
Original file line numberDiff line numberDiff line change
@@ -2118,6 +2118,37 @@ func TestMemberlist_GossipToDead(t *testing.T) {
21182118
})
21192119
}
21202120

2121+
func TestMemberlist_FailedRemote(t *testing.T) {
2122+
type test struct {
2123+
name string
2124+
err error
2125+
expected bool
2126+
}
2127+
tests := []test{
2128+
{"nil error", nil, false},
2129+
{"normal error", fmt.Errorf(""), false},
2130+
{"net.OpError for file", &net.OpError{Net: "file"}, false},
2131+
{"net.OpError for udp", &net.OpError{Net: "udp"}, false},
2132+
{"net.OpError for udp4", &net.OpError{Net: "udp4"}, false},
2133+
{"net.OpError for udp6", &net.OpError{Net: "udp6"}, false},
2134+
{"net.OpError for tcp", &net.OpError{Net: "tcp"}, false},
2135+
{"net.OpError for tcp4", &net.OpError{Net: "tcp4"}, false},
2136+
{"net.OpError for tcp6", &net.OpError{Net: "tcp6"}, false},
2137+
{"net.OpError for tcp with dial", &net.OpError{Net: "tcp", Op: "dial"}, true},
2138+
{"net.OpError for tcp with write", &net.OpError{Net: "tcp", Op: "write"}, true},
2139+
{"net.OpError for tcp with read", &net.OpError{Net: "tcp", Op: "read"}, true},
2140+
}
2141+
2142+
for _, test := range tests {
2143+
t.Run(test.name, func(t *testing.T) {
2144+
actual := failedRemote(test.err)
2145+
if actual != test.expected {
2146+
t.Fatalf("expected %t, got %t", test.expected, actual)
2147+
}
2148+
})
2149+
}
2150+
}
2151+
21212152
func TestMemberlist_PushPull(t *testing.T) {
21222153
addr1 := getBindAddr()
21232154
addr2 := getBindAddr()

0 commit comments

Comments
 (0)