Skip to content

Commit e81fb7e

Browse files
authored
Add tcpinfo watcher (#31)
* Listen for connections from the tcp-info daemon * "Watcher" is now named "Poller"
1 parent d3791a7 commit e81fb7e

File tree

11 files changed

+461
-36
lines changed

11 files changed

+461
-36
lines changed

.travis.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
language: go
22

3-
sudo: true
4-
53
before_install:
64
- go get github.com/mattn/goveralls
75
# Install dependencies, including test dependencies.

caller.go

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,16 @@ import (
77
"fmt"
88
"time"
99

10+
"github.com/m-lab/traceroute-caller/connection"
11+
1012
"github.com/m-lab/go/flagx"
1113
"github.com/m-lab/go/rtx"
14+
"github.com/m-lab/tcp-info/eventsocket"
1215

1316
"github.com/m-lab/go/prometheusx"
14-
"github.com/m-lab/traceroute-caller/connectionwatcher"
17+
"github.com/m-lab/traceroute-caller/connectionlistener"
18+
"github.com/m-lab/traceroute-caller/connectionpoller"
19+
"github.com/m-lab/traceroute-caller/ipcache"
1520
"github.com/m-lab/traceroute-caller/scamper"
1621
)
1722

@@ -22,6 +27,7 @@ var (
2227
scamperCtrlSocket = flag.String("scamper.unixsocket", "/tmp/scamperctrl", "The name of the UNIX-domain socket that the scamper daemon should listen on")
2328
outputPath = flag.String("outputPath", "/var/spool/scamper", "path of output")
2429
waitTime = flag.Duration("waitTime", 5*time.Second, "how long to wait between subsequent listings of open connections")
30+
tcpinfoSocket = flag.String("tcpinfo.socket", "", "The filename of the unix domain socket served by tcpinfo. If this argument is set, then tcpinfo will be used instead of the `ss` command.")
2531

2632
ctx, cancel = context.WithCancel(context.Background())
2733
)
@@ -47,15 +53,23 @@ func main() {
4753
}
4854
go daemon.MustStart(ctx)
4955

50-
connWatcher := connectionwatcher.New()
51-
for ctx.Err() == nil {
52-
closedConnections := connWatcher.GetClosedConnections()
53-
fmt.Printf("length of closed connections: %d\n", len(closedConnections))
54-
daemon.TraceAll(closedConnections)
56+
cache := ipcache.New(ctx)
57+
if *tcpinfoSocket == "" {
58+
connPoller := connectionpoller.New(cache)
59+
for ctx.Err() == nil {
60+
closedConnections := connPoller.GetClosedConnections()
61+
fmt.Printf("length of closed connections: %d\n", len(closedConnections))
62+
daemon.TraceAll(closedConnections)
5563

56-
select {
57-
case <-time.After(*waitTime):
58-
case <-ctx.Done():
64+
select {
65+
case <-time.After(*waitTime):
66+
case <-ctx.Done():
67+
}
5968
}
69+
} else {
70+
connCreator, err := connection.NewCreator()
71+
rtx.Must(err, "Could not discover local IPs")
72+
connListener := connectionlistener.New(&daemon, connCreator, cache)
73+
eventsocket.MustRun(ctx, *tcpinfoSocket, connListener)
6074
}
6175
}

caller_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@ package main
22

33
import (
44
"context"
5+
"io/ioutil"
56
"testing"
67
"time"
78

9+
"github.com/m-lab/tcp-info/eventsocket"
10+
811
"github.com/m-lab/go/prometheusx"
12+
"github.com/m-lab/go/rtx"
913

1014
"github.com/m-lab/go/prometheusx/promtest"
1115
)
@@ -26,3 +30,20 @@ func TestMain(t *testing.T) {
2630
}()
2731
main()
2832
}
33+
34+
func TestMainWithConnectionListener(t *testing.T) {
35+
dir, err := ioutil.TempDir("", "TestTracerouteCaller")
36+
rtx.Must(err, "Could not create temp dir")
37+
srv := eventsocket.New(dir + "/events.sock")
38+
rtx.Must(srv.Listen(), "Could not start the empty server")
39+
40+
*prometheusx.ListenAddress = ":0"
41+
*tcpinfoSocket = dir + "/events.sock"
42+
ctx, cancel = context.WithCancel(context.Background())
43+
go srv.Serve(ctx)
44+
go func() {
45+
time.Sleep(1 * time.Second)
46+
cancel()
47+
}()
48+
main()
49+
}

connection/connection.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,20 @@
22
package connection
33

44
import (
5+
"fmt"
6+
"net"
57
"strconv"
68

9+
"github.com/m-lab/tcp-info/inetdiag"
10+
711
"github.com/m-lab/uuid"
812
)
913

14+
var (
15+
// Inject this function to allow whitebox testing of error handling.
16+
netInterfaceAddrs = net.InterfaceAddrs
17+
)
18+
1019
// Connection models a single connection. This type is checked for equality
1120
// elsewhere in traceroute-caller, so be very careful adding more fields as you
1221
// might accidentally change program semantics elsewhere.
@@ -24,3 +33,76 @@ func (c *Connection) UUID() (string, error) {
2433
result, err := strconv.ParseUint(c.Cookie, 16, 64)
2534
return uuid.FromCookie(result), err
2635
}
36+
37+
type creator struct {
38+
localIPs []*net.IP
39+
}
40+
41+
// FromSockID converts a SockID into a Connection.
42+
func (c *creator) FromSockID(sockid inetdiag.SockID) (Connection, error) {
43+
srcIP := net.ParseIP(sockid.SrcIP)
44+
dstIP := net.ParseIP(sockid.DstIP)
45+
for _, local := range c.localIPs {
46+
if local.Equal(srcIP) {
47+
return Connection{
48+
RemoteIP: sockid.DstIP,
49+
RemotePort: int(sockid.DPort),
50+
LocalIP: sockid.SrcIP,
51+
LocalPort: int(sockid.SPort),
52+
Cookie: strconv.FormatUint(sockid.CookieUint64(), 16),
53+
}, nil
54+
}
55+
if local.Equal(dstIP) {
56+
return Connection{
57+
RemoteIP: sockid.SrcIP,
58+
RemotePort: int(sockid.SPort),
59+
LocalIP: sockid.DstIP,
60+
LocalPort: int(sockid.DPort),
61+
Cookie: strconv.FormatUint(sockid.CookieUint64(), 16),
62+
}, nil
63+
64+
}
65+
}
66+
return Connection{}, fmt.Errorf("Could not find a local IP in %+v", sockid)
67+
}
68+
69+
// Creator allows you to create a connection object from a SockID. It properly
70+
// assigns SrcIP and DestIP to RemoteIP and LocalIP.
71+
type Creator interface {
72+
FromSockID(sockid inetdiag.SockID) (Connection, error)
73+
}
74+
75+
// NewCreator makes an object that can convert src and dst into local and remote
76+
// IPs.
77+
func NewCreator() (Creator, error) {
78+
c := &creator{
79+
localIPs: make([]*net.IP, 0),
80+
}
81+
82+
addrs, err := netInterfaceAddrs()
83+
if err != nil {
84+
return c, err
85+
}
86+
for _, addr := range addrs {
87+
var ip net.IP
88+
switch a := addr.(type) {
89+
case *net.IPNet:
90+
ip = a.IP
91+
case *net.IPAddr:
92+
ip = a.IP
93+
default:
94+
return c, fmt.Errorf("Unknown type of address %q", addr.String())
95+
}
96+
c.localIPs = append(c.localIPs, &ip)
97+
}
98+
99+
return c, err
100+
}
101+
102+
// NewFakeCreator makes a fake creator with hardcoded local IPs to enable
103+
// testing in diverse network environments.
104+
func NewFakeCreator(localIPs []*net.IP) Creator {
105+
return &creator{
106+
localIPs: localIPs,
107+
}
108+
}

connection/connection_test.go

Lines changed: 124 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,138 @@
1-
package connection_test
1+
package connection
22

33
import (
4+
"errors"
5+
"net"
6+
"reflect"
47
"strings"
58
"testing"
69

7-
"github.com/m-lab/traceroute-caller/connection"
10+
"github.com/m-lab/go/rtx"
11+
"github.com/m-lab/tcp-info/inetdiag"
812
)
913

1014
func TestUUID(t *testing.T) {
11-
conn := connection.Connection{Cookie: "1be3"}
15+
conn := Connection{Cookie: "1be3"}
1216
tmp, err := conn.UUID()
1317
s := strings.Split(tmp, "_")
1418
if err != nil || s[len(s)-1] != "0000000000001BE3" {
1519
t.Error("Make uuid from cookie incorrect")
1620
}
1721
}
22+
23+
func TestSrcDestSwap(t *testing.T) {
24+
oldFunc := netInterfaceAddrs
25+
defer func() { netInterfaceAddrs = oldFunc }()
26+
27+
netInterfaceAddrs = func() ([]net.Addr, error) {
28+
_, nw, err := net.ParseCIDR("127.0.0.1/8")
29+
rtx.Must(err, "could not parse test nw")
30+
ip1, err := net.ResolveIPAddr("ip6", "::1")
31+
rtx.Must(err, "Could not resolve ::1")
32+
ip2, err := net.ResolveIPAddr("ip4", "1.2.3.4")
33+
rtx.Must(err, "Could not resolve 1.2.3.4")
34+
return []net.Addr{
35+
nw,
36+
ip1,
37+
ip2,
38+
}, nil
39+
}
40+
41+
c, err := NewCreator()
42+
rtx.Must(err, "Could not use fake netInterfaceAddrs")
43+
44+
tests := []struct {
45+
name string
46+
sockid inetdiag.SockID
47+
want Connection
48+
wantErr bool
49+
}{
50+
{
51+
name: "From local to remote",
52+
sockid: inetdiag.SockID{
53+
SrcIP: "1.2.3.4",
54+
SPort: 5,
55+
DstIP: "7.8.9.10",
56+
DPort: 11,
57+
Cookie: 0xc,
58+
},
59+
want: Connection{
60+
LocalIP: "1.2.3.4",
61+
LocalPort: 5,
62+
RemoteIP: "7.8.9.10",
63+
RemotePort: 11,
64+
Cookie: "c",
65+
},
66+
},
67+
{
68+
name: "From remote to local",
69+
sockid: inetdiag.SockID{
70+
DstIP: "1.2.3.4",
71+
DPort: 5,
72+
SrcIP: "7.8.9.10",
73+
SPort: 11,
74+
Cookie: 0xc,
75+
},
76+
want: Connection{
77+
LocalIP: "1.2.3.4",
78+
LocalPort: 5,
79+
RemoteIP: "7.8.9.10",
80+
RemotePort: 11,
81+
Cookie: "c",
82+
},
83+
},
84+
{
85+
name: "Nonlocal connection",
86+
sockid: inetdiag.SockID{
87+
DstIP: "13.14.15.16",
88+
DPort: 17,
89+
SrcIP: "7.8.9.10",
90+
SPort: 11,
91+
Cookie: 0xc,
92+
},
93+
wantErr: true,
94+
},
95+
}
96+
for _, tt := range tests {
97+
t.Run(tt.name, func(t *testing.T) {
98+
got, err := c.FromSockID(tt.sockid)
99+
if (err != nil) != tt.wantErr {
100+
t.Errorf("creator.FromSockID() error = %v, wantErr %v", err, tt.wantErr)
101+
return
102+
}
103+
if !reflect.DeepEqual(got, tt.want) {
104+
t.Errorf("creator.FromSockID() = %v, want %v", got, tt.want)
105+
}
106+
})
107+
}
108+
109+
c = NewFakeCreator([]*net.IP{}) // Just call it to make sure it doesn't crash.
110+
}
111+
112+
type fakeIP struct{}
113+
114+
func (fakeIP) String() string { return "" }
115+
func (fakeIP) Network() string { return "" }
116+
117+
func TestNewCreatorWithBadInterfaceAddrs(t *testing.T) {
118+
oldFunc := netInterfaceAddrs
119+
defer func() { netInterfaceAddrs = oldFunc }()
120+
121+
netInterfaceAddrs = func() ([]net.Addr, error) {
122+
return nil, errors.New("error for testing")
123+
}
124+
125+
_, err := NewCreator()
126+
if err == nil {
127+
t.Error("Should have had an error but was nil")
128+
}
129+
130+
netInterfaceAddrs = func() ([]net.Addr, error) {
131+
return []net.Addr{fakeIP{}}, nil
132+
}
133+
134+
_, err = NewCreator()
135+
if err == nil {
136+
t.Error("Should have had an error but was nil")
137+
}
138+
}

0 commit comments

Comments
 (0)