Skip to content

Commit 3103474

Browse files
authored
Merge pull request #1301 from jehiah/topology_aware_msg_delivery_1301
nsqd: add topology region/zone aware message consumption
2 parents 8e7f8d1 + 1cd6297 commit 3103474

File tree

26 files changed

+717
-172
lines changed

26 files changed

+717
-172
lines changed

apps/nsqd/options.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
142142
flagSet.Var(&lookupdTCPAddrs, "lookupd-tcp-address", "lookupd TCP address (may be given multiple times)")
143143
flagSet.Duration("http-client-connect-timeout", opts.HTTPClientConnectTimeout, "timeout for HTTP connect")
144144
flagSet.Duration("http-client-request-timeout", opts.HTTPClientRequestTimeout, "timeout for HTTP request")
145+
flagSet.String("topology-region", opts.TopologyRegion, "A region represents a larger domain, made up of one or more zones for preferring closer consumer")
146+
flagSet.String("topology-zone", opts.TopologyZone, "A zone represents a logical failure domain for preferring closer consumer")
145147

146148
// diskqueue options
147149
flagSet.String("data-path", opts.DataPath, "path to store disk-backed messages")
@@ -197,5 +199,12 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
197199
flagSet.Int("max-deflate-level", opts.MaxDeflateLevel, "max deflate compression level a client can negotiate (> values == > nsqd CPU usage)")
198200
flagSet.Bool("snappy", opts.SnappyEnabled, "enable snappy feature negotiation (client compression)")
199201

202+
experiments := app.StringArray{}
203+
var validExperiments []string
204+
for _, e := range nsqd.AllExperiments {
205+
validExperiments = append(validExperiments, fmt.Sprintf("%q", string(e)))
206+
}
207+
flagSet.Var(&experiments, "enable-experiment", fmt.Sprintf("enable experimental feature (may be given multiple times) (valid options: %s)", strings.Join(validExperiments, ", ")))
208+
200209
return flagSet
201210
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ require (
1717
)
1818

1919
require (
20-
github.com/davecgh/go-spew v1.1.1 // indirect
20+
github.com/stretchr/testify v1.9.0 // indirect
2121
golang.org/x/sys v0.10.0 // indirect
2222
)
2323

go.sum

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw=
2-
github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
31
github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8=
42
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
53
github.com/bitly/go-hostpool v0.1.0 h1:XKmsF6k5el6xHG3WPJ8U0Ku/ye7njX7W81Ng7O2ioR0=
@@ -29,13 +27,20 @@ github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQT
2927
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
3028
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
3129
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
32-
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
30+
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
31+
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
32+
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
3333
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
34+
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
35+
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
36+
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
37+
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
38+
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
3439
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
35-
golang.org/x/sys v0.0.0-20211023085530-d6a326fbbf70 h1:SeSEfdIxyvwGJliREIJhRPPXvW6sDlLT+UQ3B0hD0NA=
36-
golang.org/x/sys v0.0.0-20211023085530-d6a326fbbf70/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
3740
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
3841
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
3942
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
40-
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
4143
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
44+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
45+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
46+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

internal/clusterinfo/data.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,8 @@ func (c *ClusterInfo) GetNSQDProducers(nsqdHTTPAddrs []string) (Producers, error
352352
Hostname string `json:"hostname"`
353353
HTTPPort int `json:"http_port"`
354354
TCPPort int `json:"tcp_port"`
355+
TopologyZone string `json:"topology_zone,omitempty"`
356+
TopologyRegion string `json:"topology_region,omitempty"`
355357
}
356358

357359
type statsRespType struct {
@@ -409,6 +411,8 @@ func (c *ClusterInfo) GetNSQDProducers(nsqdHTTPAddrs []string) (Producers, error
409411
HTTPPort: infoResp.HTTPPort,
410412
TCPPort: infoResp.TCPPort,
411413
Topics: producerTopics,
414+
TopologyZone: infoResp.TopologyZone,
415+
TopologyRegion: infoResp.TopologyRegion,
412416
})
413417
}(addr)
414418
}
@@ -437,6 +441,8 @@ func (c *ClusterInfo) GetNSQDTopicProducers(topic string, nsqdHTTPAddrs []string
437441
Hostname string `json:"hostname"`
438442
HTTPPort int `json:"http_port"`
439443
TCPPort int `json:"tcp_port"`
444+
TopologyZone string `json:"topology_zone,omitempty"`
445+
TopologyRegion string `json:"topology_region,omitempty"`
440446
}
441447

