Skip to content

Commit 2bd2b30

Browse files
authored
Merge pull request #33 from m-lab/sandbox-addcache
implement cached test
2 parents e81fb7e + 5744cfe commit 2bd2b30

File tree

8 files changed

+300
-70
lines changed

8 files changed

+300
-70
lines changed

caller.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ package main
44
import (
55
"context"
66
"flag"
7-
"fmt"
87
"time"
98

109
"github.com/m-lab/traceroute-caller/connection"
@@ -57,9 +56,7 @@ func main() {
5756
if *tcpinfoSocket == "" {
5857
connPoller := connectionpoller.New(cache)
5958
for ctx.Err() == nil {
60-
closedConnections := connPoller.GetClosedConnections()
61-
fmt.Printf("length of closed connections: %d\n", len(closedConnections))
62-
daemon.TraceAll(closedConnections)
59+
connPoller.TraceClosedConnections(&daemon)
6360

6461
select {
6562
case <-time.After(*waitTime):

connectionlistener/connectionlistener_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,16 @@ type fakeTracer struct {
2828
wg sync.WaitGroup
2929
}
3030

31-
func (ft *fakeTracer) Trace(conn connection.Connection, t time.Time) {
31+
func (ft *fakeTracer) Trace(conn connection.Connection, t time.Time) string {
3232
log.Println("Tracing", conn)
3333
ft.ips = append(ft.ips, conn.RemoteIP)
3434
ft.wg.Done()
35+
return "Fake test Result"
36+
}
37+
38+
func (ft *fakeTracer) CreateCacheTest(conn connection.Connection, t time.Time, cachedTest string) {
39+
log.Println("Create cached test for: ", conn)
40+
return
3541
}
3642

3743
func TestListener(t *testing.T) {

connectionpoller/connectionpoller.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/m-lab/traceroute-caller/connection"
1717
"github.com/m-lab/traceroute-caller/ipcache"
18+
"github.com/m-lab/traceroute-caller/scamper"
1819
)
1920

2021
var (
@@ -136,25 +137,20 @@ type connectionPoller struct {
136137
// connections which it previously measured to be open, but it can no longer
137138
// measure to be open.
138139
type ConnectionPoller interface {
139-
GetClosedConnections() []connection.Connection
140+
TraceClosedConnections(tracer scamper.Tracer)
140141
}
141142

142-
// GetClosedConnections returns the list of connections which were previously
143-
// measured to be open but were not measured to be open this time..
144-
func (c *connectionPoller) GetClosedConnections() []connection.Connection {
143+
// TraceClosedConnections send trace for all closed connections.
144+
func (c *connectionPoller) TraceClosedConnections(tracer scamper.Tracer) {
145145
oldConn := c.connectionPool
146146
fmt.Printf("old connection size %d\n", len(oldConn))
147147
c.connectionPool = c.GetConnections()
148148
fmt.Printf("new connection size %d\n", len(c.connectionPool))
149-
var closed []connection.Connection
150149
for conn := range oldConn {
151-
if _, hasConn := c.connectionPool[conn]; !hasConn && !c.recentIPCache.Has(conn.RemoteIP) {
152-
closed = append(closed, conn)
153-
log.Printf("Try to add " + conn.RemoteIP)
154-
c.recentIPCache.Add(conn.RemoteIP)
150+
if _, hasConn := c.connectionPool[conn]; !hasConn {
151+
go c.recentIPCache.Trace(conn, tracer)
155152
}
156153
}
157-
return closed
158154
}
159155

160156
// New creates and returns a new ConnectionPoller.

connectionpoller/connectionpoller_test.go

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"os"
99
"reflect"
1010
"testing"
11+
"time"
1112

1213
"github.com/m-lab/traceroute-caller/ipcache"
1314

@@ -112,49 +113,51 @@ func TestGetConnectionsWithFakeSS(t *testing.T) {
112113
}
113114
}
114115

115-
func TestConnectionPollerConstruction(t *testing.T) {
116-
// The only thing we can verify by default is that the code does not crash.
117-
// Which is not nothing, but it's not a lot.
118-
ctx, cancel := context.WithCancel(context.Background())
119-
defer cancel()
120-
cache := ipcache.New(ctx)
121-
connPoller := New(cache)
122-
connPoller.GetClosedConnections()
116+
type testTracer struct {
117+
calls int
118+
answers []map[connection.Connection]struct{}
119+
}
120+
121+
func (tt *testTracer) Trace(conn connection.Connection, t time.Time) string {
122+
return "Fake Trace test"
123+
}
124+
125+
func (tt *testTracer) CreateCacheTest(conn connection.Connection, t time.Time, cachedTest string) {
126+
return
123127
}
124128

125129
type testFinder struct {
126-
calls int
127-
answers []map[connection.Connection]struct{}
128130
}
129131

130132
func (tf *testFinder) GetConnections() map[connection.Connection]struct{} {
131-
calls := tf.calls
132-
tf.calls++
133-
return tf.answers[calls]
133+
conns := make(map[connection.Connection]struct{})
134+
return conns
134135
}
135136

136-
func TestGetClosedCollection(t *testing.T) {
137-
// This setup causes both conn3 and conn2 to disappear, but because conn3 is in
138-
// the ipcache, only conn2 should be returned.
137+
func TestConnectionPollerConstruction(t *testing.T) {
138+
// The only thing we can verify by default is that the code does not crash.
139+
// Which is not nothing, but it's not a lot.
139140
ctx, cancel := context.WithCancel(context.Background())
140141
defer cancel()
141142
cache := ipcache.New(ctx)
142-
connPoller := New(cache).(*connectionPoller)
143-
conn1 := connection.Connection{RemoteIP: "1.1.1.1"}
144-
conn2 := connection.Connection{RemoteIP: "1.1.1.2"}
145-
conn3 := connection.Connection{RemoteIP: "1.1.1.3"}
146-
connPoller.recentIPCache.Add(conn3.RemoteIP)
147-
connPoller.finder = &testFinder{
148-
answers: []map[connection.Connection]struct{}{
149-
{conn1: struct{}{}, conn2: struct{}{}, conn3: struct{}{}},
150-
{conn1: struct{}{}},
151-
},
143+
connPoller := &connectionPoller{
144+
finder: &testFinder{},
145+
recentIPCache: cache,
146+
connectionPool: make(map[connection.Connection]struct{}),
152147
}
153-
connPoller.connectionPool = connPoller.GetConnections()
148+
conn1 := connection.Connection{
149+
RemoteIP: "1.1.1.2",
150+
RemotePort: 5034,
151+
LocalIP: "1.1.1.3",
152+
LocalPort: 58790,
153+
Cookie: "10f3d"}
154+
connPoller.connectionPool[conn1] = struct{}{}
155+
var tt testTracer
156+
connPoller.TraceClosedConnections(&tt)
154157

155-
c := connPoller.GetClosedConnections()
158+
time.Sleep(200 * time.Millisecond)
156159

157-
if len(c) != 1 || c[0] != conn2 {
158-
t.Errorf("Wanted %v but got %v", []connection.Connection{conn2}, c)
160+
if connPoller.recentIPCache.GetCacheLength() != 1 {
161+
t.Errorf("ConnectionPoller not working properly")
159162
}
160163
}

ipcache/ipcache.go

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ import (
77
"fmt"
88
"sync"
99
"time"
10+
11+
"github.com/m-lab/traceroute-caller/connection"
12+
"github.com/m-lab/traceroute-caller/scamper"
1013
)
1114

1215
var (
@@ -17,23 +20,63 @@ var (
1720
IPCacheUpdatePeriod = flag.Duration("IPCacheUpdatePeriod", 1*time.Second, "We run the cache eviction loop with this frequency")
1821
)
1922

23+
type CacheTest struct {
24+
timeStamp time.Time
25+
data string
26+
done chan struct{}
27+
}
28+
2029
// RecentIPCache contains a list of all the IP addresses that we have traced to
2130
// recently. We keep this list to ensure that we don't traceroute to the same
2231
// location repeatedly at a high frequency.
2332
type RecentIPCache struct {
24-
cache map[string]time.Time
33+
cache map[string]*CacheTest
2534
mu sync.RWMutex
2635
}
2736

37+
// GetTrace returns test content in []byte given a connection and tracer
38+
func (rc *RecentIPCache) Trace(conn connection.Connection, sc scamper.Tracer) {
39+
ip := conn.RemoteIP
40+
rc.mu.Lock()
41+
c, ok := rc.cache[ip]
42+
if !ok {
43+
nc := &CacheTest{
44+
timeStamp: time.Now(),
45+
done: make(chan struct{}),
46+
}
47+
rc.cache[ip] = nc
48+
rc.mu.Unlock()
49+
50+
nc.data = sc.Trace(conn, nc.timeStamp)
51+
close(nc.done)
52+
return
53+
}
54+
rc.mu.Unlock()
55+
56+
<-c.done
57+
cachedData := c.data
58+
sc.CreateCacheTest(conn, time.Now(), cachedData)
59+
}
60+
61+
func (rc *RecentIPCache) GetCacheLength() int {
62+
return len(rc.cache)
63+
}
64+
65+
func (rc *RecentIPCache) GetTestContent(ip string) string {
66+
rc.mu.RLock()
67+
defer rc.mu.RUnlock()
68+
c, ok := rc.cache[ip]
69+
if ok {
70+
return c.data
71+
}
72+
return ""
73+
}
74+
2875
// New creates and returns a RecentIPCache. It also starts up a background
2976
// goroutine that scrubs the cache.
30-
//
31-
// TODO(https://github.com/m-lab/traceroute-caller/issues/30) Make this truly threadsafe
3277
func New(ctx context.Context) *RecentIPCache {
3378
m := &RecentIPCache{}
34-
m.mu.Lock()
35-
m.cache = make(map[string]time.Time)
36-
m.mu.Unlock()
79+
m.cache = make(map[string]*CacheTest)
3780
go func() {
3881
ticker := time.NewTicker(*IPCacheUpdatePeriod)
3982
defer ticker.Stop()
@@ -42,7 +85,7 @@ func New(ctx context.Context) *RecentIPCache {
4285
return
4386
}
4487
for k, v := range m.cache {
45-
if now.Sub(v) > *IPCacheTimeout {
88+
if now.Sub(v.timeStamp) > *IPCacheTimeout {
4689
fmt.Println("try to delete " + k)
4790
m.mu.Lock()
4891
delete(m.cache, k)
@@ -69,8 +112,11 @@ func (m *RecentIPCache) Add(ip string) {
69112
fmt.Printf("func Add: Now is %d\n", time.Now().Unix())
70113
_, present := m.cache[ip]
71114
if !present {
72-
m.cache[ip] = time.Now()
73-
fmt.Printf("just add %s %d\n", ip, m.cache[ip].Unix())
115+
m.cache[ip] = &CacheTest{
116+
timeStamp: time.Now(),
117+
done: make(chan struct{}),
118+
}
119+
fmt.Printf("just add %s %d\n", ip, m.cache[ip].timeStamp.Unix())
74120
}
75121
}
76122

ipcache/ipcache_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,67 @@ package ipcache_test
22

33
import (
44
"context"
5+
"log"
56
"testing"
67
"time"
78

9+
"github.com/m-lab/traceroute-caller/connection"
810
"github.com/m-lab/traceroute-caller/ipcache"
911
)
1012

13+
type testTracer struct {
14+
calls int
15+
answers []map[connection.Connection]struct{}
16+
}
17+
18+
func (tf *testTracer) Trace(conn connection.Connection, t time.Time) string {
19+
log.Println("Create Trace Test")
20+
return "Fake Trace test " + conn.RemoteIP
21+
}
22+
23+
func (tf *testTracer) CreateCacheTest(conn connection.Connection, t time.Time, cachedTest string) {
24+
log.Println("Create cached Test " + conn.RemoteIP)
25+
return
26+
}
27+
28+
func TestTrace(t *testing.T) {
29+
ctx, cancel := context.WithCancel(context.Background())
30+
defer cancel()
31+
testCache := ipcache.New(ctx)
32+
33+
conn1 := connection.Connection{
34+
RemoteIP: "1.1.1.2",
35+
RemotePort: 5034,
36+
LocalIP: "1.1.1.3",
37+
LocalPort: 58790,
38+
Cookie: "10f3d"}
39+
var tt testTracer
40+
41+
testCache.Trace(conn1, &tt)
42+
time.Sleep(200 * time.Millisecond)
43+
tmp := testCache.GetTestContent("1.1.1.2")
44+
log.Println("!!!" + tmp + "!!!")
45+
if tmp != "Fake Trace test 1.1.1.2" {
46+
t.Error("cache not trace correctly ")
47+
}
48+
49+
conn2 := connection.Connection{
50+
RemoteIP: "1.1.1.5",
51+
RemotePort: 5034,
52+
LocalIP: "1.1.1.7",
53+
LocalPort: 58790,
54+
Cookie: "aaaa"}
55+
56+
testCache.Trace(conn2, &tt)
57+
testCache.Trace(conn1, &tt)
58+
59+
time.Sleep(200 * time.Millisecond)
60+
61+
if testCache.GetCacheLength() != 2 {
62+
t.Error("cache not working correctly ")
63+
}
64+
}
65+
1166
func TestRecentIPCache(t *testing.T) {
1267
*ipcache.IPCacheTimeout = 100 * time.Millisecond
1368
*ipcache.IPCacheUpdatePeriod = 10 * time.Millisecond

0 commit comments

Comments
 (0)