Skip to content

Commit ace282d

Browse files
committed
nsqd: per-topic message IDs
this modifies message ID generation such that each topic maintains a monotonic/atomic counter to generate message IDs, a more scalable and performant approach.
1 parent 6f99c0b commit ace282d

File tree

11 files changed

+58
-145
lines changed

11 files changed

+58
-145
lines changed

nsqd/channel_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func TestInFlightWorker(t *testing.T) {
8787
channel := topic.GetChannel("channel")
8888

8989
for i := 0; i < count; i++ {
90-
msg := NewMessage(<-nsqd.idChan, []byte("test"))
90+
msg := NewMessage(topic.GenerateID(), []byte("test"))
9191
channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout)
9292
}
9393

@@ -129,7 +129,7 @@ func TestChannelEmpty(t *testing.T) {
129129

130130
msgs := make([]*Message, 0, 25)
131131
for i := 0; i < 25; i++ {
132-
msg := NewMessage(<-nsqd.idChan, []byte("test"))
132+
msg := NewMessage(topic.GenerateID(), []byte("test"))
133133
channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout)
134134
msgs = append(msgs, msg)
135135
}
@@ -167,7 +167,7 @@ func TestChannelEmptyConsumer(t *testing.T) {
167167
channel.AddClient(client.ID, client)
168168

169169
for i := 0; i < 25; i++ {
170-
msg := NewMessage(<-nsqd.idChan, []byte("test"))
170+
msg := NewMessage(topic.GenerateID(), []byte("test"))
171171
channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout)
172172
client.SendingMessage()
173173
}
@@ -206,15 +206,15 @@ func TestChannelHealth(t *testing.T) {
206206

207207
channel.backend = &errorBackendQueue{}
208208

209-
msg := NewMessage(<-nsqd.idChan, make([]byte, 100))
209+
msg := NewMessage(topic.GenerateID(), make([]byte, 100))
210210
err := channel.PutMessage(msg)
211211
equal(t, err, nil)
212212

213-
msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
213+
msg = NewMessage(topic.GenerateID(), make([]byte, 100))
214214
err = channel.PutMessage(msg)
215215
equal(t, err, nil)
216216

217-
msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
217+
msg = NewMessage(topic.GenerateID(), make([]byte, 100))
218218
err = channel.PutMessage(msg)
219219
nequal(t, err, nil)
220220

@@ -228,7 +228,7 @@ func TestChannelHealth(t *testing.T) {
228228

229229
channel.backend = &errorRecoveredBackendQueue{}
230230

231-
msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
231+
msg = NewMessage(topic.GenerateID(), make([]byte, 100))
232232
err = channel.PutMessage(msg)
233233
equal(t, err, nil)
234234

nsqd/guid.go

Lines changed: 4 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,18 @@
11
package nsqd
22

3-
// the core algorithm here was borrowed from:
4-
// Blake Mizerany's `noeqd` https://github.com/bmizerany/noeqd
5-
// and indirectly:
6-
// Twitter's `snowflake` https://github.com/twitter/snowflake
7-
8-
// only minor cleanup and changes to introduce a type, combine the concept
9-
// of workerID + datacenterId into a single identifier, and modify the
10-
// behavior when sequences rollover for our specific implementation needs
11-
123
import (
134
"encoding/hex"
14-
"errors"
15-
"time"
5+
"sync/atomic"
166
)
177

18-
const (
19-
workerIDBits = uint64(10)
20-
sequenceBits = uint64(12)
21-
workerIDShift = sequenceBits
22-
timestampShift = sequenceBits + workerIDBits
23-
sequenceMask = int64(-1) ^ (int64(-1) << sequenceBits)
24-
25-
// ( 2012-10-28 16:23:42 UTC ).UnixNano() >> 20
26-
twepoch = int64(1288834974288)
27-
)
28-
29-
var ErrTimeBackwards = errors.New("time has gone backwards")
30-
var ErrSequenceExpired = errors.New("sequence expired")
31-
var ErrIDBackwards = errors.New("ID went backward")
32-
338
type guid int64
349

3510
type guidFactory struct {
36-
sequence int64
37-
lastTimestamp int64
38-
lastID guid
11+
sequence int64
3912
}
4013

41-
func (f *guidFactory) NewGUID(workerID int64) (guid, error) {
42-
// divide by 1048576, giving pseudo-milliseconds
43-
ts := time.Now().UnixNano() >> 20
44-
45-
if ts < f.lastTimestamp {
46-
return 0, ErrTimeBackwards
47-
}
48-
49-
if f.lastTimestamp == ts {
50-
f.sequence = (f.sequence + 1) & sequenceMask
51-
if f.sequence == 0 {
52-
return 0, ErrSequenceExpired
53-
}
54-
} else {
55-
f.sequence = 0
56-
}
57-
58-
f.lastTimestamp = ts
59-
60-
id := guid(((ts - twepoch) << timestampShift) |
61-
(workerID << workerIDShift) |
62-
f.sequence)
63-
64-
if id <= f.lastID {
65-
return 0, ErrIDBackwards
66-
}
67-
68-
f.lastID = id
69-
70-
return id, nil
14+
func (f *guidFactory) NewGUID() guid {
15+
return guid(atomic.AddInt64(&f.sequence, 1))
7116
}
7217

7318
func (g guid) Hex() MessageID {

nsqd/guid_test.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,6 @@ func BenchmarkGUIDUnsafe(b *testing.B) {
2525
func BenchmarkGUID(b *testing.B) {
2626
factory := &guidFactory{}
2727
for i := 0; i < b.N; i++ {
28-
guid, err := factory.NewGUID(0)
29-
if err != nil {
30-
continue
31-
}
32-
guid.Hex()
28+
factory.NewGUID().Hex()
3329
}
3430
}

nsqd/http.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout
236236
}
237237
}
238238

239-
msg := NewMessage(<-s.ctx.nsqd.idChan, body)
239+
msg := NewMessage(topic.GenerateID(), body)
240240
msg.deferred = deferred
241241
err = topic.PutMessage(msg)
242242
if err != nil {
@@ -265,8 +265,7 @@ func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou
265265
_, ok := reqParams["binary"]
266266
if ok {
267267
tmp := make([]byte, 4)
268-
msgs, err = readMPUB(req.Body, tmp, s.ctx.nsqd.idChan,
269-
s.ctx.nsqd.getOpts().MaxMsgSize)
268+
msgs, err = readMPUB(req.Body, tmp, topic, s.ctx.nsqd.getOpts().MaxMsgSize)
270269
if err != nil {
271270
return nil, http_api.Err{413, err.(*protocol.FatalClientErr).Code[2:]}
272271
}
@@ -304,7 +303,7 @@ func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou
304303
return nil, http_api.Err{413, "MSG_TOO_BIG"}
305304
}
306305

307-
msg := NewMessage(<-s.ctx.nsqd.idChan, block)
306+
msg := NewMessage(topic.GenerateID(), block)
308307
msgs = append(msgs, msg)
309308
}
310309
}

nsqd/nsqd.go

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"net"
1212
"os"
1313
"path"
14-
"runtime"
1514
"strings"
1615
"sync"
1716
"sync/atomic"
@@ -61,7 +60,6 @@ type NSQD struct {
6160

6261
poolSize int
6362

64-
idChan chan MessageID
6563
notifyChan chan interface{}
6664
optsNotificationChan chan struct{}
6765
exitChan chan int
@@ -80,7 +78,6 @@ func New(opts *Options) *NSQD {
8078
n := &NSQD{
8179
startTime: time.Now(),
8280
topicMap: make(map[string]*Topic),
83-
idChan: make(chan MessageID, 4096),
8481
exitChan: make(chan int),
8582
notifyChan: make(chan interface{}),
8683
optsNotificationChan: make(chan struct{}, 1),
@@ -254,7 +251,6 @@ func (n *NSQD) Main() {
254251
})
255252

256253
n.waitGroup.Wrap(func() { n.queueScanLoop() })
257-
n.waitGroup.Wrap(func() { n.idPump() })
258254
n.waitGroup.Wrap(func() { n.lookupLoop() })
259255
if n.getOpts().StatsdAddress != "" {
260256
n.waitGroup.Wrap(func() { n.statsdLoop() })
@@ -419,8 +415,6 @@ func (n *NSQD) Exit() {
419415
}
420416
n.Unlock()
421417

422-
// we want to do this last as it closes the idPump (if closed first it
423-
// could potentially starve items in process and deadlock)
424418
close(n.exitChan)
425419
n.waitGroup.Wait()
426420

@@ -524,33 +518,6 @@ func (n *NSQD) DeleteExistingTopic(topicName string) error {
524518
return nil
525519
}
526520

527-
func (n *NSQD) idPump() {
528-
factory := &guidFactory{}
529-
lastError := time.Unix(0, 0)
530-
workerID := n.getOpts().ID
531-
for {
532-
id, err := factory.NewGUID(workerID)
533-
if err != nil {
534-
now := time.Now()
535-
if now.Sub(lastError) > time.Second {
536-
// only print the error once/second
537-
n.logf("ERROR: %s", err)
538-
lastError = now
539-
}
540-
runtime.Gosched()
541-
continue
542-
}
543-
select {
544-
case n.idChan <- id.Hex():
545-
case <-n.exitChan:
546-
goto exit
547-
}
548-
}
549-
550-
exit:
551-
n.logf("ID: closing")
552-
}
553-
554521
func (n *NSQD) Notify(v interface{}) {
555522
// since the in-memory metadata is incomplete,
556523
// should not persist metadata while loading it.

nsqd/nsqd_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func TestStartup(t *testing.T) {
124124
body := make([]byte, 256)
125125
topic := nsqd.GetTopic(topicName)
126126
for i := 0; i < iterations; i++ {
127-
msg := NewMessage(<-nsqd.idChan, body)
127+
msg := NewMessage(topic.GenerateID(), body)
128128
topic.PutMessage(msg)
129129
}
130130

@@ -226,7 +226,7 @@ func TestEphemeralTopicsAndChannels(t *testing.T) {
226226
client := newClientV2(0, nil, &context{nsqd})
227227
ephemeralChannel.AddClient(client.ID, client)
228228

229-
msg := NewMessage(<-nsqd.idChan, body)
229+
msg := NewMessage(topic.GenerateID(), body)
230230
topic.PutMessage(msg)
231231
msg = <-ephemeralChannel.clientMsgChan
232232
equal(t, msg.Body, body)

nsqd/protocol_v2.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -761,7 +761,7 @@ func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
761761
}
762762

763763
topic := p.ctx.nsqd.GetTopic(topicName)
764-
msg := NewMessage(<-p.ctx.nsqd.idChan, messageBody)
764+
msg := NewMessage(topic.GenerateID(), messageBody)
765765
err = topic.PutMessage(msg)
766766
if err != nil {
767767
return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())
@@ -783,6 +783,12 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) {
783783
fmt.Sprintf("E_BAD_TOPIC MPUB topic name %q is not valid", topicName))
784784
}
785785

786+
if err := p.CheckAuth(client, "MPUB", topicName, ""); err != nil {
787+
return nil, err
788+
}
789+
790+
topic := p.ctx.nsqd.GetTopic(topicName)
791+
786792
bodyLen, err := readLen(client.Reader, client.lenSlice)
787793
if err != nil {
788794
return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read body size")
@@ -798,18 +804,12 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) {
798804
fmt.Sprintf("MPUB body too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxBodySize))
799805
}
800806

801-
messages, err := readMPUB(client.Reader, client.lenSlice, p.ctx.nsqd.idChan,
807+
messages, err := readMPUB(client.Reader, client.lenSlice, topic,
802808
p.ctx.nsqd.getOpts().MaxMsgSize)
803809
if err != nil {
804810
return nil, err
805811
}
806812

807-
if err := p.CheckAuth(client, "MPUB", topicName, ""); err != nil {
808-
return nil, err
809-
}
810-
811-
topic := p.ctx.nsqd.GetTopic(topicName)
812-
813813
// if we've made it this far we've validated all the input,
814814
// the only possible error is that the topic is exiting during
815815
// this next call (and no messages will be queued in that case)
@@ -873,7 +873,7 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) {
873873
}
874874

875875
topic := p.ctx.nsqd.GetTopic(topicName)
876-
msg := NewMessage(<-p.ctx.nsqd.idChan, messageBody)
876+
msg := NewMessage(topic.GenerateID(), messageBody)
877877
msg.deferred = timeoutDuration
878878
err = topic.PutMessage(msg)
879879
if err != nil {
@@ -910,7 +910,7 @@ func (p *protocolV2) TOUCH(client *clientV2, params [][]byte) ([]byte, error) {
910910
return nil, nil
911911
}
912912

913-
func readMPUB(r io.Reader, tmp []byte, idChan chan MessageID, maxMessageSize int64) ([]*Message, error) {
913+
func readMPUB(r io.Reader, tmp []byte, topic *Topic, maxMessageSize int64) ([]*Message, error) {
914914
numMessages, err := readLen(r, tmp)
915915
if err != nil {
916916
return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read message count")
@@ -945,7 +945,7 @@ func readMPUB(r io.Reader, tmp []byte, idChan chan MessageID, maxMessageSize int
945945
return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "MPUB failed to read message body")
946946
}
947947

948-
messages = append(messages, NewMessage(<-idChan, msgBody))
948+
messages = append(messages, NewMessage(topic.GenerateID(), msgBody))
949949
}
950950

951951
return messages, nil

0 commit comments

Comments
 (0)