442448
type statsRespType struct {
@@ -508,6 +514,8 @@ func (c *ClusterInfo) GetNSQDTopicProducers(topic string, nsqdHTTPAddrs []string
508514
HTTPPort: infoResp.HTTPPort,
509515
TCPPort: infoResp.TCPPort,
510516
Topics: producerTopics,
517+
TopologyZone: infoResp.TopologyZone,
518+
TopologyRegion: infoResp.TopologyRegion,
511519
})
512520
lock.Unlock()
513521

@@ -582,6 +590,7 @@ func (c *ClusterInfo) GetNSQDStats(producers Producers,
582590
topic.Node = addr
583591
topic.Hostname = p.Hostname
584592
topic.MemoryDepth = topic.Depth - topic.BackendDepth
593+
topic.DeliveryMsgCount = topic.ZoneLocalMsgCount + topic.RegionLocalMsgCount + topic.GlobalMsgCount
585594
if selectedTopic != "" && topic.TopicName != selectedTopic {
586595
continue
587596
}
@@ -592,6 +601,7 @@ func (c *ClusterInfo) GetNSQDStats(producers Producers,
592601
channel.Hostname = p.Hostname
593602
channel.TopicName = topic.TopicName
594603
channel.MemoryDepth = channel.Depth - channel.BackendDepth
604+
channel.DeliveryMsgCount = channel.ZoneLocalMsgCount + channel.RegionLocalMsgCount + channel.GlobalMsgCount
595605
key := channel.ChannelName
596606
if selectedTopic == "" {
597607
key = fmt.Sprintf("%s:%s", topic.TopicName, channel.ChannelName)
@@ -607,6 +617,8 @@ func (c *ClusterInfo) GetNSQDStats(producers Producers,
607617
}
608618
for _, c := range channel.Clients {
609619
c.Node = addr
620+
c.NodeTopologyRegion = p.TopologyRegion
621+
c.NodeTopologyZone = p.TopologyZone
610622
}
611623
channelStats.Add(channel)
612624
}

internal/clusterinfo/types.go

Lines changed: 101 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ type Producer struct {
3030
TCPPort int `json:"tcp_port"`
3131
HTTPPort int `json:"http_port"`
3232
Version string `json:"version"`
33+
TopologyZone string `json:"topology_zone,omitempty"`
34+
TopologyRegion string `json:"topology_region,omitempty"`
3335
VersionObj semver.Version `json:"-"`
3436
Topics ProducerTopics `json:"topics"`
3537
OutOfDate bool `json:"out_of_date"`
@@ -46,6 +48,8 @@ func (p *Producer) UnmarshalJSON(b []byte) error {
4648
Version string `json:"version"`
4749
Topics []string `json:"topics"`
4850
Tombstoned []bool `json:"tombstones"`
51+
TopologyZone string `json:"topology_zone,omitempty"`
52+
TopologyRegion string `json:"topology_region,omitempty"`
4953
}
5054
if err := json.Unmarshal(b, &r); err != nil {
5155
return err
@@ -57,6 +61,8 @@ func (p *Producer) UnmarshalJSON(b []byte) error {
5761
TCPPort: r.TCPPort,
5862
HTTPPort: r.HTTPPort,
5963
Version: r.Version,
64+
TopologyZone: r.TopologyZone,
65+
TopologyRegion: r.TopologyRegion,
6066
}
6167
for i, t := range r.Topics {
6268
p.Topics = append(p.Topics, ProducerTopic{Topic: t, Tombstoned: r.Tombstoned[i]})
@@ -92,16 +98,20 @@ func (p *Producer) IsInconsistent(numLookupd int) bool {
9298
}
9399

94100
type TopicStats struct {
95-
Node string `json:"node"`
96-
Hostname string `json:"hostname"`
97-
TopicName string `json:"topic_name"`
98-
Depth int64 `json:"depth"`
99-
MemoryDepth int64 `json:"memory_depth"`
100-
BackendDepth int64 `json:"backend_depth"`
101-
MessageCount int64 `json:"message_count"`
102-
NodeStats []*TopicStats `json:"nodes"`
103-
Channels []*ChannelStats `json:"channels"`
104-
Paused bool `json:"paused"`
101+
Node string `json:"node"`
102+
Hostname string `json:"hostname"`
103+
TopicName string `json:"topic_name"`
104+
Depth int64 `json:"depth"`
105+
MemoryDepth int64 `json:"memory_depth"`
106+
BackendDepth int64 `json:"backend_depth"`
107+
MessageCount int64 `json:"message_count"`
108+
DeliveryMsgCount int64 `json:"delivery_msg_count"`
109+
ZoneLocalMsgCount int64 `json:"zone_local_msg_count,omitempty"`
110+
RegionLocalMsgCount int64 `json:"region_local_msg_count,omitempty"`
111+
GlobalMsgCount int64 `json:"global_msg_count,omitempty"`
112+
NodeStats []*TopicStats `json:"nodes"`
113+
Channels []*ChannelStats `json:"channels"`
114+
Paused bool `json:"paused"`
105115

106116
E2eProcessingLatency *quantile.E2eProcessingLatencyAggregate `json:"e2e_processing_latency"`
107117
}
@@ -112,6 +122,10 @@ func (t *TopicStats) Add(a *TopicStats) {
112122
t.MemoryDepth += a.MemoryDepth
113123
t.BackendDepth += a.BackendDepth
114124
t.MessageCount += a.MessageCount
125+
t.DeliveryMsgCount += a.DeliveryMsgCount
126+
t.ZoneLocalMsgCount += a.ZoneLocalMsgCount
127+
t.RegionLocalMsgCount += a.RegionLocalMsgCount
128+
t.GlobalMsgCount += a.GlobalMsgCount
115129
if a.Paused {
116130
t.Paused = a.Paused
117131
}
@@ -139,23 +153,27 @@ func (t *TopicStats) Add(a *TopicStats) {
139153
}
140154

141155
type ChannelStats struct {
142-
Node string `json:"node"`
143-
Hostname string `json:"hostname"`
144-
TopicName string `json:"topic_name"`
145-
ChannelName string `json:"channel_name"`
146-
Depth int64 `json:"depth"`
147-
MemoryDepth int64 `json:"memory_depth"`
148-
BackendDepth int64 `json:"backend_depth"`
149-
InFlightCount int64 `json:"in_flight_count"`
150-
DeferredCount int64 `json:"deferred_count"`
151-
RequeueCount int64 `json:"requeue_count"`
152-
TimeoutCount int64 `json:"timeout_count"`
153-
MessageCount int64 `json:"message_count"`
154-
ClientCount int `json:"client_count"`
155-
Selected bool `json:"-"`
156-
NodeStats []*ChannelStats `json:"nodes"`
157-
Clients []*ClientStats `json:"clients"`
158-
Paused bool `json:"paused"`
156+
Node string `json:"node"`
157+
Hostname string `json:"hostname"`
158+
TopicName string `json:"topic_name"`
159+
ChannelName string `json:"channel_name"`
160+
Depth int64 `json:"depth"`
161+
MemoryDepth int64 `json:"memory_depth"`
162+
BackendDepth int64 `json:"backend_depth"`
163+
InFlightCount int64 `json:"in_flight_count"`
164+
DeferredCount int64 `json:"deferred_count"`
165+
RequeueCount int64 `json:"requeue_count"`
166+
TimeoutCount int64 `json:"timeout_count"`
167+
MessageCount int64 `json:"message_count"`
168+
DeliveryMsgCount int64 `json:"delivery_msg_count,omitempty"`
169+
ZoneLocalMsgCount int64 `json:"zone_local_msg_count,omitempty"`
170+
RegionLocalMsgCount int64 `json:"region_local_msg_count,omitempty"`
171+
GlobalMsgCount int64 `json:"global_msg_count,omitempty"`
172+
ClientCount int `json:"client_count"`
173+
Selected bool `json:"-"`
174+
NodeStats []*ChannelStats `json:"nodes"`
175+
Clients []*ClientStats `json:"clients"`
176+
Paused bool `json:"paused"`
159177

160178
E2eProcessingLatency *quantile.E2eProcessingLatencyAggregate `json:"e2e_processing_latency"`
161179
}
@@ -170,6 +188,10 @@ func (c *ChannelStats) Add(a *ChannelStats) {
170188
c.RequeueCount += a.RequeueCount
171189
c.TimeoutCount += a.TimeoutCount
172190
c.MessageCount += a.MessageCount
191+
c.DeliveryMsgCount += a.DeliveryMsgCount
192+
c.ZoneLocalMsgCount += a.ZoneLocalMsgCount
193+
c.RegionLocalMsgCount += a.RegionLocalMsgCount
194+
c.GlobalMsgCount += a.GlobalMsgCount
173195
c.ClientCount += a.ClientCount
174196
if a.Paused {
175197
c.Paused = a.Paused
@@ -189,25 +211,29 @@ func (c *ChannelStats) Add(a *ChannelStats) {
189211
}
190212

191213
type ClientStats struct {
192-
Node string `json:"node"`
193-
RemoteAddress string `json:"remote_address"`
194-
Version string `json:"version"`
195-
ClientID string `json:"client_id"`
196-
Hostname string `json:"hostname"`
197-
UserAgent string `json:"user_agent"`
198-
ConnectTs int64 `json:"connect_ts"`
199-
ConnectedDuration time.Duration `json:"connected"`
200-
InFlightCount int `json:"in_flight_count"`
201-
ReadyCount int `json:"ready_count"`
202-
FinishCount int64 `json:"finish_count"`
203-
RequeueCount int64 `json:"requeue_count"`
204-
MessageCount int64 `json:"message_count"`
205-
SampleRate int32 `json:"sample_rate"`
206-
Deflate bool `json:"deflate"`
207-
Snappy bool `json:"snappy"`
208-
Authed bool `json:"authed"`
209-
AuthIdentity string `json:"auth_identity"`
210-
AuthIdentityURL string `json:"auth_identity_url"`
214+
Node string `json:"node"`
215+
RemoteAddress string `json:"remote_address"`
216+
Version string `json:"version"`
217+
ClientID string `json:"client_id"`
218+
Hostname string `json:"hostname"`
219+
UserAgent string `json:"user_agent"`
220+
ConnectTs int64 `json:"connect_ts"`
221+
ConnectedDuration time.Duration `json:"connected"`
222+
InFlightCount int `json:"in_flight_count"`
223+
ReadyCount int `json:"ready_count"`
224+
FinishCount int64 `json:"finish_count"`
225+
RequeueCount int64 `json:"requeue_count"`
226+
MessageCount int64 `json:"message_count"`
227+
SampleRate int32 `json:"sample_rate"`
228+
Deflate bool `json:"deflate"`
229+
Snappy bool `json:"snappy"`
230+
Authed bool `json:"authed"`
231+
AuthIdentity string `json:"auth_identity"`
232+
AuthIdentityURL string `json:"auth_identity_url"`
233+
NodeTopologyRegion string `json:"node_topology_region,omitempty"`
234+
NodeTopologyZone string `json:"node_topology_zone,omitempty"`
235+
TopologyRegion string `json:"topology_region,omitempty"`
236+
TopologyZone string `json:"topology_zone,omitempty"`
211237

212238
TLS bool `json:"tls"`
213239
CipherSuite string `json:"tls_cipher_suite"`
@@ -262,6 +288,35 @@ func (c ClientsByHost) Less(i, j int) bool {
262288
return c.ClientStatsList[i].Hostname < c.ClientStatsList[j].Hostname
263289
}
264290

291+
type ClientStatsByNodeTopology struct {
292+
ClientStatsList
293+
}
294+
295+
func (c ClientStatsByNodeTopology) Less(i, j int) bool {
296+
// if its the same node, sort by topology
297+
if c.ClientStatsList[i].Node == c.ClientStatsList[j].Node {
298+
region := c.ClientStatsList[i].NodeTopologyRegion
299+
zone := c.ClientStatsList[i].NodeTopologyZone
300+
301+
switch {
302+
case c.ClientStatsList[i].TopologyRegion == region && c.ClientStatsList[i].TopologyZone == zone:
303+
return true
304+
case c.ClientStatsList[j].TopologyRegion == region && c.ClientStatsList[j].TopologyZone == zone:
305+
return false
306+
case c.ClientStatsList[i].TopologyRegion == region:
307+
return true
308+
case c.ClientStatsList[j].TopologyRegion == region:
309+
return false
310+
default:
311+
if c.ClientStatsList[i].TopologyRegion == c.ClientStatsList[j].TopologyRegion {
312+
return c.ClientStatsList[i].TopologyZone < c.ClientStatsList[j].TopologyZone
313+
}
314+
return c.ClientStatsList[i].TopologyRegion < c.ClientStatsList[j].TopologyRegion
315+
}
316+
}
317+
return c.ClientStatsList[i].Node < c.ClientStatsList[j].Node
318+
}
319+
265320
type TopicStatsList []*TopicStats
266321

267322
func (t TopicStatsList) Len() int { return len(t) }

nsqadmin/http.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"os"
1414
"path"
1515
"reflect"
16+
"sort"
1617
"strings"
1718
"time"
1819

@@ -328,6 +329,8 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps
328329
messages = append(messages, pe.Error())
329330
}
330331

332+
sort.Sort(clusterinfo.ClientStatsByNodeTopology{channelStats[channelName].Clients})
333+
331334
return struct {
332335
*clusterinfo.ChannelStats
333336
Message string `json:"message"`

0 commit comments

Comments
 (0)