Skip to content

Commit 9767488

Browse files
Separate network-level and application-level measurements (#23)
* Separate network-level and application-level measurements. This commit adds application-level measurements and namespaces to better separate the two kind of measurements in the Measurement object. It is a breaking change and will need a schema update. * Make application-level byte counters atomic. * sync.Once and atomic.Int64 do not need to be pointers.
1 parent 24a1228 commit 9767488

File tree

5 files changed

+78
-22
lines changed

5 files changed

+78
-22
lines changed

internal/measurer/measurer.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,12 @@ func (m *throughput1Measurer) measure(ctx context.Context) {
9696
case <-ctx.Done():
9797
// NOTHING
9898
case m.dstChan <- model.Measurement{
99-
ElapsedTime: time.Since(m.startTime).Microseconds(),
100-
BytesSent: int64(totalWritten) - m.bytesWrittenAtStart,
101-
BytesReceived: int64(totalRead) - m.bytesReadAtStart,
102-
BBRInfo: &bbrInfo,
99+
ElapsedTime: time.Since(m.startTime).Microseconds(),
100+
Network: model.ByteCounters{
101+
BytesSent: int64(totalWritten) - m.bytesWrittenAtStart,
102+
BytesReceived: int64(totalRead) - m.bytesReadAtStart,
103+
},
104+
BBRInfo: &bbrInfo,
103105
TCPInfo: &model.TCPInfo{
104106
LinuxTCPInfo: tcpInfo,
105107
ElapsedTime: time.Since(m.connInfo.AcceptTime()).Microseconds(),

internal/measurer/measurer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestNdt8Measurer_Start(t *testing.T) {
3535
select {
3636
case m := <-mchan:
3737
fmt.Println("received measurement")
38-
if m.BytesSent != 4 {
38+
if m.Network.BytesSent != 4 {
3939
t.Errorf("invalid byte counter value")
4040
}
4141
case <-time.After(1 * time.Second):

pkg/throughput1/model/measurement.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,10 @@ type WireMeasurement struct {
2424
// The Measurement struct contains measurement results. This structure is
2525
// meant to be serialised as JSON and sent as a textual message.
2626
type Measurement struct {
27-
// BytesSent is the number of bytes sent at the application level by the
28-
// party sending this Measurement.
29-
BytesSent int64 `json:",omitempty"`
30-
31-
// BytesReceived is the number of bytes received at the application level
32-
// by the party sending this Measurement.
33-
BytesReceived int64 `json:",omitempty"`
27+
// Application contains the application-level BytesSent/Received pair.
28+
Application ByteCounters
29+
// Network contains the network-level BytesSent/Received pair.
30+
Network ByteCounters
3431

3532
// ElapsedTime is the time elapsed since the start of the measurement
3633
// according to the party sending this Measurement.
@@ -47,6 +44,14 @@ type Measurement struct {
4744
TCPInfo *TCPInfo `json:",omitempty"`
4845
}
4946

47+
type ByteCounters struct {
48+
// BytesSent is the number of bytes sent.
49+
BytesSent int64 `json:",omitempty"`
50+
51+
// BytesReceived is the number of bytes received.
52+
BytesReceived int64 `json:",omitempty"`
53+
}
54+
5055
// TCPInfo is an extension to Linux's TCPInfo struct that includes the time
5156
// elapsed since the connection was accepted.
5257
type TCPInfo struct {

pkg/throughput1/protocol.go

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"net"
1111
"net/http"
1212
"sync"
13+
"sync/atomic"
1314
"time"
1415

1516
"github.com/gorilla/websocket"
@@ -42,7 +43,10 @@ type Protocol struct {
4243
connInfo netx.ConnInfo
4344
rnd *rand.Rand
4445
measurer Measurer
45-
once *sync.Once
46+
once sync.Once
47+
48+
applicationBytesReceived atomic.Int64
49+
applicationBytesSent atomic.Int64
4650
}
4751

4852
// New returns a new Protocol with the specified connection and every other
@@ -54,7 +58,6 @@ func New(conn *websocket.Conn) *Protocol {
5458
// Seed randomness source with the current time.
5559
rnd: rand.New(rand.NewSource(time.Now().UnixMilli())),
5660
measurer: &DefaultMeasurer{},
57-
once: &sync.Once{},
5861
}
5962
}
6063

@@ -138,7 +141,8 @@ func (p *Protocol) senderReceiverLoop(ctx context.Context,
138141
}
139142

140143
// receiver reads from the connection until NextReader fails. It returns
141-
// the measurements received over the provided channel.
144+
// the measurements received over the provided channel and updates the sent and
145+
// received byte counters as needed.
142146
func (p *Protocol) receiver(ctx context.Context,
143147
results chan<- model.WireMeasurement, errCh chan<- error) {
144148
for {
@@ -147,12 +151,22 @@ func (p *Protocol) receiver(ctx context.Context,
147151
errCh <- err
148152
return
149153
}
154+
if kind == websocket.BinaryMessage {
155+
// Binary messages are discarded after reading their size.
156+
size, err := io.Copy(io.Discard, reader)
157+
if err != nil {
158+
errCh <- err
159+
return
160+
}
161+
p.applicationBytesReceived.Add(size)
162+
}
150163
if kind == websocket.TextMessage {
151164
data, err := io.ReadAll(reader)
152165
if err != nil {
153166
errCh <- err
154167
return
155168
}
169+
p.applicationBytesReceived.Add(int64(len(data)))
156170
var m model.WireMeasurement
157171
if err := json.Unmarshal(data, &m); err != nil {
158172
errCh <- err
@@ -177,12 +191,27 @@ func (p *Protocol) sendCounterflow(ctx context.Context,
177191
wm = p.createWireMeasurement(ctx)
178192
})
179193
wm.Measurement = m
180-
err := p.conn.WriteJSON(wm)
194+
wm.Application = model.ByteCounters{
195+
BytesSent: p.applicationBytesSent.Load(),
196+
BytesReceived: p.applicationBytesReceived.Load(),
197+
}
198+
// Encode as JSON separately so we can read the message size before
199+
// sending.
200+
jsonwm, err := json.Marshal(wm)
201+
if err != nil {
202+
log.Printf("failed to encode measurement (ctx: %p, err: %v)",
203+
ctx, err)
204+
errCh <- err
205+
return
206+
}
207+
err = p.conn.WriteMessage(websocket.TextMessage, jsonwm)
181208
if err != nil {
182209
log.Printf("failed to write measurement JSON (ctx: %p, err: %v)", ctx, err)
183210
errCh <- err
184211
return
185212
}
213+
p.applicationBytesSent.Add(int64(len(jsonwm)))
214+
186215
// This send is non-blocking in case there is no one to read the
187216
// Measurement message and the channel's buffer is full.
188217
select {
@@ -195,7 +224,6 @@ func (p *Protocol) sendCounterflow(ctx context.Context,
195224

196225
func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measurement,
197226
results chan<- model.WireMeasurement, errCh chan<- error) {
198-
ci := netx.ToConnInfo(p.conn.UnderlyingConn())
199227
size := spec.MinMessageSize
200228
message, err := p.makePreparedMessage(size)
201229
if err != nil {
@@ -219,12 +247,27 @@ func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measureme
219247
wm = p.createWireMeasurement(ctx)
220248
})
221249
wm.Measurement = m
222-
err = p.conn.WriteJSON(wm)
250+
wm.Application = model.ByteCounters{
251+
BytesReceived: p.applicationBytesReceived.Load(),
252+
BytesSent: p.applicationBytesSent.Load(),
253+
}
254+
// Encode as JSON separately so we can read the message size before
255+
// sending.
256+
jsonwm, err := json.Marshal(wm)
257+
if err != nil {
258+
log.Printf("failed to encode measurement (ctx: %p, err: %v)",
259+
ctx, err)
260+
errCh <- err
261+
return
262+
}
263+
err = p.conn.WriteMessage(websocket.TextMessage, jsonwm)
223264
if err != nil {
224265
log.Printf("failed to write measurement JSON (ctx: %p, err: %v)", ctx, err)
225266
errCh <- err
226267
return
227268
}
269+
p.applicationBytesSent.Add(int64(len(jsonwm)))
270+
228271
// This send is non-blocking in case there is no one to read the
229272
// Measurement message and the channel's buffer is full.
230273
select {
@@ -238,14 +281,14 @@ func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measureme
238281
errCh <- err
239282
return
240283
}
284+
p.applicationBytesSent.Add(int64(size))
241285

242286
// Determine whether it's time to scale the message size.
243287
if size >= spec.MaxScaledMessageSize {
244288
continue
245289
}
246290

247-
_, w := ci.ByteCounters()
248-
if uint64(size) > w/spec.ScalingFraction {
291+
if size > int(p.applicationBytesSent.Load())/spec.ScalingFraction {
249292
continue
250293
}
251294

@@ -264,11 +307,15 @@ func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measureme
264307
func (p *Protocol) close(ctx context.Context) {
265308
msg := websocket.FormatCloseMessage(
266309
websocket.CloseNormalClosure, "Done sending")
310+
267311
err := p.conn.WriteControl(websocket.CloseMessage, msg, time.Now().Add(time.Second))
268312
if err != nil {
269313
log.Printf("WriteControl failed (ctx: %p, err: %v)", ctx, err)
270314
return
271315
}
316+
// The closing message is part of the measurement and added to bytesSent.
317+
p.applicationBytesSent.Add(int64(len(msg)))
318+
272319
log.Printf("Close message sent (ctx: %p)", ctx)
273320
}
274321

pkg/throughput1/protocol_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,10 @@ func TestProtocol_Download(t *testing.T) {
121121
case <-context.Background().Done():
122122
return
123123
case m := <-senderCh:
124-
fmt.Printf("senderCh BytesReceived: %d, BytesSent: %d\n", m.BytesReceived, m.BytesSent)
125-
fmt.Printf("senderCh Goodput: %f Mb/s\n", float64(m.BytesReceived)/float64(time.Since(start).Microseconds())*8)
124+
fmt.Printf("senderCh Network.BytesReceived: %d, Network.BytesSent: %d\n",
125+
m.Network.BytesReceived, m.Network.BytesSent)
126+
fmt.Printf("senderCh Network throughput: %f Mb/s\n",
127+
float64(m.Network.BytesReceived)/float64(time.Since(start).Microseconds())*8)
126128
case <-receiverCh:
127129

128130
case err := <-errCh:

0 commit comments

Comments
 (0)