Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/reconnect-errors
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="fixed" "Reconnect sequence stuck in failed state"
26 changes: 19 additions & 7 deletions Sources/LiveKit/Core/Room+Engine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ extension Room {
primary: !isSubscriberPrimary,
delegate: self)

await publisher.set { [weak self] offer in
await publisher.set { [weak self] offer, offerId in
guard let self else { return }
log("Publisher onOffer \(offer.sdp)")
try await signalClient.send(offer: offer)
log("Publisher onOffer with offerId: \(offerId), sdp: \(offer.sdp)")
try await signalClient.send(offer: offer, offerId: offerId)
}

// data over pub channel for backwards compatibility
Expand Down Expand Up @@ -318,7 +318,13 @@ extension Room {

log("[Connect] Waiting for subscriber to connect...")
// Wait for primary transport to connect (if not already)
try await primaryTransportConnectedCompleter.wait(timeout: _state.connectOptions.primaryTransportConnectTimeout)
do {
try await primaryTransportConnectedCompleter.wait(timeout: _state.connectOptions.primaryTransportConnectTimeout)
log("[Connect] Subscriber transport connected")
} catch {
log("[Connect] Subscriber transport failed to connect, error: \(error)", .error)
throw error
}
try Task.checkCancellation()

// send SyncState before offer
Expand All @@ -330,7 +336,13 @@ extension Room {
// Only if published, wait for publisher to connect...
log("[Connect] Waiting for publisher to connect...")
try await publisher.createAndSendOffer(iceRestart: true)
try await publisherTransportConnectedCompleter.wait(timeout: _state.connectOptions.publisherTransportConnectTimeout)
do {
try await publisherTransportConnectedCompleter.wait(timeout: _state.connectOptions.publisherTransportConnectTimeout)
log("[Connect] Publisher transport connected")
} catch {
log("[Connect] Publisher transport failed to connect, error: \(error)", .error)
throw error
}
}
}

Expand Down Expand Up @@ -462,8 +474,8 @@ extension Room {
$0.subscribe = !autoSubscribe
}

try await signalClient.sendSyncState(answer: previousAnswer?.toPBType(),
offer: previousOffer?.toPBType(),
try await signalClient.sendSyncState(answer: previousAnswer?.toPBType(offerId: 0),
offer: previousOffer?.toPBType(offerId: 0),
subscription: subscription,
publishTracks: localParticipant.publishedTracksInfo(),
dataChannels: publisherDataChannel.infos(),
Expand Down
32 changes: 20 additions & 12 deletions Sources/LiveKit/Core/Room+SignalClientDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,29 @@ extension Room: SignalClientDelegate {
// engine is currently connected state
case .connected = _state.connectionState
{
do {
try await startReconnect(reason: .websocket)
} catch {
log("Failed calling startReconnect, error: \(error)", .error)
Task {
Copy link
Contributor Author

@pblazej pblazej Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SerialRunnerActor (inside SignalClient._delegate)
│
├─> [Task 1] didUpdateConnectionState
│   └─> await startReconnect()  ← blocking the actor
│       └─> waiting for offer...
│
└─> [Task 2] didReceiveOffer
    └─> Can't enter because actor is busy with Task 1

do {
try await startReconnect(reason: .websocket)
} catch {
log("Failed calling startReconnect, error: \(error)", .error)
}
}
}
}

func signalClient(_: SignalClient, didReceiveLeave canReconnect: Bool, reason: Livekit_DisconnectReason) async {
log("canReconnect: \(canReconnect), reason: \(reason)")

let error = LiveKitError.from(reason: reason)

if canReconnect {
// force .full for next reconnect
_state.mutate { $0.nextReconnectMode = .full }
// Abort current connection attempt
await signalClient.cleanUp(withError: error)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more or less equivalent to JS:

        case LeaveRequest_Action.RECONNECT:
          this.fullReconnectOnNext = true;
          // reconnect immediately instead of waiting for next attempt
          this.handleDisconnect(leaveReconnect);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (if-minor): maybe this is a good moment to adopt the .action of the leave request as well here? (backwards compatible of course in case it's unset/0).

We don't actually need a full reconnect on every leave request. But if it makes more sense as a follow up that also sounds good.

Copy link
Contributor Author

@pblazej pblazej Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, moved to the action param here 👍

Re: backwards compatibility, I discarded the legacy canReconnect param, which mimics JS and no-op if unknown.

} else {
// Server indicates it's not recoverable
await cleanUp(withError: LiveKitError.from(reason: reason))
await cleanUp(withError: error)
}
}

Expand Down Expand Up @@ -319,17 +325,19 @@ extension Room: SignalClientDelegate {
}
}

func signalClient(_: SignalClient, didReceiveAnswer answer: LKRTCSessionDescription) async {
func signalClient(_: SignalClient, didReceiveAnswer answer: LKRTCSessionDescription, offerId: UInt32) async {
log("Received answer for offerId: \(offerId)")

do {
let publisher = try requirePublisher()
try await publisher.set(remoteDescription: answer)
try await publisher.set(remoteDescription: answer, offerId: offerId)
} catch {
log("Failed to set remote description, error: \(error)", .error)
log("Failed to set remote description with offerId: \(offerId), error: \(error)", .error)
}
}

func signalClient(_ signalClient: SignalClient, didReceiveOffer offer: LKRTCSessionDescription) async {
log("Received offer, creating & sending answer...")
func signalClient(_ signalClient: SignalClient, didReceiveOffer offer: LKRTCSessionDescription, offerId: UInt32) async {
log("Received offer with offerId: \(offerId), creating & sending answer...")

guard let subscriber = _state.subscriber else {
log("Failed to send answer, subscriber is nil", .error)
Expand All @@ -340,9 +348,9 @@ extension Room: SignalClientDelegate {
try await subscriber.set(remoteDescription: offer)
let answer = try await subscriber.createAnswer()
try await subscriber.set(localDescription: answer)
try await signalClient.send(answer: answer)
try await signalClient.send(answer: answer, offerId: offerId)
} catch {
log("Failed to send answer with error: \(error)", .error)
log("Failed to send answer for offerId: \(offerId), error: \(error)", .error)
}
}

Expand Down
22 changes: 13 additions & 9 deletions Sources/LiveKit/Core/SignalClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,15 @@ actor SignalClient: Loggable {
participantSid: participantSid,
adaptiveStream: adaptiveStream)

if reconnectMode != nil {
log("[Connect] with url: \(url)")
let isReconnect = reconnectMode != nil

if isReconnect {
log("Reconnecting with url: \(url)")
} else {
log("Connecting with url: \(url)")
}

_state.mutate { $0.connectionState = (reconnectMode != nil ? .reconnecting : .connecting) }
_state.mutate { $0.connectionState = (isReconnect ? .reconnecting : .connecting) }

do {
let socket = try await WebSocket(url: url,
Expand Down Expand Up @@ -279,10 +281,12 @@ private extension SignalClient {
await _restartPingTimer()

case let .answer(sd):
_delegate.notifyDetached { await $0.signalClient(self, didReceiveAnswer: sd.toRTCType()) }
let (rtcDescription, offerId) = sd.toRTCType()
_delegate.notifyDetached { await $0.signalClient(self, didReceiveAnswer: rtcDescription, offerId: offerId) }

case let .offer(sd):
_delegate.notifyDetached { await $0.signalClient(self, didReceiveOffer: sd.toRTCType()) }
let (rtcDescription, offerId) = sd.toRTCType()
_delegate.notifyDetached { await $0.signalClient(self, didReceiveOffer: rtcDescription, offerId: offerId) }

case let .trickle(trickle):
guard let rtcCandidate = try? RTC.createIceCandidate(fromJsonString: trickle.candidateInit) else {
Expand Down Expand Up @@ -358,17 +362,17 @@ extension SignalClient {
// MARK: - Send methods

extension SignalClient {
func send(offer: LKRTCSessionDescription) async throws {
func send(offer: LKRTCSessionDescription, offerId: UInt32) async throws {
let r = Livekit_SignalRequest.with {
$0.offer = offer.toPBType()
$0.offer = offer.toPBType(offerId: offerId)
}

try await _sendRequest(r)
}

func send(answer: LKRTCSessionDescription) async throws {
func send(answer: LKRTCSessionDescription, offerId: UInt32) async throws {
let r = Livekit_SignalRequest.with {
$0.answer = answer.toPBType()
$0.answer = answer.toPBType(offerId: offerId)
}

try await _sendRequest(r)
Expand Down
21 changes: 19 additions & 2 deletions Sources/LiveKit/Core/Transport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal import LiveKitWebRTC
actor Transport: NSObject, Loggable {
// MARK: - Types

typealias OnOfferBlock = @Sendable (LKRTCSessionDescription) async throws -> Void
typealias OnOfferBlock = @Sendable (LKRTCSessionDescription, UInt32) async throws -> Void

// MARK: - Public

Expand Down Expand Up @@ -56,6 +56,7 @@ actor Transport: NSObject, Loggable {
private var _reNegotiate: Bool = false
private var _onOffer: OnOfferBlock?
private var _isRestartingIce: Bool = false
private var _latestOfferId: UInt32 = 0

// forbid direct access to PeerConnection
private let _pc: LKRTCPeerConnection
Expand Down Expand Up @@ -110,6 +111,20 @@ actor Transport: NSObject, Loggable {
await _iceCandidatesQueue.process(candidate, if: remoteDescription != nil && !_isRestartingIce)
}

func set(remoteDescription sd: LKRTCSessionDescription, offerId: UInt32) async throws {
if signalingState != .haveLocalOffer {
log("Received answer with unexpected signaling state: \(signalingState), expected .haveLocalOffer", .warning)
}

if offerId == 0 {
log("Skipping validation for legacy server (missing offerId), latestOfferId: \(_latestOfferId)", .warning)
} else if offerId != _latestOfferId {
throw LiveKitError(.invalidState, message: "OfferId mismatch, expected \(_latestOfferId) but got \(offerId)")
}

try await set(remoteDescription: sd)
}

func set(remoteDescription sd: LKRTCSessionDescription) async throws {
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
_pc.setRemoteDescription(sd) { error in
Expand Down Expand Up @@ -157,12 +172,14 @@ actor Transport: NSObject, Loggable {

// Actually negotiate
func _negotiateSequence() async throws {
_latestOfferId += 1
let offer = try await createOffer(for: constraints)
try await set(localDescription: offer)
try await _onOffer(offer)
try await _onOffer(offer, _latestOfferId)
}

if signalingState == .haveLocalOffer, iceRestart, let sd = remoteDescription {
_reNegotiate = false // Clear flag to prevent double offer
Copy link
Contributor Author

@pblazej pblazej Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JS around https://github.com/livekit/client-sdk-js/blob/65e0bfe38ba594bbfe73de60ce72dbc7b96be3a2/src/room/PCTransport.ts#L271

if (this._pc && this._pc.signalingState === 'have-local-offer') {
    const currentSD = this._pc.remoteDescription;
    if (options?.iceRestart && currentSD) {
        // 1. Rollback: Sets remote description
        await this._pc.setRemoteDescription(currentSD);
        // 2. DOES NOT set this.renegotiate = true
        // 3. Falls through to create offer (line 291)
    } else {
        // 1. Defer: Sets renegotiate = true
        this.renegotiate = true;
        // 2. Returns immediately (skips offer creation)
        return;
    }
}

vs Swift

if signalingState == .haveLocalOffer {
    if !(iceRestart && remoteDescription != nil) {
        // 1. Defer: Sets _reNegotiate = true
        _reNegotiate = true
        // 2. Returns immediately
        return
    }
    
    // Else: ICE Restart path falls through...
}

// ... (offer ID increment) ...

if signalingState == .haveLocalOffer, iceRestart, let sd = remoteDescription {
    // 1. Force clear _reNegotiate (Matches JS not setting it)
    _reNegotiate = false  
    // 2. Rollback: Sets remote description
    try await set(remoteDescription: sd)
    // 3. Creates new offer immediately
    return try await _negotiateSequence()
}

try await set(remoteDescription: sd)
return try await _negotiateSequence()
}
Expand Down
4 changes: 2 additions & 2 deletions Sources/LiveKit/Protocols/SignalClientDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ internal import LiveKitWebRTC
protocol SignalClientDelegate: AnyObject, Sendable {
func signalClient(_ signalClient: SignalClient, didUpdateConnectionState newState: ConnectionState, oldState: ConnectionState, disconnectError: LiveKitError?) async
func signalClient(_ signalClient: SignalClient, didReceiveConnectResponse connectResponse: SignalClient.ConnectResponse) async
func signalClient(_ signalClient: SignalClient, didReceiveAnswer answer: LKRTCSessionDescription) async
func signalClient(_ signalClient: SignalClient, didReceiveOffer offer: LKRTCSessionDescription) async
func signalClient(_ signalClient: SignalClient, didReceiveAnswer answer: LKRTCSessionDescription, offerId: UInt32) async
func signalClient(_ signalClient: SignalClient, didReceiveOffer offer: LKRTCSessionDescription, offerId: UInt32) async
func signalClient(_ signalClient: SignalClient, didReceiveIceCandidate iceCandidate: IceCandidate, target: Livekit_SignalTarget) async
func signalClient(_ signalClient: SignalClient, didUnpublishLocalTrack localTrack: Livekit_TrackUnpublishedResponse) async
func signalClient(_ signalClient: SignalClient, didUpdateParticipants participants: [Livekit_ParticipantInfo]) async
Expand Down
7 changes: 4 additions & 3 deletions Sources/LiveKit/Types/SessionDescription.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
internal import LiveKitWebRTC

extension LKRTCSessionDescription {
func toPBType() -> Livekit_SessionDescription {
func toPBType(offerId: UInt32) -> Livekit_SessionDescription {
var sd = Livekit_SessionDescription()
sd.sdp = sdp
sd.id = offerId

switch type {
case .answer: sd.type = "answer"
Expand All @@ -33,7 +34,7 @@ extension LKRTCSessionDescription {
}

extension Livekit_SessionDescription {
func toRTCType() -> LKRTCSessionDescription {
func toRTCType() -> (LKRTCSessionDescription, UInt32) {
var sdpType: LKRTCSdpType
switch type {
case "answer": sdpType = .answer
Expand All @@ -42,6 +43,6 @@ extension Livekit_SessionDescription {
default: fatalError("Unknown state \(type)") // This should never happen
}

return RTC.createSessionDescription(type: sdpType, sdp: sdp)
return (RTC.createSessionDescription(type: sdpType, sdp: sdp), id)
}
}
Loading