Skip to content

Commit 64bdc6e

Browse files
Add latency package (#14)
* Add latency package and its tests. * Rename Measurements -> Packets in result structs. * Add docstrings. * Move timeout ctx into the sendLoop func and call cancel(). * Improve error handling in latency.go Stop memoryless.Run() by canceling the context. * Move time.Now() before conn.WriteTo() and add comments. * Only record send time on successful write and send kickoff packet * Lock/unlock mutexes before accessing map items. * Keep all the RTTs * Rename RTTs to Results * Update unit tests * Change results format and allow filenames without subtest * Remove session from cache and save to disk after a call to /result * Fix seqN check and unit tests * Remove debug binary * Update comments. * Use a slice instead of a map for send times. * Add comments. * Add logging and comments * Move recvTime snapshot one instruction earlier. * Add src field to debug message * Set Archivalata.Version to TODO. * Fix received packets count. * Restore original persistence/ package and use "network" as subtest. * Update go.mod/sum. * Remove msak-latency client added by mistake. * s/network/transport/ and s/src/addr * Rename RTT to Packet * Update go.mod/sum * Make the session's mutexes values instead of references * s/Packet/RoundTrip/ * Revert changes to msak-server's main * s/transport/application/ * Rename latency -> latency1 globally * Remove unnecessary rw.WriteHeader after failed Write
1 parent 0f9eb63 commit 64bdc6e

File tree

6 files changed

+748
-3
lines changed

6 files changed

+748
-3
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/charmbracelet/log v0.2.1
88
github.com/google/uuid v1.3.0
99
github.com/gorilla/websocket v1.5.0
10+
github.com/jellydator/ttlcache/v3 v3.0.1
1011
github.com/m-lab/access v0.0.11
1112
github.com/m-lab/go v0.1.58
1213
github.com/m-lab/ndt-server v0.20.17

go.sum

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,12 +190,15 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+
190190
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
191191
github.com/googleapis/gax-go/v2 v2.8.0 h1:UBtEZqx1bjXtOQ5BVTkuYghXrr3N4V123VKJK67vJZc=
192192
github.com/googleapis/gax-go/v2 v2.8.0/go.mod h1:4orTrqY6hXxxaUL4LHIPl6lGo8vAE38/qKbhSAKP6QI=
193+
github.com/googleapis/google-cloud-go-testing v0.0.0-20191008195207-8e1d251e947d h1:YBqybTXA//1pltKcwyntNQdgDw6AnA5oHZCXFOiZhoo=
193194
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
194195
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
195196
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
196197
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
197198
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
198199
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
200+
github.com/jellydator/ttlcache/v3 v3.0.1 h1:cHgCSMS7TdQcoprXnWUptJZzyFsqs18Lt8VVhRuZYVU=
201+
github.com/jellydator/ttlcache/v3 v3.0.1/go.mod h1:WwTaEmcXQ3MTjOm4bsZoDFiCu/hMvNWLO1w67RXz6h4=
199202
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
200203
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
201204
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
@@ -219,8 +222,10 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv
219222
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
220223
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
221224
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
225+
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
222226
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
223227
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
228+
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
224229
github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY=
225230
github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
226231
github.com/m-lab/access v0.0.11 h1:i2aoal7zgdzXAA7pGL5mXpM8yybURDJGZLwBMmA4Le8=
@@ -292,6 +297,7 @@ github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
292297
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
293298
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
294299
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
300+
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
295301
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
296302
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
297303
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
@@ -322,6 +328,7 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
322328
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
323329
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
324330
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
331+
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
325332
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
326333
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
327334
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
@@ -621,6 +628,7 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks
621628
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
622629
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
623630
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
631+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
624632
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
625633
gopkg.in/square/go-jose.v2 v2.6.0 h1:NGk74WTnPKBNUhNzQX7PYcTLUjoq7mzKk2OKbvwk2iI=
626634
gopkg.in/square/go-jose.v2 v2.6.0/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=

internal/handler/handler.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (h *Handler) Upload(rw http.ResponseWriter, req *http.Request) {
6868

6969
func (h *Handler) upgradeAndRunMeasurement(kind model.TestDirection, rw http.ResponseWriter,
7070
req *http.Request) {
71-
mid, err := getMIDFromRequest(req)
71+
mid, err := GetMIDFromRequest(req)
7272
if err != nil {
7373
ClientConnections.WithLabelValues(string(kind), "missing-mid").Inc()
7474
log.Info("Received request without mid", "source", req.RemoteAddr,
@@ -253,13 +253,13 @@ func (h *Handler) writeResult(uuid string, kind model.TestDirection, result *mod
253253
}
254254
}
255255

256-
// getMIDFromRequest extracts the measurement id ("mid") from a given HTTP
256+
// GetMIDFromRequest extracts the measurement id ("mid") from a given HTTP
257257
// request, if present.
258258
//
259259
// A measurement ID can be specified in two ways: via a "mid" querystring
260260
// parameter (when access tokens are not required) or via the ID field
261261
// in the JWT access token.
262-
func getMIDFromRequest(req *http.Request) (string, error) {
262+
func GetMIDFromRequest(req *http.Request) (string, error) {
263263
// If the request includes a valid JWT token, the claim and the ID are in
264264
// the request's context already.
265265
claims := controller.GetClaim(req.Context())

internal/latency1/latency1.go

Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
package latency1
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"net"
8+
"net/http"
9+
"sync"
10+
"time"
11+
12+
"github.com/charmbracelet/log"
13+
"github.com/jellydator/ttlcache/v3"
14+
"github.com/m-lab/go/memoryless"
15+
"github.com/m-lab/go/rtx"
16+
"github.com/m-lab/msak/internal/handler"
17+
"github.com/m-lab/msak/internal/persistence"
18+
"github.com/m-lab/msak/pkg/latency1/model"
19+
)
20+
21+
const sendDuration = 5 * time.Second
22+
23+
var (
24+
errorUnauthorized = errors.New("unauthorized")
25+
errorInvalidSeqN = errors.New("invalid sequence number")
26+
)
27+
28+
// Handler is the handler for latency tests.
29+
type Handler struct {
30+
dataDir string
31+
sessions *ttlcache.Cache[string, *model.Session]
32+
sessionsMu sync.Mutex
33+
}
34+
35+
// NewHandler returns a new handler for the UDP latency test.
36+
// It sets up a cache for sessions that writes the results to disk on item
37+
// eviction.
38+
func NewHandler(dir string, cacheTTL time.Duration) *Handler {
39+
40+
cache := ttlcache.New(
41+
ttlcache.WithTTL[string, *model.Session](cacheTTL),
42+
ttlcache.WithDisableTouchOnHit[string, *model.Session](),
43+
)
44+
cache.OnEviction(func(ctx context.Context,
45+
er ttlcache.EvictionReason,
46+
i *ttlcache.Item[string, *model.Session]) {
47+
log.Debug("Session expired", "id", i.Value().ID, "reason", er)
48+
49+
// Save data to disk when the session expires.
50+
archive := i.Value().Archive()
51+
archive.EndTime = time.Now()
52+
_, err := persistence.WriteDataFile(dir, "latency1", "application", archive.ID, archive)
53+
if err != nil {
54+
log.Error("failed to write latency result", "mid", archive.ID, "error", err)
55+
return
56+
}
57+
})
58+
59+
go cache.Start()
60+
return &Handler{
61+
dataDir: dir,
62+
sessions: cache,
63+
}
64+
}
65+
66+
// Authorize verifies that the request includes a valid JWT, extracts its jti
67+
// and adds a new empty session to the sessions cache.
68+
// It returns a valid kickoff LatencyPacket for this new session in the
69+
// response body.
70+
func (h *Handler) Authorize(rw http.ResponseWriter, req *http.Request) {
71+
mid, err := handler.GetMIDFromRequest(req)
72+
if err != nil {
73+
log.Info("Received request without mid", "source", req.RemoteAddr,
74+
"error", err)
75+
rw.WriteHeader(http.StatusUnauthorized)
76+
rw.Header().Set("Connection", "Close")
77+
return
78+
}
79+
80+
// Create a new session for this mid.
81+
session := model.NewSession(mid)
82+
h.sessionsMu.Lock()
83+
h.sessions.Set(mid, session, ttlcache.DefaultTTL)
84+
h.sessionsMu.Unlock()
85+
86+
log.Debug("session created", "id", mid)
87+
88+
// Create a valid kickoff packet for this session and send it in the
89+
// response body.
90+
kickoff := &model.LatencyPacket{
91+
Type: "c2s",
92+
ID: mid,
93+
Seq: 0,
94+
}
95+
96+
b, err := json.Marshal(kickoff)
97+
// This should never happen.
98+
rtx.Must(err, "cannot marshal LatencyPacket")
99+
100+
_, err = rw.Write(b)
101+
if err != nil {
102+
// TODO: add Prometheus metric for write errors.
103+
return
104+
}
105+
106+
}
107+
108+
// Result returns a result for a given measurement id. Possible status codes
109+
// are:
110+
// - 400 if the request does not contain a mid
111+
// - 404 if the mid is not found in the sessions cache
112+
// - 500 if the session JSON cannot be marshalled
113+
func (h *Handler) Result(rw http.ResponseWriter, req *http.Request) {
114+
mid, err := handler.GetMIDFromRequest(req)
115+
if err != nil {
116+
log.Info("Received request without mid", "source", req.RemoteAddr,
117+
"error", err)
118+
rw.WriteHeader(http.StatusBadRequest)
119+
rw.Header().Set("Connection", "Close")
120+
return
121+
}
122+
123+
h.sessionsMu.Lock()
124+
cachedResult := h.sessions.Get(mid)
125+
h.sessionsMu.Unlock()
126+
if cachedResult == nil {
127+
rw.WriteHeader(http.StatusNotFound)
128+
return
129+
}
130+
131+
session := cachedResult.Value()
132+
b, err := json.Marshal(session.Summarize())
133+
if err != nil {
134+
rw.WriteHeader(http.StatusInternalServerError)
135+
return
136+
}
137+
138+
_, err = rw.Write(b)
139+
if err != nil {
140+
// TODO: add Prometheus metric for write errors.
141+
return
142+
}
143+
144+
// Remove this session from the cache.
145+
h.sessions.Delete(mid)
146+
}
147+
148+
// sendLoop sends UDP pings with progressive sequence numbers until the context
149+
// expires or is canceled.
150+
func (h *Handler) sendLoop(ctx context.Context, conn net.PacketConn,
151+
remoteAddr net.Addr, session *model.Session, duration time.Duration) error {
152+
seq := 0
153+
var err error
154+
155+
timeout, cancel := context.WithTimeout(ctx, duration)
156+
defer cancel()
157+
158+
memoryless.Run(timeout, func() {
159+
b, marshalErr := json.Marshal(&model.LatencyPacket{
160+
ID: session.ID,
161+
Type: "s2c",
162+
Seq: seq,
163+
LastRTT: int(session.LastRTT.Load()),
164+
})
165+
166+
// This should never happen, since we should always be able to marshal
167+
// a LatencyPacket struct.
168+
rtx.Must(marshalErr, "cannot marshal LatencyPacket")
169+
170+
// Call time.Now() just before writing to the socket. The RTT will
171+
// include the ping packet's write time. This is intentional.
172+
sendTime := time.Now()
173+
// As the kernel's socket buffers are usually much larger than the
174+
// packets we send here, calling conn.WriteTo is expected to take a
175+
// negligible time.
176+
n, writeErr := conn.WriteTo(b, remoteAddr)
177+
if writeErr != nil {
178+
err = writeErr
179+
cancel()
180+
return
181+
}
182+
if n != len(b) {
183+
err = errors.New("partial write")
184+
cancel()
185+
return
186+
}
187+
188+
// Update the SendTimes map after a successful write.
189+
session.SendTimesMu.Lock()
190+
session.SendTimes = append(session.SendTimes, sendTime)
191+
session.SendTimesMu.Unlock()
192+
193+
// Add this packet to the Results slice. Results are "lost" until a
194+
// reply is received from the server.
195+
session.RoundTrips = append(session.RoundTrips, model.RoundTrip{
196+
Lost: true,
197+
})
198+
199+
seq++
200+
201+
log.Debug("packet sent", "len", n, "id", session.ID, "seq", seq)
202+
203+
}, memoryless.Config{
204+
// Using randomized intervals allows to detect cyclic network
205+
// behaviors where a fixed interval could align to the cycle.
206+
Expected: 25 * time.Millisecond,
207+
Min: 10 * time.Millisecond,
208+
Max: 40 * time.Millisecond,
209+
})
210+
return err
211+
}
212+
213+
// processPacket processes a single UDP latency packet.
214+
func (h *Handler) processPacket(conn net.PacketConn, remoteAddr net.Addr,
215+
packet []byte, recvTime time.Time) error {
216+
// Attempt to unmarshal the packet.
217+
var m model.LatencyPacket
218+
err := json.Unmarshal(packet, &m)
219+
if err != nil {
220+
return err
221+
}
222+
223+
// Check if this is a known session.
224+
h.sessionsMu.Lock()
225+
cachedResult := h.sessions.Get(m.ID)
226+
h.sessionsMu.Unlock()
227+
if cachedResult == nil {
228+
return errorUnauthorized
229+
}
230+
231+
session := cachedResult.Value()
232+
233+
// If this message's type is s2c, it was a server ping echoed back by the
234+
// client. Store it in the session's result and compute the RTT.
235+
if m.Type == "s2c" {
236+
session.SendTimesMu.Lock()
237+
defer session.SendTimesMu.Unlock()
238+
if m.Seq >= len(session.SendTimes) {
239+
// TODO: Add Prometheus metric.
240+
log.Info("received packet with valid mid and invalid seq number",
241+
"mid", m.ID,
242+
"seq", m.Seq,
243+
"addr", remoteAddr.String())
244+
return errorInvalidSeqN
245+
}
246+
247+
rtt := recvTime.Sub(session.SendTimes[m.Seq]).Microseconds()
248+
session.LastRTT.Store(rtt)
249+
session.RoundTrips[m.Seq].RTT = int(rtt)
250+
session.RoundTrips[m.Seq].Lost = false
251+
252+
log.Debug("received pong, updating result", "mid", session.ID,
253+
"result", session.RoundTrips[m.Seq])
254+
// TODO: prometheus metric
255+
return nil
256+
}
257+
258+
// If this message's type is c2s, it's a kickoff packet. Record
259+
// local/remote addresses and trigger the send loop.
260+
if m.Type == "c2s" {
261+
session.StartedMu.Lock()
262+
defer session.StartedMu.Unlock()
263+
if !session.Started {
264+
session.Started = true
265+
session.Client = remoteAddr.String()
266+
session.Server = conn.LocalAddr().String()
267+
go h.sendLoop(context.Background(), conn, remoteAddr, session,
268+
sendDuration)
269+
}
270+
}
271+
272+
return nil
273+
}
274+
275+
// ProcessPacketLoop is the main packet processing loop. For each incoming
276+
// packet, it records its timestamp and acts depending on the packet type.
277+
func (h *Handler) ProcessPacketLoop(conn net.PacketConn) {
278+
log.Info("Accepting UDP packets...")
279+
buf := make([]byte, 1024)
280+
for {
281+
n, addr, err := conn.ReadFrom(buf)
282+
if err != nil {
283+
log.Error("error while reading UDP packet", "err", err)
284+
continue
285+
}
286+
// The receive time should be recorded as soon as possible after
287+
// reading the packet, to improve accuracy.
288+
recvTime := time.Now()
289+
log.Debug("received UDP packet", "addr", addr, "n", n, "data", string(buf[:n]))
290+
err = h.processPacket(conn, addr, buf[:n], recvTime)
291+
if err != nil {
292+
log.Debug("failed to process packet",
293+
"err", err,
294+
"addr", addr.String())
295+
}
296+
}
297+
}

0 commit comments

Comments
 (0)