Skip to content

Commit 7fbab14

Browse files
committed
refactor nsqd storage engine
1 parent 1db3903 commit 7fbab14

File tree

8 files changed

+566
-28
lines changed

8 files changed

+566
-28
lines changed

go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,13 @@ require (
1010
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
1111
github.com/judwhite/go-svc v1.0.0
1212
github.com/julienschmidt/httprouter v1.2.0
13+
github.com/kr/pretty v0.1.0 // indirect
1314
github.com/mreiferson/go-options v0.0.0-20190302015348-0c63f026bcd6
1415
github.com/nsqio/go-diskqueue v0.0.0-20180306152900-74cfbc9de839
1516
github.com/nsqio/go-nsq v1.0.7
1617
github.com/pmezard/go-difflib v1.0.0 // indirect
1718
github.com/stretchr/testify v1.2.2 // indirect
18-
golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6 // indirect
19+
github.com/vmihailenco/msgpack v4.0.4+incompatible
20+
google.golang.org/appengine v1.6.1 // indirect
21+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
1922
)

go.sum

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,19 @@ github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDf
1010
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
1111
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
1212
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
13+
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
14+
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
1315
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
1416
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
1517
github.com/judwhite/go-svc v1.0.0 h1:W447kYhZsqC14hkfNG8XLy9wbYibeMW75g5DtAIpFGw=
1618
github.com/judwhite/go-svc v1.0.0/go.mod h1:EeMSAFO3mLgEQfcvnZ50JDG0O1uQlagpAbMS6talrXE=
1719
github.com/julienschmidt/httprouter v1.2.0 h1:TDTW5Yz1mjftljbcKqRcrYhd4XeOoI98t+9HbQbYf7g=
1820
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
21+
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
22+
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
23+
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
24+
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
25+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
1926
github.com/mreiferson/go-options v0.0.0-20190302015348-0c63f026bcd6 h1:frRvTmIp7QT1RPaphBvr6zvEHfvdOX7jMO7rvicCH9Q=
2027
github.com/mreiferson/go-options v0.0.0-20190302015348-0c63f026bcd6/go.mod h1:zHtCks/HQvOt8ATyfwVe3JJq2PPuImzXINPRTC03+9w=
2128
github.com/nsqio/go-diskqueue v0.0.0-20180306152900-74cfbc9de839 h1:nZ0z0haJRzCXAWH9Jl+BUnfD2n2MCSbGRSl8VBX+zR0=
@@ -26,5 +33,24 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
2633
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
2734
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
2835
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
29-
golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6 h1:IcgEB62HYgAhX0Nd/QrVgZlxlcyxbGQHElLUhW2X4Fo=
30-
golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
36+
github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI=
37+
github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
38+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
39+
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
40+
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
41+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
42+
golang.org/x/net v0.0.0-20190603091049-60506f45cf65 h1:+rhAzEzT3f4JtomfC371qB+0Ola2caSKcY69NUBZrRQ=
43+
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
44+
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
45+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
46+
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
47+
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c h1:+EXw7AwNOKzPFXMZ1yNjO40aWCh3PIquJB2fYlv9wcs=
48+
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
49+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
50+
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
51+
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
52+
golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
53+
google.golang.org/appengine v1.6.1 h1:QzqyMA1tlu6CgqCDUtU9V+ZKhLFT2dkJuANu5QaxI3I=
54+
google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
55+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
56+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

nsqd/channel.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func NewChannel(topicName string, channelName string, ctx *context,
106106
ctx.nsqd.getOpts().DataPath,
107107
ctx.nsqd.getOpts().MaxBytesPerFile,
108108
int32(minValidMsgLength),
109-
int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
109+
int32(ctx.nsqd.getOpts().MaxMsgSize)+maxValidMsgLength,
110110
ctx.nsqd.getOpts().SyncEvery,
111111
ctx.nsqd.getOpts().SyncTimeout,
112112
dqLogf,

nsqd/http.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout
223223
}
224224

