1
1
package decoder
2
2
3
3
import (
4
+ "bytes"
4
5
"hash"
5
6
"os"
7
+ "strconv"
8
+ "time"
6
9
10
+ "github.com/allegro/bigcache"
7
11
"github.com/cespare/xxhash"
8
12
"github.com/google/gopacket"
9
13
"github.com/google/gopacket/layers"
@@ -24,7 +28,10 @@ type Decoder struct {
24
28
tcpCount int
25
29
dnsCount int
26
30
unknownCount int
27
- lru * lru.ARCCache
31
+ IPFlow gopacket.Flow
32
+ UDPFlow gopacket.Flow
33
+ lru * lru.Cache
34
+ bigcache * bigcache.BigCache
28
35
hash hash.Hash64
29
36
}
30
37
@@ -40,19 +47,48 @@ type Packet struct {
40
47
Dport uint16
41
48
CorrelationID []byte
42
49
Payload []byte
50
+ Type byte
43
51
}
44
52
45
53
func NewDecoder () * Decoder {
46
-
47
54
host , err := os .Hostname ()
48
55
if err != nil {
49
56
host = "sniffer"
50
57
}
51
- l , err := lru .NewARC (8192 )
58
+
59
+ la , err := lru .New (8000 )
52
60
if err != nil {
53
61
logp .Err ("lru %v" , err )
54
62
}
55
- h := xxhash .New ()
63
+
64
+ xh := xxhash .New ()
65
+
66
+ bConf := bigcache.Config {
67
+ // number of shards (must be a power of 2)
68
+ Shards : 1024 ,
69
+ // time after which entry can be evicted
70
+ LifeWindow : 180 * time .Minute ,
71
+ // rps * lifeWindow, used only in initial memory allocation
72
+ MaxEntriesInWindow : 1000 * 180 * 60 ,
73
+ // max entry size in bytes, used only in initial memory allocation
74
+ MaxEntrySize : 300 ,
75
+ // prints information about additional memory allocation
76
+ Verbose : true ,
77
+ // cache will not allocate more memory than this limit, value in MB
78
+ // if value is reached then the oldest entries can be overridden for the new ones
79
+ // 0 value means no size limit
80
+ HardMaxCacheSize : 512 ,
81
+ // callback fired when the oldest entry is removed because of its
82
+ // expiration time or no space left for the new entry. Default value is nil which
83
+ // means no callback and it prevents from unwrapping the oldest entry.
84
+ OnRemove : nil ,
85
+ }
86
+
87
+ bc , err := bigcache .NewBigCache (bConf )
88
+ if err != nil {
89
+ logp .Err ("bigcache %v" , err )
90
+ }
91
+
56
92
d := & Decoder {
57
93
Host : host ,
58
94
defragger : ip4defrag .NewIPv4Defragmenter (),
@@ -63,8 +99,9 @@ func NewDecoder() *Decoder {
63
99
tcpCount : 0 ,
64
100
dnsCount : 0 ,
65
101
unknownCount : 0 ,
66
- lru : l ,
67
- hash : h ,
102
+ lru : la ,
103
+ hash : xh ,
104
+ bigcache : bc ,
68
105
}
69
106
go d .flushFrag ()
70
107
go d .printStats ()
@@ -101,16 +138,18 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error
101
138
}
102
139
}
103
140
141
+ d .IPFlow = ip4 .NetworkFlow ()
104
142
d .ip4Count ++
143
+
105
144
pkt .Version = ip4 .Version
106
145
pkt .Protocol = uint8 (ip4 .Protocol )
107
146
pkt .Srcip = ip2int (ip4 .SrcIP )
108
147
pkt .Dstip = ip2int (ip4 .DstIP )
109
148
110
149
ip4New , err := d .defragger .DefragIPv4 (ip4 )
111
150
if err != nil {
112
- logp .Err ("Error while de-fragmenting" , err )
113
- return nil , err
151
+ logp .Warn ("Error while de-fragmenting" , err )
152
+ return nil , nil
114
153
} else if ip4New == nil {
115
154
d .fragCount ++
116
155
return nil , nil
@@ -133,21 +172,28 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error
133
172
nextDecoder := ip4New .NextLayerType ()
134
173
nextDecoder .Decode (ip4New .Payload , pb )
135
174
}
136
- // TODO: generate a more meaningful CorrelationID
137
- if config .Cfg .Mode == "DNS" || config .Cfg .Mode == "LOG" || config .Cfg .Mode == "TLS" {
138
- pkt .CorrelationID = []byte (config .Cfg .Mode )
139
- }
140
175
}
141
176
142
177
if udpLayer := packet .Layer (layers .LayerTypeUDP ); udpLayer != nil {
143
178
udp , ok := udpLayer .(* layers.UDP )
144
179
if ! ok {
145
180
return nil , nil
146
181
}
182
+
183
+ d .UDPFlow = udp .TransportFlow ()
147
184
d .udpCount ++
185
+
148
186
pkt .Sport = uint16 (udp .SrcPort )
149
187
pkt .Dport = uint16 (udp .DstPort )
150
188
pkt .Payload = udp .Payload
189
+ pkt .Type = 1
190
+
191
+ if config .Cfg .Mode == "SIPRTCP" {
192
+ d .cacheSDPIPPort (udp .Payload )
193
+ if (udp .Payload [0 ]& 0xc0 )>> 6 == 2 && (udp .Payload [1 ] == 200 || udp .Payload [1 ] == 201 ) {
194
+ pkt .Payload , pkt .CorrelationID , pkt .Type = d .correlateRTCP (udp .Payload )
195
+ }
196
+ }
151
197
152
198
} else if tcpLayer := packet .Layer (layers .LayerTypeTCP ); tcpLayer != nil {
153
199
tcp , ok := tcpLayer .(* layers.TCP )
@@ -158,6 +204,11 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error
158
204
pkt .Sport = uint16 (tcp .SrcPort )
159
205
pkt .Dport = uint16 (tcp .DstPort )
160
206
pkt .Payload = tcp .Payload
207
+ pkt .Type = 1
208
+
209
+ if config .Cfg .Mode == "SIPRTCP" {
210
+ d .cacheSDPIPPort (tcp .Payload )
211
+ }
161
212
}
162
213
163
214
if dnsLayer := packet .Layer (layers .LayerTypeDNS ); dnsLayer != nil {
@@ -186,3 +237,62 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error
186
237
d .unknownCount ++
187
238
return nil , nil
188
239
}
240
+
241
+ func (d * Decoder ) cacheSDPIPPort (payload []byte ) {
242
+ var SDPIP , RTCPPort string
243
+ var callID []byte
244
+
245
+ if posSDPIP , posSDPPort := bytes .Index (payload , []byte ("c=IN IP4 " )), bytes .Index (payload , []byte ("m=audio " )); posSDPIP >= 0 && posSDPPort >= 0 {
246
+ restIP := payload [posSDPIP :]
247
+ if posRestIP := bytes .Index (restIP , []byte ("\r \n " )); posRestIP >= 0 {
248
+ SDPIP = string (restIP [len ("c=IN IP4 " ):bytes .Index (restIP , []byte ("\r \n " ))])
249
+ } else {
250
+ logp .Warn ("Couldn't find end of SDP IP in '%s'" , string (restIP ))
251
+ }
252
+
253
+ restPort := payload [posSDPPort :]
254
+ if posRestPort := bytes .Index (restIP , []byte (" RTP" )); posRestPort >= 0 {
255
+ SDPPort , err := strconv .Atoi (string (restPort [len ("m=audio " ):bytes .Index (restPort , []byte (" RTP" ))]))
256
+ if err != nil {
257
+ logp .Warn ("%v" , err )
258
+ }
259
+ RTCPPort = strconv .Itoa (SDPPort + 1 )
260
+ } else {
261
+ logp .Warn ("Couldn't find end of SDP Port in '%s'" , string (restPort ))
262
+ }
263
+
264
+ if posCallID := bytes .Index (payload , []byte ("Call-ID: " )); posCallID >= 0 {
265
+ restCallID := payload [posCallID :]
266
+ if posRestCallID := bytes .Index (restIP , []byte ("\r \n " )); posRestCallID >= 0 {
267
+ callID = restCallID [len ("Call-ID: " ):bytes .Index (restCallID , []byte ("\r \n " ))]
268
+ } else {
269
+ logp .Warn ("Couldn't find end of Call-ID in '%s'" , string (restCallID ))
270
+ }
271
+ } else if posID := bytes .Index (payload , []byte ("i: " )); posID >= 0 {
272
+ restID := payload [posID :]
273
+ if posRestID := bytes .Index (restIP , []byte ("\r \n " )); posRestID >= 0 {
274
+ callID = restID [len ("i: " ):bytes .Index (restID , []byte ("\r \n " ))]
275
+ } else {
276
+ logp .Warn ("Couldn't find end of Call-ID in '%s'" , string (restID ))
277
+ }
278
+ }
279
+ d .bigcache .Set (SDPIP + RTCPPort , callID )
280
+ }
281
+ }
282
+
283
+ func (d * Decoder ) correlateRTCP (payload []byte ) ([]byte , []byte , byte ) {
284
+ jsonRTCP , err := protos .ParseRTCP (payload )
285
+ if err != nil {
286
+ logp .Warn ("%v" , err )
287
+ return nil , nil , 0
288
+ }
289
+
290
+ corrID , err := d .bigcache .Get (d .IPFlow .Src ().String () + d .UDPFlow .Src ().String ())
291
+ if err != nil {
292
+ logp .Warn ("%v" , err )
293
+ return nil , nil , 0
294
+ }
295
+
296
+ logp .Debug ("decoder" , "RTCP JSON payload: %s" , string (jsonRTCP ))
297
+ return jsonRTCP , corrID , 5
298
+ }
0 commit comments