Skip to content

Commit 152d08a

Browse files
committed
add outClientMsgs and Bytes
1 parent 6079769 commit 152d08a

File tree

5 files changed

+106
-35
lines changed

5 files changed

+106
-35
lines changed

server/client.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5000,6 +5000,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
50005000
var dlvExtraSize int64
50015001
var dlvRouteMsgs int64
50025002
var dlvLeafMsgs int64
5003+
var dlvClientMsgs int64
50035004

50045005
// We need to know if this is a MQTT producer because they send messages
50055006
// without CR_LF (we otherwise remove the size of CR_LF from message size).
@@ -5013,12 +5014,14 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
50135014
totalBytes := dlvMsgs*int64(len(msg)) + dlvExtraSize
50145015
routeBytes := dlvRouteMsgs*int64(len(msg)) + dlvExtraSize
50155016
leafBytes := dlvLeafMsgs*int64(len(msg)) + dlvExtraSize
5017+
clientBytes := dlvClientMsgs*int64(len(msg)) + dlvExtraSize
50165018

50175019
// For non MQTT producers, remove the CR_LF * number of messages
50185020
if !prodIsMQTT {
50195021
totalBytes -= dlvMsgs * int64(LEN_CR_LF)
50205022
routeBytes -= dlvRouteMsgs * int64(LEN_CR_LF)
50215023
leafBytes -= dlvLeafMsgs * int64(LEN_CR_LF)
5024+
clientBytes -= dlvClientMsgs * int64(LEN_CR_LF)
50225025
}
50235026

50245027
if acc != nil {
@@ -5039,6 +5042,9 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
50395042
if srv := c.srv; srv != nil {
50405043
atomic.AddInt64(&srv.outMsgs, dlvMsgs)
50415044
atomic.AddInt64(&srv.outBytes, totalBytes)
5045+
5046+
atomic.AddInt64(&srv.outClientMsgs, dlvClientMsgs)
5047+
atomic.AddInt64(&srv.outClientBytes, clientBytes)
50425048
}
50435049
}
50445050

@@ -5134,6 +5140,9 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
51345140
// We don't count internal deliveries, so do only when sub.icb is nil.
51355141
if sub.icb == nil {
51365142
dlvMsgs++
5143+
if sub.client.kind == CLIENT {
5144+
dlvClientMsgs++
5145+
}
51375146
}
51385147
didDeliver = true
51395148
}
@@ -5361,6 +5370,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
53615370
dlvRouteMsgs++
53625371
case LEAF:
53635372
dlvLeafMsgs++
5373+
case CLIENT:
5374+
dlvClientMsgs++
53645375
}
53655376
}
53665377
// Do the rest even when message delivery was skipped.
@@ -5459,6 +5470,8 @@ sendToRoutesOrLeafs:
54595470
dlvRouteMsgs++
54605471
case LEAF:
54615472
dlvLeafMsgs++
5473+
case CLIENT:
5474+
dlvClientMsgs++
54625475
}
54635476
dlvExtraSize += int64(len(dmsg) - len(msg))
54645477
}

server/client_test.go

Lines changed: 68 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3858,52 +3858,98 @@ func TestClientFlushOutboundWriteTimeoutPolicy(t *testing.T) {
38583858

38593859
func TestClientMsgsMetric(t *testing.T) {
38603860
o1 := DefaultOptions()
3861+
o1.ServerName = "S1"
38613862
s1 := RunServer(o1)
38623863
defer s1.Shutdown()
38633864

38643865
o2 := DefaultOptions()
3866+
o2.ServerName = "S2"
38653867
o2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port))
38663868
s2 := RunServer(o2)
38673869
defer s2.Shutdown()
38683870

38693871
checkClusterFormed(t, s1, s2)
38703872

3871-
nc1 := natsConnect(t, s1.ClientURL())
3872-
defer nc1.Close()
3873+
ncS1 := natsConnect(t, s1.ClientURL())
3874+
defer ncS1.Close()
38733875

3874-
nc2 := natsConnect(t, s2.ClientURL())
3875-
defer nc1.Close()
3876+
ncS2 := natsConnect(t, s2.ClientURL())
3877+
defer ncS1.Close()
38763878