225225
msg := NewMessage(topic.GenerateID(), body)
226-
msg.deferred = deferred
226+
msg.AbsTs = time.Now().Add(deferred).UnixNano()
227227
err = topic.PutMessage(msg)
228228
if err != nil {
229229
return nil, http_api.Err{503, "EXITING"}

nsqd/message.go

Lines changed: 92 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,49 @@ package nsqd
33
import (
44
"bytes"
55
"encoding/binary"
6+
"errors"
67
"fmt"
78
"io"
9+
"math"
810
"time"
11+
12+
"github.com/vmihailenco/msgpack"
13+
)
14+
15+
var (
16+
// First 4 bytes picked from hex representation of a day before epoch timestamp which should never exist in normal timestamp.
17+
// python3 -c 'import struct; import datetime; print(struct.pack(">Q", int((datetime.datetime(1990, 1, 1).timestamp() - 60*60*24) * 10**9)))'
18+
msgMagic = []byte{0x08, 0xc1, 0xe4, 0xa0}
19+
20+
metaKey = []byte("meta")
21+
bodyKey = []byte("body")
22+
metaLengthPlaceholder = []byte("01") // 2 bytes
23+
bodyLengthPlaceholder = []byte("01234567") // 8 bytes. Because `MaxMsgSize` is in int64 type.
24+
25+
// No const or reference directly are mainly used for unit test.
26+
maxMetaLen uint16 = math.MaxUint16
927
)
1028

1129
const (
1230
MsgIDLength = 16
13-
minValidMsgLength = MsgIDLength + 8 + 2 // Timestamp + Attempts
31+
minValidMsgLength = 4 + 4 + 2 + 4 + 8 // msgMagic + metaKey + metaLen + bodyKey + bodyLen
32+
maxValidMsgLength = minValidMsgLength + math.MaxUint16 // minValidMsgLength + maxMetaLength
1433
)
1534

1635
type MessageID [MsgIDLength]byte
1736

1837
type Message struct {
19-
ID MessageID
20-
Body []byte
21-
Timestamp int64
22-
Attempts uint16
38+
ID MessageID `msgpack:"message_id"`
39+
Body []byte `msgpack:"-"`
40+
Timestamp int64 `msgpack:"timestamp"`
41+
Attempts uint16 `msgpack:"attempts"`
42+
AbsTs int64 `msgpack:"abs_ts"`
2343

2444
// for in-flight handling
2545
deliveryTS time.Time
2646
clientID int64
2747
pri int64
2848
index int
29-
deferred time.Duration
3049
}
3150

3251
func NewMessage(id MessageID, body []byte) *Message {
@@ -38,19 +57,45 @@ func NewMessage(id MessageID, body []byte) *Message {
3857
}
3958

4059
func (m *Message) WriteTo(w io.Writer) (int64, error) {
41-
var buf [10]byte
4260
var total int64
4361

44-
binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp))
45-
binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts))
62+
// magic bytes
63+
n, err := w.Write(msgMagic)
64+
total += int64(n)
65+
if err != nil {
66+
return total, err
67+
}
68+
69+
// meta bytes
70+
meta, err := msgpack.Marshal(m)
71+
if err != nil {
72+
return total, err
73+
}
74+
75+
if len(meta) > int(maxMetaLen) {
76+
return total, errors.New("marshaled meta data length exceeds max meta length")
77+
}
78+
79+
var metaPrefix = append(metaKey, metaLengthPlaceholder...)
80+
binary.BigEndian.PutUint16(metaPrefix[4:4+len(metaLengthPlaceholder)], uint16(len(meta)))
81+
82+
n, err = w.Write(metaPrefix[:])
83+
total += int64(n)
84+
if err != nil {
85+
return total, err
86+
}
4687

