Skip to content
135 changes: 32 additions & 103 deletions pkg/ipeye/ipeye.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
package ipeye

import (
"bytes"
"math/rand"
"net/http"
"strings"
"time"

"github.com/AlexxIT/go2rtc/internal/app"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/h264/annexb"
"github.com/AlexxIT/go2rtc/pkg/iso"
"github.com/gorilla/websocket"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
)

type Producer struct {
Expand Down Expand Up @@ -66,7 +62,6 @@ func Dial(source string) (core.Producer, error) {

// probe waits for init with avcC and extracts SPS/PPS
func (p *Producer) probe() error {
log := app.GetLogger("ipeye")

for {
mType, b, err := p.conn.ReadMessage()
Expand All @@ -80,14 +75,12 @@ func (p *Producer) probe() error {
continue
}

var trackID, timeScale uint32
var trackID uint32

for _, atom := range atoms {
switch atom := atom.(type) {
case *iso.AtomTkhd:
trackID = atom.TrackID
case *iso.AtomMdhd:
timeScale = atom.TimeScale
case *iso.AtomVideo:
if atom.Name == "avc1" {
codec := h264.AVCCToCodec(atom.Config)
Expand All @@ -101,17 +94,11 @@ func (p *Producer) probe() error {
})
p.videoTrackID = trackID
p.clockRate = codec.ClockRate

log.Info().
Uint32("trackID", trackID).
Uint32("timeScale", timeScale).
Msg("fMP4 video detected")
}
}
}

if len(p.Medias) > 0 {
log.Info().Int("medias", len(p.Medias)).Msg("fMP4 init complete")
return nil
}
}
Expand All @@ -120,7 +107,6 @@ func (p *Producer) probe() error {

// Start runs the main fragment reading loop
func (p *Producer) Start() error {
log := app.GetLogger("ipeye")

receivers := make(map[uint32]*core.Receiver)
if p.videoTrackID != 0 {
Expand All @@ -131,23 +117,18 @@ func (p *Producer) Start() error {
}
}

// RTP packetizer
h264Pay := &codecs.H264Payloader{}
seq := rtp.NewRandomSequencer()
h264pkt := rtp.NewPacketizer(1200, 96, 0, h264Pay, seq, p.clockRate)

// global counters
// RTP счётчики
rtpStart := rand.Uint32()
seq := uint16(rand.Uint32())
var dts uint64
var defaultDur uint32
var initialized bool

const wrapPeriod = uint64(1) << 32 // RTP TS wraps (mod 2^32)
const wrapPeriod = uint64(1) << 32 // RTP TS wrap (mod 2^32)

for {
mType, b, err := p.conn.ReadMessage()
if err != nil {
log.Error().Err(err).Msg("read error")
return err
}
if mType != websocket.BinaryMessage {
Expand All @@ -171,10 +152,6 @@ func (p *Producer) Start() error {

case *iso.AtomTfdt:
if !initialized {
log.Info().
Uint64("decodeTime", atom.DecodeTime).
Uint32("rtpStart", rtpStart).
Msg("stream initialized")
dts = atom.DecodeTime
initialized = true
}
Expand All @@ -192,83 +169,35 @@ func (p *Producer) Start() error {
continue
}

// convert AVCC -> AnnexB
annexbData := annexb.DecodeAVCC(mdatData, true)
nalus := bytes.Split(annexbData, []byte{0, 0, 0, 1})

// iterate over NALUs
for i, nalu := range nalus {
if len(nalu) == 0 {
continue
}
typ := nalu[0] & 0x1F

// RTP TS for this sample (mod 2^32)
ts := rtpStart + uint32((dts*uint64(p.clockRate)/90000)%wrapPeriod)

// SPS/PPS before IDR
if typ == h264.NALUTypeIFrame {
if len(p.sps) > 0 {
for _, pkt := range h264pkt.Packetize(p.sps, ts) {
pkt.Timestamp = ts
recv.Input(pkt)
log.Debug().
Uint32("ts", pkt.Timestamp).
Uint16("seq", pkt.SequenceNumber).
Int("size", len(pkt.Payload)).
Str("type", "SPS").
Msg("RTP packet")
}
}
if len(p.pps) > 0 {
for _, pkt := range h264pkt.Packetize(p.pps, ts) {
pkt.Timestamp = ts
recv.Input(pkt)
log.Debug().
Uint32("ts", pkt.Timestamp).
Uint16("seq", pkt.SequenceNumber).
Int("size", len(pkt.Payload)).
Str("type", "PPS").
Msg("RTP packet")
}
}
}

// actual frame
for _, pkt := range h264pkt.Packetize(nalu, ts) {
pkt.Timestamp = ts
recv.Input(pkt)
log.Debug().
Uint32("ts", pkt.Timestamp).
Uint16("seq", pkt.SequenceNumber).
Int("size", len(pkt.Payload)).
Int("nalType", int(typ)).
Msg("RTP packet")
}

// duration selection
dur := defaultDur
if i < len(samplesDur) && samplesDur[i] != 0 {
dur = samplesDur[i]
}
if dur == 0 {
dur = p.clockRate / 25 // fallback
}
// RTP TS для этого sample
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments must be in English

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

ts := rtpStart + uint32((dts*uint64(p.clockRate)/90000)%wrapPeriod)

// Отправляем один RTP-пакет с полезной нагрузкой в AVCC
seq++
recv.Input(&rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
SequenceNumber: seq,
Timestamp: ts,
SSRC: 1,
},
Payload: mdatData,
})

// выбираем длительность
dur := defaultDur
if len(samplesDur) > 0 && samplesDur[0] != 0 {
dur = samplesDur[0]
}
if dur == 0 {
dur = p.clockRate / 25 // fallback
}

log.Trace().
Int("sample", i).
Uint32("dur", dur).
Uint64("dts", dts).
Uint32("rtpTS", ts).
Int("nalType", int(typ)).
Int("size", len(nalu)).
Msg("sample processed")

// increment DTS
dts += uint64(dur)
if dts >= wrapPeriod {
dts %= wrapPeriod
}
// инкремент DTS
dts += uint64(dur)
if dts >= wrapPeriod {
dts %= wrapPeriod
}
}
}
Expand Down