Skip to content

Commit b99c1d7

Browse files
authored
Merge pull request #627 from uber/dev
Version 1.6.0
2 parents 0b7f160 + eab25af commit b99c1d7

18 files changed

+604
-68
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ lint.log
1616
.idea
1717
tchannel-go.iml
1818
.vscode
19+
.bin/

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
Changelog
22
=========
33

4+
# v1.6.0
5+
6+
* Locks Apache Thrift to version 0.9.3 to maintain backward-compatibility.
7+
* Add `OnPeerStatusChanged` channel option to receive a notification each time
8+
the number of available connections changes for any given peer.
9+
* Set DiffServ (QoS) bit on outbound connections.
10+
* Improve resilience of the frame parser.
11+
412
# v1.5.0
513

614
* Add `PeerList.Len` to expose the number of peers in the peer list.

Makefile

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,10 @@ SRCS := $(foreach pkg,$(PKGS),$(wildcard $(pkg)/*.go))
2323

2424
PLATFORM := $(shell uname -s | tr '[:upper:]' '[:lower:]')
2525
ARCH := $(shell uname -m)
26-
THRIFT_REL := ./scripts/travis/thrift-release/$(PLATFORM)-$(ARCH)
2726

2827
OLD_GOPATH := $(GOPATH)
2928

30-
export PATH := $(realpath $(THRIFT_REL)):$(PATH)
29+
BIN := $(shell pwd)/.bin
3130

3231
# Cross language test args
3332
TEST_HOST=127.0.0.1
@@ -37,6 +36,10 @@ TEST_PORT=0
3736

3837
all: test examples
3938

39+
$(BIN)/thrift:
40+
mkdir -p $(BIN)
41+
scripts/install-thrift.sh $(BIN)
42+
4043
packages_test:
4144
go list -json ./... | jq -r '. | select ((.TestGoFiles | length) > 0) | .ImportPath'
4245

@@ -46,9 +49,6 @@ setup:
4649
mkdir -p $(THRIFT_GEN_RELEASE_LINUX)
4750
mkdir -p $(THRIFT_GEN_RELEASE_DARWIN)
4851

49-
get_thrift:
50-
scripts/travis/get-thrift.sh
51-
5252
# We want to remove `vendor` dir because thrift-gen tests don't work with it.
5353
# However, glide install even with --cache-gopath option leaves GOPATH at HEAD,
5454
# not at the desired versions from glide.lock, which are only applied to `vendor`
@@ -70,7 +70,7 @@ install_glide:
7070
# but have to pin to 0.12.3 due to https://github.com/Masterminds/glide/issues/745
7171
GOPATH=$(OLD_GOPATH) go get -u github.com/Masterminds/glide && cd $(OLD_GOPATH)/src/github.com/Masterminds/glide && git checkout v0.12.3 && go install
7272

73-
install_ci: install_glide install_lint get_thrift install
73+
install_ci: $(BIN)/thrift install_glide install_lint install
7474
GOPATH=$(OLD_GOPATH) go get -u github.com/mattn/goveralls
7575
ifdef CROSSDOCK
7676
$(MAKE) install_docker_ci
@@ -100,23 +100,23 @@ else
100100
$(MAKE) test
101101
endif
102102

103-
test: clean setup install_test check_no_test_deps
103+
test: clean setup install_test check_no_test_deps $(BIN)/thrift
104104
@echo Testing packages:
105-
go test -parallel=4 $(TEST_ARG) $(ALL_PKGS)
105+
PATH=$(BIN):$$PATH go test -parallel=4 $(TEST_ARG) $(ALL_PKGS)
106106
@echo Running frame pool tests
107-
go test -run TestFramesReleased -stressTest $(TEST_ARG)
107+
PATH=$(BIN):$$PATH go test -run TestFramesReleased -stressTest $(TEST_ARG)
108108

109109
check_no_test_deps:
110110
! go list -json $(PROD_PKGS) | jq -r .Deps[] | grep -e test -e mock
111111

112-
benchmark: clean setup
112+
benchmark: clean setup $(BIN)/thrift
113113
echo Running benchmarks:
114-
go test $(ALL_PKGS) -bench=. -cpu=1 -benchmem -run NONE
114+
PATH=$(BIN)::$$PATH go test $(ALL_PKGS) -bench=. -cpu=1 -benchmem -run NONE
115115

116-
cover_profile: clean setup
116+
cover_profile: clean setup $(BIN)/thrift
117117
@echo Testing packages:
118118
mkdir -p $(BUILD)
119-
go test ./ $(TEST_ARG) -coverprofile=$(BUILD)/coverage.out
119+
PATH=$(BIN)::$$PATH go test ./ $(TEST_ARG) -coverprofile=$(BUILD)/coverage.out
120120

121121
cover: cover_profile
122122
go tool cover -html=$(BUILD)/coverage.out
@@ -167,18 +167,18 @@ examples: clean setup thrift_example
167167
go build -o $(BUILD)/examples/bench/runner ./examples/bench/runner.go
168168
go build -o $(BUILD)/examples/test_server ./examples/test_server
169169

170-
thrift_gen:
170+
thrift_gen: $(BIN)/thrift
171171
go build -o $(BUILD)/thrift-gen ./thrift/thrift-gen
172-
$(BUILD)/thrift-gen --generateThrift --inputFile thrift/test.thrift --outputDir thrift/gen-go/
173-
$(BUILD)/thrift-gen --generateThrift --inputFile examples/keyvalue/keyvalue.thrift --outputDir examples/keyvalue/gen-go
174-
$(BUILD)/thrift-gen --generateThrift --inputFile examples/thrift/example.thrift --outputDir examples/thrift/gen-go
175-
$(BUILD)/thrift-gen --generateThrift --inputFile hyperbahn/hyperbahn.thrift --outputDir hyperbahn/gen-go
172+
PATH=$(BIN):$$PATH $(BUILD)/thrift-gen --generateThrift --inputFile thrift/test.thrift --outputDir thrift/gen-go/
173+
PATH=$(BIN):$$PATH $(BUILD)/thrift-gen --generateThrift --inputFile examples/keyvalue/keyvalue.thrift --outputDir examples/keyvalue/gen-go
174+
PATH=$(BIN):$$PATH $(BUILD)/thrift-gen --generateThrift --inputFile examples/thrift/example.thrift --outputDir examples/thrift/gen-go
175+
PATH=$(BIN):$$PATH $(BUILD)/thrift-gen --generateThrift --inputFile hyperbahn/hyperbahn.thrift --outputDir hyperbahn/gen-go
176176

177177
release_thrift_gen: clean setup
178178
GOOS=linux GOARCH=amd64 go build -o $(THRIFT_GEN_RELEASE_LINUX)/thrift-gen ./thrift/thrift-gen
179179
GOOS=darwin GOARCH=amd64 go build -o $(THRIFT_GEN_RELEASE_DARWIN)/thrift-gen ./thrift/thrift-gen
180180
tar -czf thrift-gen-release.tar.gz $(THRIFT_GEN_RELEASE)
181181
mv thrift-gen-release.tar.gz $(THRIFT_GEN_RELEASE)/
182182

183-
.PHONY: all help clean fmt format get_thrift install install_ci install_lint install_glide release_thrift_gen packages_test check_no_test_deps test test_ci lint
183+
.PHONY: all help clean fmt format install install_ci install_lint install_glide release_thrift_gen packages_test check_no_test_deps test test_ci lint
184184
.SILENT: all help clean fmt format test lint

all_channels.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020

2121
package tchannel
2222

23-
import "sync"
23+
import (
24+
"fmt"
25+
"sync"
26+
)
2427

2528
// channelMap is used to ensure that applications don't create multiple channels with
2629
// the same service name in a single process.
@@ -34,7 +37,10 @@ var channelMap = struct {
3437
func registerNewChannel(ch *Channel) {
3538
serviceName := ch.ServiceName()
3639
ch.createdStack = string(getStacks(false /* all */))
37-
ch.log.Debugf("NewChannel created at %s", ch.createdStack)
40+
ch.log.WithFields(
41+
LogField{"channelPtr", fmt.Sprintf("%p", ch)},
42+
LogField{"createdStack", ch.createdStack},
43+
).Info("Created new channel.")
3844

