Skip to content

Commit 2feb388

Browse files
authored
Merge pull request #585 from uber/dev
Release version 1.4.0
2 parents 7938782 + 85cab57 commit 2feb388

25 files changed

+932
-438
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ go:
99
- 1.5
1010
- 1.6
1111
- 1.7
12-
- tip
12+
- 1.8
1313

1414
matrix:
1515
include:

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
Changelog
22
=========
33

4+
# v1.4.0
5+
6+
* Add version information to the channel's LocalPeerInfo.
7+
* Add peers package for peer management utilities such as
8+
consistent peer selection.
9+
* Fix SetScoreStrategy not rescoring existing peers. (#583).
10+
411
# v1.3.0
512

613
* Exposes the channel's RootPeerList with `channel.RootPeers()`.

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ endif
1313
PATH := $(GOPATH)/bin:$(PATH)
1414
EXAMPLES=./examples/bench/server ./examples/bench/client ./examples/ping ./examples/thrift ./examples/hyperbahn/echo-server
1515
ALL_PKGS := $(shell glide nv)
16-
PROD_PKGS := . ./http ./hyperbahn ./json ./pprof ./raw ./relay ./stats ./thrift $(EXAMPLES)
16+
PROD_PKGS := . ./http ./hyperbahn ./json ./peers ./pprof ./raw ./relay ./stats ./thrift $(EXAMPLES)
1717
TEST_ARG ?= -race -v -timeout 5m
1818
BUILD := ./build
1919
THRIFT_GEN_RELEASE := ./thrift-gen-release

channel.go

Lines changed: 78 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
"net"
2727
"os"
2828
"path/filepath"
29+
"runtime"
30+
"strings"
2931
"sync"
3032
"time"
3133

@@ -83,6 +85,10 @@ type ChannelOptions struct {
8385
// Tracer is an OpenTracing Tracer used to manage distributed tracing spans.
8486
// If not set, opentracing.GlobalTracer() is used.
8587
Tracer opentracing.Tracer
88+
89+
// Handler is an alternate handler for all inbound requests, overriding the
90+
// default handler that delegates to a subchannel.
91+
Handler Handler
8692
}
8793

8894
// ChannelState is the state of a channel.
@@ -124,6 +130,7 @@ type Channel struct {
124130
peers *PeerList
125131
relayHost RelayHost
126132
relayMaxTimeout time.Duration
133+
handler Handler
127134

128135
// mutable contains all the members of Channel which are mutable.
129136
mutable struct {
@@ -209,25 +216,40 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
209216
tracer: opts.Tracer,
210217
},
211218
chID: chID,
212-
connectionOptions: opts.DefaultConnectionOptions,
219+
connectionOptions: opts.DefaultConnectionOptions.withDefaults(),
213220
relayHost: opts.RelayHost,
214221
relayMaxTimeout: validateRelayMaxTimeout(opts.RelayMaxTimeout, logger),
215222
}
216223
ch.peers = newRootPeerList(ch).newChild()
217224

225+
if opts.Handler != nil {
226+
ch.handler = opts.Handler
227+
} else {
228+
ch.handler = channelHandler{ch}
229+
}
230+
218231
ch.mutable.peerInfo = LocalPeerInfo{
219232
PeerInfo: PeerInfo{
220233
ProcessName: processName,
221234
HostPort: ephemeralHostPort,
222235
IsEphemeral: true,
236+
Version: PeerVersion{
237+
Language: "go",
238+
LanguageVersion: strings.TrimPrefix(runtime.Version(), "go"),
239+
TChannelVersion: VersionInfo,
240+
},
223241
},
224242
ServiceName: serviceName,
225243
}
226244
ch.mutable.state = ChannelClient
227245
ch.mutable.conns = make(map[uint32]*Connection)
228246
ch.createCommonStats()
229247

230-
ch.registerInternal()
248+
// Register internal unless the root handler has been overridden, since
249+
// Register will panic.
250+
if opts.Handler == nil {
251+
ch.registerInternal()
252+
}
231253

232254
registerNewChannel(ch)
233255

@@ -322,7 +344,13 @@ type Registrar interface {
322344
// under that. You may also use SetHandler on a SubChannel to set up a
323345
// catch-all Handler for that service. See the docs for SetHandler for more
324346
// information.
347+
//
348+
// Register panics if the channel was constructed with an alternate root
349+
// handler.
325350
func (ch *Channel) Register(h Handler, methodName string) {
351+
if _, ok := ch.handler.(channelHandler); !ok {
352+
panic("can't register handler when channel configured with alternate root handler")
353+
}
326354
ch.GetSubChannel(ch.PeerInfo().ServiceName).Register(h, methodName)
327355
}
328356

@@ -417,18 +445,18 @@ func (ch *Channel) serve() {
417445

418446
acceptBackoff = 0
419447

420-
// Register the connection in the peer once the channel is set up.
421-
events := connectionEvents{
422-
OnActive: ch.inboundConnectionActive,
423-
OnCloseStateChange: ch.connectionCloseStateChange,
424-
OnExchangeUpdated: ch.exchangeUpdated,
425-
}
426-
if _, err := ch.newInboundConnection(netConn, events); err != nil {
427-
// Server is getting overloaded - begin rejecting new connections
428-
ch.log.WithFields(ErrField(err)).Error("Couldn't create new TChannelConnection for incoming conn.")
429-
netConn.Close()
430-
continue
431-
}
448+
// Perform the connection handshake in a background goroutine.
449+
go func() {
450+
// Register the connection in the peer once the channel is set up.
451+
events := connectionEvents{
452+
OnActive: ch.inboundConnectionActive,
453+
OnCloseStateChange: ch.connectionCloseStateChange,
454+
OnExchangeUpdated: ch.exchangeUpdated,
455+
}
456+
if _, err := ch.inboundHandshake(context.Background(), netConn, events); err != nil {
457+
netConn.Close()
458+
}
459+
}()
432460
}
433461
}
434462

@@ -496,28 +524,45 @@ func (ch *Channel) Connect(ctx context.Context, hostPort string) (*Connection, e
496524
return nil, GetContextError(err)
497525
}
498526

499-
c, err := ch.newOutboundConnection(ctx, hostPort, events)
527+
timeout := getTimeout(ctx)
528+
tcpConn, err := dialContext(ctx, hostPort)
500529
if err != nil {
530+
if ne, ok := err.(net.Error); ok && ne.Timeout() {
531+
ch.log.WithFields(
532+
LogField{"remoteHostPort", hostPort},
533+
LogField{"timeout", timeout},
534+
).Info("Outbound net.Dial timed out.")
535+
err = ErrTimeout
536+
} else if ctx.Err() == context.Canceled {
537+
ch.log.WithFields(
538+
LogField{"remoteHostPort", hostPort},
539+
).Info("Outbound net.Dial was cancelled.")
540+
err = GetContextError(ErrRequestCancelled)
541+
} else {
542+
ch.log.WithFields(
543+
ErrField(err),
544+
LogField{"remoteHostPort", hostPort},
545+
).Info("Outbound net.Dial failed.")
546+
}
501547
return nil, err
502548
}
503549

504-
if err := c.sendInit(ctx); err != nil {
505-
return nil, err
506-
}
507-
508-
// It's possible that the connection we just created responds with a host:port
509-
// that is not what we tried to connect to. E.g., we may have connected to
510-
// 127.0.0.1:1234, but the returned host:port may be 10.0.0.1:1234.
511-
// In this case, the connection won't be added to 127.0.0.1:1234 peer
512-
// and so future calls to that peer may end up creating new connections. To
513-
// avoid this issue, and to avoid clients being aware of any TCP relays, we
514-
// add the connection to the intended peer.
515-
if hostPort != c.remotePeerInfo.HostPort {
516-
c.log.Debugf("Outbound connection host:port mismatch, adding to peer %v", c.remotePeerInfo.HostPort)
517-
ch.addConnectionToPeer(hostPort, c, outbound)
550+
conn, err := ch.outboundHandshake(ctx, tcpConn, hostPort, events)
551+
if conn != nil {
552+
// It's possible that the connection we just created responds with a host:port
553+
// that is not what we tried to connect to. E.g., we may have connected to
554+
// 127.0.0.1:1234, but the returned host:port may be 10.0.0.1:1234.
555+
// In this case, the connection won't be added to 127.0.0.1:1234 peer
556+
// and so future calls to that peer may end up creating new connections. To
557+
// avoid this issue, and to avoid clients being aware of any TCP relays, we
558+
// add the connection to the intended peer.
559+
if hostPort != conn.remotePeerInfo.HostPort {
560+
conn.log.Debugf("Outbound connection host:port mismatch, adding to peer %v", conn.remotePeerInfo.HostPort)
561+
ch.addConnectionToPeer(hostPort, conn, outbound)
562+
}
518563
}
519564

520-
return c, nil
565+
return conn, err
521566
}
522567

523568
// exchangeUpdated updates the peer heap.
@@ -537,7 +582,7 @@ func (ch *Channel) exchangeUpdated(c *Connection) {
537582

538583
// updatePeer updates the score of the peer and update it's position in heap as well.
539584
func (ch *Channel) updatePeer(p *Peer) {
540-
ch.peers.updatePeer(p)
585+
ch.peers.onPeerChange(p)
541586
ch.subChannels.updatePeer(p)
542587
p.callOnUpdateComplete()
543588
}
@@ -569,8 +614,7 @@ func (ch *Channel) connectionActive(c *Connection, direction connectionDirection
569614

570615
if added := ch.addConnection(c, direction); !added {
571616
// The channel isn't in a valid state to accept this connection, close the connection.
572-
c.log.Debugf("Closing connection due to closing channel state")
573-
c.Close()
617+
c.close(LogField{"reason", "new active connection on closing channel"})
574618
return
575619
}
576620

@@ -716,7 +760,7 @@ func (ch *Channel) Close() {
716760
ch.mutable.Unlock()
717761

718762
for _, c := range connections {
719-
c.Close()
763+
c.close(LogField{"reason", "channel closing"})
720764
}
721765

722766
if channelClosed {

channel_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
"io/ioutil"
2525
"math"
2626
"os"
27+
"runtime"
28+
"strings"
2729
"testing"
2830
"time"
2931

@@ -41,6 +43,27 @@ func toMap(fields LogFields) map[string]interface{} {
4143
return m
4244
}
4345

46+
func TestNewChannel(t *testing.T) {
47+
ch, err := NewChannel("svc", &ChannelOptions{
48+
ProcessName: "pname",
49+
})
50+
require.NoError(t, err, "NewChannel failed")
51+
52+
assert.Equal(t, LocalPeerInfo{
53+
ServiceName: "svc",
54+
PeerInfo: PeerInfo{
55+
ProcessName: "pname",
56+
HostPort: ephemeralHostPort,
57+
IsEphemeral: true,
58+
Version: PeerVersion{
59+
Language: "go",
60+
LanguageVersion: strings.TrimPrefix(runtime.Version(), "go"),
61+
TChannelVersion: VersionInfo,
62+
},
63+
},
64+
}, ch.PeerInfo(), "Wrong local peer info")
65+
}
66+
4467
func TestLoggers(t *testing.T) {
4568
ch, err := NewChannel("svc", &ChannelOptions{
4669
Logger: NewLogger(ioutil.Discard),

0 commit comments

Comments
 (0)