Skip to content

Commit 0be89c3

Browse files
authored
Avoid eager MQTT response buffer access (#2515)
1 parent a69f4c1 commit 0be89c3

2 files changed

Lines changed: 30 additions & 3 deletions

File tree

pkg/ebpf/common/mqtt_detect_transform.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,16 @@ func packetTypeToMethod(packetType mqttparser.PacketType) string {
5757
// should be ignored for span creation (e.g., control packets like CONNECT).
5858
func ProcessPossibleMQTTEvent(event *TCPRequestInfo, pkt *largebuf.LargeBuffer, rpkt *largebuf.LargeBuffer) (*MQTTInfo, bool, error) {
5959
pktView := pkt.UnsafeView()
60-
rpktView := rpkt.UnsafeView()
6160

6261
m, ignore, err := ProcessMQTTEvent(pktView)
6362
if err != nil {
63+
if rpkt == nil {
64+
return m, ignore, err
65+
}
66+
6467
// If we are getting the information in the response buffer, the event
6568
// must be reversed and that's how we captured it.
66-
m, ignore, err = ProcessMQTTEvent(rpktView)
69+
m, ignore, err = ProcessMQTTEvent(rpkt.UnsafeView())
6770
if err == nil && !ignore {
6871
reverseTCPEvent(event)
6972
}

pkg/ebpf/common/mqtt_detect_transform_test.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,21 @@ func TestProcessPossibleMQTTEvent(t *testing.T) {
269269
PacketID: 0,
270270
},
271271
},
272+
{
273+
name: "PUBLISH in request buffer without response buffer",
274+
request: []byte{
275+
0x30, // PUBLISH, QoS 0
276+
0x0c, // Remaining length: 12
277+
0x00, 0x0a, // Topic length: 10
278+
't', 'e', 's', 't', '/', 't', 'o', 'p', 'i', 'c',
279+
},
280+
expected: &MQTTInfo{
281+
PacketType: mqttparser.PacketTypePUBLISH,
282+
Topic: "test/topic",
283+
QoS: mqttparser.QoSAtMostOnce,
284+
PacketID: 0,
285+
},
286+
},
272287
{
273288
name: "PUBLISH in response buffer (reversed)",
274289
request: []byte{},
@@ -291,12 +306,21 @@ func TestProcessPossibleMQTTEvent(t *testing.T) {
291306
response: []byte{0x00, 0x00, 0x00},
292307
err: true,
293308
},
309+
{
310+
name: "Invalid request without response buffer",
311+
request: []byte{0x00, 0x00, 0x00},
312+
err: true,
313+
},
294314
}
295315

296316
for _, tt := range tests {
297317
t.Run(tt.name, func(t *testing.T) {
298318
event := &TCPRequestInfo{}
299-
res, _, err := ProcessPossibleMQTTEvent(event, largebuf.NewLargeBufferFrom(tt.request), largebuf.NewLargeBufferFrom(tt.response))
319+
var response *largebuf.LargeBuffer
320+
if tt.response != nil {
321+
response = largebuf.NewLargeBufferFrom(tt.response)
322+
}
323+
res, _, err := ProcessPossibleMQTTEvent(event, largebuf.NewLargeBufferFrom(tt.request), response)
300324
if tt.err {
301325
assert.Error(t, err)
302326
return

0 commit comments

Comments
 (0)