Skip to content

Commit aeda483

Browse files
authored
Merge pull request #570 from uber/dev
Release version 1.2.3
2 parents 2caa315 + dd39c35 commit aeda483

39 files changed

+1279
-175
lines changed

CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,18 @@
11
Changelog
22
=========
33

4+
# v1.2.3
5+
6+
* Improve error messages when an argument reader is closed without
7+
reading the EOF. (#567)
8+
* thrift: Fix an issue where we return `nil` if we expected a Thrift exception
9+
but none was found (e.g., exception is from the future). (#566)
10+
* Fix ListenIP selecting docker interfaces over physical networks. (#565)
11+
* Fix for error when a Thrift payload has completed decoding and attempts
12+
to close the argument reader without waiting till EOF. (#564)
13+
* thrift-gen: Fix "namespace go" being ignored even though the Apache thrift
14+
generated code was respecting it. (#559)
15+
416
# v1.2.2
517

618
* Add a unique channel ID for introspection (#548)

Makefile

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,8 @@ ARCH := $(shell uname -m)
2626
THRIFT_REL := ./scripts/travis/thrift-release/$(PLATFORM)-$(ARCH)
2727

2828
OLD_GOPATH := $(GOPATH)
29-
VENDOR_PATH := $(PWD)/.tmp/vendor
3029

3130
export PATH := $(realpath $(THRIFT_REL)):$(PATH)
32-
export GOPATH := $(VENDOR_PATH):$(GOPATH)
3331

3432
# Cross language test args
3533
TEST_HOST=127.0.0.1
@@ -58,9 +56,6 @@ get_thrift:
5856
# Note that glide itself is still executed against the original GOPATH.
5957
install:
6058
GOPATH=$(OLD_GOPATH) glide --debug install --cache --cache-gopath
61-
rm -rf $(VENDOR_PATH)
62-
mkdir -p $(VENDOR_PATH)
63-
mv vendor $(VENDOR_PATH)/src
6459

6560
install_lint:
6661
ifdef SHOULD_LINT
@@ -176,7 +171,6 @@ thrift_gen:
176171
$(BUILD)/thrift-gen --generateThrift --inputFile examples/keyvalue/keyvalue.thrift --outputDir examples/keyvalue/gen-go
177172
$(BUILD)/thrift-gen --generateThrift --inputFile examples/thrift/example.thrift --outputDir examples/thrift/gen-go
178173
$(BUILD)/thrift-gen --generateThrift --inputFile hyperbahn/hyperbahn.thrift --outputDir hyperbahn/gen-go
179-
rm -rf trace/thrift/gen-go/tcollector && $(BUILD)/thrift-gen --generateThrift --inputFile trace/tcollector.thrift --outputDir trace/thrift/gen-go/
180174

181175
release_thrift_gen: clean setup
182176
GOOS=linux GOARCH=amd64 go build -o $(THRIFT_GEN_RELEASE_LINUX)/thrift-gen ./thrift/thrift-gen

arguments.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
"encoding/json"
2626
"io"
2727
"io/ioutil"
28+
29+
"github.com/uber/tchannel-go/internal/argreader"
2830
)
2931

3032
// ArgReader is the interface for the arg2 and arg3 streams on an
@@ -74,6 +76,9 @@ func (r ArgReadHelper) read(f func() error) error {
7476
if err := f(); err != nil {
7577
return err
7678
}
79+
if err := argreader.EnsureEmpty(r.reader, "read arg"); err != nil {
80+
return err
81+
}
7782
return r.reader.Close()
7883
}
7984

arguments_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ package tchannel
2323
import (
2424
"bytes"
2525
"io"
26+
"io/ioutil"
27+
"strings"
2628
"testing"
2729

2830
"github.com/stretchr/testify/assert"
@@ -72,6 +74,15 @@ func TestJSONInputOutput(t *testing.T) {
7274
assert.Equal(t, 20756, outObj.Value)
7375
}
7476

77+
func TestReadNotEmpty(t *testing.T) {
78+
// Note: The contents need to be larger than the default buffer size of bufio.NewReader.
79+
r := bytes.NewReader([]byte("{}" + strings.Repeat("{}\n", 10000)))
80+
81+
var data map[string]interface{}
82+
reader := NewArgReader(ioutil.NopCloser(r), nil)
83+
require.Error(t, reader.ReadJSON(&data), "Read should fail due to extra bytes")
84+
}
85+
7586
func BenchmarkArgReaderWriter(b *testing.B) {
7687
obj := testObject{Name: "Foo", Value: 20756}
7788
outObj := testObject{}

connection.go

Lines changed: 75 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,28 @@ import (
3939
"golang.org/x/net/context"
4040
)
4141

42+
const (
43+
// CurrentProtocolVersion is the current version of the TChannel protocol
44+
// supported by this stack
45+
CurrentProtocolVersion = 0x02
46+
47+
// DefaultConnectTimeout is the default timeout used by net.Dial, if no timeout
48+
// is specified in the context.
49+
DefaultConnectTimeout = 5 * time.Second
50+
51+
// defaultConnectionBufferSize is the default size for the connection's
52+
// read and write channels.
53+
defaultConnectionBufferSize = 512
54+
)
55+
56+
// PeerVersion contains version related information for a specific peer.
57+
// These values are extracted from the init headers.
58+
type PeerVersion struct {
59+
Language string `json:"language"`
60+
LanguageVersion string `json:"languageVersion"`
61+
TChannelVersion string `json:"tchannelVersion"`
62+
}
63+
4264
// PeerInfo contains information about a TChannel peer
4365
type PeerInfo struct {
4466
// The host and port that can be used to contact the peer, as encoded by net.JoinHostPort
@@ -49,6 +71,9 @@ type PeerInfo struct {
4971

5072
// IsEphemeral returns whether the remote host:port is ephemeral (e.g. not listening).
5173
IsEphemeral bool `json:"isEphemeral"`
74+
75+
// Version returns the version information for the remote peer.
76+
Version PeerVersion `json:"version"`
5277
}
5378

5479
func (p PeerInfo) String() string {
@@ -72,14 +97,6 @@ func (p LocalPeerInfo) String() string {
7297
return fmt.Sprintf("%v: %v", p.ServiceName, p.PeerInfo)
7398
}
7499

75-
// CurrentProtocolVersion is the current version of the TChannel protocol
76-
// supported by this stack
77-
const CurrentProtocolVersion = 0x02
78-
79-
// DefaultConnectTimeout is the default timeout used by net.Dial, if no timeout
80-
// is specified in the context.
81-
const DefaultConnectTimeout = 5 * time.Second
82-
83100
var (
84101
// ErrConnectionClosed is returned when a caller performs an method
85102
// on a closed connection
@@ -115,7 +132,7 @@ type ConnectionOptions struct {
115132
// The frame pool, allowing better management of frame buffers. Defaults to using raw heap.
116133
FramePool FramePool
117134

118-
// The size of receive channel buffers. Defaults to 512.
135+
// NOTE: This is deprecated and not used for anything.
119136
RecvBufferSize int
120137

121138
// The size of send channel buffers. Defaults to 512.
@@ -142,8 +159,7 @@ type Connection struct {
142159
channelConnectionCommon
143160

144161
connID uint32
145-
checksumType ChecksumType
146-
framePool FramePool
162+
opts ConnectionOptions
147163
conn net.Conn
148164
localPeerInfo LocalPeerInfo
149165
remotePeerInfo PeerInfo
@@ -227,6 +243,19 @@ func getTimeout(ctx context.Context) time.Duration {
227243
return deadline.Sub(time.Now())
228244
}
229245

246+
func (co ConnectionOptions) withDefaults() ConnectionOptions {
247+
if co.ChecksumType == ChecksumTypeNone {
248+
co.ChecksumType = ChecksumTypeCrc32
249+
}
250+
if co.FramePool == nil {
251+
co.FramePool = DefaultFramePool
252+
}
253+
if co.SendBufferSize <= 0 {
254+
co.SendBufferSize = defaultConnectionBufferSize
255+
}
256+
return co
257+
}
258+
230259
// Creates a new Connection around an outbound connection initiated to a peer
231260
func (ch *Channel) newOutboundConnection(ctx context.Context, hostPort string, events connectionEvents) (*Connection, error) {
232261
timeout := getTimeout(ctx)
@@ -262,27 +291,7 @@ func (ch *Channel) newInboundConnection(conn net.Conn, events connectionEvents)
262291

263292
// Creates a new connection in a given initial state
264293
func (ch *Channel) newConnection(conn net.Conn, outboundHP string, initialState connectionState, events connectionEvents) *Connection {
265-
opts := &ch.connectionOptions
266-
267-
checksumType := opts.ChecksumType
268-
if checksumType == ChecksumTypeNone {
269-
checksumType = ChecksumTypeCrc32C
270-
}
271-
272-
sendBufferSize := opts.SendBufferSize
273-
if sendBufferSize <= 0 {
274-
sendBufferSize = 512
275-
}
276-
277-
recvBufferSize := opts.RecvBufferSize
278-
if recvBufferSize <= 0 {
279-
recvBufferSize = 512
280-
}
281-
282-
framePool := opts.FramePool
283-
if framePool == nil {
284-
framePool = DefaultFramePool
285-
}
294+
opts := ch.connectionOptions.withDefaults()
286295

287296
connID := _nextConnID.Inc()
288297
log := ch.log.WithFields(LogFields{
@@ -298,13 +307,12 @@ func (ch *Channel) newConnection(conn net.Conn, outboundHP string, initialState
298307

299308
connID: connID,
300309
conn: conn,
301-
framePool: framePool,
310+
opts: opts,
302311
state: initialState,
303-
sendCh: make(chan *Frame, sendBufferSize),
312+
sendCh: make(chan *Frame, opts.SendBufferSize),
304313
stopCh: make(chan struct{}),
305314
localPeerInfo: peerInfo,
306315
outboundHP: outboundHP,
307-
checksumType: checksumType,
308316
inbound: newMessageExchangeSet(log, messageExchangeSetInbound),
309317
outbound: newMessageExchangeSet(log, messageExchangeSetOutbound),
310318
handler: channelHandler{ch},
@@ -404,7 +412,7 @@ func (c *Connection) sendInit(ctx context.Context) error {
404412
}
405413
defer c.pendingExchangeMethodDone()
406414

407-
mex, err := c.outbound.newExchange(ctx, c.framePool, req.messageType(), req.ID(), 1)
415+
mex, err := c.outbound.newExchange(ctx, c.opts.FramePool, req.messageType(), req.ID(), 1)
408416
if err != nil {
409417
return c.connectionError("create init req", err)
410418
}
@@ -441,18 +449,11 @@ func (c *Connection) handleInitReq(frame *Frame) {
441449
return
442450
}
443451

444-
var ok bool
445-
if c.remotePeerInfo.HostPort, ok = req.initParams[InitParamHostPort]; !ok {
446-
c.protocolError(id, fmt.Errorf("header %v is required", InitParamHostPort))
447-
return
448-
}
449-
if c.remotePeerInfo.ProcessName, ok = req.initParams[InitParamProcessName]; !ok {
450-
c.protocolError(id, fmt.Errorf("header %v is required", InitParamProcessName))
452+
if err := c.parseRemotePeer(req.initParams); err != nil {
453+
c.protocolError(id, err)
451454
return
452455
}
453456

454-
c.parseRemotePeerAddress()
455-
456457
res := initRes{initMessage{id: frame.Header.ID}}
457458
res.initParams = c.getInitParams()
458459
res.Version = CurrentProtocolVersion
@@ -469,7 +470,6 @@ func (c *Connection) handleInitReq(frame *Frame) {
469470

470471
return nil
471472
})
472-
473473
c.callOnActive()
474474
}
475475

@@ -482,7 +482,7 @@ func (c *Connection) ping(ctx context.Context) error {
482482
defer c.pendingExchangeMethodDone()
483483

484484
req := &pingReq{id: c.NextMessageID()}
485-
mex, err := c.outbound.newExchange(ctx, c.framePool, req.messageType(), req.ID(), 1)
485+
mex, err := c.outbound.newExchange(ctx, c.opts.FramePool, req.messageType(), req.ID(), 1)
486486
if err != nil {
487487
return c.connectionError("create ping exchange", err)
488488
}
@@ -563,12 +563,9 @@ func (c *Connection) handleInitRes(frame *Frame) bool {
563563
c.protocolError(frame.Header.ID, fmt.Errorf("unsupported protocol version %d from peer", res.Version))
564564
return true
565565
}
566-
if _, ok := res.initParams[InitParamHostPort]; !ok {
567-
c.protocolError(frame.Header.ID, fmt.Errorf("header %v is required", InitParamHostPort))
568-
return true
569-
}
570-
if _, ok := res.initParams[InitParamProcessName]; !ok {
571-
c.protocolError(frame.Header.ID, fmt.Errorf("header %v is required", InitParamProcessName))
566+
567+
if err := c.parseRemotePeer(res.initParams); err != nil {
568+
c.protocolError(frame.Header.ID, err)
572569
return true
573570
}
574571

@@ -607,9 +604,9 @@ func (c *Connection) handleInitRes(frame *Frame) bool {
607604

608605
// sendMessage sends a standalone message (typically a control message)
609606
func (c *Connection) sendMessage(msg message) error {
610-
frame := c.framePool.Get()
607+
frame := c.opts.FramePool.Get()
611608
if err := frame.write(msg); err != nil {
612-
c.framePool.Release(frame)
609+
c.opts.FramePool.Release(frame)
613610
return err
614611
}
615612

@@ -633,7 +630,7 @@ func (c *Connection) recvMessage(ctx context.Context, msg message, mex *messageE
633630
}
634631

635632
err = frame.read(msg)
636-
c.framePool.Release(frame)
633+
c.opts.FramePool.Release(frame)
637634
return err
638635
}
639636

@@ -649,7 +646,7 @@ func (c *Connection) NextMessageID() uint32 {
649646

650647
// SendSystemError sends an error frame for the given system error.
651648
func (c *Connection) SendSystemError(id uint32, span Span, err error) error {
652-
frame := c.framePool.Get()
649+
frame := c.opts.FramePool.Get()
653650

654651
if err := frame.write(&errorMessage{
655652
id: id,
@@ -706,7 +703,7 @@ func (c *Connection) logConnectionError(site string, err error) error {
706703
errCode = se.Code()
707704
logger.Error("Connection error.")
708705
} else {
709-
logger.Warn("Connection error.")
706+
logger.Info("Connection error.")
710707
}
711708
}
712709
return NewWrappedSystemError(errCode, err)
@@ -778,14 +775,14 @@ func (c *Connection) readState() connectionState {
778775
// since we cannot process new frames until the initialization is complete.
779776
func (c *Connection) readFrames(_ uint32) {
780777
for {
781-
frame := c.framePool.Get()
778+
frame := c.opts.FramePool.Get()
782779
if err := frame.ReadIn(c.conn); err != nil {
783780
if c.closeNetworkCalled.Load() == 0 {
784781
c.connectionError("read frames", err)
785782
} else {
786783
c.log.Debugf("Ignoring error after connection was closed: %v", err)
787784
}
788-
c.framePool.Release(frame)
785+
c.opts.FramePool.Release(frame)
789786
return
790787
}
791788

@@ -796,7 +793,7 @@ func (c *Connection) readFrames(_ uint32) {
796793
releaseFrame = c.handleFrameRelay(frame)
797794
}
798795
if releaseFrame {
799-
c.framePool.Release(frame)
796+
c.opts.FramePool.Release(frame)
800797
}
801798
}
802799
}
@@ -862,7 +859,7 @@ func (c *Connection) writeFrames(_ uint32) {
862859
}
863860

864861
err := f.WriteOut(c.conn)
865-
c.framePool.Release(f)
862+
c.opts.FramePool.Release(f)
866863
if err != nil {
867864
c.connectionError("write frames", err)
868865
return
@@ -1010,6 +1007,22 @@ func (c *Connection) closeNetwork() {
10101007
}
10111008
}
10121009

1010+
func (c *Connection) parseRemotePeer(p initParams) error {
1011+
var ok bool
1012+
if c.remotePeerInfo.HostPort, ok = p[InitParamHostPort]; !ok {
1013+
return fmt.Errorf("header %v is required", InitParamHostPort)
1014+
}
1015+
if c.remotePeerInfo.ProcessName, ok = p[InitParamProcessName]; !ok {
1016+
return fmt.Errorf("header %v is required", InitParamProcessName)
1017+
}
1018+
1019+
c.parseRemotePeerAddress()
1020+
c.remotePeerInfo.Version.Language = p[InitParamTChannelLanguage]
1021+
c.remotePeerInfo.Version.LanguageVersion = p[InitParamTChannelLanguageVersion]
1022+
c.remotePeerInfo.Version.TChannelVersion = p[InitParamTChannelVersion]
1023+
return nil
1024+
}
1025+
10131026
// parseRemotePeerAddress parses remote peer info into individual components and
10141027
// caches them on the Connection to be used to set peer tags on OpenTracing Span.
10151028
func (c *Connection) parseRemotePeerAddress() {

0 commit comments

Comments
 (0)