diff --git a/.vscode/launch.json b/.vscode/launch.json index 3bf3fa05..d9318572 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -6,8 +6,6 @@ "type": "go", "request": "launch", "mode": "debug", - "remotePath": "", - "port": 2345, "host": "127.0.0.1", "program": "${fileDirname}", "env": {}, diff --git a/emitter.conf b/emitter.conf index 5c7daa21..d7da7e84 100644 --- a/emitter.conf +++ b/emitter.conf @@ -1,4 +1,5 @@ { + "license": "PfA8IFFbf_BHbe8gjlq7e2E7fHb26zdEP9OtQuxONRWlmRiaEMVZzVTm1KOGzNVmlXJ6BxGE5ZDWBNT36-QOAQ:3", "listen": ":8080", "tls": { "listen": ":443" diff --git a/internal/message/message.go b/internal/message/message.go index 224202ef..0abd103a 100644 --- a/internal/message/message.go +++ b/internal/message/message.go @@ -45,6 +45,11 @@ func (m *Message) Size() int64 { return int64(len(m.Payload)) } +// TotalSize returns the total byte size of the message, including the ID and channel. +func (m *Message) TotalSize() int64 { + return int64(len(m.Payload) + len(m.ID) + len(m.Channel)) +} + // Time gets the time of the key, adjusted. func (m *Message) Time() int64 { return m.ID.Time() @@ -145,6 +150,18 @@ func (f *Frame) Limit(n int) { } } +// LimitPayloadSize limits the payload size of the frame. +func (f *Frame) LimitPayloadSize(frame Frame, maxPayloadSize int64) { + var sum int64 + for i := 0; i < len(frame); i++ { + sum += frame[i].TotalSize() + if sum >= maxPayloadSize { + *f = frame[:i] + return + } + } +} + // Encode encodes the message frame func (f *Frame) Encode() []byte { diff --git a/internal/network/mqtt/mqtt.go b/internal/network/mqtt/mqtt.go index fde82aff..540937f2 100644 --- a/internal/network/mqtt/mqtt.go +++ b/internal/network/mqtt/mqtt.go @@ -21,8 +21,16 @@ import ( ) const ( - maxHeaderSize = 6 // max MQTT header size - MaxMessageSize = 65536 // max MQTT message size is impossible to increase as per protocol (uint16 len) + maxHeaderSize = 6 // max MQTT header size + + /* Original comment: "[65536] max MQTT message size is impossible to increase as per protocol (uint16 len)" + This is not true according the MQTT 3.1.1 spec, the max size is 256MB. + Official spec: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html + Also see this article for simplified explanation: https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels/ + However, as noted in most sources, brokers could and should impose a much sensible limit. + 65536 is the original limit in Emitter and seems sensible. This number will be used as the payload limit. + */ + MaxMessageSize = 65536 ) // ErrMessageTooLarge occurs when a message encoded/decoded is larger than max MQTT frame. diff --git a/internal/provider/storage/ssd.go b/internal/provider/storage/ssd.go index 31cc81df..463aafa3 100644 --- a/internal/provider/storage/ssd.go +++ b/internal/provider/storage/ssd.go @@ -144,7 +144,8 @@ func (s *SSD) Query(ssid message.Ssid, from, until time.Time, startFromID messag } } - match.Limit(limit) + match.Limit(limit) // Limit the number of messages. + match.LimitPayloadSize(match, mqtt.MaxMessageSize) return match, nil } @@ -196,7 +197,7 @@ func (s *SSD) lookup(q lookupQuery) (matches message.Frame) { it.Next() } - matchesSize := 0 + var matchesSize int64 = 0 // Seek the prefix and check the key so we can quickly exit the iteration. for ; it.Valid() && message.ID(it.Item().Key()).HasPrefix(q.Ssid, q.From) && @@ -211,7 +212,12 @@ func (s *SSD) lookup(q lookupQuery) (matches message.Frame) { continue } - if matchesSize += len(msg.Payload) + len(msg.ID) + len(msg.Channel); matchesSize > mqtt.MaxMessageSize { + // MaxMessageSize is the maximum size of the payload of an MQTT message in Emitter. See comment on mqtt.MaxMessageSize. + + // STILL BUGGY: messages are ingested based on the size of the payload, not including the size of the topic... REALLY? + // Technically, this algorithms would try to send a message longer than the maximum size of the Payload as defined in mqtt.MaxMessageSize + // This means, through History, you might not be able to retrieve a message whose Payload is still within the maximum size of the MQTT message. + if matchesSize += msg.TotalSize(); matchesSize > mqtt.MaxMessageSize { break } diff --git a/internal/service/history/history_test.go b/internal/service/history/history_test.go index 03e7e569..d0e38376 100644 --- a/internal/service/history/history_test.go +++ b/internal/service/history/history_test.go @@ -15,10 +15,13 @@ package history import ( + "crypto/rand" "encoding/json" + "fmt" "testing" "github.com/emitter-io/emitter/internal/message" + "github.com/emitter-io/emitter/internal/network/mqtt" "github.com/emitter-io/emitter/internal/provider/storage" "github.com/emitter-io/emitter/internal/security" "github.com/emitter-io/emitter/internal/service/fake" @@ -82,3 +85,118 @@ func TestHistory(t *testing.T) { // The response should have returned the last 2 messages. assert.Equal(t, 2, len(response.(*Response).Messages)) } + +func TestLargeMessage(t *testing.T) { + ssid := message.Ssid{1, 3238259379, 500706888, 1027807523} + store := storage.NewInMemory(nil) + store.Configure(nil) + auth := &fake.Authorizer{ + Success: true, + Contract: uint32(1), + ExtraPerm: security.AllowLoad, + } + // Create new service + service := New(auth, store) + connection := &fake.Conn{} + + // The most basic request, on an empty store. + request := &Request{ + Key: "key", + Channel: "key/a/b/c/", + } + + // Store 1 long message + // Keep in mind the message will be composed of the ID and the channel size on top of the payload. + // So mqttMaxMessageSize is really smaller than the actual message size. + randomBytes := make([]byte, mqtt.MaxMessageSize) + rand.Read(randomBytes) + firstSSID := message.NewID(ssid) + store.Store(&message.Message{ + ID: firstSSID, + Channel: []byte("a/b/c/"), + Payload: randomBytes, + TTL: 30, + }) + + reqBytes, _ := json.Marshal(request) + + // Issue the same request + response, ok := service.OnRequest(connection, reqBytes) + // The request should have succeeded and returned a response. + assert.Equal(t, true, ok) + // The response should have returned the last message as per MQTT spec. + assert.Equal(t, 0, len(response.(*Response).Messages)) +} + +// ONLY PASSES BECAUSE OF THE BUG, THERE IS ONLY ONE SERVER SO NO GATHER +// match.Limit(limit) only limits based on the number of messages not the size of the frame +/*func (s *SSD) Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error) { + + // Construct a query and lookup locally first + query := newLookupQuery(ssid, from, until, startFromID, limit) + match := s.lookup(query) + + // Issue the message survey to the cluster + if req, err := binary.Marshal(query); err == nil && s.survey != nil { + if awaiter, err := s.survey.Query("ssdstore", req); err == nil { + + // Wait for all presence updates to come back (or a deadline) + for _, resp := range awaiter.Gather(2000 * time.Millisecond) { + if frame, err := message.DecodeFrame(resp); err == nil { + match = append(match, frame...) + } + } + } + } + + match.Limit(limit) + return match, nil +}*/ +func TestSumOfTwoExceedMaxSize(t *testing.T) { + ssid := message.Ssid{1, 3238259379, 500706888, 1027807523} + store := storage.NewInMemory(nil) + store.Configure(nil) + auth := &fake.Authorizer{ + Success: true, + Contract: uint32(1), + ExtraPerm: security.AllowLoad, + } + // Create new service + service := New(auth, store) + connection := &fake.Conn{} + + // The most basic request, on an empty store. + request := &Request{ + Key: "key", + Channel: "key/a/b/c/", + } + + // Store 2 messages + firstSSID := message.NewID(ssid) + fmt.Println(int(mqtt.MaxMessageSize - len(firstSSID) - len("test/") - 1)) // KEYSIZE??? + //randomBytes := make([]byte, int(mqtt.MaxMessageSize-len(firstSSID)-len("a/b/c/")-1)) // BUG: MaxMessageSize represents the maximum size of the payload, but the message is composed of the ID, the channel size and the payload. + randomBytes := make([]byte, int(mqtt.MaxMessageSize)) + rand.Read(randomBytes) + err := store.Store(&message.Message{ + ID: firstSSID, + Channel: []byte("a/b/c/"), + Payload: randomBytes, + TTL: 30, + }) + assert.NoError(t, err) + store.Store(&message.Message{ + ID: message.NewID(ssid), + Channel: []byte("a/b/c/"), + Payload: randomBytes, + TTL: 30, + }) + reqBytes, _ := json.Marshal(request) + + request.Channel = "key/a/b/c/?last=2" + reqBytes, _ = json.Marshal(request) + response, ok := service.OnRequest(connection, reqBytes) + // The request should have succeeded and returned a response. + assert.Equal(t, true, ok) + // The response should have returned the last 2 messages. + assert.Equal(t, 1, len(response.(*Response).Messages)) +}