Skip to content

Commit eb293e7

Browse files
mzyatkovcobeq
andauthored
fix(packet_pool): reduce latency for single-packet PES by flushing immediately when complete (#72)
* fix(packet_pool): reducing latency for single-packet PES by using a heuristic to determine when a single-packet PES is received and to send eventually * fix issues and simplify --------- Co-authored-by: cobeq <[email protected]>
1 parent 5181545 commit eb293e7

File tree

3 files changed

+77
-0
lines changed

3 files changed

+77
-0
lines changed

data.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,3 +182,42 @@ func isPSIComplete(ps []*Packet) bool {
182182

183183
return i.Len() >= i.Offset()
184184
}
185+
186+
// isPESComplete checks whether payload fully contains PES packet
187+
func isPESComplete(ps []*Packet) bool {
188+
// Get payload length
189+
var l int
190+
for _, p := range ps {
191+
l += len(p.Payload)
192+
}
193+
194+
// Get the slice for payload from pool
195+
payload := bytesPool.get(l)
196+
defer bytesPool.put(payload)
197+
198+
// Append payload
199+
var o int
200+
for _, p := range ps {
201+
o += copy(payload.s[o:], p.Payload)
202+
}
203+
204+
// Create reader
205+
i := astikit.NewBytesIterator(payload.s)
206+
207+
// Skip first 3 bytes that are there to identify the PES payload
208+
i.Seek(3)
209+
210+
// Parse header
211+
h, _, dataEnd, err := parsePESHeader(i)
212+
if err != nil {
213+
err = fmt.Errorf("astits: parsing PES header failed: %w", err)
214+
return false
215+
}
216+
217+
if h.PacketLength == 0 {
218+
// There's no other way to know whether the packet is complete
219+
return false
220+
}
221+
222+
return i.Len() >= dataEnd
223+
}

packet_pool.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ func (b *packetAccumulator) add(p *Packet) (ps []*Packet) {
5252
isPSIComplete(mps) {
5353
ps = mps
5454
mps = nil
55+
} else if isPESPayload(mps[0].Payload) && isPESComplete(mps) { // Check if PES payload is complete
56+
ps = mps
57+
mps = nil
5558
}
5659

5760
b.q = mps

packet_pool_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,38 @@ func TestPacketPool(t *testing.T) {
4949
ps = b.dumpUnlocked()
5050
assert.Len(t, ps, 0)
5151
}
52+
53+
func TestPacketPoolWithRarePackets(t *testing.T) {
54+
payloadDVBTeletext := hexToBytes(`000001bd00b2848024293c972af5ffffffffffffffffffffffffffffffff
55+
ffffffffffffffffffffffffffffff10032cf5e4a8a80b0ba80ba80b2692
56+
040404040404040404040404040404040404040404040404040404040404
57+
0404032cd5e4a8a85757a8a8a80b26a80404040404040404040404040404
58+
040404040404040404040404040404040404ff2cffffffffffffffffffff
59+
ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff
60+
ffffffff`)
61+
b := newPacketPool(nil)
62+
ps := b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 0, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
63+
assert.Len(t, ps, 1)
64+
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 1, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
65+
assert.Len(t, ps, 1)
66+
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 2, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
67+
assert.Len(t, ps, 1)
68+
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 3, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
69+
assert.Len(t, ps, 1)
70+
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 3, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
71+
assert.Len(t, ps, 1)
72+
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 4, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
73+
assert.Len(t, ps, 1)
74+
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 6, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
75+
assert.Len(t, ps, 1)
76+
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 7, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
77+
assert.Len(t, ps, 1)
78+
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 7, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
79+
assert.Len(t, ps, 1)
80+
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 9, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
81+
assert.Len(t, ps, 1)
82+
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 10, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
83+
assert.Len(t, ps, 1)
84+
ps = b.dumpUnlocked()
85+
assert.Len(t, ps, 0)
86+
}

0 commit comments

Comments
 (0)