Skip to content

Commit 24cece9

Browse files
committed
Read sender ssrc for RTCP RR, set ReadFile timeout
1 parent 311de57 commit 24cece9

File tree

5 files changed

+143
-140
lines changed

5 files changed

+143
-140
lines changed

config/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type InterfacesConfig struct {
2323
WriteFile string `config:"write_file"`
2424
Snaplen int `config:"snaplen"`
2525
BufferSizeMb int `config:"buffer_size_mb"`
26-
TopSpeed bool `config:"top_speed"`
26+
ReadSpeed bool `config:"top_speed"`
2727
OneAtATime bool `config:"one_at_a_time"`
2828
Loop int `config:"loop"`
2929
}

decoder/decoder.go

+12-14
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package decoder
22

33
import (
44
"bytes"
5-
"errors"
65
"hash"
76
"os"
87
"strconv"
@@ -31,7 +30,7 @@ type Decoder struct {
3130
unknownCount int
3231
IPFlow gopacket.Flow
3332
UDPFlow gopacket.Flow
34-
lru *lru.ARCCache
33+
lru *lru.Cache
3534
bigcache *bigcache.BigCache
3635
hash hash.Hash64
3736
}
@@ -57,7 +56,7 @@ func NewDecoder() *Decoder {
5756
host = "sniffer"
5857
}
5958

60-
la, err := lru.NewARC(8192)
59+
la, err := lru.New(8000)
6160
if err != nil {
6261
logp.Err("lru %v", err)
6362
}
@@ -68,17 +67,17 @@ func NewDecoder() *Decoder {
6867
// number of shards (must be a power of 2)
6968
Shards: 1024,
7069
// time after which entry can be evicted
71-
LifeWindow: 10 * time.Minute,
70+
LifeWindow: 180 * time.Minute,
7271
// rps * lifeWindow, used only in initial memory allocation
7372
MaxEntriesInWindow: 1000 * 180 * 60,
7473
// max entry size in bytes, used only in initial memory allocation
75-
MaxEntrySize: 384,
74+
MaxEntrySize: 300,
7675
// prints information about additional memory allocation
7776
Verbose: true,
7877
// cache will not allocate more memory than this limit, value in MB
7978
// if value is reached then the oldest entries can be overridden for the new ones
8079
// 0 value means no size limit
81-
HardMaxCacheSize: 1024,
80+
HardMaxCacheSize: 512,
8281
// callback fired when the oldest entry is removed because of its
8382
// expiration time or no space left for the new entry. Default value is nil which
8483
// means no callback and it prevents from unwrapping the oldest entry.
@@ -239,7 +238,7 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error
239238
return nil, nil
240239
}
241240

242-
func (d *Decoder) cacheSDPIPPort(payload []byte) error {
241+
func (d *Decoder) cacheSDPIPPort(payload []byte) {
243242
var SDPIP, RTCPPort string
244243
var callID []byte
245244

@@ -248,38 +247,37 @@ func (d *Decoder) cacheSDPIPPort(payload []byte) error {
248247
if posRestIP := bytes.Index(restIP, []byte("\r\n")); posRestIP >= 0 {
249248
SDPIP = string(restIP[len("c=IN IP4 "):bytes.Index(restIP, []byte("\r\n"))])
250249
} else {
251-
return errors.New("Couldn't find end of SDP IP")
250+
logp.Warn("Couldn't find end of SDP IP in '%s'", string(restIP))
252251
}
253252

254253
restPort := payload[posSDPPort:]
255254
if posRestPort := bytes.Index(restIP, []byte(" RTP")); posRestPort >= 0 {
256255
SDPPort, err := strconv.Atoi(string(restPort[len("m=audio "):bytes.Index(restPort, []byte(" RTP"))]))
257256
if err != nil {
258-
return err
257+
logp.Warn("%v", err)
259258
}
260259
RTCPPort = strconv.Itoa(SDPPort + 1)
261260
} else {
262-
return errors.New("Couldn't find end of SDP Port")
261+
logp.Warn("Couldn't find end of SDP Port in '%s'", string(restPort))
263262
}
264263

265264
if posCallID := bytes.Index(payload, []byte("Call-ID: ")); posCallID >= 0 {
266265
restCallID := payload[posCallID:]
267266
if posRestCallID := bytes.Index(restIP, []byte("\r\n")); posRestCallID >= 0 {
268267
callID = restCallID[len("Call-ID: "):bytes.Index(restCallID, []byte("\r\n"))]
269268
} else {
270-
return errors.New("Couldn't find end of Call-ID")
269+
logp.Warn("Couldn't find end of Call-ID in '%s'", string(restCallID))
271270
}
272271
} else if posID := bytes.Index(payload, []byte("i: ")); posID >= 0 {
273272
restID := payload[posID:]
274273
if posRestID := bytes.Index(restIP, []byte("\r\n")); posRestID >= 0 {
275274
callID = restID[len("i: "):bytes.Index(restID, []byte("\r\n"))]
276275
} else {
277-
return errors.New("Couldn't find end of Call-ID")
276+
logp.Warn("Couldn't find end of Call-ID in '%s'", string(restID))
278277
}
279278
}
280279
d.bigcache.Set(SDPIP+RTCPPort, callID)
281280
}
282-
return nil
283281
}
284282

285283
func (d *Decoder) correlateRTCP(payload []byte) ([]byte, []byte, byte) {
@@ -295,6 +293,6 @@ func (d *Decoder) correlateRTCP(payload []byte) ([]byte, []byte, byte) {
295293
return nil, nil, 0
296294
}
297295

298-
//fmt.Println(string(jsonRTCP))
296+
logp.Debug("decoder", "RTCP JSON payload: %s", string(jsonRTCP))
299297
return jsonRTCP, corrID, 5
300298
}

main.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ func parseFlags() {
2828
flag.StringVar(&ifaceConfig.Type, "t", "pcap", "Capture types are [af_packet, pcap, file]")
2929
flag.StringVar(&ifaceConfig.ReadFile, "rf", "", "Read packets from file. Please use -t file")
3030
flag.StringVar(&ifaceConfig.WriteFile, "wf", "", "Write packets to file")
31-
flag.IntVar(&ifaceConfig.Loop, "lp", 0, "Loop count over ReadFile")
32-
flag.BoolVar(&ifaceConfig.TopSpeed, "ts", false, "Topspeed uses timestamps from packets")
31+
flag.IntVar(&ifaceConfig.Loop, "lp", 1, "Loop count over ReadFile")
32+
flag.BoolVar(&ifaceConfig.ReadSpeed, "rs", false, "Maximum read speed. Doesn't use packet timestamps")
3333
flag.IntVar(&ifaceConfig.Snaplen, "s", 32768, "Snap length")
3434
flag.IntVar(&ifaceConfig.BufferSizeMb, "b", 64, "Interface buffersize (MB)")
3535
flag.IntVar(&keepLogFiles, "kl", 4, "Rotate the number of log files")

0 commit comments

Comments
 (0)