3945
channelMap.Lock()
4046
defer channelMap.Unlock()

channel.go

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ type ChannelOptions struct {
5757
// The name of the process, for logging and reporting to peers
5858
ProcessName string
5959

60-
// OnPeerStatusChanged
60+
// OnPeerStatusChanged is an optional callback that receives a notification
61+
// whenever the channel establishes a usable connection to a peer, or loses
62+
// a connection to a peer.
6163
OnPeerStatusChanged func(*Peer)
6264

6365
// The logger to use for this channel
@@ -123,14 +125,15 @@ const (
123125
type Channel struct {
124126
channelConnectionCommon
125127

126-
chID uint32
127-
createdStack string
128-
commonStatsTags map[string]string
129-
connectionOptions ConnectionOptions
130-
peers *PeerList
131-
relayHost RelayHost
132-
relayMaxTimeout time.Duration
133-
handler Handler
128+
chID uint32
129+
createdStack string
130+
commonStatsTags map[string]string
131+
connectionOptions ConnectionOptions
132+
peers *PeerList
133+
relayHost RelayHost
134+
relayMaxTimeout time.Duration
135+
handler Handler
136+
onPeerStatusChanged func(*Peer)
134137

135138
// mutable contains all the members of Channel which are mutable.
136139
mutable struct {
@@ -220,7 +223,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
220223
relayHost: opts.RelayHost,
221224
relayMaxTimeout: validateRelayMaxTimeout(opts.RelayMaxTimeout, logger),
222225
}
223-
ch.peers = newRootPeerList(ch).newChild()
226+
ch.peers = newRootPeerList(ch, opts.OnPeerStatusChanged).newChild()
224227

225228
if opts.Handler != nil {
226229
ch.handler = opts.Handler
@@ -287,7 +290,9 @@ func (ch *Channel) Serve(l net.Listener) error {
287290
ch.log = ch.log.WithFields(LogField{"hostPort", mutable.peerInfo.HostPort})
288291

289292
peerInfo := mutable.peerInfo
290-
ch.log.Debugf("%v (%v) listening on %v", peerInfo.ProcessName, peerInfo.ServiceName, peerInfo.HostPort)
293+
ch.log.WithFields(
294+
LogField{"hostPort", peerInfo.HostPort},
295+
).Info("Channel is listening.")
291296
go ch.serve()
292297
return nil
293298
}
@@ -738,7 +743,7 @@ func (ch *Channel) State() ChannelState {
738743
// Close starts a graceful Close for the channel. This does not happen immediately:
739744
// 1. This call closes the Listener and starts closing connections.
740745
// 2. When all incoming connections are drained, the connection blocks new outgoing calls.
741-
// 3. When all connections are drainged, the channel's state is updated to Closed.
746+
// 3. When all connections are drained, the channel's state is updated to Closed.
742747
func (ch *Channel) Close() {
743748
ch.Logger().Info("Channel.Close called.")
744749
var connections []*Connection

connection.go

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,12 @@ import (
2929
"sync"
3030
"time"
3131

32+
"github.com/uber/tchannel-go/tos"
33+
3234
"github.com/uber-go/atomic"
3335
"golang.org/x/net/context"
36+
"golang.org/x/net/ipv4"
37+
"golang.org/x/net/ipv6"
3438
)
3539

3640
const (
@@ -129,6 +133,9 @@ type ConnectionOptions struct {
129133

130134
// The type of checksum to use when sending messages.
131135
ChecksumType ChecksumType
136+
137+
// ToS class name marked on outbound packets.
138+
TosPriority tos.ToS
132139
}
133140

134141
// connectionEvents are the events that can be triggered by a connection.
@@ -176,10 +183,6 @@ type Connection struct {
176183
stoppedExchanges atomic.Uint32
177184
// pendingMethods is the number of methods running that may block closing of sendCh.
178185
pendingMethods atomic.Int64
179-
// ignoreRemotePeer is used to avoid a data race between setting the RemotePeerInfo
180-
// and the connection failing, causing a read of the RemotePeerInfo at the same time.
181-
ignoreRemotePeer bool
182-
183186
// remotePeerAddress is used as a cache for remote peer address parsed into individual
184187
// components that can be used to set peer tags on OpenTracing Span.
185188
remotePeerAddress peerAddressComponents
@@ -237,6 +240,23 @@ func (co ConnectionOptions) withDefaults() ConnectionOptions {
237240
return co
238241
}
239242

243+
func (ch *Channel) setConnectionTosPriority(tosPriority tos.ToS, c net.Conn) error {
244+
tcpAddr, isTCP := c.RemoteAddr().(*net.TCPAddr)
245+
if !isTCP {
246+
return nil
247+
}
248+
249+
// Handle dual stack listeners and set Traffic Class.
250+
var err error
251+
switch ip := tcpAddr.IP; {
252+
case ip.To16() != nil && ip.To4() == nil:
253+
err = ipv6.NewConn(c).SetTrafficClass(int(tosPriority))
254+
case ip.To4() != nil:
255+
err = ipv4.NewConn(c).SetTOS(int(tosPriority))
256+
}
257+
return err
258+
}
259+
240260
func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP string, remotePeer PeerInfo, remotePeerAddress peerAddressComponents, events connectionEvents) *Connection {
241261
opts := ch.connectionOptions.withDefaults()
242262

@@ -279,6 +299,12 @@ func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP str
279299
commonStatsTags: ch.commonStatsTags,
280300
}
281301

302+
if tosPriority := opts.TosPriority; tosPriority > 0 {
303+
if err := ch.setConnectionTosPriority(tosPriority, conn); err != nil {
304+
log.WithFields(ErrField(err)).Error("Failed to set ToS priority.")
305+
}
306+
}
307+
282308
c.nextMessageID.Store(initialID)
283309
c.log = log
284310
c.inbound.onRemoved = c.checkExchanges
@@ -501,12 +527,6 @@ func (c *Connection) logConnectionError(site string, err error) error {
501527

502528
// connectionError handles a connection level error
503529
func (c *Connection) connectionError(site string, err error) error {
504-
// Avoid racing with setting the peer info.
505-
c.withStateLock(func() error {
506-
c.ignoreRemotePeer = true
507-
return nil
508-
})
509-
510530
var closeLogFields LogFields
511531
if err == io.EOF {
512532
closeLogFields = LogFields{{"reason", "network connection EOF"}}

0 commit comments

Comments
 (0)