Skip to content

Commit 972e0d6

Browse files
author
cobeq
committed
fix(packet_pool): reducing latency for single-packet PES by using a heuristic to determine when a single-packet PES is received and to send eventually
1 parent 5181545 commit 972e0d6

File tree

3 files changed

+78
-1
lines changed

3 files changed

+78
-1
lines changed

data.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,3 +182,26 @@ func isPSIComplete(ps []*Packet) bool {
182182

183183
return i.Len() >= i.Offset()
184184
}
185+
186+
// isPESComplete checks whether payload fully contains PES packet
187+
func isPESComplete(payload []byte) bool {
188+
i := astikit.NewBytesIterator(payload)
189+
190+
i.Seek(4)
191+
192+
// Get next bytes
193+
var bs []byte
194+
var err error
195+
if bs, err = i.NextBytesNoCopy(2); err != nil {
196+
return false
197+
}
198+
199+
pesLength := uint16(bs[0])<<8 | uint16(bs[1])
200+
201+
if pesLength == 0 {
202+
// any length
203+
return false
204+
}
205+
206+
return int(pesLength)+pesHeaderLength <= len(payload)
207+
}

packet_pool.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ func newPacketAccumulator(pid uint16, programMap *programMap) *packetAccumulator
2323
func (b *packetAccumulator) add(p *Packet) (ps []*Packet) {
2424
mps := b.q
2525

26+
var needSkipBufferMPS bool
2627
// Empty buffer if we detect a discontinuity
2728
if hasDiscontinuity(mps, p) {
2829
// Reset current slice or make new
@@ -31,6 +32,8 @@ func (b *packetAccumulator) add(p *Packet) (ps []*Packet) {
3132
} else {
3233
mps = make([]*Packet, 0, 10)
3334
}
35+
} else {
36+
needSkipBufferMPS = len(mps) == 0 || isPacketsAlreadySent(mps)
3437
}
3538

3639
// Throw away packet if it's the same as the previous one
@@ -40,7 +43,9 @@ func (b *packetAccumulator) add(p *Packet) (ps []*Packet) {
4043

4144
// Flush buffer if new payload starts here
4245
if p.Header.PayloadUnitStartIndicator {
43-
ps = mps
46+
if !needSkipBufferMPS {
47+
ps = mps
48+
}
4449
mps = make([]*Packet, 0, cap(mps))
4550
}
4651

@@ -52,6 +57,8 @@ func (b *packetAccumulator) add(p *Packet) (ps []*Packet) {
5257
isPSIComplete(mps) {
5358
ps = mps
5459
mps = nil
60+
} else if needSkipBufferMPS && isPayloadCompletePES(p) {
61+
ps = mps
5562
}
5663

5764
b.q = mps
@@ -110,6 +117,10 @@ func (b *packetPool) dumpUnlocked() (ps []*Packet) {
110117
ps = b.b[uint32(k)].q
111118
delete(b.b, uint32(k))
112119
if len(ps) > 0 {
120+
if isPacketsAlreadySent(ps) {
121+
ps = nil
122+
continue
123+
}
113124
return
114125
}
115126
}
@@ -150,3 +161,11 @@ func isSameAsPrevious(ps []*Packet, p *Packet) bool {
150161
l := len(ps)
151162
return l > 0 && p.Header.HasPayload && p.Header.ContinuityCounter == ps[l-1].Header.ContinuityCounter
152163
}
164+
165+
func isPayloadCompletePES(packet *Packet) bool {
166+
return packet.Header.PayloadUnitStartIndicator && isPESPayload(packet.Payload) && isPESComplete(packet.Payload)
167+
}
168+
169+
func isPacketsAlreadySent(ps []*Packet) bool {
170+
return len(ps) == 1 && isPayloadCompletePES(ps[0])
171+
}

packet_pool_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,38 @@ func TestPacketPool(t *testing.T) {
4949
ps = b.dumpUnlocked()
5050
assert.Len(t, ps, 0)
5151
}
52+
53+
func TestPacketPoolWithRarePackets(t *testing.T) {
54+
payloadDVBTeletext := hexToBytes(`000001bd00b2848024293c972af5ffffffffffffffffffffffffffffffff
55+
ffffffffffffffffffffffffffffff10032cf5e4a8a80b0ba80ba80b2692
56+
040404040404040404040404040404040404040404040404040404040404
57+
0404032cd5e4a8a85757a8a8a80b26a80404040404040404040404040404
58+
040404040404040404040404040404040404ff2cffffffffffffffffffff
59+
ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff
60+
ffffffff`)
61+
b := newPacketPool(nil)
62+
ps := b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 0, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
63+
assert.Len(t, ps, 1)
64+
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 1, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
65+
assert.Len(t, ps, 1)
66+
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 2, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
67+
assert.Len(t, ps, 1)
68+
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 3, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
69+
assert.Len(t, ps, 1)
70+
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 3, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
71+
assert.Len(t, ps, 0)
72+
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 4, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
73+
assert.Len(t, ps, 1)
74+
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 6, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
75+
assert.Len(t, ps, 0)
76+
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 7, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
77+
assert.Len(t, ps, 1)
78+
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 7, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
79+
assert.Len(t, ps, 0)
80+
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 9, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
81+
assert.Len(t, ps, 0)
82+
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 10, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
83+
assert.Len(t, ps, 1)
84+
ps = b.dumpUnlocked()
85+
assert.Len(t, ps, 0)
86+
}

0 commit comments

Comments
 (0)