47-
n, err := w.Write(buf[:])
88+
n, err = w.Write(meta)
4889
total += int64(n)
4990
if err != nil {
5091
return total, err
5192
}
5293

53-
n, err = w.Write(m.ID[:])
94+
// msg body
95+
bodyPrefix := append(bodyKey, bodyLengthPlaceholder...)
96+
binary.BigEndian.PutUint64(bodyPrefix[4:4+len(bodyLengthPlaceholder)], uint64(len(m.Body)))
97+
98+
n, err = w.Write(bodyPrefix[:])
5499
total += int64(n)
55100
if err != nil {
56101
return total, err
@@ -67,6 +112,8 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) {
67112

68113
// decodeMessage deserializes data (as []byte) and creates a new Message
69114
// message format:
115+
//
116+
// Old message format:
70117
// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
71118
// | (int64) || || (hex string encoded in ASCII) || (binary)
72119
// | 8-byte || || 16-byte || N-byte
@@ -75,18 +122,45 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) {
75122
// (uint16)
76123
// 2-byte
77124
// attempts
125+
//
126+
// New message format:
127+
// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
128+
// | ([]byte)|| (metaKey+metaLen+meta) || (bodyKey+bodyLen+body)
129+
// | 4-byte || (4+2+N)-byte || (4+8+N)-byte
130+
// ------------------------------------------------------------------------------------------...
131+
// message magic message meta message body
132+
//
78133
func decodeMessage(b []byte) (*Message, error) {
79134
var msg Message
80135

81-
if len(b) < minValidMsgLength {
82-
return nil, fmt.Errorf("invalid message buffer size (%d)", len(b))
136+
prefixBytes := b[:len(msgMagic)]
137+
if bytes.Equal(prefixBytes, msgMagic) {
138+
// New message format
139+
metaStartIndex := len(msgMagic)
140+
if !bytes.Equal(b[metaStartIndex:metaStartIndex+len(metaKey)], metaKey) {
141+
return nil, fmt.Errorf("bad msg format. \"meta\" key should be after msg magic")
142+
}
143+
144+
metaSize := binary.BigEndian.Uint16(b[metaStartIndex+len(metaKey) : metaStartIndex+len(metaKey)+len(metaLengthPlaceholder)])
145+
err := msgpack.Unmarshal(b[metaStartIndex+len(metaKey)+len(metaLengthPlaceholder):metaStartIndex+len(metaKey)+len(metaLengthPlaceholder)+int(metaSize)], &msg)
146+
if err != nil {
147+
return nil, err
148+
}
149+
150+
bodyStartIndex := metaStartIndex + len(bodyKey) + len(metaLengthPlaceholder) + int(metaSize)
151+
if !bytes.Equal(b[bodyStartIndex:bodyStartIndex+len(bodyKey)], bodyKey) {
152+
return nil, fmt.Errorf("bad msg format. \"body\" key should be after meta content")
153+
}
154+
bodySize := binary.BigEndian.Uint64(b[bodyStartIndex+len(bodyKey) : bodyStartIndex+len(bodyKey)+len(bodyLengthPlaceholder)])
155+
msg.Body = b[bodyStartIndex+len(bodyKey)+len(bodyLengthPlaceholder) : uint64(bodyStartIndex+len(bodyKey)+len(bodyLengthPlaceholder))+bodySize]
156+
} else {
157+
// Old message format
158+
msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
159+
msg.Attempts = binary.BigEndian.Uint16(b[8:10])
160+
copy(msg.ID[:], b[10:10+MsgIDLength])
161+
msg.Body = b[10+MsgIDLength:]
83162
}
84163

85-
msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
86-
msg.Attempts = binary.BigEndian.Uint16(b[8:10])
87-
copy(msg.ID[:], b[10:10+MsgIDLength])
88-
msg.Body = b[10+MsgIDLength:]
89-
90164
return &msg, nil
91165
}
92166

0 commit comments

Comments
 (0)