diff --git a/.changes/reconnect-errors b/.changes/reconnect-errors new file mode 100644 index 000000000..2100ef714 --- /dev/null +++ b/.changes/reconnect-errors @@ -0,0 +1 @@ +patch type="fixed" "Reconnect sequence stuck in failed state" \ No newline at end of file diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index f4932fa18..d9b0db65c 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -61,11 +61,11 @@ jobs: # https://github.com/actions/runner-images/blob/main/images/macos/macos-26-arm64-Readme.md - os: macos-26 xcode: latest - platform: "iOS Simulator,name=iPhone 17 Pro,OS=26.1" + platform: "iOS Simulator,name=iPhone 17 Pro,OS=26.2" symbol-graph: true - os: macos-26 xcode: latest - platform: "iOS Simulator,name=iPhone 17 Pro,OS=26.1" + platform: "iOS Simulator,name=iPhone 17 Pro,OS=26.2" extension-api-only: true - os: macos-26 xcode: latest @@ -84,10 +84,10 @@ jobs: platform: "macOS,variant=Mac Catalyst" - os: macos-26 xcode: latest - platform: "visionOS Simulator,name=Apple Vision Pro,OS=26.1" + platform: "visionOS Simulator,name=Apple Vision Pro,OS=26.2" - os: macos-26 xcode: latest - platform: "tvOS Simulator,name=Apple TV,OS=26.1" + platform: "tvOS Simulator,name=Apple TV,OS=26.2" runs-on: ${{ matrix.os }} timeout-minutes: 60 diff --git a/Sources/LiveKit/Core/Room+Engine.swift b/Sources/LiveKit/Core/Room+Engine.swift index d2b50acc6..94211a7bc 100644 --- a/Sources/LiveKit/Core/Room+Engine.swift +++ b/Sources/LiveKit/Core/Room+Engine.swift @@ -161,10 +161,10 @@ extension Room { primary: !isSubscriberPrimary, delegate: self) - await publisher.set { [weak self] offer in + await publisher.set { [weak self] offer, offerId in guard let self else { return } - log("Publisher onOffer \(offer.sdp)") - try await signalClient.send(offer: offer) + log("Publisher onOffer with offerId: \(offerId), sdp: \(offer.sdp)") + try await signalClient.send(offer: offer, offerId: offerId) } // data over pub channel for backwards compatibility @@ -318,7 +318,13 @@ extension Room { log("[Connect] Waiting for subscriber to connect...") // Wait for primary transport to connect (if not already) - try await primaryTransportConnectedCompleter.wait(timeout: _state.connectOptions.primaryTransportConnectTimeout) + do { + try await primaryTransportConnectedCompleter.wait(timeout: _state.connectOptions.primaryTransportConnectTimeout) + log("[Connect] Subscriber transport connected") + } catch { + log("[Connect] Subscriber transport failed to connect, error: \(error)", .error) + throw error + } try Task.checkCancellation() // send SyncState before offer @@ -330,7 +336,13 @@ extension Room { // Only if published, wait for publisher to connect... log("[Connect] Waiting for publisher to connect...") try await publisher.createAndSendOffer(iceRestart: true) - try await publisherTransportConnectedCompleter.wait(timeout: _state.connectOptions.publisherTransportConnectTimeout) + do { + try await publisherTransportConnectedCompleter.wait(timeout: _state.connectOptions.publisherTransportConnectTimeout) + log("[Connect] Publisher transport connected") + } catch { + log("[Connect] Publisher transport failed to connect, error: \(error)", .error) + throw error + } } } @@ -462,8 +474,8 @@ extension Room { $0.subscribe = !autoSubscribe } - try await signalClient.sendSyncState(answer: previousAnswer?.toPBType(), - offer: previousOffer?.toPBType(), + try await signalClient.sendSyncState(answer: previousAnswer?.toPBType(offerId: 0), + offer: previousOffer?.toPBType(offerId: 0), subscription: subscription, publishTracks: localParticipant.publishedTracksInfo(), dataChannels: publisherDataChannel.infos(), diff --git a/Sources/LiveKit/Core/Room+SignalClientDelegate.swift b/Sources/LiveKit/Core/Room+SignalClientDelegate.swift index 3298e72c4..a7d569c0d 100644 --- a/Sources/LiveKit/Core/Room+SignalClientDelegate.swift +++ b/Sources/LiveKit/Core/Room+SignalClientDelegate.swift @@ -32,23 +32,32 @@ extension Room: SignalClientDelegate { // engine is currently connected state case .connected = _state.connectionState { - do { - try await startReconnect(reason: .websocket) - } catch { - log("Failed calling startReconnect, error: \(error)", .error) + Task { + do { + try await startReconnect(reason: .websocket) + } catch { + log("Failed calling startReconnect, error: \(error)", .error) + } } } } - func signalClient(_: SignalClient, didReceiveLeave canReconnect: Bool, reason: Livekit_DisconnectReason) async { - log("canReconnect: \(canReconnect), reason: \(reason)") + func signalClient(_: SignalClient, didReceiveLeave action: Livekit_LeaveRequest.Action, reason: Livekit_DisconnectReason) async { + log("action: \(action), reason: \(reason)") - if canReconnect { - // force .full for next reconnect + let error = LiveKitError.from(reason: reason) + switch action { + case .reconnect: + // Force .full for next reconnect _state.mutate { $0.nextReconnectMode = .full } - } else { - // Server indicates it's not recoverable - await cleanUp(withError: LiveKitError.from(reason: reason)) + fallthrough + case .resume: + // Abort current attempt + await signalClient.cleanUp(withError: error) + case .disconnect: + await cleanUp(withError: error) + default: + log("Unknown leave action: \(action), ignoring", .warning) } } @@ -319,17 +328,19 @@ extension Room: SignalClientDelegate { } } - func signalClient(_: SignalClient, didReceiveAnswer answer: LKRTCSessionDescription) async { + func signalClient(_: SignalClient, didReceiveAnswer answer: LKRTCSessionDescription, offerId: UInt32) async { + log("Received answer for offerId: \(offerId)") + do { let publisher = try requirePublisher() - try await publisher.set(remoteDescription: answer) + try await publisher.set(remoteDescription: answer, offerId: offerId) } catch { - log("Failed to set remote description, error: \(error)", .error) + log("Failed to set remote description with offerId: \(offerId), error: \(error)", .error) } } - func signalClient(_ signalClient: SignalClient, didReceiveOffer offer: LKRTCSessionDescription) async { - log("Received offer, creating & sending answer...") + func signalClient(_ signalClient: SignalClient, didReceiveOffer offer: LKRTCSessionDescription, offerId: UInt32) async { + log("Received offer with offerId: \(offerId), creating & sending answer...") guard let subscriber = _state.subscriber else { log("Failed to send answer, subscriber is nil", .error) @@ -340,9 +351,9 @@ extension Room: SignalClientDelegate { try await subscriber.set(remoteDescription: offer) let answer = try await subscriber.createAnswer() try await subscriber.set(localDescription: answer) - try await signalClient.send(answer: answer) + try await signalClient.send(answer: answer, offerId: offerId) } catch { - log("Failed to send answer with error: \(error)", .error) + log("Failed to send answer for offerId: \(offerId), error: \(error)", .error) } } diff --git a/Sources/LiveKit/Core/SignalClient.swift b/Sources/LiveKit/Core/SignalClient.swift index 5df326a08..13429990c 100644 --- a/Sources/LiveKit/Core/SignalClient.swift +++ b/Sources/LiveKit/Core/SignalClient.swift @@ -126,13 +126,15 @@ actor SignalClient: Loggable { participantSid: participantSid, adaptiveStream: adaptiveStream) - if reconnectMode != nil { - log("[Connect] with url: \(url)") + let isReconnect = reconnectMode != nil + + if isReconnect { + log("Reconnecting with url: \(url)") } else { log("Connecting with url: \(url)") } - _state.mutate { $0.connectionState = (reconnectMode != nil ? .reconnecting : .connecting) } + _state.mutate { $0.connectionState = (isReconnect ? .reconnecting : .connecting) } do { let socket = try await WebSocket(url: url, @@ -279,10 +281,12 @@ private extension SignalClient { await _restartPingTimer() case let .answer(sd): - _delegate.notifyDetached { await $0.signalClient(self, didReceiveAnswer: sd.toRTCType()) } + let (rtcDescription, offerId) = sd.toRTCType() + _delegate.notifyDetached { await $0.signalClient(self, didReceiveAnswer: rtcDescription, offerId: offerId) } case let .offer(sd): - _delegate.notifyDetached { await $0.signalClient(self, didReceiveOffer: sd.toRTCType()) } + let (rtcDescription, offerId) = sd.toRTCType() + _delegate.notifyDetached { await $0.signalClient(self, didReceiveOffer: rtcDescription, offerId: offerId) } case let .trickle(trickle): guard let rtcCandidate = try? RTC.createIceCandidate(fromJsonString: trickle.candidateInit) else { @@ -315,7 +319,7 @@ private extension SignalClient { _delegate.notifyDetached { await $0.signalClient(self, didUpdateRemoteMute: Track.Sid(from: mute.sid), muted: mute.muted) } case let .leave(leave): - _delegate.notifyDetached { await $0.signalClient(self, didReceiveLeave: leave.canReconnect, reason: leave.reason) } + _delegate.notifyDetached { await $0.signalClient(self, didReceiveLeave: leave.action, reason: leave.reason) } case let .streamStateUpdate(states): _delegate.notifyDetached { await $0.signalClient(self, didUpdateTrackStreamStates: states.streamStates) } @@ -358,17 +362,17 @@ extension SignalClient { // MARK: - Send methods extension SignalClient { - func send(offer: LKRTCSessionDescription) async throws { + func send(offer: LKRTCSessionDescription, offerId: UInt32) async throws { let r = Livekit_SignalRequest.with { - $0.offer = offer.toPBType() + $0.offer = offer.toPBType(offerId: offerId) } try await _sendRequest(r) } - func send(answer: LKRTCSessionDescription) async throws { + func send(answer: LKRTCSessionDescription, offerId: UInt32) async throws { let r = Livekit_SignalRequest.with { - $0.answer = answer.toPBType() + $0.answer = answer.toPBType(offerId: offerId) } try await _sendRequest(r) diff --git a/Sources/LiveKit/Core/Transport.swift b/Sources/LiveKit/Core/Transport.swift index 0b97d301a..e025a2b9b 100644 --- a/Sources/LiveKit/Core/Transport.swift +++ b/Sources/LiveKit/Core/Transport.swift @@ -21,7 +21,7 @@ internal import LiveKitWebRTC actor Transport: NSObject, Loggable { // MARK: - Types - typealias OnOfferBlock = @Sendable (LKRTCSessionDescription) async throws -> Void + typealias OnOfferBlock = @Sendable (LKRTCSessionDescription, UInt32) async throws -> Void // MARK: - Public @@ -56,6 +56,7 @@ actor Transport: NSObject, Loggable { private var _reNegotiate: Bool = false private var _onOffer: OnOfferBlock? private var _isRestartingIce: Bool = false + private var _latestOfferId: UInt32 = 0 // forbid direct access to PeerConnection private let _pc: LKRTCPeerConnection @@ -110,6 +111,20 @@ actor Transport: NSObject, Loggable { await _iceCandidatesQueue.process(candidate, if: remoteDescription != nil && !_isRestartingIce) } + func set(remoteDescription sd: LKRTCSessionDescription, offerId: UInt32) async throws { + if signalingState != .haveLocalOffer { + log("Received answer with unexpected signaling state: \(signalingState), expected .haveLocalOffer", .warning) + } + + if offerId == 0 { + log("Skipping validation for legacy server (missing offerId), latestOfferId: \(_latestOfferId)", .warning) + } else if offerId != _latestOfferId { + throw LiveKitError(.invalidState, message: "OfferId mismatch, expected \(_latestOfferId) but got \(offerId)") + } + + try await set(remoteDescription: sd) + } + func set(remoteDescription sd: LKRTCSessionDescription) async throws { try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in _pc.setRemoteDescription(sd) { error in @@ -157,12 +172,14 @@ actor Transport: NSObject, Loggable { // Actually negotiate func _negotiateSequence() async throws { + _latestOfferId += 1 let offer = try await createOffer(for: constraints) try await set(localDescription: offer) - try await _onOffer(offer) + try await _onOffer(offer, _latestOfferId) } if signalingState == .haveLocalOffer, iceRestart, let sd = remoteDescription { + _reNegotiate = false // Clear flag to prevent double offer try await set(remoteDescription: sd) return try await _negotiateSequence() } diff --git a/Sources/LiveKit/Protocols/SignalClientDelegate.swift b/Sources/LiveKit/Protocols/SignalClientDelegate.swift index bff50161b..35b504e64 100644 --- a/Sources/LiveKit/Protocols/SignalClientDelegate.swift +++ b/Sources/LiveKit/Protocols/SignalClientDelegate.swift @@ -21,8 +21,8 @@ internal import LiveKitWebRTC protocol SignalClientDelegate: AnyObject, Sendable { func signalClient(_ signalClient: SignalClient, didUpdateConnectionState newState: ConnectionState, oldState: ConnectionState, disconnectError: LiveKitError?) async func signalClient(_ signalClient: SignalClient, didReceiveConnectResponse connectResponse: SignalClient.ConnectResponse) async - func signalClient(_ signalClient: SignalClient, didReceiveAnswer answer: LKRTCSessionDescription) async - func signalClient(_ signalClient: SignalClient, didReceiveOffer offer: LKRTCSessionDescription) async + func signalClient(_ signalClient: SignalClient, didReceiveAnswer answer: LKRTCSessionDescription, offerId: UInt32) async + func signalClient(_ signalClient: SignalClient, didReceiveOffer offer: LKRTCSessionDescription, offerId: UInt32) async func signalClient(_ signalClient: SignalClient, didReceiveIceCandidate iceCandidate: IceCandidate, target: Livekit_SignalTarget) async func signalClient(_ signalClient: SignalClient, didUnpublishLocalTrack localTrack: Livekit_TrackUnpublishedResponse) async func signalClient(_ signalClient: SignalClient, didUpdateParticipants participants: [Livekit_ParticipantInfo]) async @@ -34,6 +34,6 @@ protocol SignalClientDelegate: AnyObject, Sendable { func signalClient(_ signalClient: SignalClient, didUpdateSubscribedCodecs codecs: [Livekit_SubscribedCodec], qualities: [Livekit_SubscribedQuality], forTrackSid sid: String) async func signalClient(_ signalClient: SignalClient, didUpdateSubscriptionPermission permission: Livekit_SubscriptionPermissionUpdate) async func signalClient(_ signalClient: SignalClient, didUpdateToken token: String) async - func signalClient(_ signalClient: SignalClient, didReceiveLeave canReconnect: Bool, reason: Livekit_DisconnectReason) async + func signalClient(_ signalClient: SignalClient, didReceiveLeave action: Livekit_LeaveRequest.Action, reason: Livekit_DisconnectReason) async func signalClient(_ signalClient: SignalClient, didSubscribeTrack trackSid: Track.Sid) async } diff --git a/Sources/LiveKit/Types/SessionDescription.swift b/Sources/LiveKit/Types/SessionDescription.swift index 862f6f865..27a788a7b 100644 --- a/Sources/LiveKit/Types/SessionDescription.swift +++ b/Sources/LiveKit/Types/SessionDescription.swift @@ -17,9 +17,10 @@ internal import LiveKitWebRTC extension LKRTCSessionDescription { - func toPBType() -> Livekit_SessionDescription { + func toPBType(offerId: UInt32) -> Livekit_SessionDescription { var sd = Livekit_SessionDescription() sd.sdp = sdp + sd.id = offerId switch type { case .answer: sd.type = "answer" @@ -33,7 +34,7 @@ extension LKRTCSessionDescription { } extension Livekit_SessionDescription { - func toRTCType() -> LKRTCSessionDescription { + func toRTCType() -> (LKRTCSessionDescription, UInt32) { var sdpType: LKRTCSdpType switch type { case "answer": sdpType = .answer @@ -42,6 +43,6 @@ extension Livekit_SessionDescription { default: fatalError("Unknown state \(type)") // This should never happen } - return RTC.createSessionDescription(type: sdpType, sdp: sdp) + return (RTC.createSessionDescription(type: sdpType, sdp: sdp), id) } }