@@ -3,30 +3,49 @@ package nsqd
33import (
44 "bytes"
55 "encoding/binary"
6+ "errors"
67 "fmt"
78 "io"
9+ "math"
810 "time"
11+
12+ "github.com/vmihailenco/msgpack"
13+ )
14+
15+ var (
16+ // First 4 bytes picked from hex representation of a day before epoch timestamp which should never exist in normal timestamp.
17+ // python3 -c 'import struct; import datetime; print(struct.pack(">Q", int((datetime.datetime(1990, 1, 1).timestamp() - 60*60*24) * 10**9)))'
18+ msgMagic = []byte {0x08 , 0xc1 , 0xe4 , 0xa0 }
19+
20+ metaKey = []byte ("meta" )
21+ bodyKey = []byte ("body" )
22+ metaLengthPlaceholder = []byte ("01" ) // 2 bytes
23+ bodyLengthPlaceholder = []byte ("01234567" ) // 8 bytes. Because `MaxMsgSize` is in int64 type.
24+
25+ // No const or reference directly are mainly used for unit test.
26+ maxMetaLen uint16 = math .MaxUint16
927)
1028
1129const (
1230 MsgIDLength = 16
13- minValidMsgLength = MsgIDLength + 8 + 2 // Timestamp + Attempts
31+ minValidMsgLength = 4 + 4 + 2 + 4 + 8 // msgMagic + metaKey + metaLen + bodyKey + bodyLen
32+ maxValidMsgLength = minValidMsgLength + math .MaxUint16 // minValidMsgLength + maxMetaLength
1433)
1534
1635type MessageID [MsgIDLength ]byte
1736
1837type Message struct {
19- ID MessageID
20- Body []byte
21- Timestamp int64
22- Attempts uint16
38+ ID MessageID `msgpack:"message_id"`
39+ Body []byte `msgpack:"-"`
40+ Timestamp int64 `msgpack:"timestamp"`
41+ Attempts uint16 `msgpack:"attempts"`
42+ AbsTs int64 `msgpack:"abs_ts"`
2343
2444 // for in-flight handling
2545 deliveryTS time.Time
2646 clientID int64
2747 pri int64
2848 index int
29- deferred time.Duration
3049}
3150
3251func NewMessage (id MessageID , body []byte ) * Message {
@@ -38,19 +57,45 @@ func NewMessage(id MessageID, body []byte) *Message {
3857}
3958
4059func (m * Message ) WriteTo (w io.Writer ) (int64 , error ) {
41- var buf [10 ]byte
4260 var total int64
4361
44- binary .BigEndian .PutUint64 (buf [:8 ], uint64 (m .Timestamp ))
45- binary .BigEndian .PutUint16 (buf [8 :10 ], uint16 (m .Attempts ))
62+ // magic bytes
63+ n , err := w .Write (msgMagic )
64+ total += int64 (n )
65+ if err != nil {
66+ return total , err
67+ }
68+
69+ // meta bytes
70+ meta , err := msgpack .Marshal (m )
71+ if err != nil {
72+ return total , err
73+ }
74+
75+ if len (meta ) > int (maxMetaLen ) {
76+ return total , errors .New ("marshaled meta data length exceeds max meta length" )
77+ }
78+
79+ var metaPrefix = append (metaKey , metaLengthPlaceholder ... )
80+ binary .BigEndian .PutUint16 (metaPrefix [4 :4 + len (metaLengthPlaceholder )], uint16 (len (meta )))
81+
82+ n , err = w .Write (metaPrefix [:])
83+ total += int64 (n )
84+ if err != nil {
85+ return total , err
86+ }
4687
47- n , err : = w .Write (buf [:] )
88+ n , err = w .Write (meta )
4889 total += int64 (n )
4990 if err != nil {
5091 return total , err
5192 }
5293
53- n , err = w .Write (m .ID [:])
94+ // msg body
95+ bodyPrefix := append (bodyKey , bodyLengthPlaceholder ... )
96+ binary .BigEndian .PutUint64 (bodyPrefix [4 :4 + len (bodyLengthPlaceholder )], uint64 (len (m .Body )))
97+
98+ n , err = w .Write (bodyPrefix [:])
5499 total += int64 (n )
55100 if err != nil {
56101 return total , err
@@ -67,6 +112,8 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) {
67112
68113// decodeMessage deserializes data (as []byte) and creates a new Message
69114// message format:
115+ //
116+ // Old message format:
70117// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
71118// | (int64) || || (hex string encoded in ASCII) || (binary)
72119// | 8-byte || || 16-byte || N-byte
@@ -75,18 +122,45 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) {
75122// (uint16)
76123// 2-byte
77124// attempts
125+ //
126+ // New message format:
127+ // [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
128+ // | ([]byte)|| (metaKey+metaLen+meta) || (bodyKey+bodyLen+body)
129+ // | 4-byte || (4+2+N)-byte || (4+8+N)-byte
130+ // ------------------------------------------------------------------------------------------...
131+ // message magic message meta message body
132+ //
78133func decodeMessage (b []byte ) (* Message , error ) {
79134 var msg Message
80135
81- if len (b ) < minValidMsgLength {
82- return nil , fmt .Errorf ("invalid message buffer size (%d)" , len (b ))
136+ prefixBytes := b [:len (msgMagic )]
137+ if bytes .Equal (prefixBytes , msgMagic ) {
138+ // New message format
139+ metaStartIndex := len (msgMagic )
140+ if ! bytes .Equal (b [metaStartIndex :metaStartIndex + len (metaKey )], metaKey ) {
141+ return nil , fmt .Errorf ("bad msg format. \" meta\" key should be after msg magic" )
142+ }
143+
144+ metaSize := binary .BigEndian .Uint16 (b [metaStartIndex + len (metaKey ) : metaStartIndex + len (metaKey )+ len (metaLengthPlaceholder )])
145+ err := msgpack .Unmarshal (b [metaStartIndex + len (metaKey )+ len (metaLengthPlaceholder ):metaStartIndex + len (metaKey )+ len (metaLengthPlaceholder )+ int (metaSize )], & msg )
146+ if err != nil {
147+ return nil , err
148+ }
149+
150+ bodyStartIndex := metaStartIndex + len (bodyKey ) + len (metaLengthPlaceholder ) + int (metaSize )
151+ if ! bytes .Equal (b [bodyStartIndex :bodyStartIndex + len (bodyKey )], bodyKey ) {
152+ return nil , fmt .Errorf ("bad msg format. \" body\" key should be after meta content" )
153+ }
154+ bodySize := binary .BigEndian .Uint64 (b [bodyStartIndex + len (bodyKey ) : bodyStartIndex + len (bodyKey )+ len (bodyLengthPlaceholder )])
155+ msg .Body = b [bodyStartIndex + len (bodyKey )+ len (bodyLengthPlaceholder ) : uint64 (bodyStartIndex + len (bodyKey )+ len (bodyLengthPlaceholder ))+ bodySize ]
156+ } else {
157+ // Old message format
158+ msg .Timestamp = int64 (binary .BigEndian .Uint64 (b [:8 ]))
159+ msg .Attempts = binary .BigEndian .Uint16 (b [8 :10 ])
160+ copy (msg .ID [:], b [10 :10 + MsgIDLength ])
161+ msg .Body = b [10 + MsgIDLength :]
83162 }
84163
85- msg .Timestamp = int64 (binary .BigEndian .Uint64 (b [:8 ]))
86- msg .Attempts = binary .BigEndian .Uint16 (b [8 :10 ])
87- copy (msg .ID [:], b [10 :10 + MsgIDLength ])
88- msg .Body = b [10 + MsgIDLength :]
89-
90164 return & msg , nil
91165}
92166
0 commit comments