3877-
natsSub(t, nc1, "foo", func(m *nats.Msg) {
3878-
m.Respond(m.Data)
3879-
})
3880-
natsSub(t, nc2, "bar", func(m *nats.Msg) {
3881-
m.Respond(m.Data)
3882-
})
3883-
nc1.Flush()
3884-
nc2.Flush()
3879+
natsSub(t, ncS1, "foo", func(m *nats.Msg) { m.Respond(m.Data) })
3880+
natsSub(t, ncS2, "bar", func(m *nats.Msg) { m.Respond(m.Data) })
3881+
ncS1.Flush()
3882+
ncS2.Flush()
38853883

3886-
_, err := nc2.Request("foo", []byte("6bytes"), 3*time.Second)
3884+
fooMsg := "6bytes"
3885+
// Requesting foo from S1 and bar from S2
3886+
// to verify to make sure client messages are counted correctly
3887+
_, err := ncS2.Request("foo", []byte(fooMsg), 3*time.Second)
38873888
if err != nil {
38883889
t.Fatalf("Error on receiving: %v", err)
38893890
}
3890-
_, err = nc1.Request("bar", []byte("ninebytes"), 3*time.Second)
3891+
barMsg := "ninebytes"
3892+
_, err = ncS1.Request("bar", []byte(barMsg), 3*time.Second)
38913893
if err != nil {
38923894
t.Fatalf("Error on receiving: %v", err)
38933895
}
3894-
nc1.Flush()
3895-
nc2.Flush()
3896+
ncS1.Flush()
3897+
ncS2.Flush()
38963898

3897-
// inMsgs and inBytes counts include routed messages
3899+
// in/out Msgs/Bytes counts include routed messages
38983900
require_Equal(t, atomic.LoadInt64(&s1.inMsgs), 4)
3899-
require_Equal(t, atomic.LoadInt64(&s1.inBytes), 6+6+9+9)
3901+
require_Equal(t, atomic.LoadInt64(&s1.inBytes), int64(len(fooMsg)*2+len(barMsg)*2))
39003902
require_Equal(t, atomic.LoadInt64(&s2.inMsgs), 4)
3901-
require_Equal(t, atomic.LoadInt64(&s2.inBytes), 6+6+9+9)
3903+
require_Equal(t, atomic.LoadInt64(&s2.inBytes), int64(len(fooMsg)*2+len(barMsg)*2))
3904+
3905+
require_Equal(t, atomic.LoadInt64(&s1.outMsgs), 4)
3906+
require_Equal(t, atomic.LoadInt64(&s1.outBytes), int64(len(fooMsg)*2+len(barMsg)*2))
3907+
require_Equal(t, atomic.LoadInt64(&s2.outMsgs), 4)
3908+
require_Equal(t, atomic.LoadInt64(&s2.outBytes), int64(len(fooMsg)*2+len(barMsg)*2))
39023909

3903-
// only count messages received from clients
3910+
// in/out ClientMsgs/Bytes only count client messages
39043911
require_Equal(t, atomic.LoadInt64(&s1.inClientMsgs), 2)
3905-
require_Equal(t, atomic.LoadInt64(&s1.inClientBytes), 6+9)
3912+
require_Equal(t, atomic.LoadInt64(&s1.inClientBytes), int64(len(fooMsg)+len(barMsg)))
39063913
require_Equal(t, atomic.LoadInt64(&s2.inClientMsgs), 2)
3907-
require_Equal(t, atomic.LoadInt64(&s2.inClientBytes), 6+9)
3914+
require_Equal(t, atomic.LoadInt64(&s2.inClientBytes), int64(len(fooMsg)+len(barMsg)))
3915+
3916+
require_Equal(t, atomic.LoadInt64(&s1.outClientMsgs), 2)
3917+
require_Equal(t, atomic.LoadInt64(&s1.outClientBytes), int64(len(fooMsg)+len(barMsg)))
3918+
require_Equal(t, atomic.LoadInt64(&s2.outClientMsgs), 2)
3919+
require_Equal(t, atomic.LoadInt64(&s2.outClientBytes), int64(len(fooMsg)+len(barMsg)))
3920+
3921+
natsQueueSub(t, ncS1, "orders.new", "workers", func(m *nats.Msg) { m.Respond(m.Data) })
3922+
natsQueueSub(t, ncS2, "orders.new", "workers", func(m *nats.Msg) { m.Respond(m.Data) })
3923+
ncS1.Flush()
3924+
ncS2.Flush()
39083925

