Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 104 additions & 30 deletions Moblin/Media/HaishinKit/Whip/WhipStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ private let whipQueue = DispatchQueue(label: "com.eerimoq.Moblin.whip")
private let h264PayloadType: UInt8 = 98
private let opusPayloadType: UInt8 = 111
private let rtpMtu = 1200
private let gatheringFallbackTimeout: TimeInterval = 2.0

private func makeSsrc() -> UInt32 {
var ssrc: UInt32 = 0
Expand Down Expand Up @@ -54,6 +55,25 @@ private enum ConnectionState {
}
}

private enum GatheringState {
case new
case inProgress
case complete

init?(cValue: rtcGatheringState) {
switch cValue {
case RTC_GATHERING_NEW:
self = .new
case RTC_GATHERING_INPROGRESS:
self = .inProgress
case RTC_GATHERING_COMPLETE:
self = .complete
default:
return nil
}
}
}

private func makeEndpointUrl(url: String) -> URL? {
guard var components = URLComponents(string: url) else {
return nil
Expand Down Expand Up @@ -108,14 +128,29 @@ private struct RtpPacket {
}
}

private func convertTimestamp(_ presentationTimeStamp: CMTime) -> UInt32 {
return UInt32(UInt64(presentationTimeStamp.seconds * 90000) & 0xFFFF_FFFF)
private struct RtpTimestamp {
private let rate: Double
private var startedAt: Double?

init(rate: Double) {
self.rate = rate
}

mutating func convert(_ time: CMTime) -> UInt32 {
let seconds = time.seconds
if startedAt == nil {
startedAt = seconds
}
let timestamp = UInt64(max((seconds - startedAt!) * rate, 0))
return UInt32(timestamp & 0xFFFF_FFFF)
}
}

private final class H264Packetizer {
let ssrc: UInt32
private let payloadType: UInt8
private var sequenceNumber: UInt16 = 0
private var timestamp = RtpTimestamp(rate: 90_000)
private var sps: Data?
private var pps: Data?

Expand All @@ -129,7 +164,7 @@ private final class H264Packetizer {
self.pps = pps
}

func packetize(sampleBuffer: CMSampleBuffer, presentationTimeStamp: CMTime) -> [Data] {
func packetize(sampleBuffer: CMSampleBuffer) -> [Data] {
var nalUnits = extractNalUnits(sampleBuffer: sampleBuffer)
guard !nalUnits.isEmpty else {
return []
Expand All @@ -142,7 +177,7 @@ private final class H264Packetizer {
nalUnits.insert(pps, at: min(1, nalUnits.count))
}
}
let packetTimestamp = convertTimestamp(presentationTimeStamp)
let packetTimestamp = timestamp.convert(sampleBuffer.presentationTimeStamp)
var packets: [Data] = []
for (index, nalUnit) in nalUnits.enumerated() {
let isLastNal = index == nalUnits.count - 1
Expand Down Expand Up @@ -239,31 +274,32 @@ private final class OpusPacketizer {
}
packets.append(allData.subdata(in: offset ..< offset + size))
}
return packets.isEmpty ? [allData] : packets
return packets
}
}

private final class OpusRtpPacketizer {
private let ssrc: UInt32
private let payloadType: UInt8
private var sequenceNumber: UInt16 = 0
private var timestamp: UInt32 = 0

init(ssrc: UInt32, payloadType: UInt8) {
self.ssrc = ssrc
self.payloadType = payloadType
}

func packetize(payload: Data, presentationTimeStamp: CMTime) -> Data {
let packetTimestamp = convertTimestamp(presentationTimeStamp)
func packetize(payload: Data) -> Data {
let packet = RtpPacket(
marker: false,
payloadType: payloadType,
sequenceNumber: sequenceNumber,
timestamp: packetTimestamp,
timestamp: timestamp,
ssrc: ssrc,
payload: payload
)
sequenceNumber &+= 1
timestamp &+= 960
return packet.data()
}
}
Expand Down Expand Up @@ -349,6 +385,7 @@ private struct RtcTrackConfig {

private protocol PeerConnectionDelegate: AnyObject {
func peerConnectionOnConnectionStateChanged(state: ConnectionState)
func peerConnectionOnGatheringStateChanged(state: GatheringState)
}

private func toPeerConnection(pointer: UnsafeMutableRawPointer?) -> PeerConnection? {
Expand Down Expand Up @@ -376,6 +413,9 @@ private final class PeerConnection {
try checkOk(rtcSetStateChangeCallback(peerConnectionId) { _, state, pointer in
toPeerConnection(pointer: pointer)?.handleStateChange(state: state)
})
try checkOk(rtcSetGatheringStateChangeCallback(peerConnectionId) { _, state, pointer in
toPeerConnection(pointer: pointer)?.handleGatheringStateChange(state: state)
})
} catch {
rtcDeletePeerConnection(peerConnectionId)
throw error
Expand Down Expand Up @@ -416,10 +456,10 @@ private final class PeerConnection {
try checkOk(rtcSetLocalDescription(peerConnectionId, "offer"))
}

func createOffer() throws -> String {
let size = try checkOkReturnResult(rtcCreateOffer(peerConnectionId, nil, 0))
func getLocalDescription() throws -> String {
let size = try checkOkReturnResult(rtcGetLocalDescription(peerConnectionId, nil, 0))
var buffer = [CChar](repeating: 0, count: Int(size))
try checkOk(rtcCreateOffer(peerConnectionId, &buffer, Int32(size)))
try checkOk(rtcGetLocalDescription(peerConnectionId, &buffer, Int32(size)))
return String(cString: buffer)
}

Expand All @@ -437,6 +477,13 @@ private final class PeerConnection {
}
delegate?.peerConnectionOnConnectionStateChanged(state: state)
}

private func handleGatheringStateChange(state: rtcGatheringState) {
guard let state = GatheringState(cValue: state) else {
return
}
delegate?.peerConnectionOnGatheringStateChanged(state: state)
}
}

protocol WhipStreamDelegate: AnyObject {
Expand Down Expand Up @@ -515,8 +562,10 @@ final class WhipStream {
)
self.peerConnection = peerConnection
try peerConnection.setLocalDescriptionOffer()
let offer = try peerConnection.createOffer()
sendOffer(endpointUrl: endpointUrl, offer: offer)
// Fallback: if gathering callback is not delivered, send local SDP after a short delay.
whipQueue.asyncAfter(deadline: .now() + gatheringFallbackTimeout) { [weak self] in
self?.sendCurrentLocalOfferIfNeeded()
}
} catch {
stopInternal(reason: "WHIP start failed")
}
Expand Down Expand Up @@ -560,6 +609,33 @@ final class WhipStream {
}
}

private func handleGatheringStateChanged(state: GatheringState) {
logger.info("whip: Gathering state \(state)")
switch state {
case .complete:
sendCurrentLocalOfferIfNeeded()
case .new, .inProgress:
break
}
}

private func sendCurrentLocalOfferIfNeeded() {
guard !offerSent, let endpointUrl, let peerConnection else {
return
}
do {
let offer = try peerConnection.getLocalDescription()
guard !offer.isEmpty else {
return
}
logger.info("whip: Sending local offer after ICE gathering")
offerSent = true
sendOffer(endpointUrl: endpointUrl, offer: offer)
} catch {
logger.info("whip: Failed to get local offer: \(error)")
}
}

private func sendOffer(endpointUrl: URL, offer: String) {
var request = URLRequest(url: endpointUrl)
request.httpMethod = "POST"
Expand Down Expand Up @@ -637,20 +713,14 @@ final class WhipStream {
delegate?.whipStreamOnDisconnected(reason: reason)
}

private func handleAudioEncoderOutputBuffer(_ buffer: AVAudioCompressedBuffer,
_ presentationTimeStamp: CMTime)
private func handleAudioEncoderOutputBuffer(_ buffer: AVAudioCompressedBuffer)
{
guard connected, let packets = audioPacketizer?.packetize(buffer: buffer) else {
return
}
for packet in packets {
let outgoingPacket: Data
if let rtpPacketizer = audioRtpPacketizer {
outgoingPacket = rtpPacketizer.packetize(payload: packet,
presentationTimeStamp: presentationTimeStamp)
} else {
continue
}
guard let rtpPacketizer = audioRtpPacketizer else { continue }
let outgoingPacket = rtpPacketizer.packetize(payload: packet)
if audioTrack?.send(packet: outgoingPacket) == true {
totalByteCount += Int64(outgoingPacket.count)
}
Expand All @@ -664,12 +734,10 @@ final class WhipStream {
videoPacketizer?.setParameterSets(sps: config.sequenceParameterSet, pps: config.pictureParameterSet)
}

private func handleVideoEncoderOutputSampleBuffer(_ sampleBuffer: CMSampleBuffer,
_ presentationTimeStamp: CMTime)
private func handleVideoEncoderOutputSampleBuffer(_ sampleBuffer: CMSampleBuffer)
{
guard connected,
let packets = videoPacketizer?.packetize(sampleBuffer: sampleBuffer,
presentationTimeStamp: presentationTimeStamp)
let packets = videoPacketizer?.packetize(sampleBuffer: sampleBuffer)
else {
return
}
Expand All @@ -685,14 +753,20 @@ extension WhipStream: PeerConnectionDelegate {
self.handleConnectionStateChanged(state: state)
}
}

fileprivate func peerConnectionOnGatheringStateChanged(state: GatheringState) {
whipQueue.async {
self.handleGatheringStateChanged(state: state)
}
}
}

extension WhipStream: AudioEncoderDelegate {
func audioEncoderOutputFormat(_: AVAudioFormat) {}

func audioEncoderOutputBuffer(_ buffer: AVAudioCompressedBuffer, _ presentationTimeStamp: CMTime) {
func audioEncoderOutputBuffer(_ buffer: AVAudioCompressedBuffer, _: CMTime) {
whipQueue.async {
self.handleAudioEncoderOutputBuffer(buffer, presentationTimeStamp)
self.handleAudioEncoderOutputBuffer(buffer)
}
}
}
Expand All @@ -706,10 +780,10 @@ extension WhipStream: VideoEncoderDelegate {

func videoEncoderOutputSampleBuffer(_: VideoEncoder,
_ sampleBuffer: CMSampleBuffer,
_ presentationTimeStamp: CMTime)
_: CMTime)
{
whipQueue.async {
self.handleVideoEncoderOutputSampleBuffer(sampleBuffer, presentationTimeStamp)
self.handleVideoEncoderOutputSampleBuffer(sampleBuffer)
}
}
}
Loading