Skip to content

Conversation

@mzyatkov
Copy link
Contributor

@mzyatkov mzyatkov commented Dec 3, 2025

Motivation / Problem

Currently, the demuxer buffers a PES packet and only emits it when it encounters the PayloadUnitStartIndicator of the next packet.

While this works well for high-frequency streams (like Video/Audio), it causes significant issues for sparse streams, such as DVB Teletext subtitles. These packets may arrive minutes apart.
Consequence: A subtitle frame arriving at T=0 is held in the buffer until T=4min (when the next subtitle arrives). This results in severe timestamp handling issues during transcoding, often causing the subtitles to be dropped or desynchronized because they effectively arrive "from the past."

Solution

This PR introduces a heuristic to reduce latency without breaking existing buffer logic:

  1. Length Check: When a packet arrives with PayloadUnitStartIndicator, we check the PES header length.
  2. Immediate Flush: If the PES packet length indicates that the entire PES payload is contained within this single TS packet, we emit it immediately.
  3. Safety Mechanism:
    • The packet remains in the internal accumulator (b.q) after being sent.
    • This ensures that standard checks for Discontinuity and Duplicates (sameAsPrevious) continue to work correctly when the next packet eventually arrives.
    • We added a flag check to ensure the packet is not emitted a second time when the standard "next packet arrival" flush occurs.

Re-produce example

We have source with following dvb_teletext sequence: source_dvb.txt.

Current upstream version will delay the last packet until there are no more packets, and instead send the previous one with a timestamp from 4 minutes ago.

…euristic to determine when a single-packet PES is received and to send eventually
@asticode
Copy link
Owner

asticode commented Dec 4, 2025

First off, thanks for the PR ❤️

I like the idea but I find the proposal a bit overly complicated. I'd rather have only the following:

        // Check if PES payload is complete
	if isPESPayload(mps[0].Payload) && isPESComplete(mps) {
		ps = mps
		mps = nil
	}
  • add a fun isPESComplete(ps []*Packet) bool but that uses parsePESHeader to re-implement the least amount of stuff, something like:
// Get payload length
	var l int
	for _, p := range ps {
		l += len(p.Payload)
	}

	// Get the slice for payload from pool
	payload := bytesPool.get(l)
	defer bytesPool.put(payload)

	// Append payload
	var o int
	for _, p := range ps {
		o += copy(payload.s[o:], p.Payload)
	}

	// Create reader
	i := astikit.NewBytesIterator(payload.s)

        // Skip first 3 bytes that are there to identify the PES payload
	i.Seek(3)

	// Parse header
	var dataEnd int
	if h, _, dataEnd, err = parsePESHeader(i); err != nil {
		err = fmt.Errorf("astits: parsing PES header failed: %w", err)
		return
	}

        // TODO Only return true if h.PacketLength > 0 since otherwise there's no other way to know whether the packet is complete
        // TODO Check whether i.Len() >= dataEnd

Only those 2 things + the tests should be enough 👍

@mzyatkov
Copy link
Contributor Author

mzyatkov commented Dec 4, 2025

Thank you for reply! Let me clarify,

  1. do we really need to check isPESComplete every time a new packet arrives?
    It seems like this creates unnecessary computational overhead. Wouldn't it be distinct enough to perform this check only when the mps buffer contains exactly one packet with the PayloadUnitStartIndicator?
  2. Is it acceptable that we effectively skip the discontinuity and sameAsPrevious checks for the subsequent packet? Since we flush the accumulator (mps = nil) immediately, the next incoming packet won't have the previous one to compare against

#72 (comment)

@asticode
Copy link
Owner

asticode commented Dec 4, 2025

do we really need to check isPESComplete every time a new packet arrives?
It seems like this creates unnecessary computational overhead. Wouldn't it be distinct enough to perform this check only when the mps buffer contains exactly one packet with the PayloadUnitStartIndicator?

  • thing is PES data can span over multiple packets. Therefore even if you can get the PES data length when getting a packet with the PayloadUnitStartIndicator set to true, you still need to check whether all data has arrived every time a new packet arrive.
  • also that wouldn't be for every packets but for packets containing PES data

Is it acceptable that we effectively skip the discontinuity and sameAsPrevious checks for the subsequent packet? Since we flush the accumulator (mps = nil) immediately, the next incoming packet won't have the previous one to compare against

That is already what is implemented right now for all packets. If we were to change that, that would be in a different PR and for all packets (not juste PES ones)

@mzyatkov
Copy link
Contributor Author

mzyatkov commented Dec 4, 2025

fixed

@asticode asticode merged commit eb293e7 into asticode:master Dec 5, 2025
1 check passed
@asticode
Copy link
Owner

asticode commented Dec 5, 2025

Thanks again for the PR ❤️

Let me know whether you need a tag 👍

robot-piglet pushed a commit to yandex/perforator that referenced this pull request Dec 5, 2025
asticode/go-astits#72

Проблема: в go-astits демуксер отправляет пакет только когда увидел начало предыдущего. Это аффектит прод, так как на первом канале субтитры приходят иногда раз в 4 минуты, что вносит задержку при транскодировании.
Решение: патч библиотеки, в котором отправляем PES сразу, если он состоит ровно из одного пакета. При этом оставляем его в буффере для проверок на disontinuity и sameAsPrevious, но не отправляем его дважды.
commit_hash:c560c4d07b88493144887611689ae477b71b11d6
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants