@@ -5,13 +5,10 @@ import (
5
5
"hash"
6
6
"os"
7
7
"strconv"
8
- "time"
9
8
10
- "github.com/allegro/bigcache"
11
9
"github.com/cespare/xxhash"
12
10
"github.com/google/gopacket"
13
11
"github.com/google/gopacket/layers"
14
- "github.com/hashicorp/golang-lru"
15
12
"github.com/negbie/heplify/config"
16
13
"github.com/negbie/heplify/ip4defrag"
17
14
"github.com/negbie/heplify/logp"
@@ -28,11 +25,12 @@ type Decoder struct {
28
25
tcpCount int
29
26
dnsCount int
30
27
unknownCount int
31
- IPFlow gopacket. Flow
32
- UDPFlow gopacket. Flow
28
+ FlowSrcIP string
29
+ FlowSrcPort string
33
30
SIPHash hash.Hash64
34
- SIPCache * lru.Cache
35
- RTCPCache * bigcache.BigCache
31
+ SIPCache * Cache
32
+ SDPCache * Cache
33
+ RTCPCache * Cache
36
34
}
37
35
38
36
type Packet struct {
@@ -56,38 +54,10 @@ func NewDecoder() *Decoder {
56
54
host = "sniffer"
57
55
}
58
56
59
- sh := xxhash .New ()
60
-
61
- sc , err := lru .New (8000 )
62
- if err != nil {
63
- logp .Err ("lru %v" , err )
64
- }
65
-
66
- rcConf := bigcache.Config {
67
- // number of shards (must be a power of 2)
68
- Shards : 1024 ,
69
- // time after which entry can be evicted
70
- LifeWindow : 180 * time .Minute ,
71
- // rps * lifeWindow, used only in initial memory allocation
72
- MaxEntriesInWindow : 1000 * 180 * 60 ,
73
- // max entry size in bytes, used only in initial memory allocation
74
- MaxEntrySize : 300 ,
75
- // prints information about additional memory allocation
76
- Verbose : false ,
77
- // cache will not allocate more memory than this limit, value in MB
78
- // if value is reached then the oldest entries can be overridden for the new ones
79
- // 0 value means no size limit
80
- HardMaxCacheSize : 512 ,
81
- // callback fired when the oldest entry is removed because of its
82
- // expiration time or no space left for the new entry. Default value is nil which
83
- // means no callback and it prevents from unwrapping the oldest entry.
84
- OnRemove : nil ,
85
- }
86
-
87
- rc , err := bigcache .NewBigCache (rcConf )
88
- if err != nil {
89
- logp .Err ("bigcache %v" , err )
90
- }
57
+ hSIP := xxhash .New ()
58
+ cSIP := NewLRUCache (4000 )
59
+ cSDP := NewLRUCache (10000 )
60
+ cRTCP := NewLRUCache (100000 )
91
61
92
62
d := & Decoder {
93
63
Host : host ,
@@ -99,9 +69,10 @@ func NewDecoder() *Decoder {
99
69
tcpCount : 0 ,
100
70
dnsCount : 0 ,
101
71
unknownCount : 0 ,
102
- SIPHash : sh ,
103
- SIPCache : sc ,
104
- RTCPCache : rc ,
72
+ SIPHash : hSIP ,
73
+ SIPCache : cSIP ,
74
+ SDPCache : cSDP ,
75
+ RTCPCache : cRTCP ,
105
76
}
106
77
go d .flushFrag ()
107
78
go d .printStats ()
@@ -127,8 +98,7 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error
127
98
128
99
if config .Cfg .Dedup {
129
100
d .SIPHash .Write (ip4 .Payload )
130
- //key := fastHash(ip4.Payload)
131
- key := d .SIPHash .Sum64 ()
101
+ key := strconv .FormatUint (d .SIPHash .Sum64 (), 10 )
132
102
d .SIPHash .Reset ()
133
103
_ , dup := d .SIPCache .Get (key )
134
104
d .SIPCache .Add (key , nil )
@@ -138,7 +108,7 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error
138
108
}
139
109
}
140
110
141
- d .IPFlow = ip4 .NetworkFlow ()
111
+ d .FlowSrcIP = ip4 .NetworkFlow (). Src (). String ()
142
112
d .ip4Count ++
143
113
144
114
pkt .Version = ip4 .Version
@@ -180,7 +150,7 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error
180
150
return nil , nil
181
151
}
182
152
183
- d .UDPFlow = udp .TransportFlow ()
153
+ d .FlowSrcPort = udp .TransportFlow (). Src (). String ()
184
154
d .udpCount ++
185
155
186
156
pkt .Sport = uint16 (udp .SrcPort )
@@ -190,7 +160,7 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error
190
160
191
161
if config .Cfg .Mode == "SIPRTCP" {
192
162
d .cacheSDPIPPort (udp .Payload )
193
- if (udp .Payload [0 ]& 0xc0 )>> 6 == 2 && (udp .Payload [1 ] == 200 || udp .Payload [1 ] == 201 ) {
163
+ if (udp .Payload [0 ]& 0xc0 )>> 6 == 2 && udp . SrcPort % 2 != 0 && udp . DstPort % 2 != 0 && (udp .Payload [1 ] == 200 || udp .Payload [1 ] == 201 ) {
194
164
pkt .Payload , pkt .CorrelationID , pkt .Type = d .correlateRTCP (udp .Payload )
195
165
}
196
166
}
@@ -276,23 +246,28 @@ func (d *Decoder) cacheSDPIPPort(payload []byte) {
276
246
logp .Warn ("Couldn't find end of Call-ID in '%s'" , string (restID ))
277
247
}
278
248
}
279
- d .RTCPCache . Set (SDPIP + RTCPPort , callID )
249
+ d .SDPCache . Add (SDPIP + RTCPPort , callID )
280
250
}
281
251
}
282
252
283
253
func (d * Decoder ) correlateRTCP (payload []byte ) ([]byte , []byte , byte ) {
284
- jsonRTCP , err := protos .ParseRTCP (payload )
285
- if err != nil {
286
- logp .Warn ("%v" , err )
287
- return nil , nil , 0
254
+ jsonRTCP , info := protos .ParseRTCP (payload )
255
+ if info != "" {
256
+ logp .Info ("%v" , info )
257
+ if jsonRTCP == nil {
258
+ return nil , nil , 0
259
+ }
288
260
}
289
261
290
- corrID , err := d .RTCPCache .Get (d .IPFlow .Src ().String () + d .UDPFlow .Src ().String ())
291
- if err != nil {
292
- logp .Warn ("%v" , err )
293
- return nil , nil , 0
262
+ if corrID , ok := d .SDPCache .Get (d .FlowSrcIP + d .FlowSrcPort ); ok {
263
+ logp .Debug ("decoder" , "SDPCache RTCP JSON payload: %s" , string (jsonRTCP ))
264
+ d .RTCPCache .Add (d .FlowSrcIP + d .FlowSrcPort , corrID )
265
+ return jsonRTCP , corrID , 5
266
+ } else if corrID , ok := d .RTCPCache .Get (d .FlowSrcIP + d .FlowSrcPort ); ok {
267
+ logp .Debug ("decoder" , "RTCPCache RTCP JSON payload: %s" , string (jsonRTCP ))
268
+ return jsonRTCP , corrID , 5
294
269
}
295
270
296
- logp .Debug ( "decoder" , " RTCP JSON payload: %s " , string ( jsonRTCP ) )
297
- return jsonRTCP , corrID , 5
271
+ logp .Info ( "Couldn't find RTCP correlation value for key=%v " , d . FlowSrcIP + d . FlowSrcPort )
272
+ return nil , nil , 0
298
273
}
0 commit comments