Skip to content

Commit d84ea3d

Browse files
author
Saied Kazemi
authored
Add hop annotation support (#120)
* Add hop annotation support This commit adds hop annotation support (hopannotation1 datatype) to traceroute-caller. For details, run the following command: go doc -all hopannotation The contents of a typical hop annotation file looks like the following: { "ID": "20210824_2d638011386a_91.189.91.39", "Timestamp": "2021-08-24T15:27:59Z", "Annotations": { "Geo": { "ContinentCode": "NA", "CountryCode": "US", "CountryName": "United States", "Subdivision1ISOCode": "MA", "Subdivision1Name": "Massachusetts", "MetroCode": 506, "City": "Boston", "PostalCode": "02112", "Latitude": 42.3562, "Longitude": -71.0631, "AccuracyRadiusKm": 200 }, "Network": { "CIDR": "91.189.88.0/21", "ASNumber": 41231, "ASName": "Canonical Group Limited", "Systems": [ { "ASNs": [ 41231 ] } ] } } } The changes were successfully tested with: go test ./... go test -race ./... docker-compose up * Remove debug code from hopannotation * Make changes suggested in code review * Make the hop cache aware of date * Make additional changes suggested in code review
1 parent 95ad07c commit d84ea3d

File tree

5 files changed

+445
-17
lines changed

5 files changed

+445
-17
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
local

connectionlistener/connectionlistener.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,25 @@ func (cl *connectionListener) traceAnnotateAndArchive(ctx context.Context, conn
8787
log.Printf("failed to run a trace for connection %v (error: %v)\n", conn, err)
8888
return
8989
}
90-
if _, err := parser.ParseTraceroute(data); err != nil {
90+
output, err := parser.ParseTraceroute(data)
91+
if err != nil {
9192
log.Printf("failed to parse traceroute output (error: %v)\n", err)
9293
return
9394
}
94-
// TODO(SaiedKazemi): Remove this line when done debugging.
95-
log.Printf("successfully parsed traceroute output\n")
95+
hops := parser.ExtractHops(&output.Tracelb)
96+
if len(hops) == 0 {
97+
log.Printf("failed to extract hops from tracelb %+v\n", output.Tracelb)
98+
return
99+
}
100+
101+
traceStartTime := time.Unix(int64(output.CycleStart.StartTime), 0)
102+
annotations, allErrs := cl.hopAnnotator.Annotate(ctx, hops, traceStartTime)
103+
if allErrs != nil {
104+
log.Printf("failed to annotate some or all hops (errors: %+v)\n", allErrs)
105+
}
106+
if annotations != nil && len(annotations) > 0 {
107+
cl.hopAnnotator.WriteAnnotations(annotations, traceStartTime)
108+
}
96109
}
97110

98111
// New returns an eventsocket.Handler that will call the passed-in scamper

docker-compose.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ services:
6363
- -traceroute-output=/local/traceroute
6464
- -hopannotation-output=/local/hopannotation1
6565
- -tcpinfo.eventsocket=/local/tcpevents.sock
66+
- -ipservice.sock=/local/uuid-annotator.sock
6667
- -tracetool=scamper
6768
- -IPCacheTimeout=10m
6869
- -IPCacheUpdatePeriod=1m

hopannotation/hopannotation.go

Lines changed: 240 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,259 @@
1-
// Package hopannotation handles hop annotation and archiving by
2-
// maintaining a daily cache of annotated and archived hop IP addresses.
1+
// Package hopannotation handles hop annotation and archiving.
32
//
4-
// This is currently a stub package so the rest of the code can compile.
3+
// In the context of this package, a hop is synonymous with an IP address.
4+
// This package uses the uuid-annotator (github.com/m-lab/uuid-annotator)
5+
// to annotate hops. Each hop annotation consists of geolocation and
6+
// Autonomous System Number (ASN) data according to MaxMind, IPinfo.io,
7+
// and RouteViews databases.
8+
//
9+
// Hop annotations are cached for a maximum of one day because the
10+
// annotations can change. Each hop cache has a cache resetter
11+
// goroutine that resets the cache every day at midnight.
12+
//
13+
// A hop cache entry is an IP address plus the date in yyyymmdd format.
14+
// (e.g., 100.116.79.252-2021-08-26). The purpose of the date suffix is
15+
// to make sure that hop annotations of a traceroute that ran right before
16+
// midnight do not prevent us from annotating the same hops today.
17+
//
18+
// This package has the following exported functions:
19+
// New()
20+
// (*HopCache) Reset()
21+
// (*HopCache) Annotate()
22+
// (*HopCache) WriteAnnotations()
523
package hopannotation
624

725
import (
826
"context"
27+
"encoding/json"
28+
"errors"
29+
"fmt"
30+
"io/ioutil"
31+
"log"
32+
"net"
33+
"os"
34+
"sync"
35+
"sync/atomic"
936
"time"
1037

38+
// TODO: These should both be in a common location containing API definitions.
39+
"github.com/m-lab/uuid-annotator/annotator"
1140
"github.com/m-lab/uuid-annotator/ipservice"
41+
"github.com/prometheus/client_golang/prometheus"
42+
"github.com/prometheus/client_golang/prometheus/promauto"
43+
)
44+
45+
var (
46+
// ErrParseHopIP means a hop IP address could not be parsed.
47+
ErrParseHopIP = errors.New("failed to parse hop IP address")
48+
// ErrCreatePath means a directory path for hop annotations could not be created.
49+
ErrCreatePath = errors.New("failed to create directory path")
50+
// ErrMarshalAnnotation means a hop annotation could not be marshaled.
51+
ErrMarshalAnnotation = errors.New("failed to marshal annotation to json")
52+
// ErrWriteMarshal means a hop annotation could not be written to file.
53+
ErrWriteMarshal = errors.New("failed to write marshaled annotation")
54+
55+
hopAnnotationOps = promauto.NewCounterVec(
56+
prometheus.CounterOpts{
57+
Name: "hop_cache_operations_total",
58+
Help: "The number of hop cache operations",
59+
},
60+
[]string{"type", "operation"},
61+
)
62+
hopAnnotationErrors = promauto.NewCounterVec(
63+
prometheus.CounterOpts{
64+
Name: "hop_annotation_errors_total",
65+
Help: "The number of errors hop annotations errors",
66+
},
67+
[]string{"type", "error"},
68+
)
69+
70+
hostname string
71+
72+
// Package testing aid.
73+
tickerDuration = int64(60 * 1000 * time.Millisecond) // ticker duration for cache resetter
74+
writeFile = ioutil.WriteFile
1275
)
1376

14-
// HopAnnotation1 is a stub.
15-
type HopAnnotation1 struct{}
77+
// HopAnnotation1 is the datatype that is written to the hop annotation file.
78+
type HopAnnotation1 struct {
79+
ID string
80+
Timestamp time.Time
81+
Annotations *annotator.ClientAnnotations
82+
}
83+
84+
// HopCache is the cache of hop annotations.
85+
type HopCache struct {
86+
hops map[string]bool // hop addresses being handled or already handled
87+
hopsLock sync.Mutex // hop cache lock
88+
annotator ipservice.Client // function for getting hop annotations
89+
outputPath string // path to directory for writing hop annotations
90+
hour int32 // the hour (between 0 and 23) when cache resetter last checked time
91+
}
1692

17-
// HopCache is a stub.
18-
type HopCache struct{}
93+
// init saves (caches) the host name for all future references because
94+
// the host name doesn't change.
95+
func init() {
96+
var err error
97+
hostname, err = os.Hostname()
98+
if err != nil {
99+
log.Fatalf("failed to get hostname (error: %v)\n", err)
100+
}
101+
}
19102

20-
// New is a stub.
21-
// to obtain annotations. The HopCache will be cleared every day at midnight.
103+
// New returns a new HopCache that will use the provided ipservice.Client
104+
// to obtain annotations. It also starts a goroutine that checks for the
105+
// passage of the midnight every minute to reset the cache. The goroutine
106+
// will terminate when the ctx is cancelled.
22107
func New(ctx context.Context, annotator ipservice.Client, outputPath string) *HopCache {
23-
return &HopCache{}
108+
hc := &HopCache{
109+
hops: make(map[string]bool, 10000), // based on observation
110+
annotator: annotator,
111+
outputPath: outputPath,
112+
}
113+
// Start a cache resetter goroutine to reset the cache every day
114+
// at midnight. For now, we use atomic read/write operations for
115+
// hour because package testing code modifies it to fake midnight.
116+
// Otherwise "go test -race" complains about a race condition.
117+
// TODO(SaiedKazemi): Use moneky patching to control the progression
118+
// of time and get rid of the atomic read.
119+
go func(duration time.Duration) {
120+
ticker := time.NewTicker(duration)
121+
defer ticker.Stop()
122+
for now := range ticker.C {
123+
if ctx.Err() != nil {
124+
return
125+
}
126+
hour := now.Hour()
127+
// Each day, hour increases from 0 to 23. So if
128+
// the current hour is less than the previous hour,
129+
// we must have passed midnight and its' time to
130+
// reset the hop cache.
131+
if hour < int(atomic.LoadInt32(&hc.hour)) {
132+
hc.Reset()
133+
}
134+
atomic.StoreInt32(&hc.hour, int32(hour))
135+
}
136+
}(time.Duration(tickerDuration))
137+
return hc
24138
}
25139

26-
// Clear is a stub.
27-
func (hc *HopCache) Clear() {
140+
// Reset creates a new empty hop cache that is a little bigger (25%)
141+
// than the current cache. The current cache is retained as old cache
142+
// to allow for active annotations to finish.
143+
func (hc *HopCache) Reset() {
144+
hc.hopsLock.Lock()
145+
defer hc.hopsLock.Unlock()
146+
hc.hops = make(map[string]bool, len(hc.hops)+len(hc.hops)/4)
28147
}
29148

30-
// AnnotateArchive is a stub.
31-
func (hc *HopCache) AnnotateArchive(ctx context.Context, hops []string, traceStartTime time.Time) (allErrs []error) {
149+
// Annotate annotates new hops found in the hops argument. It aggregates
150+
// the errors and returns all of them instead of returning after encountering
151+
// the first error.
152+
func (hc *HopCache) Annotate(ctx context.Context, hops []string, traceStartTime time.Time) (map[string]*annotator.ClientAnnotations, []error) {
153+
if err := ctx.Err(); err != nil {
154+
return nil, []error{err}
155+
}
156+
157+
// Validate all hop IP addresses.
158+
allErrs := []error{}
159+
for _, hop := range hops {
160+
if net.ParseIP(hop).String() == "<nil>" {
161+
allErrs = append(allErrs, fmt.Errorf("%w: %v", ErrParseHopIP, hop))
162+
}
163+
}
164+
if len(allErrs) != 0 {
165+
return nil, allErrs
166+
}
167+
168+
// Insert all of the new hops in the hop cache.
169+
// If the cache is reset while iterating this loop, it means that
170+
// midnight has passed and we have a new empty cache. Therefore,
171+
// the remaining hops in the hops slice will be inserted in the new
172+
// cache and added to newHops which is the behavior we want.
173+
var newHops []string
174+
yyyymmdd := traceStartTime.Format("-20060102")
175+
hc.hopsLock.Lock()
176+
for _, hop := range hops {
177+
if !hc.hops[hop+yyyymmdd] {
178+
hopAnnotationOps.WithLabelValues("hopcache", "inserted").Inc()
179+
hc.hops[hop+yyyymmdd] = true
180+
newHops = append(newHops, hop)
181+
}
182+
}
183+
hc.hopsLock.Unlock()
184+
// Are there any new hops?
185+
if len(newHops) == 0 {
186+
return nil, nil
187+
}
188+
189+
// Annotate the new hops.
190+
newAnnotations, err := hc.annotator.Annotate(ctx, newHops)
191+
if err != nil {
192+
return nil, []error{err}
193+
}
194+
hopAnnotationOps.WithLabelValues("hopcache", "annotated").Add(float64(len(newAnnotations)))
195+
return newAnnotations, nil
196+
}
197+
198+
// WriteAnnotations writes out the annotations passed in. It writes out the
199+
// annotations in parallel for speed. It aggregates the errors and returns
200+
// all of them instead of returning after encountering the first error.
201+
func (hc *HopCache) WriteAnnotations(annotations map[string]*annotator.ClientAnnotations, traceStartTime time.Time) []error {
202+
// Write the annotations in parallel.
203+
var wg sync.WaitGroup
204+
errChan := make(chan error, len(annotations))
205+
for hop, annotation := range annotations {
206+
wg.Add(1)
207+
go hc.writeAnnotation(&wg, hop, annotation, traceStartTime, errChan)
208+
}
209+
wg.Wait()
210+
close(errChan)
211+
var allErrs []error
212+
for err := range errChan {
213+
allErrs = append(allErrs, err)
214+
}
32215
return allErrs
33216
}
217+
218+
// writeAnnotation writes the given hop annotations to a file.
219+
func (hc *HopCache) writeAnnotation(wg *sync.WaitGroup, hop string, annotation *annotator.ClientAnnotations, traceStartTime time.Time, errChan chan<- error) {
220+
defer wg.Done()
221+
222+
// Get a file path.
223+
filepath, err := hc.generateAnnotationFilepath(hop, traceStartTime)
224+
if err != nil {
225+
errChan <- err
226+
return
227+
}
228+
229+
// Write to the file.
230+
yyyymmdd := traceStartTime.Format("20060102")
231+
b, err := json.Marshal(HopAnnotation1{
232+
ID: fmt.Sprintf("%s_%s_%s", yyyymmdd, hostname, hop),
233+
Timestamp: traceStartTime,
234+
Annotations: annotation,
235+
})
236+
if err != nil {
237+
hopAnnotationErrors.WithLabelValues("hopannotation", "marshal").Inc()
238+
errChan <- fmt.Errorf("%w (error: %v)", ErrMarshalAnnotation, err)
239+
return
240+
}
241+
if err := writeFile(filepath, b, 0444); err != nil {
242+
hopAnnotationErrors.WithLabelValues("hopannotation", "writefile").Inc()
243+
errChan <- fmt.Errorf("%w (error: %v)", ErrWriteMarshal, err)
244+
return
245+
}
246+
hopAnnotationOps.WithLabelValues("hopannotation", "written").Inc()
247+
}
248+
249+
// generateAnnotationFilepath returns the full pathname of a hop
250+
// annotation file in the format "<timestamp>_<hostname>_<ip>.json"
251+
func (hc *HopCache) generateAnnotationFilepath(hop string, timestamp time.Time) (string, error) {
252+
dirPath := hc.outputPath + "/" + timestamp.Format("2006/01/02")
253+
if err := os.MkdirAll(dirPath, 0777); err != nil {
254+
hopAnnotationErrors.WithLabelValues("hopannotation", "mkdirall").Inc()
255+
return "", fmt.Errorf("%w (error: %v)", ErrCreatePath, err)
256+
}
257+
datetime := timestamp.Format("20060102T150405Z")
258+
return fmt.Sprintf("%s/%s_%s_%s.json", dirPath, datetime, hostname, hop), nil
259+
}

0 commit comments

Comments
 (0)