3926+
orderMsg := "order"
3927+
_, err = ncS1.Request("orders.new", []byte(orderMsg), 3*time.Second)
3928+
if err != nil {
3929+
t.Fatalf("Error on receiving: %v", err)
3930+
}
3931+
ncS1.Flush()
3932+
ncS2.Flush()
3933+
3934+
if atomic.LoadInt64(&s1.outClientMsgs) == 4 {
3935+
require_Equal(t, atomic.LoadInt64(&s2.outClientMsgs), 2)
3936+
3937+
require_Equal(t, atomic.LoadInt64(&s1.outClientBytes),
3938+
int64(len(fooMsg)+len(barMsg)+len(orderMsg)*2))
3939+
3940+
require_Equal(t, atomic.LoadInt64(&s2.outClientBytes),
3941+
int64(len(fooMsg)+len(barMsg)))
3942+
3943+
} else if atomic.LoadInt64(&s1.outClientMsgs) == 3 {
3944+
require_Equal(t, atomic.LoadInt64(&s2.outClientMsgs), 3)
3945+
3946+
require_Equal(t, atomic.LoadInt64(&s1.outClientBytes),
3947+
int64(len(fooMsg)+len(barMsg)+len(orderMsg)))
3948+
3949+
require_Equal(t, atomic.LoadInt64(&s2.outClientBytes),
3950+
int64(len(fooMsg)+len(barMsg)+len(orderMsg)))
3951+
3952+
} else {
3953+
t.Fatalf("Did not get expected outClientMsg/Bytes for message sent on qsub")
3954+
}
39093955
}

