Skip to content

Commit d686aae

Browse files
committed
Simple LRU cache, more checks
1 parent 5190bdd commit d686aae

File tree

3 files changed

+123
-76
lines changed

3 files changed

+123
-76
lines changed

decoder/decoder.go

+23-37
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ package decoder
22

33
import (
44
"bytes"
5+
"fmt"
56
"hash"
67
"os"
78
"strconv"
89

910
"github.com/cespare/xxhash"
1011
"github.com/google/gopacket"
1112
"github.com/google/gopacket/layers"
12-
"github.com/hashicorp/golang-lru"
1313
"github.com/negbie/heplify/config"
1414
"github.com/negbie/heplify/ip4defrag"
1515
"github.com/negbie/heplify/logp"
@@ -26,12 +26,12 @@ type Decoder struct {
2626
tcpCount int
2727
dnsCount int
2828
unknownCount int
29-
IPFlow gopacket.Flow
30-
UDPFlow gopacket.Flow
29+
FlowSrcIP string
30+
FlowSrcPort string
3131
SIPHash hash.Hash64
32-
SIPCache *lru.Cache
33-
SDPCache *lru.Cache
34-
RTCPCache *lru.Cache
32+
SIPCache *Cache
33+
SDPCache *Cache
34+
RTCPCache *Cache
3535
}
3636

3737
type Packet struct {
@@ -56,21 +56,9 @@ func NewDecoder() *Decoder {
5656
}
5757

5858
hSIP := xxhash.New()
59-
60-
cSIP, err := lru.New(2000)
61-
if err != nil {
62-
logp.Err("SIPCache %v", err)
63-
}
64-
65-
cSDP, err := lru.New(10000)
66-
if err != nil {
67-
logp.Err("SDPCache %v", err)
68-
}
69-
70-
cRTCP, err := lru.New(10000)
71-
if err != nil {
72-
logp.Err("RTCPCache %v", err)
73-
}
59+
cSIP := NewLRUCache(4000)
60+
cSDP := NewLRUCache(10000)
61+
cRTCP := NewLRUCache(100000)
7462

7563
d := &Decoder{
7664
Host: host,
@@ -112,7 +100,7 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error
112100
if config.Cfg.Dedup {
113101
d.SIPHash.Write(ip4.Payload)
114102
//key := fastHash(ip4.Payload)
115-
key := d.SIPHash.Sum64()
103+
key := strconv.FormatUint(d.SIPHash.Sum64(), 10)
116104
d.SIPHash.Reset()
117105
_, dup := d.SIPCache.Get(key)
118106
d.SIPCache.Add(key, nil)
@@ -122,7 +110,7 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error
122110
}
123111
}
124112

125-
d.IPFlow = ip4.NetworkFlow()
113+
d.FlowSrcIP = ip4.NetworkFlow().Src().String()
126114
d.ip4Count++
127115

128116
pkt.Version = ip4.Version
@@ -164,7 +152,7 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error
164152
return nil, nil
165153
}
166154

167-
d.UDPFlow = udp.TransportFlow()
155+
d.FlowSrcPort = udp.TransportFlow().Src().String()
168156
d.udpCount++
169157

170158
pkt.Sport = uint16(udp.SrcPort)
@@ -265,27 +253,25 @@ func (d *Decoder) cacheSDPIPPort(payload []byte) {
265253
}
266254

267255
func (d *Decoder) correlateRTCP(payload []byte) ([]byte, []byte, byte) {
268-
jsonRTCP, err := protos.ParseRTCP(payload)
269-
if err != nil {
270-
logp.Warn("%v", err)
256+
jsonRTCP, info := protos.ParseRTCP(payload)
257+
if info != "" {
258+
logp.Info("%v", info)
271259
if jsonRTCP == nil {
272260
return nil, nil, 0
273261
}
274262
}
275263

276-
if corrID, ok := d.SDPCache.Get(d.IPFlow.Src().String() + d.UDPFlow.Src().String()); ok {
264+
if corrID, ok := d.SDPCache.Get(d.FlowSrcIP + d.FlowSrcPort); ok {
277265
logp.Debug("decoder", "SDPCache RTCP JSON payload: %s", string(jsonRTCP))
278-
d.RTCPCache.Add(d.IPFlow.Src().String()+d.UDPFlow.Src().String(), corrID)
279-
//fmt.Println(string(jsonRTCP))
280-
//fmt.Println(string(corrID.([]byte)))
281-
return jsonRTCP, corrID.([]byte), 5
282-
} else if corrID, ok := d.RTCPCache.Get(d.IPFlow.Src().String() + d.UDPFlow.Src().String()); ok {
266+
d.RTCPCache.Add(d.FlowSrcIP+d.FlowSrcPort, corrID)
267+
fmt.Println(string(jsonRTCP))
268+
fmt.Println(string(corrID))
269+
return jsonRTCP, corrID, 5
270+
} else if corrID, ok := d.RTCPCache.Get(d.FlowSrcIP + d.FlowSrcPort); ok {
283271
logp.Debug("decoder", "RTCPCache RTCP JSON payload: %s", string(jsonRTCP))
284-
d.RTCPCache.Add(d.IPFlow.Src().String()+d.UDPFlow.Src().String(), corrID)
285-
return jsonRTCP, corrID.([]byte), 5
286-
} else {
287-
logp.Warn("Couldn't find RTCP correlation value for key=%v", d.IPFlow.Src().String()+d.UDPFlow.Src().String())
272+
return jsonRTCP, corrID, 5
288273
}
289274

275+
logp.Info("Couldn't find RTCP correlation value for key=%v", d.FlowSrcIP+d.FlowSrcPort)
290276
return nil, nil, 0
291277
}

decoder/util.go

+67
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package decoder
22

33
import (
4+
"container/list"
45
"encoding/binary"
56
"net"
7+
"sync"
68
"time"
79

810
"github.com/negbie/heplify/logp"
@@ -46,3 +48,68 @@ func (d *Decoder) printStats() {
4648
}()
4749
}
4850
}
51+
52+
type cacheValue struct {
53+
key string
54+
bytes []byte
55+
}
56+
57+
// Just an estimate
58+
func (v *cacheValue) size() uint64 {
59+
return uint64(len([]byte(v.key)) + len(v.bytes))
60+
}
61+
62+
type Cache struct {
63+
sync.Mutex
64+
Size uint64
65+
capacity uint64
66+
list *list.List
67+
table map[string]*list.Element
68+
}
69+
70+
// NewLRUCache with a maximum size of capacity bytes.
71+
func NewLRUCache(capacity uint64) *Cache {
72+
return &Cache{
73+
capacity: capacity,
74+
list: list.New(),
75+
table: make(map[string]*list.Element),
76+
}
77+
}
78+
79+
// Set some {key, document} into the cache. Doesn't do anything if the key is already present.
80+
func (c *Cache) Add(key string, document []byte) {
81+
c.Lock()
82+
defer c.Unlock()
83+
84+
_, ok := c.table[key]
85+
if ok {
86+
return
87+
}
88+
v := &cacheValue{key, document}
89+
elt := c.list.PushFront(v)
90+
c.table[key] = elt
91+
c.Size += v.size()
92+
for c.Size > c.capacity {
93+
elt := c.list.Back()
94+
if elt == nil {
95+
return
96+
}
97+
v := c.list.Remove(elt).(*cacheValue)
98+
delete(c.table, v.key)
99+
c.Size -= v.size()
100+
}
101+
}
102+
103+
// Get retrieves a value from the cache and returns the value and an indicator boolean to show whether it was
104+
// present.
105+
func (c *Cache) Get(key string) (document []byte, ok bool) {
106+
c.Lock()
107+
defer c.Unlock()
108+
109+
elt, ok := c.table[key]
110+
if !ok {
111+
return nil, false
112+
}
113+
c.list.MoveToFront(elt)
114+
return elt.Value.(*cacheValue).bytes, true
115+
}

protos/rtcp.go

+33-39
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55
"encoding/hex"
66
"encoding/json"
77
"fmt"
8-
9-
"github.com/negbie/heplify/logp"
108
)
119

1210
/* RTCP header
@@ -134,11 +132,11 @@ const (
134132
)
135133

136134
type RTCP_header struct {
137-
Version uint8 `json:"version"` // 2 bit
138-
Padding uint8 `json:"padding"` // 1 bit
139-
ReceptionReportCount uint8 `json:"report_count"` // 5 bit
140-
RTCPType uint8 `json:"type"` // 8 bit
141-
Length uint16 `json:"length"` // 16 bit
135+
Version uint8 `json:"version"` // 2 bit
136+
Padding uint8 `json:"padding"` // 1 bit
137+
ReportCount uint8 `json:"report_count"` // 5 bit
138+
RTCPType uint8 `json:"type"` // 8 bit
139+
Length uint16 `json:"length"` // 16 bit
142140
}
143141

144142
type RTCP_Packet struct {
@@ -175,40 +173,37 @@ func (rp *RTCP_Packet) MarshalJSON() ([]byte, error) {
175173
return bytes, err
176174
}
177175

178-
func ParseRTCP(data []byte) ([]byte, error) {
176+
func ParseRTCP(data []byte) (rtcpPkt []byte, infoMsg string) {
179177
dataLen := len(data)
180178
if dataLen < 28 {
181-
return nil, fmt.Errorf("Fishy RTCP packet length=%d in packet:\n%v\n", dataLen, hex.Dump(data))
179+
return nil, fmt.Sprintf("Fishy RTCP dataLen=%d in packet:\n%v", dataLen, hex.Dump(data))
182180
}
183181
var err error
184182
pkt := &RTCP_Packet{}
185-
rtcpPkt := []byte{}
186183
offset := 0
187184

188185
for dataLen > 0 {
189186
if dataLen < 4 || dataLen > 576 {
190-
return nil, fmt.Errorf("Fishy RTCP packet length=%d in packet:\n%v\n", dataLen, hex.Dump(data))
187+
return rtcpPkt, fmt.Sprintf("Fishy RTCP dataLen=%d in packet:\n%v", dataLen, hex.Dump(data))
191188
}
192189

193-
//version := (data[offset] & 0xc0) >> 6
190+
RTCPVersion := int((data[offset] & 0xc0) >> 6)
194191
//padding := (data[offset] & 0x20) >> 5
195-
receptionReportCount := int(data[offset] & 0x1f)
192+
RTCPReportCount := int(data[offset] & 0x1f)
196193
RTCPType := int(data[offset+1])
197194
RTCPLength := int(binary.BigEndian.Uint16(data[offset+2:]) * 4)
198195
offset += 4
199196

200-
if receptionReportCount < 0 || receptionReportCount > 4 {
201-
return rtcpPkt, fmt.Errorf("Fishy RTCP receptionReportCount=%v type=%d length=%d offset=%d in packet:\n%v", receptionReportCount, RTCPType, dataLen, offset, hex.Dump(data))
202-
} else if RTCPLength > dataLen {
203-
return rtcpPkt, fmt.Errorf("Fishy RTCP report length=%d in packet:\n%v", RTCPLength, hex.Dump(data))
204-
} else if RTCPType < 200 || RTCPType > 207 {
205-
return rtcpPkt, fmt.Errorf("Fishy RTCP type=%d in packet:\n%v", RTCPType, hex.Dump(data))
197+
if RTCPVersion != 2 || RTCPReportCount < 0 || RTCPReportCount > 4 || RTCPType < 200 || RTCPType > 207 || RTCPLength > dataLen {
198+
return rtcpPkt, fmt.Sprintf("Fishy RTCPVersion=%d, RTCPReportCount=%d, RTCPType=%d, RTCPLength=%d, dataLen=%d, offset=%d in packet:\n%v",
199+
RTCPVersion, RTCPReportCount, RTCPType, RTCPLength, dataLen, offset, hex.Dump(data))
206200
}
207201

208202
switch RTCPType {
209203
case TYPE_RTCP_SR:
210204
if RTCPLength < 24 || offset+24 > len(data) {
211-
return rtcpPkt, fmt.Errorf("Fishy RTCP packet=%v length=%d type=%d offset=%d", data, RTCPLength, RTCPType, offset)
205+
return rtcpPkt, fmt.Sprintf("Fishy RTCPVersion=%d, RTCPReportCount=%d, RTCPType=%d, RTCPLength=%d, dataLen=%d, offset=%d in packet:\n%v",
206+
RTCPVersion, RTCPReportCount, RTCPType, RTCPLength, dataLen, offset, hex.Dump(data))
212207
}
213208

214209
pkt.Ssrc = binary.BigEndian.Uint32(data[offset:])
@@ -219,9 +214,9 @@ func ParseRTCP(data []byte) ([]byte, error) {
219214
pkt.SenderInformation.Octet_count = binary.BigEndian.Uint32(data[offset+20:])
220215
offset += 24
221216

222-
if receptionReportCount > 0 && RTCPLength >= 24 && offset+24 <= len(data) {
223-
tmpReportBlocks := make([]RTCP_report_block, receptionReportCount)
224-
for i := 0; i < receptionReportCount; i++ {
217+
if RTCPReportCount > 0 && RTCPLength >= 24 && offset+24 <= len(data) {
218+
tmpReportBlocks := make([]RTCP_report_block, RTCPReportCount)
219+
for i := 0; i < RTCPReportCount; i++ {
225220
tmpReportBlocks[i].SourceSsrc = binary.BigEndian.Uint32(data[offset:])
226221
tmpReportBlocks[i].Fraction_lost = data[offset+4]
227222
var cumBuf [4]byte
@@ -231,7 +226,7 @@ func ParseRTCP(data []byte) ([]byte, error) {
231226
tmpReportBlocks[i].Jitter = binary.BigEndian.Uint32(data[offset+12:])
232227
tmpReportBlocks[i].LastSR = binary.BigEndian.Uint32(data[offset+16:])
233228
tmpReportBlocks[i].Delay_last_SR = binary.BigEndian.Uint32(data[offset+20:])
234-
tmpReportBlocks[i].ReportCount = uint8(receptionReportCount)
229+
tmpReportBlocks[i].ReportCount = uint8(RTCPReportCount)
235230
tmpReportBlocks[i].RTCPType = uint8(RTCPType)
236231
offset += 24
237232
RTCPLength -= 24
@@ -240,20 +235,21 @@ func ParseRTCP(data []byte) ([]byte, error) {
240235
}
241236
rtcpPkt, err = pkt.MarshalJSON()
242237
if err != nil {
243-
return nil, err
238+
return nil, fmt.Sprintf("RTCP MarshalJSON %v", err)
244239
}
245240

246241
case TYPE_RTCP_RR:
247242
if RTCPLength < 4 || offset+4 > len(data) {
248-
return rtcpPkt, fmt.Errorf("Fishy RTCP packet=%v length=%d type=%d offset=%d", data, RTCPLength, RTCPType, offset)
243+
return rtcpPkt, fmt.Sprintf("Fishy RTCPVersion=%d, RTCPReportCount=%d, RTCPType=%d, RTCPLength=%d, dataLen=%d, offset=%d in packet:\n%v",
244+
RTCPVersion, RTCPReportCount, RTCPType, RTCPLength, dataLen, offset, hex.Dump(data))
249245
}
250246

251247
pkt.Ssrc = binary.BigEndian.Uint32(data[offset:])
252248
offset += 4
253249

254-
if receptionReportCount > 0 && RTCPLength >= 24 && offset+24 <= len(data) {
255-
tmpReportBlocks := make([]RTCP_report_block, receptionReportCount)
256-
for i := 0; i < receptionReportCount; i++ {
250+
if RTCPReportCount > 0 && RTCPLength >= 24 && offset+24 <= len(data) {
251+
tmpReportBlocks := make([]RTCP_report_block, RTCPReportCount)
252+
for i := 0; i < RTCPReportCount; i++ {
257253
tmpReportBlocks[i].SourceSsrc = binary.BigEndian.Uint32(data[offset:])
258254
tmpReportBlocks[i].Fraction_lost = data[offset+4]
259255
var cumBuf [4]byte
@@ -263,7 +259,7 @@ func ParseRTCP(data []byte) ([]byte, error) {
263259
tmpReportBlocks[i].Jitter = binary.BigEndian.Uint32(data[offset+12:])
264260
tmpReportBlocks[i].LastSR = binary.BigEndian.Uint32(data[offset+16:])
265261
tmpReportBlocks[i].Delay_last_SR = binary.BigEndian.Uint32(data[offset+20:])
266-
tmpReportBlocks[i].ReportCount = uint8(receptionReportCount)
262+
tmpReportBlocks[i].ReportCount = uint8(RTCPReportCount)
267263
tmpReportBlocks[i].RTCPType = uint8(RTCPType)
268264
offset += 24
269265
RTCPLength -= 24
@@ -272,27 +268,25 @@ func ParseRTCP(data []byte) ([]byte, error) {
272268
}
273269
rtcpPkt, err = pkt.MarshalJSON()
274270
if err != nil {
275-
return nil, err
271+
return nil, fmt.Sprintf("RTCP MarshalJSON %v", err)
276272
}
277273

278274
case TYPE_RTCP_SDES:
279-
logp.Debug("rtcp", "Discard RTCP_SDES packet type=%d", RTCPType)
275+
infoMsg = fmt.Sprintf("Discard RTCP_SDES packet type=%d", RTCPType)
280276
offset += RTCPLength
281277
case TYPE_RTCP_APP:
282-
logp.Debug("rtcp", "Discard RTCP_APP packet type=%d", RTCPType)
278+
infoMsg = fmt.Sprintf("Discard RTCP_APP packet type=%d", RTCPType)
283279
offset += RTCPLength
284280
case TYPE_RTCP_BYE:
285-
logp.Debug("rtcp", "Discard RTCP_BYE packet type=%d", RTCPType)
281+
infoMsg = fmt.Sprintf("Discard RTCP_BYE packet type=%d", RTCPType)
286282
offset += RTCPLength
287283
case TYPE_RTCP_XR:
288-
logp.Debug("rtcp", "Discard RTCP_XR packet type=%d", RTCPType)
284+
infoMsg = fmt.Sprintf("Discard RTCP_XR packet type=%d", RTCPType)
289285
offset += RTCPLength
290-
default:
291-
logp.Warn("rtcp", "Discard unsupported packet type=%d length=%d offset=%d in packet:\n%v", RTCPType, dataLen, offset, hex.Dump(data))
292-
return nil, fmt.Errorf("Discard unsupported packet type: %d", RTCPType)
293286
}
294287

295288
dataLen -= offset
296289
}
297-
return rtcpPkt, nil
290+
291+
return
298292
}

0 commit comments

Comments
 (0)