Skip to content

Commit 4f722ff

Browse files
Use new measurer interface
1 parent 1f63587 commit 4f722ff

File tree

1 file changed

+42
-55
lines changed

1 file changed

+42
-55
lines changed

pkg/throughput1/protocol.go

Lines changed: 42 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,10 @@ type senderFunc func(ctx context.Context,
2424
measurerCh <-chan model.Measurement, results chan<- model.WireMeasurement,
2525
errCh chan<- error)
2626

27+
// Measurer is an interface for collecting connection metrics.
2728
type Measurer interface {
2829
Start(context.Context, net.Conn) <-chan model.Measurement
29-
}
30-
31-
// DefaultMeasurer is the default throughput1 measurer that wraps the measurer
32-
// package's Start function.
33-
type DefaultMeasurer struct{}
34-
35-
func (*DefaultMeasurer) Start(ctx context.Context,
36-
c net.Conn) <-chan model.Measurement {
37-
return measurer.Start(ctx, c)
30+
Measure(ctx context.Context) model.Measurement
3831
}
3932

4033
// Protocol is the implementation of the throughput1 protocol.
@@ -59,7 +52,7 @@ func New(conn *websocket.Conn) *Protocol {
5952
connInfo: netx.ToConnInfo(conn.UnderlyingConn()),
6053
// Seed randomness source with the current time.
6154
rnd: rand.New(rand.NewSource(time.Now().UnixMilli())),
62-
measurer: &DefaultMeasurer{},
55+
measurer: measurer.New(),
6356
}
6457
}
6558

@@ -185,51 +178,58 @@ func (p *Protocol) receiver(ctx context.Context,
185178
}
186179
}
187180

181+
func (p *Protocol) sendWireMeasurement(ctx context.Context, m model.Measurement) (*model.WireMeasurement, error) {
182+
wm := model.WireMeasurement{}
183+
p.once.Do(func() {
184+
wm = p.createWireMeasurement(ctx)
185+
})
186+
wm.Measurement = m
187+
wm.Application = model.ByteCounters{
188+
BytesSent: p.applicationBytesSent.Load(),
189+
BytesReceived: p.applicationBytesReceived.Load(),
190+
}
191+
// Encode as JSON separately so we can read the message size before
192+
// sending.
193+
jsonwm, err := json.Marshal(wm)
194+
if err != nil {
195+
log.Printf("failed to encode measurement (ctx: %p, err: %v)", ctx, err)
196+
return nil, err
197+
}
198+
err = p.conn.WriteMessage(websocket.TextMessage, jsonwm)
199+
if err != nil {
200+
log.Printf("failed to write measurement JSON (ctx: %p, err: %v)", ctx, err)
201+
return nil, err
202+
}
203+
p.applicationBytesSent.Add(int64(len(jsonwm)))
204+
return &wm, nil
205+
}
206+
188207
func (p *Protocol) sendCounterflow(ctx context.Context,
189208
measurerCh <-chan model.Measurement, results chan<- model.WireMeasurement,
190209
errCh chan<- error) {
191210
byteLimit := int64(p.byteLimit)
192211
for {
193212
select {
194213
case <-ctx.Done():
214+
// TODO: do we need to send a final wiremessage here?
195215
p.close(ctx)
196216
return
197217
case m := <-measurerCh:
198-
wm := model.WireMeasurement{}
199-
p.once.Do(func() {
200-
wm = p.createWireMeasurement(ctx)
201-
})
202-
wm.Measurement = m
203-
wm.Application = model.ByteCounters{
204-
BytesSent: p.applicationBytesSent.Load(),
205-
BytesReceived: p.applicationBytesReceived.Load(),
206-
}
207-
// Encode as JSON separately so we can read the message size before
208-
// sending.
209-
jsonwm, err := json.Marshal(wm)
218+
wm, err := p.sendWireMeasurement(ctx, m)
210219
if err != nil {
211-
log.Printf("failed to encode measurement (ctx: %p, err: %v)",
212-
ctx, err)
213220
errCh <- err
214221
return
215222
}
216-
err = p.conn.WriteMessage(websocket.TextMessage, jsonwm)
217-
if err != nil {
218-
log.Printf("failed to write measurement JSON (ctx: %p, err: %v)", ctx, err)
219-
errCh <- err
220-
return
221-
}
222-
p.applicationBytesSent.Add(int64(len(jsonwm)))
223-
224223
// This send is non-blocking in case there is no one to read the
225224
// Measurement message and the channel's buffer is full.
226225
select {
227-
case results <- wm:
226+
case results <- *wm:
228227
default:
229228
}
230229

231230
// End the test once enough bytes have been received.
232231
if byteLimit > 0 && m.TCPInfo != nil && m.TCPInfo.BytesReceived >= byteLimit {
232+
// WireMessage was just sent above, so we do not need to send another.
233233
p.close(ctx)
234234
return
235235
}
@@ -254,39 +254,21 @@ func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measureme
254254
for {
255255
select {
256256
case <-ctx.Done():
257+
// Attempt to send final write message before close. Ignore errors.
258+
p.sendWireMeasurement(ctx, p.measurer.Measure(ctx))
257259
p.close(ctx)
258260
return
259261
case m := <-measurerCh:
260-
wm := model.WireMeasurement{}
261-
p.once.Do(func() {
262-
wm = p.createWireMeasurement(ctx)
263-
})
264-
wm.Measurement = m
265-
wm.Application = model.ByteCounters{
266-
BytesReceived: p.applicationBytesReceived.Load(),
267-
BytesSent: p.applicationBytesSent.Load(),
268-
}
269-
// Encode as JSON separately so we can read the message size before
270-
// sending.
271-
jsonwm, err := json.Marshal(wm)
272-
if err != nil {
273-
log.Printf("failed to encode measurement (ctx: %p, err: %v)",
274-
ctx, err)
275-
errCh <- err
276-
return
277-
}
278-
err = p.conn.WriteMessage(websocket.TextMessage, jsonwm)
262+
wm, err := p.sendWireMeasurement(ctx, m)
279263
if err != nil {
280-
log.Printf("failed to write measurement JSON (ctx: %p, err: %v)", ctx, err)
281264
errCh <- err
282265
return
283266
}
284-
p.applicationBytesSent.Add(int64(len(jsonwm)))
285267

286268
// This send is non-blocking in case there is no one to read the
287269
// Measurement message and the channel's buffer is full.
288270
select {
289-
case results <- wm:
271+
case results <- *wm:
290272
default:
291273
}
292274
default:
@@ -300,6 +282,11 @@ func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measureme
300282

301283
bytesSent := int(p.applicationBytesSent.Load())
302284
if p.byteLimit > 0 && bytesSent >= p.byteLimit {
285+
_, err := p.sendWireMeasurement(ctx, p.measurer.Measure(ctx))
286+
if err != nil {
287+
errCh <- err
288+
return
289+
}
303290
p.close(ctx)
304291
return
305292
}

0 commit comments

Comments
 (0)