Skip to content

Commit 50cc96e

Browse files
authored
Fix connection listener (#40)
Standardize flags, parse IPs in a more cautious manner, don't run local-local traceroutes, add a dryRun flag. Use icache.Trace and not scamper.Trace. Actually, the pollers and listeners should not know about scamper at all. Only the ipcache. Deleted old code. Updated tests to preserve test coverage. Don't risk a null pointer error in an error message
1 parent 46838de commit 50cc96e

File tree

9 files changed

+58
-81
lines changed

9 files changed

+58
-81
lines changed

caller.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package main
44
import (
55
"context"
66
"flag"
7+
"log"
78
"sync"
89
"time"
910

@@ -37,6 +38,8 @@ var (
3738
// go build
3839
// ./traceroute-caller --outputPath scamper_output
3940
func main() {
41+
log.SetFlags(log.LstdFlags | log.Lshortfile)
42+
4043
flag.Parse()
4144
rtx.Must(flagx.ArgsFromEnv(flag.CommandLine), "Could not get args from environment")
4245

@@ -58,13 +61,13 @@ func main() {
5861
}()
5962

6063
wg := sync.WaitGroup{}
61-
cache := ipcache.New(ctx)
64+
cache := ipcache.New(ctx, &daemon, *ipcache.IPCacheTimeout, *ipcache.IPCacheUpdatePeriod)
6265
if *poll {
6366
wg.Add(1)
6467
go func() {
6568
connPoller := connectionpoller.New(cache)
6669
for ctx.Err() == nil {
67-
connPoller.TraceClosedConnections(&daemon)
70+
connPoller.TraceClosedConnections()
6871

6972
select {
7073
case <-time.After(*waitTime):
@@ -82,9 +85,9 @@ func main() {
8285
esdaemon := daemon
8386
esdaemon.DryRun = *eventsocketDryRun
8487
if *eventsocketDryRun {
85-
cache = ipcache.New(ctx)
88+
cache = ipcache.New(ctx, &esdaemon, *ipcache.IPCacheTimeout, *ipcache.IPCacheUpdatePeriod)
8689
}
87-
connListener := connectionlistener.New(&esdaemon, connCreator, cache)
90+
connListener := connectionlistener.New(connCreator, cache)
8891
eventsocket.MustRun(ctx, *eventsocket.Filename, connListener)
8992
wg.Done()
9093
}()

caller_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@ func TestMetrics(t *testing.T) {
1919
}
2020

2121
func TestMain(t *testing.T) {
22+
dir, err := ioutil.TempDir("", "TestMain")
23+
rtx.Must(err, "Could not create temp dir")
24+
2225
// Verify that main doesn't crash, and that it does exit when the context is canceled.
2326
// TODO: verify more in this test.
2427
*prometheusx.ListenAddress = ":0"
28+
*scamperCtrlSocket = dir + "/scamper.sock"
2529
*waitTime = time.Nanosecond // Run through the loop a few times.
2630
ctx, cancel = context.WithCancel(context.Background())
2731
go func() {
@@ -32,12 +36,13 @@ func TestMain(t *testing.T) {
3236
}
3337

3438
func TestMainWithConnectionListener(t *testing.T) {
35-
dir, err := ioutil.TempDir("", "TestTracerouteCaller")
39+
dir, err := ioutil.TempDir("", "TestMainWithConnectionListener")
3640
rtx.Must(err, "Could not create temp dir")
3741
srv := eventsocket.New(dir + "/events.sock")
3842
rtx.Must(srv.Listen(), "Could not start the empty server")
3943

4044
*prometheusx.ListenAddress = ":0"
45+
*scamperCtrlSocket = dir + "/scamper.sock"
4146
*eventsocket.Filename = dir + "/events.sock"
4247
*eventsocketDryRun = true
4348

connectionlistener/connectionlistener.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/m-lab/tcp-info/inetdiag"
1111
"github.com/m-lab/traceroute-caller/connection"
1212
"github.com/m-lab/traceroute-caller/ipcache"
13-
"github.com/m-lab/traceroute-caller/scamper"
1413
)
1514

1615
// connectionListener implements the eventsocket.Handler interface, allowing us
@@ -19,7 +18,6 @@ import (
1918
type connectionListener struct {
2019
mutex sync.Mutex
2120
conns map[string]connection.Connection
22-
tracer scamper.Tracer
2321
cache *ipcache.RecentIPCache
2422
creator connection.Creator
2523
}
@@ -46,19 +44,15 @@ func (cl *connectionListener) Close(ctx context.Context, timestamp time.Time, uu
4644
cl.mutex.Unlock()
4745

4846
if ok {
49-
if !cl.cache.Has(conn.RemoteIP) {
50-
cl.cache.Add(conn.RemoteIP)
51-
go cl.tracer.Trace(conn, timestamp)
52-
}
47+
go cl.cache.Trace(conn)
5348
}
5449
}
5550

5651
// New returns an eventsocket.Handler that will call the passed-in scamper
5752
// daemon on every closed connection.
58-
func New(tracer scamper.Tracer, creator connection.Creator, cache *ipcache.RecentIPCache) eventsocket.Handler {
53+
func New(creator connection.Creator, cache *ipcache.RecentIPCache) eventsocket.Handler {
5954
return &connectionListener{
6055
conns: make(map[string]connection.Connection),
61-
tracer: tracer,
6256
cache: cache,
6357
creator: creator,
6458
}

connectionlistener/connectionlistener_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,14 @@ func TestListener(t *testing.T) {
5555
// Create a new connectionlistener with a fake tracer.
5656
ctx, cancel := context.WithCancel(context.Background())
5757
defer cancel()
58-
cache := ipcache.New(ctx)
5958
ft := &fakeTracer{}
59+
cache := ipcache.New(ctx, ft, 100*time.Second, time.Second)
60+
6061
ft.wg.Add(2)
6162

6263
localIP := net.ParseIP("10.0.0.1")
6364
creator := connection.NewFakeCreator([]*net.IP{&localIP})
64-
cl := connectionlistener.New(ft, creator, cache)
65+
cl := connectionlistener.New(creator, cache)
6566

6667
// Connect the connectionlistener to the server
6768
go eventsocket.MustRun(ctx, dir+"/tcpevents.sock", cl)

connectionpoller/connectionpoller.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515

1616
"github.com/m-lab/traceroute-caller/connection"
1717
"github.com/m-lab/traceroute-caller/ipcache"
18-
"github.com/m-lab/traceroute-caller/scamper"
1918
)
2019

2120
var (
@@ -137,18 +136,18 @@ type connectionPoller struct {
137136
// connections which it previously measured to be open, but it can no longer
138137
// measure to be open.
139138
type ConnectionPoller interface {
140-
TraceClosedConnections(tracer scamper.Tracer)
139+
TraceClosedConnections()
141140
}
142141

143142
// TraceClosedConnections send trace for all closed connections.
144-
func (c *connectionPoller) TraceClosedConnections(tracer scamper.Tracer) {
143+
func (c *connectionPoller) TraceClosedConnections() {
145144
oldConn := c.connectionPool
146145
fmt.Printf("old connection size %d\n", len(oldConn))
147146
c.connectionPool = c.GetConnections()
148147
fmt.Printf("new connection size %d\n", len(c.connectionPool))
149148
for conn := range oldConn {
150149
if _, hasConn := c.connectionPool[conn]; !hasConn {
151-
go c.recentIPCache.Trace(conn, tracer)
150+
go c.recentIPCache.Trace(conn)
152151
}
153152
}
154153
}

connectionpoller/connectionpoller_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,8 @@ func TestConnectionPollerConstruction(t *testing.T) {
147147
// Which is not nothing, but it's not a lot.
148148
ctx, cancel := context.WithCancel(context.Background())
149149
defer cancel()
150-
cache := ipcache.New(ctx)
150+
var tt testTracer
151+
cache := ipcache.New(ctx, &tt, time.Second, time.Second)
151152
connPoller := New(cache).(*connectionPoller)
152153
connPoller.finder = &testFinder{}
153154
connPoller.connectionPool = make(map[connection.Connection]struct{})
@@ -158,8 +159,7 @@ func TestConnectionPollerConstruction(t *testing.T) {
158159
LocalPort: 58790,
159160
Cookie: "10f3d"}
160161
connPoller.connectionPool[conn1] = struct{}{}
161-
var tt testTracer
162-
connPoller.TraceClosedConnections(&tt)
162+
connPoller.TraceClosedConnections()
163163

164164
time.Sleep(200 * time.Millisecond)
165165

ipcache/ipcache.go

Lines changed: 13 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,12 @@ type CacheTest struct {
3232
type RecentIPCache struct {
3333
cache map[string]*CacheTest
3434
mu sync.RWMutex
35+
36+
tracer scamper.Tracer
3537
}
3638

37-
// GetTrace returns test content in []byte given a connection and tracer
38-
func (rc *RecentIPCache) Trace(conn connection.Connection, sc scamper.Tracer) {
39+
// Trace performs a trace and adds it to the cache.
40+
func (rc *RecentIPCache) Trace(conn connection.Connection) {
3941
ip := conn.RemoteIP
4042
rc.mu.Lock()
4143
_, ok := rc.cache[ip]
@@ -47,12 +49,12 @@ func (rc *RecentIPCache) Trace(conn connection.Connection, sc scamper.Tracer) {
4749
rc.cache[ip] = nc
4850
rc.mu.Unlock()
4951

50-
nc.data = sc.Trace(conn, nc.timeStamp)
52+
nc.data = rc.tracer.Trace(conn, nc.timeStamp)
5153
close(nc.done)
5254
return
5355
}
5456
rc.mu.Unlock()
55-
sc.CreateCacheTest(conn, time.Now(), rc.GetData(ip))
57+
rc.tracer.CreateCacheTest(conn, time.Now(), rc.GetData(ip))
5658
}
5759

5860
func (rc *RecentIPCache) GetCacheLength() int {
@@ -74,18 +76,20 @@ func (rc *RecentIPCache) GetData(ip string) string {
7476

7577
// New creates and returns a RecentIPCache. It also starts up a background
7678
// goroutine that scrubs the cache.
77-
func New(ctx context.Context) *RecentIPCache {
78-
m := &RecentIPCache{}
79-
m.cache = make(map[string]*CacheTest)
79+
func New(ctx context.Context, tracer scamper.Tracer, ipCacheTimeout, ipCacheUpdatePeriod time.Duration) *RecentIPCache {
80+
m := &RecentIPCache{
81+
cache: make(map[string]*CacheTest),
82+
tracer: tracer,
83+
}
8084
go func() {
81-
ticker := time.NewTicker(*IPCacheUpdatePeriod)
85+
ticker := time.NewTicker(ipCacheUpdatePeriod)
8286
defer ticker.Stop()
8387
for now := range ticker.C {
8488
if ctx.Err() != nil {
8589
return
8690
}
8791
for k, v := range m.cache {
88-
if now.Sub(v.timeStamp) > *IPCacheTimeout {
92+
if now.Sub(v.timeStamp) > ipCacheTimeout {
8993
fmt.Println("try to delete " + k)
9094
m.mu.Lock()
9195
delete(m.cache, k)
@@ -98,36 +102,3 @@ func New(ctx context.Context) *RecentIPCache {
98102
}()
99103
return m
100104
}
101-
102-
func (m *RecentIPCache) len() int {
103-
m.mu.RLock()
104-
defer m.mu.RUnlock()
105-
return len(m.cache)
106-
}
107-
108-
// Add an IP to the cache.
109-
func (m *RecentIPCache) Add(ip string) {
110-
m.mu.Lock()
111-
defer m.mu.Unlock()
112-
fmt.Printf("func Add: Now is %d\n", time.Now().Unix())
113-
_, present := m.cache[ip]
114-
if !present {
115-
m.cache[ip] = &CacheTest{
116-
timeStamp: time.Now(),
117-
done: make(chan struct{}),
118-
}
119-
fmt.Printf("just add %s %d\n", ip, m.cache[ip].timeStamp.Unix())
120-
}
121-
}
122-
123-
// Has tests whether an IP is in the cache.
124-
func (m *RecentIPCache) Has(ip string) bool {
125-
m.mu.RLock()
126-
defer m.mu.RUnlock()
127-
//fmt.Printf("func Has: Now is %d, length of cache: %d \n", time.Now().Unix(), m.Len())
128-
if m.len() == 0 {
129-
return false
130-
}
131-
_, ok := m.cache[ip]
132-
return ok
133-
}

ipcache/ipcache_test.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,17 @@ func (tf *testTracer) CreateCacheTest(conn connection.Connection, t time.Time, c
2828
func TestGetData(t *testing.T) {
2929
ctx, cancel := context.WithCancel(context.Background())
3030
defer cancel()
31-
testCache := ipcache.New(ctx)
31+
var tt testTracer
32+
testCache := ipcache.New(ctx, &tt, 100*time.Second, time.Second)
3233

3334
conn1 := connection.Connection{
3435
RemoteIP: "1.1.1.2",
3536
RemotePort: 5034,
3637
LocalIP: "1.1.1.3",
3738
LocalPort: 58790,
3839
Cookie: "10f3d"}
39-
var tt testTracer
4040

41-
testCache.Trace(conn1, &tt)
41+
testCache.Trace(conn1)
4242
time.Sleep(200 * time.Millisecond)
4343
tmp := testCache.GetData("1.1.1.1")
4444
if tmp != "" {
@@ -54,17 +54,17 @@ func TestGetData(t *testing.T) {
5454
func TestTrace(t *testing.T) {
5555
ctx, cancel := context.WithCancel(context.Background())
5656
defer cancel()
57-
testCache := ipcache.New(ctx)
57+
var tt testTracer
58+
testCache := ipcache.New(ctx, &tt, 100*time.Second, time.Second)
5859

5960
conn1 := connection.Connection{
6061
RemoteIP: "1.1.1.2",
6162
RemotePort: 5034,
6263
LocalIP: "1.1.1.3",
6364
LocalPort: 58790,
6465
Cookie: "10f3d"}
65-
var tt testTracer
6666

67-
testCache.Trace(conn1, &tt)
67+
testCache.Trace(conn1)
6868
time.Sleep(200 * time.Millisecond)
6969
tmp := testCache.GetData("1.1.1.2")
7070
if tmp != "Fake Trace test 1.1.1.2" {
@@ -78,8 +78,8 @@ func TestTrace(t *testing.T) {
7878
LocalPort: 58790,
7979
Cookie: "aaaa"}
8080

81-
testCache.Trace(conn2, &tt)
82-
testCache.Trace(conn1, &tt)
81+
testCache.Trace(conn2)
82+
testCache.Trace(conn1)
8383

8484
time.Sleep(200 * time.Millisecond)
8585

@@ -89,20 +89,24 @@ func TestTrace(t *testing.T) {
8989
}
9090

9191
func TestRecentIPCache(t *testing.T) {
92-
*ipcache.IPCacheTimeout = 100 * time.Millisecond
93-
*ipcache.IPCacheUpdatePeriod = 10 * time.Millisecond
94-
9592
ctx, cancel := context.WithCancel(context.Background())
9693
defer cancel()
97-
tmp := ipcache.New(ctx)
98-
tmp.Add("1.2.3.4")
99-
if !tmp.Has("1.2.3.4") {
94+
var tt testTracer
95+
tmp := ipcache.New(ctx, &tt, 100*time.Millisecond, 10*time.Millisecond)
96+
tmp.Trace(connection.Connection{
97+
RemoteIP: "1.2.3.4",
98+
RemotePort: 5,
99+
LocalIP: "6.7.8.9",
100+
LocalPort: 10,
101+
Cookie: "11",
102+
})
103+
if tmp.GetCacheLength() != 1 {
100104
t.Error("cache not working correctly")
101105
}
102106

103107
time.Sleep(300 * time.Millisecond)
104-
if tmp.Has("1.2.3.4") {
105-
t.Error("cache not expire correctly")
108+
if tmp.GetCacheLength() != 0 {
109+
t.Error("cache not working correctly")
106110
}
107111
cancel()
108112
time.Sleep(200 * time.Millisecond)

scamper/scamper.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (d *Daemon) MustStart(ctx context.Context) {
7070
derivedCtx, derivedCancel := context.WithCancel(ctx)
7171
defer derivedCancel()
7272
if _, err := os.Stat(d.ControlSocket); !os.IsNotExist(err) {
73-
logFatal("The control socket file must not already exist")
73+
logFatal("The control socket file must not already exist: ", err)
7474
}
7575
defer os.Remove(d.ControlSocket)
7676
command := exec.Command(d.Binary, "-U", d.ControlSocket)

0 commit comments

Comments
 (0)