server/monitor.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1258,12 +1258,14 @@ type Varz struct {
12581258
Routes int `json:"routes"` // Routes is the number of connected route servers
12591259
Remotes int `json:"remotes"` // Remotes is the configured route remote endpoints
12601260
Leafs int `json:"leafnodes"` // Leafs is the number connected leafnode clients
1261-
InMsgs int64 `json:"in_msgs"` // InMsgs is the total number of messages this server received. This includes messages from the clients, routers, gateways and leaf nodes.
1262-
OutMsgs int64 `json:"out_msgs"` // OutMsgs is the number of message this server sent
1263-
InBytes int64 `json:"in_bytes"` // InBytes is the total number of bytes this server received. This includes messages from the clients, routers, gateways and leaf nodes.
1264-
OutBytes int64 `json:"out_bytes"` // OutMsgs is the number of bytes this server sent
1261+
InMsgs int64 `json:"in_msgs"` // InMsgs is the total number of messages this server received. This includes messages from the clients, routers, gateways and leaf nodes
1262+
InBytes int64 `json:"in_bytes"` // InBytes is the total number of bytes this server received. This includes messages from the clients, routers, gateways and leaf nodes
12651263
InClientMsgs int64 `json:"in_client_msgs"` // InClientMsgs is the number of messages this server received from the clients
12661264
InClientBytes int64 `json:"in_client_bytes"` // InClientBytes is the number of bytes this server received from the clients
1265+
OutMsgs int64 `json:"out_msgs"` // OutMsgs is the total number of message this server sent. This includes messages sent to the clients, routers, gateways and leaf nodes
1266+
OutBytes int64 `json:"out_bytes"` // OutMsgs is the total number of bytes this server sent. This includes messages sent to the clients, routers, gateways and leaf nodes
1267+
OutClientMsgs int64 `json:"out_client_msgs"` // OutClientMsgs is the number of messages this server sent to the clients
1268+
OutClientBytes int64 `json:"out_client_bytes"` // OutClientBytes is the number of bytes this server sent to the clients
12671269
SlowConsumers int64 `json:"slow_consumers"` // SlowConsumers is the total count of clients that were disconnected since start due to being slow consumers
12681270
StaleConnections int64 `json:"stale_connections"` // StaleConnections is the total count of stale connections that were detected
12691271
StalledClients int64 `json:"stalled_clients"` // StalledClients is the total number of times that clients have been stalled.
@@ -1856,11 +1858,13 @@ func (s *Server) updateVarzRuntimeFields(v *Varz, forceUpdate bool, pcpu float64
18561858
v.Remotes = s.numRemotes()
18571859
v.Leafs = len(s.leafs)
18581860
v.InMsgs = atomic.LoadInt64(&s.inMsgs)
1859-
v.InClientMsgs = atomic.LoadInt64(&s.inClientMsgs)
18601861
v.InBytes = atomic.LoadInt64(&s.inBytes)
1861-
v.InClientBytes = atomic.LoadInt64(&s.inClientBytes)
18621862
v.OutMsgs = atomic.LoadInt64(&s.outMsgs)
18631863
v.OutBytes = atomic.LoadInt64(&s.outBytes)
1864+
v.InClientMsgs = atomic.LoadInt64(&s.inClientMsgs)
1865+
v.InClientBytes = atomic.LoadInt64(&s.inClientBytes)
1866+
v.OutClientMsgs = atomic.LoadInt64(&s.outClientMsgs)
1867+
v.OutClientBytes = atomic.LoadInt64(&s.outClientBytes)
18641868
v.SlowConsumers = atomic.LoadInt64(&s.slowConsumers)
18651869
v.StalledClients = atomic.LoadInt64(&s.stalls)
18661870
v.SlowConsumersStats = &SlowConsumersStats{

server/monitor_test.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -304,20 +304,26 @@ func TestMonitorHandleVarz(t *testing.T) {
304304
if v.InMsgs != 1 {
305305
t.Fatalf("Expected InMsgs of 1, got %v\n", v.InMsgs)
306306
}
307-
if v.InClientMsgs != 1 {
308-
t.Fatalf("Expected InClientMsgs of 1, got %v\n", v.InClientMsgs)
307+
if v.InBytes != 5 {
308+
t.Fatalf("Expected InBytes of 5, got %v\n", v.InBytes)
309309
}
310310
if v.OutMsgs != 1 {
311311
t.Fatalf("Expected OutMsgs of 1, got %v\n", v.OutMsgs)
312312
}
313-
if v.InBytes != 5 {
314-
t.Fatalf("Expected InBytes of 5, got %v\n", v.InBytes)
313+
if v.OutBytes != 5 {
314+
t.Fatalf("Expected OutBytes of 5, got %v\n", v.OutBytes)
315+
}
316+
if v.InClientMsgs != 1 {
317+
t.Fatalf("Expected InClientMsgs of 1, got %v\n", v.InClientMsgs)
315318
}
316319
if v.InClientBytes != 5 {
317320
t.Fatalf("Expected InClientBytes of 5, got %v\n", v.InClientBytes)
318321
}
319-
if v.OutBytes != 5 {
320-
t.Fatalf("Expected OutBytes of 5, got %v\n", v.OutBytes)
322+
if v.OutClientMsgs != 1 {
323+
t.Fatalf("Expected InClientMsgs of 1, got %v\n", v.InClientMsgs)
324+
}
325+
if v.OutClientBytes != 5 {
326+
t.Fatalf("Expected InClientBytes of 5, got %v\n", v.InClientBytes)
321327
}
322328
if v.Subscriptions <= 10 {
323329
t.Fatalf("Expected Subscriptions of at least 10, got %v\n", v.Subscriptions)

server/server.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,11 +401,13 @@ type nodeInfo struct {
401401

402402
type stats struct {
403403
inMsgs int64
404-
outMsgs int64
405404
inBytes int64
405+
outMsgs int64
406406
outBytes int64
407407
inClientMsgs int64
408408
inClientBytes int64
409+
outClientMsgs int64
410+
outClientBytes int64
409411
slowConsumers int64
410412
staleConnections int64
411413
stalls int64

0 commit comments

Comments
 (0)