From cf20241603f3e695b97de35cfe3a189f76dacd9b Mon Sep 17 00:00:00 2001 From: Andrew Heard Date: Wed, 29 Apr 2026 13:20:57 -0400 Subject: [PATCH 1/5] [AI] Test "LiveSession Socket Leak Analysis" proposed fixes --- .../Types/Internal/Live/AsyncWebSocket.swift | 54 ++++-- .../Internal/Live/LiveSessionService.swift | 104 ++++++++--- .../Types/Public/Live/LiveSession.swift | 9 +- .../project.pbxproj | 4 +- .../Tests/Integration/LiveSessionTests.swift | 169 +++++++++++++----- 5 files changed, 262 insertions(+), 78 deletions(-) diff --git a/FirebaseAI/Sources/Types/Internal/Live/AsyncWebSocket.swift b/FirebaseAI/Sources/Types/Internal/Live/AsyncWebSocket.swift index 38503be2ef5..ce216828f77 100644 --- a/FirebaseAI/Sources/Types/Internal/Live/AsyncWebSocket.swift +++ b/FirebaseAI/Sources/Types/Internal/Live/AsyncWebSocket.swift @@ -29,6 +29,9 @@ final class AsyncWebSocket: Sendable { private let continuationFinished = UnfairLock(false) private let closeError: UnfairLock + private let receivingTask = UnfairLock?>(nil) + private let pingTask = UnfairLock?>(nil) + init(urlSession: URLSession = GenAIURLSession.default, urlRequest: URLRequest) { webSocketTask = urlSession.webSocketTask(with: urlRequest) (stream, continuation) = AsyncThrowingStream @@ -45,6 +48,7 @@ final class AsyncWebSocket: Sendable { webSocketTask.resume() closeError.withLock { $0 = nil } startReceiving() + startPinging() return stream } @@ -66,20 +70,44 @@ final class AsyncWebSocket: Sendable { } private func startReceiving() { - Task { - while !Task.isCancelled && self.webSocketTask.isOpen && self.closeError.value() == nil { - do { - let message = try await webSocketTask.receive() - continuation.yield(message) - } catch { - if let error = webSocketTask.error as? NSError { - close( - code: webSocketTask.closeCode, - reason: webSocketTask.closeReason, + receivingTask.withLock { [weak self] task in + task?.cancel() + task = Task { [weak self] in + while let self, + !Task.isCancelled && self.webSocketTask.isOpen && self.closeError.value() == nil { + do { + let message = try await self.webSocketTask.receive() + self.continuation.yield(message) + } catch { + self.close( + code: self.webSocketTask.closeCode, + reason: self.webSocketTask.closeReason, underlyingError: error ) - } else { - close(code: webSocketTask.closeCode, reason: webSocketTask.closeReason) + } + } + } + } + } + + private func startPinging() { + pingTask.withLock { [weak self] task in + task?.cancel() + task = Task { [weak self] in + while let self, + !Task.isCancelled && self.webSocketTask.isOpen && self.closeError.value() == nil { + try? await Task.sleep(nanoseconds: 30 * 1_000_000_000) + guard !Task.isCancelled && self.webSocketTask.isOpen && self.closeError.value() == nil + else { return } + + self.webSocketTask.sendPing { [weak self] error in + if let error { + self?.close( + code: .abnormalClosure, + reason: nil, + underlyingError: error + ) + } } } } @@ -99,6 +127,8 @@ final class AsyncWebSocket: Sendable { } webSocketTask.cancel(with: code, reason: reason) + receivingTask.value()?.cancel() + pingTask.value()?.cancel() continuationFinished.withLock { isFinished in guard !isFinished else { return } diff --git a/FirebaseAI/Sources/Types/Internal/Live/LiveSessionService.swift b/FirebaseAI/Sources/Types/Internal/Live/LiveSessionService.swift index 3f4f0ef287e..a609e2d12a2 100644 --- a/FirebaseAI/Sources/Types/Internal/Live/LiveSessionService.swift +++ b/FirebaseAI/Sources/Types/Internal/Live/LiveSessionService.swift @@ -20,6 +20,7 @@ import os.log // https://forums.swift.org/t/why-does-sending-a-sendable-value-risk-causing-data-races/73074 @preconcurrency import FirebaseAppCheckInterop @preconcurrency import FirebaseAuthInterop +private import FirebaseCoreInternal /// Facilitates communication with the backend for a ``LiveSession``. /// @@ -31,9 +32,16 @@ import os.log /// session is being reloaded. @available(watchOS, unavailable) actor LiveSessionService { - let responses: AsyncThrowingStream - private let responseContinuation: AsyncThrowingStream - .Continuation + private typealias StreamState = ( + responses: AsyncThrowingStream, + continuation: AsyncThrowingStream.Continuation, + isFinished: Bool + ) + private let streamState: UnfairLock + + nonisolated var responses: AsyncThrowingStream { + streamState.value().responses + } // to ensure messages are sent in order, since swift actors are reentrant private var messageQueue: AsyncStream @@ -71,7 +79,13 @@ actor LiveSessionService { toolConfig: ToolConfig?, systemInstruction: ModelContent?, requestOptions: RequestOptions) { - (responses, responseContinuation) = AsyncThrowingStream.makeStream() + let (responses, responseContinuation) = AsyncThrowingStream + .makeStream() + streamState = UnfairLock(( + responses: responses, + continuation: responseContinuation, + isFinished: false + )) (messageQueue, messageQueueContinuation) = AsyncStream.makeStream() self.modelResourceName = modelResourceName self.generationConfig = generationConfig @@ -92,7 +106,10 @@ actor LiveSessionService { // we only finish the streams when the actor deinits; while the actor is still in scope, the // user could continue using the streams via resumeSession (even after calling close) messageQueueContinuation.finish() - responseContinuation.finish() + streamState.withLock { state in + state.continuation.finish() + state.isFinished = true + } webSocket = nil responsesTask = nil @@ -115,7 +132,15 @@ actor LiveSessionService { /// /// This function will yield until the websocket is ready to communicate with the client. func connect(sessionResumption: SessionResumptionConfig? = nil) async throws { - close() + close(finishingStream: false) + + streamState.withLock { state in + if state.isFinished { + let (responses, responseContinuation) = AsyncThrowingStream + .makeStream() + state = (responses: responses, continuation: responseContinuation, isFinished: false) + } + } let stream = try await setupWebsocket() try await waitForSetupComplete(stream: stream, sessionResumption: sessionResumption) @@ -125,7 +150,10 @@ actor LiveSessionService { /// Cancel any running tasks and close the websocket. /// /// This method is idempotent; if it's already ran once, it will effectively be a no-op. - func close() { + /// + /// - Parameters: + /// - finishingStream: Whether to also finish the public ``responses`` stream. + func close(finishingStream: Bool = false) { responsesTask?.cancel() messageQueueTask?.cancel() webSocket?.disconnect() @@ -133,6 +161,13 @@ actor LiveSessionService { webSocket = nil responsesTask = nil messageQueueTask = nil + + if finishingStream { + streamState.withLock { state in + state.continuation.finish() + state.isFinished = true + } + } } /// Performs the initial setup procedure for the model. @@ -163,7 +198,7 @@ actor LiveSessionService { try await webSocket.send(.data(data)) } catch { let error = LiveSessionSetupError(underlyingError: error) - close() + close(finishingStream: true) throw error } @@ -181,7 +216,7 @@ actor LiveSessionService { } } catch { if let error = mapWebsocketError(error) { - close() + close(finishingStream: true) throw error } // the user called close while setup was running @@ -221,7 +256,7 @@ actor LiveSessionService { } } catch { let error = LiveSessionSetupError(underlyingError: error) - close() + close(finishingStream: true) throw error } } @@ -233,20 +268,21 @@ actor LiveSessionService { /// - `responsesTask`: Listen to messages from the server and yield them through `responses`. /// - `messageQueueTask`: Listen to messages from the client and send them through the websocket. private func spawnMessageTasks(stream: MappedStream) { - guard let webSocket else { return } + guard webSocket != nil else { return } // we create a new messageQueue since the iterator below will cancel the old one when the // task is cancelled. this will cause issues when trying to restart a session via resumeSession (messageQueue, messageQueueContinuation) = AsyncStream.makeStream() - responsesTask = Task { + responsesTask = Task { [weak self] in do { for try await message in stream { + guard let self else { return } #if DEBUG if #available(macOS 11.0, *) { - logServerMessage(message) + await self.logServerMessage(message) } #endif - let response = try decodeServerMessage(message) + let response = try await self.decodeServerMessage(message) if case .setupComplete = response.messageType { AILog.debug( @@ -262,25 +298,51 @@ actor LiveSessionService { ) } - responseContinuation.yield(liveMessage) + self.streamState.withLock { state in + state.continuation.yield(liveMessage) + } + } + } + // loop finished normally (websocket closed normally) + guard let self = self else { return } + if !Task.isCancelled { + self.streamState.withLock { state in + state.continuation.finish() + state.isFinished = true } } } catch { - if let error = mapWebsocketError(error) { - close() - responseContinuation.finish(throwing: error) + guard let self = self else { return } + if Task.isCancelled { return } + + if let error = await self.mapWebsocketError(error) { + await self.close(finishingStream: true) + self.streamState.withLock { state in + state.continuation.finish(throwing: error) + state.isFinished = true + } + } else { + // normal closure (mapped to nil) + await self.close(finishingStream: true) } } } - messageQueueTask = Task { + let messageQueue = self.messageQueue + messageQueueTask = Task { [weak self] in for await message in messageQueue { - guard let data = encodeClientMessage(message) else { continue } + guard let self = self else { return } + guard let data = await self.encodeClientMessage(message) else { continue } do { - try await webSocket.send(.data(data)) + try await self.webSocket?.send(.data(data)) } catch { AILog.error(code: .liveSessionFailedToSendClientMessage, error.localizedDescription) + await self.close(finishingStream: true) + self.streamState.withLock { state in + state.continuation.finish(throwing: error) + state.isFinished = true + } } } } diff --git a/FirebaseAI/Sources/Types/Public/Live/LiveSession.swift b/FirebaseAI/Sources/Types/Public/Live/LiveSession.swift index 8dd3fa813e9..84fde9b1199 100644 --- a/FirebaseAI/Sources/Types/Public/Live/LiveSession.swift +++ b/FirebaseAI/Sources/Types/Public/Live/LiveSession.swift @@ -35,6 +35,13 @@ public final class LiveSession: Sendable { self.service = service } + deinit { + let service = self.service + Task { + await service.close() + } + } + /// Response to a ``LiveServerToolCall`` received from the server. /// /// This method is used both for the realtime API and the incremental API. @@ -142,7 +149,7 @@ public final class LiveSession: Sendable { /// Attempting to receive content from a closed session will cause a /// ``LiveSessionUnexpectedClosureError`` error to be thrown. public func close() async { - await service.close() + await service.close(finishingStream: true) } /// Resumes an existing live session with the server. diff --git a/FirebaseAI/Tests/TestApp/FirebaseAITestApp.xcodeproj/project.pbxproj b/FirebaseAI/Tests/TestApp/FirebaseAITestApp.xcodeproj/project.pbxproj index c40918762a4..1b28e0caf2b 100644 --- a/FirebaseAI/Tests/TestApp/FirebaseAITestApp.xcodeproj/project.pbxproj +++ b/FirebaseAI/Tests/TestApp/FirebaseAITestApp.xcodeproj/project.pbxproj @@ -437,7 +437,7 @@ CURRENT_PROJECT_VERSION = 1; DEAD_CODE_STRIPPING = YES; GENERATE_INFOPLIST_FILE = YES; - IPHONEOS_DEPLOYMENT_TARGET = 15.0; + IPHONEOS_DEPLOYMENT_TARGET = 16.0; MACOSX_DEPLOYMENT_TARGET = 12.0; MARKETING_VERSION = 1.0; PRODUCT_BUNDLE_IDENTIFIER = com.google.firebase.FirebaseAITestAppTests; @@ -459,7 +459,7 @@ CURRENT_PROJECT_VERSION = 1; DEAD_CODE_STRIPPING = YES; GENERATE_INFOPLIST_FILE = YES; - IPHONEOS_DEPLOYMENT_TARGET = 15.0; + IPHONEOS_DEPLOYMENT_TARGET = 16.0; MACOSX_DEPLOYMENT_TARGET = 12.0; MARKETING_VERSION = 1.0; PRODUCT_BUNDLE_IDENTIFIER = com.google.firebase.FirebaseAITestAppTests; diff --git a/FirebaseAI/Tests/TestApp/Tests/Integration/LiveSessionTests.swift b/FirebaseAI/Tests/TestApp/Tests/Integration/LiveSessionTests.swift index d213fa51cb0..4cf5f2cd410 100644 --- a/FirebaseAI/Tests/TestApp/Tests/Integration/LiveSessionTests.swift +++ b/FirebaseAI/Tests/TestApp/Tests/Integration/LiveSessionTests.swift @@ -19,7 +19,7 @@ import Testing @testable import struct FirebaseAILogic.APIConfig -@Suite(.serialized) +@Suite(.serialized, .timeLimit(.minutes(1))) struct LiveSessionTests { private static let arguments = InstanceConfig.liveConfigs.flatMap { config in switch config.apiConfig.service { @@ -128,7 +128,13 @@ struct LiveSessionTests { // The model can't infer that we're done speaking until we send null bytes await session.sendAudioRealtime(Data(repeating: 0, count: audioFile.data.count)) - let text = try await session.collectNextAudioOutputTranscript() + let text: String + do { + text = try await session.collectNextAudioOutputTranscript() + } catch { + await session.close() + throw error + } await session.close() let modelResponse = text @@ -151,6 +157,7 @@ struct LiveSessionTests { let session = try await model.connect() guard let videoFile = NSDataAsset(name: "cat") else { Issue.record("Missing video file 'cat' in Assets") + await session.close() return } @@ -164,12 +171,22 @@ struct LiveSessionTests { // (they both respond with audio though) guard let audioFile = NSDataAsset(name: "hello") else { Issue.record("Missing audio file 'hello.wav' in Assets") + await session.close() return } await session.sendAudioRealtime(audioFile.data) await session.sendAudioRealtime(Data(repeating: 0, count: audioFile.data.count)) - let text = try await session.collectNextAudioOutputTranscript() + var text: String + do { + text = try await session.collectNextAudioOutputTranscript() + if text.isEmpty { + text = try await session.collectNextAudioOutputTranscript() + } + } catch { + await session.close() + throw error + } await session.close() let modelResponse = text @@ -193,7 +210,15 @@ struct LiveSessionTests { let session = try await model.connect() await session.sendTextRealtime("Alex") - guard let toolCall = try await session.collectNextToolCall() else { + let toolCall: LiveServerToolCall? + do { + toolCall = try await session.collectNextToolCall() + } catch { + await session.close() + throw error + } + guard let toolCall else { + await session.close() return } @@ -204,6 +229,7 @@ struct LiveSessionTests { #expect(functionCall.name == "getLastName") guard let response = getLastName(args: functionCall.args) else { + await session.close() return } await session.sendFunctionResponses([ @@ -213,9 +239,15 @@ struct LiveSessionTests { functionId: functionCall.functionId ), ]) - var text = try await session.collectNextAudioOutputTranscript() - if text.isEmpty { + var text: String + do { text = try await session.collectNextAudioOutputTranscript() + if text.isEmpty { + text = try await session.collectNextAudioOutputTranscript() + } + } catch { + await session.close() + throw error } await session.close() @@ -241,7 +273,15 @@ struct LiveSessionTests { await session.sendTextRealtime("My name is Alex.") - guard let newHandle = try await session.collectNextSessionHandle() else { + let newHandle: String? + do { + newHandle = try await session.collectNextSessionHandle() + } catch { + await session.close() + throw error + } + await session.close() + guard let newHandle else { return } @@ -249,11 +289,17 @@ struct LiveSessionTests { sessionResumption: SessionResumptionConfig(handle: newHandle) ) - await session.sendTextRealtime("What is my name?") + var text: String + do { + await session.sendTextRealtime("What is my name?") - var text = try await session.collectNextAudioOutputTranscript() - if text.isEmpty { text = try await session.collectNextAudioOutputTranscript() + if text.isEmpty { + text = try await session.collectNextAudioOutputTranscript() + } + } catch { + await session.close() + throw error } await session.close() @@ -283,15 +329,22 @@ struct LiveSessionTests { await session.sendTextRealtime("My name is Alex.") // re-connect without specifying the new handle (so it should be a new session) + await session.close() session = try await model.connect( sessionResumption: SessionResumptionConfig() ) - await session.sendTextRealtime("What is my name?") + var text: String + do { + await session.sendTextRealtime("What is my name?") - var text = try await session.collectNextAudioOutputTranscript() - if text.isEmpty { text = try await session.collectNextAudioOutputTranscript() + if text.isEmpty { + text = try await session.collectNextAudioOutputTranscript() + } + } catch { + await session.close() + throw error } await session.close() @@ -318,18 +371,32 @@ struct LiveSessionTests { await session.sendTextRealtime("My name is Alex.") - guard let newHandle = try await session.collectNextSessionHandle() else { - return + let newHandle: String? + do { + newHandle = try await session.collectNextSessionHandle() + } catch { + await session.close() + throw error } await session.close() + guard let newHandle else { + return + } + try await session.resumeSession(sessionResumption: SessionResumptionConfig(handle: newHandle)) - await session.sendTextRealtime("What is my name?") + var text: String + do { + await session.sendTextRealtime("What is my name?") - var text = try await session.collectNextAudioOutputTranscript() - if text.isEmpty { text = try await session.collectNextAudioOutputTranscript() + if text.isEmpty { + text = try await session.collectNextAudioOutputTranscript() + } + } catch { + await session.close() + throw error } await session.close() @@ -365,6 +432,7 @@ struct LiveSessionTests { await session.sendTextRealtime("Alex") guard let toolCall = try await session.collectNextToolCall() else { + await session.close() return } @@ -374,11 +442,16 @@ struct LiveSessionTests { let functionCall = try #require(functionCalls.first) let id = try #require(functionCall.functionId) - await session.sendTextRealtime("Actually, I don't care about the last name of Alex anymore.") + do { + await session.sendTextRealtime("Actually, I don't care about the last name of Alex anymore.") - for try await cancellation in session.responsesOf(LiveServerToolCallCancellation.self) { - #expect(cancellation.ids == [id]) - break + for try await cancellation in session.responsesOf(LiveServerToolCallCancellation.self) { + #expect(cancellation.ids == [id]) + break + } + } catch { + await session.close() + throw error } await session.close() @@ -398,23 +471,29 @@ struct LiveSessionTests { try await retry(times: 3, delayInSeconds: 2.0) { let session = try await model.connect() - await session.sendAudioRealtime(audioFile.data) - await session.sendAudioRealtime(Data(repeating: 0, count: audioFile.data.count)) - - // Wait a second to allow the model to start generating (and cause a proper interruption) - try await Task.sleep(nanoseconds: oneSecondInNanoseconds) - await session.sendAudioRealtime(audioFile.data) - await session.sendAudioRealtime(Data(repeating: 0, count: audioFile.data.count)) - - for try await content in session.responsesOf(LiveServerContent.self) { - if content.wasInterrupted { - break - } - - if content.isTurnComplete { - throw NoInterruptionError() + do { + await session.sendAudioRealtime(audioFile.data) + await session.sendAudioRealtime(Data(repeating: 0, count: audioFile.data.count)) + + // Wait a second to allow the model to start generating (and cause a proper interruption) + try await Task.sleep(nanoseconds: oneSecondInNanoseconds) + await session.sendAudioRealtime(audioFile.data) + await session.sendAudioRealtime(Data(repeating: 0, count: audioFile.data.count)) + + for try await content in session.responsesOf(LiveServerContent.self) { + if content.wasInterrupted { + break + } + + if content.isTurnComplete { + throw NoInterruptionError() + } } + } catch { + await session.close() + throw error } + await session.close() } } @@ -427,13 +506,19 @@ struct LiveSessionTests { ) let session = try await model.connect() - await session.sendContent("Does five plus") - await session.sendContent(" five equal ten?", turnComplete: true) + var text: String + do { + await session.sendContent("Does five plus") + await session.sendContent(" five equal ten?", turnComplete: true) - var text = try await session.collectNextAudioOutputTranscript() - if text.isEmpty { - // The model sometimes sends an empty text response first text = try await session.collectNextAudioOutputTranscript() + if text.isEmpty { + // The model sometimes sends an empty text response first + text = try await session.collectNextAudioOutputTranscript() + } + } catch { + await session.close() + throw error } await session.close() From 91cd2274e2376ee0863d837e443ef2a0521d6b87 Mon Sep 17 00:00:00 2001 From: Andrew Heard Date: Thu, 30 Apr 2026 17:59:02 -0400 Subject: [PATCH 2/5] Remove unneeded actor isolation --- .../Sources/Types/Internal/Live/LiveSessionService.swift | 4 ++-- FirebaseAI/Sources/Types/Public/Live/LiveSession.swift | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/FirebaseAI/Sources/Types/Internal/Live/LiveSessionService.swift b/FirebaseAI/Sources/Types/Internal/Live/LiveSessionService.swift index a609e2d12a2..5cea2e5c5c6 100644 --- a/FirebaseAI/Sources/Types/Internal/Live/LiveSessionService.swift +++ b/FirebaseAI/Sources/Types/Internal/Live/LiveSessionService.swift @@ -315,7 +315,7 @@ actor LiveSessionService { guard let self = self else { return } if Task.isCancelled { return } - if let error = await self.mapWebsocketError(error) { + if let error = self.mapWebsocketError(error) { await self.close(finishingStream: true) self.streamState.withLock { state in state.continuation.finish(throwing: error) @@ -368,7 +368,7 @@ actor LiveSessionService { /// /// Some errors have public api alternatives. This function will ensure they're mapped /// accordingly. - private func mapWebsocketError(_ error: Error) -> Error? { + private nonisolated func mapWebsocketError(_ error: Error) -> Error? { if let error = error as? WebSocketClosedError { // only raise an error if the session didn't close normally (ie; the user calling close) if error.closeCode == .goingAway { diff --git a/FirebaseAI/Sources/Types/Public/Live/LiveSession.swift b/FirebaseAI/Sources/Types/Public/Live/LiveSession.swift index 84fde9b1199..8ffdc2806ff 100644 --- a/FirebaseAI/Sources/Types/Public/Live/LiveSession.swift +++ b/FirebaseAI/Sources/Types/Public/Live/LiveSession.swift @@ -38,7 +38,7 @@ public final class LiveSession: Sendable { deinit { let service = self.service Task { - await service.close() + await service.close(finishingStream: true) } } @@ -160,7 +160,7 @@ public final class LiveSession: Sendable { /// /// To optain a valid handle, ensure you pass an instance of /// ``SessionResumptionConfig`` to ``LiveGenerativeModel/connect(sessionResumption:)``, - /// and then listen for the hande provided from a ``LiveSessionResumptionUpdate`` + /// and then listen for the handle provided from a ``LiveSessionResumptionUpdate`` /// server message. /// /// - Parameters: From 7acf501994ae8f6490ce65f0677332116e4a4b7d Mon Sep 17 00:00:00 2001 From: Andrew Heard Date: Thu, 30 Apr 2026 17:59:57 -0400 Subject: [PATCH 3/5] Use `close(finishingStream: false)` on error paths --- .../Types/Internal/Live/LiveSessionService.swift | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/FirebaseAI/Sources/Types/Internal/Live/LiveSessionService.swift b/FirebaseAI/Sources/Types/Internal/Live/LiveSessionService.swift index 5cea2e5c5c6..8ee087393cc 100644 --- a/FirebaseAI/Sources/Types/Internal/Live/LiveSessionService.swift +++ b/FirebaseAI/Sources/Types/Internal/Live/LiveSessionService.swift @@ -162,6 +162,10 @@ actor LiveSessionService { responsesTask = nil messageQueueTask = nil + // Finish the message queue so the messageQueueTask's `for await` loop exits immediately, + // rather than waiting for task cancellation to take effect. + messageQueueContinuation.finish() + if finishingStream { streamState.withLock { state in state.continuation.finish() @@ -269,8 +273,8 @@ actor LiveSessionService { /// - `messageQueueTask`: Listen to messages from the client and send them through the websocket. private func spawnMessageTasks(stream: MappedStream) { guard webSocket != nil else { return } - // we create a new messageQueue since the iterator below will cancel the old one when the - // task is cancelled. this will cause issues when trying to restart a session via resumeSession + // Create a fresh messageQueue so the new task iterates its own stream. The old stream was + // finished in close(), and the old task was cancelled, so they are both done at this point. (messageQueue, messageQueueContinuation) = AsyncStream.makeStream() responsesTask = Task { [weak self] in @@ -316,7 +320,7 @@ actor LiveSessionService { if Task.isCancelled { return } if let error = self.mapWebsocketError(error) { - await self.close(finishingStream: true) + await self.close(finishingStream: false) self.streamState.withLock { state in state.continuation.finish(throwing: error) state.isFinished = true @@ -338,7 +342,7 @@ actor LiveSessionService { try await self.webSocket?.send(.data(data)) } catch { AILog.error(code: .liveSessionFailedToSendClientMessage, error.localizedDescription) - await self.close(finishingStream: true) + await self.close(finishingStream: false) self.streamState.withLock { state in state.continuation.finish(throwing: error) state.isFinished = true From c3404a93b1a5a9d83e87281776380bd0bdc19eec Mon Sep 17 00:00:00 2001 From: Andrew Heard Date: Thu, 30 Apr 2026 18:24:58 -0400 Subject: [PATCH 4/5] Address review comments --- .../Internal/Live/LiveSessionService.swift | 14 ++++--- .../Tests/Integration/LiveSessionTests.swift | 37 ++++++------------- 2 files changed, 20 insertions(+), 31 deletions(-) diff --git a/FirebaseAI/Sources/Types/Internal/Live/LiveSessionService.swift b/FirebaseAI/Sources/Types/Internal/Live/LiveSessionService.swift index 8ee087393cc..5a595b3052e 100644 --- a/FirebaseAI/Sources/Types/Internal/Live/LiveSessionService.swift +++ b/FirebaseAI/Sources/Types/Internal/Live/LiveSessionService.swift @@ -283,10 +283,10 @@ actor LiveSessionService { guard let self else { return } #if DEBUG if #available(macOS 11.0, *) { - await self.logServerMessage(message) + self.logServerMessage(message) } #endif - let response = try await self.decodeServerMessage(message) + let response = try self.decodeServerMessage(message) if case .setupComplete = response.messageType { AILog.debug( @@ -336,7 +336,7 @@ actor LiveSessionService { messageQueueTask = Task { [weak self] in for await message in messageQueue { guard let self = self else { return } - guard let data = await self.encodeClientMessage(message) else { continue } + guard let data = self.encodeClientMessage(message) else { continue } do { try await self.webSocket?.send(.data(data)) @@ -354,7 +354,7 @@ actor LiveSessionService { #if DEBUG @available(macOS 11.0, *) - private func logServerMessage(_ message: Data) { + private nonisolated func logServerMessage(_ message: Data) { guard AILog.additionalLoggingEnabled() else { return } guard let message = JSONSerialization.prettyString(with: message) else { return } @@ -397,7 +397,8 @@ actor LiveSessionService { /// Decodes a message from the server's websocket into a valid `BidiGenerateContentServerMessage`. /// /// Will throw an error if decoding fails. - private func decodeServerMessage(_ message: Data) throws -> BidiGenerateContentServerMessage { + private nonisolated func decodeServerMessage(_ message: Data) throws + -> BidiGenerateContentServerMessage { do { return try jsonDecoder.decode( BidiGenerateContentServerMessage.self, @@ -423,7 +424,8 @@ actor LiveSessionService { /// Encodes a message from the client into `Data` that can be sent through a websocket data frame. /// /// Will return `nil` if decoding fails, and log an error describing why. - private func encodeClientMessage(_ message: BidiGenerateContentClientMessage) -> Data? { + private nonisolated func encodeClientMessage(_ message: BidiGenerateContentClientMessage) + -> Data? { do { return try jsonEncoder.encode(message) } catch { diff --git a/FirebaseAI/Tests/TestApp/Tests/Integration/LiveSessionTests.swift b/FirebaseAI/Tests/TestApp/Tests/Integration/LiveSessionTests.swift index 4cf5f2cd410..b2a2d64ff8d 100644 --- a/FirebaseAI/Tests/TestApp/Tests/Integration/LiveSessionTests.swift +++ b/FirebaseAI/Tests/TestApp/Tests/Integration/LiveSessionTests.swift @@ -177,12 +177,9 @@ struct LiveSessionTests { await session.sendAudioRealtime(audioFile.data) await session.sendAudioRealtime(Data(repeating: 0, count: audioFile.data.count)) - var text: String + let text: String do { text = try await session.collectNextAudioOutputTranscript() - if text.isEmpty { - text = try await session.collectNextAudioOutputTranscript() - } } catch { await session.close() throw error @@ -239,12 +236,9 @@ struct LiveSessionTests { functionId: functionCall.functionId ), ]) - var text: String + let text: String do { text = try await session.collectNextAudioOutputTranscript() - if text.isEmpty { - text = try await session.collectNextAudioOutputTranscript() - } } catch { await session.close() throw error @@ -289,14 +283,11 @@ struct LiveSessionTests { sessionResumption: SessionResumptionConfig(handle: newHandle) ) - var text: String + let text: String do { await session.sendTextRealtime("What is my name?") text = try await session.collectNextAudioOutputTranscript() - if text.isEmpty { - text = try await session.collectNextAudioOutputTranscript() - } } catch { await session.close() throw error @@ -334,14 +325,11 @@ struct LiveSessionTests { sessionResumption: SessionResumptionConfig() ) - var text: String + let text: String do { await session.sendTextRealtime("What is my name?") text = try await session.collectNextAudioOutputTranscript() - if text.isEmpty { - text = try await session.collectNextAudioOutputTranscript() - } } catch { await session.close() throw error @@ -386,14 +374,11 @@ struct LiveSessionTests { try await session.resumeSession(sessionResumption: SessionResumptionConfig(handle: newHandle)) - var text: String + let text: String do { await session.sendTextRealtime("What is my name?") text = try await session.collectNextAudioOutputTranscript() - if text.isEmpty { - text = try await session.collectNextAudioOutputTranscript() - } } catch { await session.close() throw error @@ -506,16 +491,12 @@ struct LiveSessionTests { ) let session = try await model.connect() - var text: String + let text: String do { await session.sendContent("Does five plus") await session.sendContent(" five equal ten?", turnComplete: true) text = try await session.collectNextAudioOutputTranscript() - if text.isEmpty { - // The model sometimes sends an empty text response first - text = try await session.collectNextAudioOutputTranscript() - } } catch { await session.close() throw error @@ -554,6 +535,12 @@ private extension LiveSession { /// Once the model signals that its turn is complete, the function will return /// a string concatenated of all the `LiveAudioTranscription`s. func collectNextAudioOutputTranscript() async throws -> String { + // The model sometimes sends an empty text response first + let text = try await collectNextTurn() + return text.isEmpty ? try await collectNextTurn() : text + } + + private func collectNextTurn() async throws -> String { var text = "" for try await content in responsesOf(LiveServerContent.self) { From 512257dd5ac19e5310cf4834cda95b634b23feb5 Mon Sep 17 00:00:00 2001 From: Andrew Heard Date: Thu, 30 Apr 2026 18:39:22 -0400 Subject: [PATCH 5/5] Revert integration test time limit --- .../Tests/TestApp/FirebaseAITestApp.xcodeproj/project.pbxproj | 4 ++-- .../Tests/TestApp/Tests/Integration/LiveSessionTests.swift | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/FirebaseAI/Tests/TestApp/FirebaseAITestApp.xcodeproj/project.pbxproj b/FirebaseAI/Tests/TestApp/FirebaseAITestApp.xcodeproj/project.pbxproj index 1b28e0caf2b..c40918762a4 100644 --- a/FirebaseAI/Tests/TestApp/FirebaseAITestApp.xcodeproj/project.pbxproj +++ b/FirebaseAI/Tests/TestApp/FirebaseAITestApp.xcodeproj/project.pbxproj @@ -437,7 +437,7 @@ CURRENT_PROJECT_VERSION = 1; DEAD_CODE_STRIPPING = YES; GENERATE_INFOPLIST_FILE = YES; - IPHONEOS_DEPLOYMENT_TARGET = 16.0; + IPHONEOS_DEPLOYMENT_TARGET = 15.0; MACOSX_DEPLOYMENT_TARGET = 12.0; MARKETING_VERSION = 1.0; PRODUCT_BUNDLE_IDENTIFIER = com.google.firebase.FirebaseAITestAppTests; @@ -459,7 +459,7 @@ CURRENT_PROJECT_VERSION = 1; DEAD_CODE_STRIPPING = YES; GENERATE_INFOPLIST_FILE = YES; - IPHONEOS_DEPLOYMENT_TARGET = 16.0; + IPHONEOS_DEPLOYMENT_TARGET = 15.0; MACOSX_DEPLOYMENT_TARGET = 12.0; MARKETING_VERSION = 1.0; PRODUCT_BUNDLE_IDENTIFIER = com.google.firebase.FirebaseAITestAppTests; diff --git a/FirebaseAI/Tests/TestApp/Tests/Integration/LiveSessionTests.swift b/FirebaseAI/Tests/TestApp/Tests/Integration/LiveSessionTests.swift index b2a2d64ff8d..6f0396e072e 100644 --- a/FirebaseAI/Tests/TestApp/Tests/Integration/LiveSessionTests.swift +++ b/FirebaseAI/Tests/TestApp/Tests/Integration/LiveSessionTests.swift @@ -19,7 +19,7 @@ import Testing @testable import struct FirebaseAILogic.APIConfig -@Suite(.serialized, .timeLimit(.minutes(1))) +@Suite(.serialized) struct LiveSessionTests { private static let arguments = InstanceConfig.liveConfigs.flatMap { config in switch config.apiConfig.service {