Skip to content

Commit 0b7f160

Browse files
authored
Merge pull request #612 from uber/dev
Release version 1.5.0
2 parents 2feb388 + 2a1bef0 commit 0b7f160

File tree

11 files changed

+111
-32
lines changed

11 files changed

+111
-32
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
Changelog
22
=========
33

4+
# v1.5.0
5+
6+
* Add `PeerList.Len` to expose the number of peers in the peer list.
7+
* Add `PeerList.GetNew` to only return previously unselected peers.
8+
49
# v1.4.0
510

611
* Add version information to the channel's LocalPeerInfo.

benchmark/real_relay.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ type fixedHosts struct {
3636
pickI atomic.Int32
3737
}
3838

39-
func (fh *fixedHosts) Get(cf relay.CallFrame, _ relay.Conn) (string, error) {
39+
func (fh *fixedHosts) Get(cf relay.CallFrame, _ *tchannel.Connection) (string, error) {
4040
peers := fh.hosts[string(cf.Service())]
4141
if len(peers) == 0 {
4242
return "", errors.New("no peers")

peer.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ var (
4545
// ErrPeerNotFound indicates that the specified peer was not found.
4646
ErrPeerNotFound = errors.New("peer not found")
4747

48+
// ErrNoNewPeers indicates that no previously unselected peer is available.
49+
ErrNoNewPeers = errors.New("no new peer available")
50+
4851
peerRng = trand.NewSeeded()
4952
)
5053

@@ -116,11 +119,12 @@ func (l *PeerList) Add(hostPort string) *Peer {
116119
return p
117120
}
118121

119-
// Get returns a peer from the peer list, or nil if none can be found.
120-
func (l *PeerList) Get(prevSelected map[string]struct{}) (*Peer, error) {
122+
// GetNew returns a new, previously unselected peer from the peer list, or nil,
123+
// if no new unselected peer can be found.
124+
func (l *PeerList) GetNew(prevSelected map[string]struct{}) (*Peer, error) {
121125
l.Lock()
126+
defer l.Unlock()
122127
if l.peerHeap.Len() == 0 {
123-
l.Unlock()
124128
return nil, ErrNoPeers
125129
}
126130

@@ -131,9 +135,25 @@ func (l *PeerList) Get(prevSelected map[string]struct{}) (*Peer, error) {
131135
peer = l.choosePeer(prevSelected, false /* avoidHost */)
132136
}
133137
if peer == nil {
138+
return nil, ErrNoNewPeers
139+
}
140+
return peer, nil
141+
}
142+
143+
// Get returns a peer from the peer list, or nil if none can be found,
144+
// will avoid previously selected peers if possible.
145+
func (l *PeerList) Get(prevSelected map[string]struct{}) (*Peer, error) {
146+
peer, err := l.GetNew(prevSelected)
147+
if err == ErrNoNewPeers {
148+
l.Lock()
134149
peer = l.choosePeer(nil, false /* avoidHost */)
150+
l.Unlock()
151+
} else if err != nil {
152+
return nil, err
153+
}
154+
if peer == nil {
155+
return nil, ErrNoPeers
135156
}
136-
l.Unlock()
137157
return peer, nil
138158
}
139159

@@ -214,6 +234,13 @@ func (l *PeerList) Copy() map[string]*Peer {
214234
return listCopy
215235
}
216236

237+
// Len returns the length of the PeerList.
238+
func (l *PeerList) Len() int {
239+
l.RLock()
240+
defer l.RUnlock()
241+
return l.peerHeap.Len()
242+
}
243+
217244
// exists checks if a hostport exists in the peer list.
218245
func (l *PeerList) exists(hostPort string) (*peerScore, bool) {
219246
l.RLock()

peer_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,21 @@ func TestGetPeerSinglePeer(t *testing.T) {
7676
assert.Equal(t, "1.1.1.1:1234", peer.HostPort(), "returned peer mismatch")
7777
}
7878

79+
func TestPeerUpdatesLen(t *testing.T) {
80+
ch := testutils.NewClient(t, nil)
81+
defer ch.Close()
82+
assert.Zero(t, ch.Peers().Len())
83+
for i := 1; i < 5; i++ {
84+
ch.Peers().Add(fmt.Sprintf("1.1.1.1:%d", i))
85+
assert.Equal(t, ch.Peers().Len(), i)
86+
}
87+
for i := 4; i > 0; i-- {
88+
assert.Equal(t, ch.Peers().Len(), i)
89+
ch.Peers().Remove(fmt.Sprintf("1.1.1.1:%d", i))
90+
}
91+
assert.Zero(t, ch.Peers().Len())
92+
}
93+
7994
func TestGetPeerAvoidPrevSelected(t *testing.T) {
8095
const (
8196
peer1 = "1.1.1.1:1"
@@ -153,6 +168,18 @@ func TestGetPeerAvoidPrevSelected(t *testing.T) {
153168
continue
154169
}
155170

171+
newPeer, err := peers.GetNew(rs.PrevSelectedPeers())
172+
if len(tt.peers) == len(tt.prevSelected) {
173+
if newPeer != nil || err != ErrNoNewPeers {
174+
t.Errorf("%s: newPeer should not have been found %v: %v\n", tt.msg, newPeer, err)
175+
}
176+
} else {
177+
if gotPeer != newPeer || err != nil {
178+
t.Errorf("%s: expected equal peers, got %v new %v: %v\n",
179+
tt.msg, gotPeer, newPeer, err)
180+
}
181+
}
182+
156183
got := gotPeer.HostPort()
157184
if _, ok := tt.expected[got]; !ok {
158185
t.Errorf("%s: got unexpected peer, expected one of %v got %v\n Peers = %v PrevSelected = %v",

relay.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,6 @@ var (
4949
errUnknownID = errors.New("non-callReq for inactive ID")
5050
)
5151

52-
// relayConn implements the relay.Connection interface.
53-
type relayConn Connection
54-
5552
type relayItem struct {
5653
*time.Timer
5754

@@ -334,7 +331,7 @@ func (r *Relayer) handleCallReq(f lazyCallReq) error {
334331
return nil
335332
}
336333

337-
call, err := r.relayHost.Start(f, (*relayConn)(r.conn))
334+
call, err := r.relayHost.Start(f, r.conn)
338335
if err != nil {
339336
// If we have a RateLimitDropError we record the statistic, but
340337
// we *don't* send an error frame back to the client.
@@ -353,6 +350,11 @@ func (r *Relayer) handleCallReq(f lazyCallReq) error {
353350
call.End()
354351
}
355352
r.conn.SendSystemError(f.Header.ID, f.Span(), err)
353+
354+
// If the RelayHost returns a protocol error, close the connection.
355+
if GetSystemErrorCode(err) == ErrCodeProtocol {
356+
return r.conn.close(LogField{"reason", "RelayHost returned protocol error"})
357+
}
356358
return nil
357359
}
358360

@@ -552,10 +554,6 @@ func (r *Relayer) handleLocalCallReq(cr lazyCallReq) bool {
552554
return true
553555
}
554556

555-
func (r *relayConn) RemoteHostPort() string {
556-
return (*Connection)(r).RemotePeerInfo().HostPort
557-
}
558-
559557
func frameTypeFor(f *Frame) frameType {
560558
switch t := f.Header.messageType; t {
561559
case messageTypeCallRes, messageTypeCallResContinue, messageTypeError, messageTypePingRes:

relay/relay.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,6 @@
2424
// backwards-compatibility guarantee.
2525
package relay
2626

27-
// Conn is an interface that exposes a bit of information about the underlying
28-
// connection.
29-
type Conn interface {
30-
// RemoteHostPort returns the host:port of the remote peer.
31-
RemoteHostPort() string
32-
}
33-
3427
// CallFrame is an interface that abstracts access to the call req frame.
3528
type CallFrame interface {
3629
// Caller is the name of the originating service.

relay/relaytest/func_host.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
package relaytest
22

33
import (
4-
tchannel "github.com/uber/tchannel-go"
4+
"github.com/uber/tchannel-go"
55
"github.com/uber/tchannel-go/relay"
66
)
77

88
type hostFunc struct {
99
ch *tchannel.Channel
1010
stats *MockStats
11-
fn func(cf relay.CallFrame, conn relay.Conn) (string, error)
11+
fn func(relay.CallFrame, *tchannel.Connection) (string, error)
1212
}
1313

1414
type hostFuncPeer struct {
@@ -18,15 +18,15 @@ type hostFuncPeer struct {
1818
}
1919

2020
// HostFunc wraps a given function to implement tchannel.RelayHost.
21-
func HostFunc(fn func(cf relay.CallFrame, conn relay.Conn) (string, error)) tchannel.RelayHost {
21+
func HostFunc(fn func(relay.CallFrame, *tchannel.Connection) (string, error)) tchannel.RelayHost {
2222
return &hostFunc{nil, NewMockStats(), fn}
2323
}
2424

2525
func (hf *hostFunc) SetChannel(ch *tchannel.Channel) {
2626
hf.ch = ch
2727
}
2828

29-
func (hf *hostFunc) Start(cf relay.CallFrame, conn relay.Conn) (tchannel.RelayCall, error) {
29+
func (hf *hostFunc) Start(cf relay.CallFrame, conn *tchannel.Connection) (tchannel.RelayCall, error) {
3030
var peer *tchannel.Peer
3131

3232
peerHP, err := hf.fn(cf, conn)

relay/relaytest/stub_host.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func (rh *StubRelayHost) SetChannel(ch *tchannel.Channel) {
5757
}
5858

5959
// Start starts a new RelayCall for the given call on a specific connection.
60-
func (rh *StubRelayHost) Start(cf relay.CallFrame, conn relay.Conn) (tchannel.RelayCall, error) {
60+
func (rh *StubRelayHost) Start(cf relay.CallFrame, _ *tchannel.Connection) (tchannel.RelayCall, error) {
6161
// Get a peer from the subchannel.
6262
peer, err := rh.ch.GetSubChannel(string(cf.Service())).Peers().Get(nil)
6363
return &stubCall{rh.stats.Begin(cf), peer}, err

relay_api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type RelayHost interface {
3232
// Start starts a new RelayCall given the call frame and connection.
3333
// It may return a call and an error, in which case the caller will
3434
// call Failed/End on the RelayCall.
35-
Start(relay.CallFrame, relay.Conn) (RelayCall, error)
35+
Start(relay.CallFrame, *Connection) (RelayCall, error)
3636
}
3737

3838
// RelayCall abstracts away peer selection, stats, and any other business

relay_test.go

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func TestRelayErrorsOnGetPeer(t *testing.T) {
199199
}
200200

201201
for _, tt := range tests {
202-
f := func(relay.CallFrame, relay.Conn) (string, error) {
202+
f := func(relay.CallFrame, *Connection) (string, error) {
203203
return tt.returnPeer, tt.returnErr
204204
}
205205

@@ -502,8 +502,8 @@ func TestRelayMakeOutgoingCall(t *testing.T) {
502502
func TestRelayConnection(t *testing.T) {
503503
var errTest = errors.New("test")
504504
var wantHostPort string
505-
getHost := func(call relay.CallFrame, conn relay.Conn) (string, error) {
506-
assert.Equal(t, wantHostPort, conn.RemoteHostPort(), "Unexpected RemoteHostPort")
505+
getHost := func(call relay.CallFrame, conn *Connection) (string, error) {
506+
assert.Equal(t, wantHostPort, conn.RemotePeerInfo().HostPort, "Unexpected RemoteHostPort")
507507
return "", errTest
508508
}
509509

@@ -519,6 +519,35 @@ func TestRelayConnection(t *testing.T) {
519519
err := testutils.CallEcho(client, ts.HostPort(), ts.ServiceName(), nil)
520520
require.Error(t, err, "Expected CallEcho to fail")
521521
assert.Contains(t, err.Error(), errTest.Error(), "Unexpected error")
522+
523+
// Verify that the relay has not closed any connections.
524+
assert.Equal(t, 1, ts.Relay().IntrospectNumConnections(), "Relay should maintain client connection")
525+
})
526+
}
527+
528+
func TestRelayConnectionClosed(t *testing.T) {
529+
protocolErr := NewSystemError(ErrCodeProtocol, "invalid service name")
530+
getHost := func(call relay.CallFrame, conn *Connection) (string, error) {
531+
return "", protocolErr
532+
}
533+
534+
opts := testutils.NewOpts().
535+
SetRelayOnly().
536+
SetRelayHost(relaytest.HostFunc(getHost))
537+
testutils.WithTestServer(t, opts, func(ts *testutils.TestServer) {
538+
// The client receives a protocol error which causes the following logs.
539+
opts := testutils.NewOpts().
540+
AddLogFilter("Peer reported protocol error", 1).
541+
AddLogFilter("Connection error", 1)
542+
client := ts.NewClient(opts)
543+
544+
err := testutils.CallEcho(client, ts.HostPort(), ts.ServiceName(), nil)
545+
assert.Equal(t, protocolErr, err, "Unexpected error on call")
546+
547+
closedAll := testutils.WaitFor(time.Second, func() bool {
548+
return ts.Relay().IntrospectNumConnections() == 0
549+
})
550+
assert.True(t, closedAll, "Relay should close client connection")
522551
})
523552
}
524553

@@ -577,7 +606,7 @@ func TestRelayRejectsDuringClose(t *testing.T) {
577606
}
578607

579608
func TestRelayRateLimitDrop(t *testing.T) {
580-
getHost := func(call relay.CallFrame, conn relay.Conn) (string, error) {
609+
getHost := func(call relay.CallFrame, _ *Connection) (string, error) {
581610
return "", relay.RateLimitDropError{}
582611
}
583612

@@ -691,7 +720,7 @@ func TestRelayThroughSeparateRelay(t *testing.T) {
691720
SetRelayOnly()
692721
testutils.WithTestServer(t, opts, func(ts *testutils.TestServer) {
693722
serverHP := ts.Server().PeerInfo().HostPort
694-
dummyFactory := func(relay.CallFrame, relay.Conn) (string, error) {
723+
dummyFactory := func(relay.CallFrame, *Connection) (string, error) {
695724
panic("should not get invoked")
696725
}
697726
relay2Opts := testutils.NewOpts().SetRelayHost(relaytest.HostFunc(dummyFactory))

0 commit comments

Comments
 (0)