Add Serialized FIFO Network Operations & Completion-Based APIs
Introduction & Motivation:
In AtomNetworking, we need to support both async/await and completion-based APIs for flexibility in iOS apps, especially in legacy or UIKit contexts where sync calls are required. Direct actor calls to SessionActor allow reentrancy during awaits, leading to potential races in concurrent flows (e.g., multiple updates/resumes interleaving). To address this, we introduce a serialized queue for FIFO processing, ensuring tasks execute one at a time in enqueue order.
Proposed Solution:
These changes are additive and enhance the API without breaking existing async flows:
- Change
Atomfromactortostructto enable non-asyncenqueuereturningService. - Add
withAtomCheckedContinuationfunction for mappingErrortoAtomErrorin checked continuations. - Add extension on
TaskfortypedValue()to mapErrortoAtomErrorduring awaits. - Improve
.bearertoken refresh inapplyAuthorizationHeaderwith deduplication, polling for race windows, and typed error mapping. - Introduce
RequestableQueueManagerclass for GCD-based serial queuing with on-demand timer for stall recovery. - Add async resume overloads to
Serviceextension for bridging to completion-based APIs. - Add completion-based resume overloads to
Serviceextension for serialized execution.
For the improved .bearer implementation in SessionActor's applyAuthorizationHeader (example snippet):
case let .bearer(endpoint, credential, writable):
// Ensure the requestable requires an authorization header to be applied to it. A client has a
// way to opt out of including the authorization header on a per-request basis even if the `authenticationMethod` is set.
guard requestable.requiresAuthorization else {
return requestable
}
// Ensure the existing credential requires a refresh.
if writable.tokenCredential.requiresRefresh {
if let refreshTask {
// Await the ongoing refresh Task to ensure completion before proceeding.
writable.tokenCredential = try await refreshTask.typedValue()
}
else {
// If isRefreshing but no task yet (race window), wait briefly.
var pollCount = 0
// Poll loop to handle potential race where flag is set but Task not yet assigned.
while isRefreshing, refreshTask == nil, pollCount < 10 {
pollCount += 1
// Sleep briefly to allow the other call to assign the refreshTask.
try? await Task.sleep(for: .milliseconds(10))
}
// After polling, check if the refreshTask is now available and await it if so.
if let refreshTask {
writable.tokenCredential = try await refreshTask.typedValue()
}
// If no Task after polling, initiate a new refresh.
else {
// Set the flag to block other concurrent calls from starting a duplicate refresh.
isRefreshing = true
// Create a new Task for the background refresh operation.
refreshTask = Task {
// Ensure the flag and task reference are reset after completion or failure.
defer {
isRefreshing = false
refreshTask = nil
}
// Perform the actual token refresh and return the new credential.
return try await refreshAccessToken(using: endpoint, credential: credential, writable: writable)
}
// Await the new Task's result and assign if successful, or throw on failure.
if let value = try await refreshTask?.typedValue() {
writable.tokenCredential = value
}
}
}
}
// Apply access token/credential authorization header.
return AuthorizedRequestable(requestable: requestable, authorizationHeaderItems: [method.authorizationHeaderItem])For the RequestableQueueManager class (full code with documentation):
/// A manager for serializing and processing a FIFO queue of asynchronous tasks in Swift 6 iOS apps.
///
/// This class uses a serial `DispatchQueue` for thread-safe mutations to shared state (`pendingTasks`, `isProcessing`), ensuring no data races. Tasks are enqueued synchronously (non-blocking for callers beyond brief sync), processed asynchronously in a detached `Task` for background execution, and drained serially with `await` to maintain FIFO order. An on-demand timer provides a "nudge" to recover from potential stalls (e.g., if the processor Task fails to reset due to errors or suspension).
///
/// Use this for network-heavy flows (e.g., in Atom) where order matters but callers need non-async APIs. `@unchecked Sendable` is used as the queue protects state for cross-concurrency safety.
///
/// - Note: Enqueues apply backpressure via `sync` (minor blocking on contention), suitable for moderate loads. For high-throughput, monitor with Instruments to avoid overload.
final class RequestableQueueManager: @unchecked Sendable {
// MARK: - Properties
/// A flag indicating if the processing loop is active. Mutated only within the serialized queue context to prevent races.
private var isProcessing: Bool
/// An array of pending async tasks (closures), appended in FIFO order and drained serially.
private var pendingTasks: [() async -> Void]
/// A serial dispatch queue for synchronizing access to `pendingTasks` and `isProcessing`, ensuring atomicity in Swift 6.
private let queue: DispatchQueue
/// An optional timer for periodic checks to nudge draining if the queue stalls (e.g., due to unhandled Task issues). Started on-demand and stopped when idle to minimize battery impact in iOS.
private var checkTimer: DispatchSourceTimer?
// MARK: - Lifecycle
/// Initializes the queue manager with empty state.
init() {
self.isProcessing = false
self.pendingTasks = .init()
self.queue = .init(label: "com.alaskaair.atom.requestable.queue.manager")
}
/// Cleans up the timer on deallocation to prevent leaks or continued firing.
deinit {
checkTimer?.cancel()
}
// MARK: - Functions
/// Enqueues a task for FIFO processing (callable synchronously from non-actors).
///
/// Appends the task atomically via `queue.sync` and spawns a detached processor if not already processing. Starts the timer on-demand if the queue was empty before appending, to handle potential stalls.
///
/// - Parameter task: The async closure to enqueue (e.g., network operations).
func enqueue(_ task: @escaping @Sendable () async -> Void) {
queue.sync {
let wasEmpty = pendingTasks.isEmpty
pendingTasks.append(task)
if !isProcessing {
isProcessing = true
Task.detached { [weak self] in
await self?.processQueue()
}
}
if wasEmpty {
startCheckTimer()
}
}
}
}
// MARK: - Private Properties and Methods
extension RequestableQueueManager {
/// Processes the queue asynchronously, draining tasks in FIFO order.
///
/// Runs in a detached Task, awaiting each task serially to maintain order. Resets `isProcessing` after draining and stops the timer if the queue is now empty.
private func processQueue() async {
while let nextTask = dequeue() {
await nextTask()
}
queue.sync {
isProcessing = false
if pendingTasks.isEmpty {
stopCheckTimer()
}
}
}
/// Safely dequeues the first task, or nil if empty.
///
/// Uses `queue.sync` for atomic access, ensuring no races in Swift 6.
///
/// - Returns: The next task closure, or nil if the queue is empty.
private func dequeue() -> (() async -> Void)? {
queue.sync {
pendingTasks.isEmpty ? nil : pendingTasks.removeFirst()
}
}
/// Starts the on-demand timer for periodic queue checks.
///
/// Creates a DispatchSourceTimer on a background queue for power efficiency, scheduling checks every 1 second (adjustable). Uses leeway to allow iOS to optimize timing.
private func startCheckTimer() {
guard checkTimer == nil else {
return
}
let timer = DispatchSource.makeTimerSource(queue: .global(qos: .background))
timer.schedule(deadline: .now(), repeating: .milliseconds(1000), leeway: .milliseconds(100))
timer.setEventHandler { [weak self] in
self?.checkAndDrainIfNeeded()
}
timer.resume()
checkTimer = timer
}
/// Stops and clears the timer to prevent unnecessary firing when the queue is idle.
private func stopCheckTimer() {
checkTimer?.cancel()
checkTimer = nil
}
/// Checks if tasks are pending and not processing, then spawns a processor if needed.
///
/// Called periodically by the timer to recover from potential stalls (e.g., if a previous processor Task failed to reset the flag).
private func checkAndDrainIfNeeded() {
queue.sync {
if !pendingTasks.isEmpty, !isProcessing {
isProcessing = true
Task.detached { [weak self] in
await self?.processQueue()
}
}
}
}
}Detailed Design
See changed files.
Source compatibility
This implementation is additive and does not introduce breaking changes. It enhances the existing Service API with a wrapper for serialized operations, maintaining compatibility with current source code. Consumers can opt-in to the wrapper for completion-based flows without affecting async/await usage.
Source Compatibility:
Please check the box to indicate the impact of this proposal on source compatibility.
- This change is additive and does not impact existing source code.
- This change breaks existing source code.