Skip to content

Commit b1ce96b

Browse files
fix(archive): archive final messages sent to client (#34)
* fix(archive): archive final messages sent to client
1 parent bc4cce9 commit b1ce96b

File tree

1 file changed

+20
-18
lines changed

1 file changed

+20
-18
lines changed

pkg/throughput1/protocol.go

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -212,21 +212,15 @@ func (p *Protocol) sendCounterflow(ctx context.Context,
212212
select {
213213
case <-ctx.Done():
214214
// Attempt to send final write message before close. Ignore errors.
215-
p.sendWireMeasurement(ctx, p.measurer.Measure(ctx))
215+
p.sendAndPublishWireMeasurement(ctx, p.measurer.Measure(ctx), results)
216216
p.close(ctx)
217217
return
218218
case m := <-measurerCh:
219-
wm, err := p.sendWireMeasurement(ctx, m)
219+
err := p.sendAndPublishWireMeasurement(ctx, m, results)
220220
if err != nil {
221221
errCh <- err
222222
return
223223
}
224-
// This send is non-blocking in case there is no one to read the
225-
// Measurement message and the channel's buffer is full.
226-
select {
227-
case results <- *wm:
228-
default:
229-
}
230224

231225
// End the test once enough bytes have been received.
232226
if byteLimit > 0 && m.TCPInfo != nil && m.TCPInfo.BytesReceived >= byteLimit {
@@ -238,6 +232,21 @@ func (p *Protocol) sendCounterflow(ctx context.Context,
238232
}
239233
}
240234

235+
func (p *Protocol) sendAndPublishWireMeasurement(ctx context.Context, m model.Measurement, results chan<- model.WireMeasurement) error {
236+
wm, err := p.sendWireMeasurement(ctx, m)
237+
if err != nil {
238+
return err
239+
}
240+
241+
// This send is non-blocking in case there is no one to read the
242+
// Measurement message and the channel's buffer is full.
243+
select {
244+
case results <- *wm:
245+
default:
246+
}
247+
return nil
248+
}
249+
241250
func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measurement,
242251
results chan<- model.WireMeasurement, errCh chan<- error) {
243252
size := p.ScaleMessage(spec.MinMessageSize, 0)
@@ -256,22 +265,15 @@ func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measureme
256265
select {
257266
case <-ctx.Done():
258267
// Attempt to send final write message before close. Ignore errors.
259-
p.sendWireMeasurement(ctx, p.measurer.Measure(ctx))
268+
p.sendAndPublishWireMeasurement(ctx, p.measurer.Measure(ctx), results)
260269
p.close(ctx)
261270
return
262271
case m := <-measurerCh:
263-
wm, err := p.sendWireMeasurement(ctx, m)
272+
err := p.sendAndPublishWireMeasurement(ctx, m, results)
264273
if err != nil {
265274
errCh <- err
266275
return
267276
}
268-
269-
// This send is non-blocking in case there is no one to read the
270-
// Measurement message and the channel's buffer is full.
271-
select {
272-
case results <- *wm:
273-
default:
274-
}
275277
default:
276278
err = p.conn.WritePreparedMessage(message)
277279
if err != nil {
@@ -283,7 +285,7 @@ func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measureme
283285

284286
bytesSent := int(p.applicationBytesSent.Load())
285287
if p.byteLimit > 0 && bytesSent >= p.byteLimit {
286-
_, err := p.sendWireMeasurement(ctx, p.measurer.Measure(ctx))
288+
err := p.sendAndPublishWireMeasurement(ctx, p.measurer.Measure(ctx), results)
287289
if err != nil {
288290
errCh <- err
289291
return

0 commit comments

Comments
 (0)