Skip to content

Commit 7d05237

Browse files
authored
Add support for thread safe updates in ObservableObjectAtom (#171)
1 parent a01cbb4 commit 7d05237

File tree

2 files changed

+109
-14
lines changed

2 files changed

+109
-14
lines changed

Sources/Atoms/Atom/ObservableObjectAtom.swift

+72-10
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import Combine
12
import Foundation
23

34
/// An atom type that instantiates an observable object.
@@ -70,24 +71,85 @@ public extension ObservableObjectAtom {
7071
AtomProducer { context in
7172
context.transaction(object)
7273
} manageValue: { object, context in
73-
var task: Task<Void, Never>?
7474
let cancellable = object
7575
.objectWillChange
76-
.sink { [weak object] _ in
77-
// Wait until the object's property is set, because `objectWillChange`
78-
// emits an event before the property is updated.
79-
task?.cancel()
80-
task = Task { @MainActor in
81-
if let object, !Task.isCancelled, !context.isTerminated {
82-
context.update(with: object)
83-
}
76+
.map { @Sendable _ in }
77+
.sinkLatest { [weak object] _ in
78+
// A custom subscriber is used here, encompassing the following
79+
// three behaviours.
80+
//
81+
// 1. It ensures that updates are performed on the main actor because `ObservableObject`
82+
// is not constrained to be isolated to the main actor.
83+
// 2. It always performs updates asynchronously to ensure the object to be updated as
84+
// `objectWillChange` emits events before the update.
85+
// 3. It adopts the latest event and cancels the previous update when successive events
86+
// arrive.
87+
if let object, !context.isTerminated {
88+
context.update(with: object)
8489
}
8590
}
8691

8792
context.onTermination = {
88-
task?.cancel()
8993
cancellable.cancel()
9094
}
9195
}
9296
}
9397
}
98+
99+
private extension Publisher where Output: Sendable, Failure == Never {
100+
func sinkLatest(receiveValue: @MainActor @escaping (Output) -> Void) -> AnyCancellable {
101+
let subscriber = Subscribers.SinkLatestOnMainActor(receiveValue: receiveValue)
102+
receive(subscriber: subscriber)
103+
return AnyCancellable(subscriber)
104+
}
105+
}
106+
107+
private extension Subscribers {
108+
final class SinkLatestOnMainActor<Input: Sendable>: Combine.Subscriber, Cancellable {
109+
private var receiveValue: (@MainActor (Input) -> Void)?
110+
private var currentTask: Task<Void, Never>?
111+
private var lock = os_unfair_lock_s()
112+
113+
init(receiveValue: @MainActor @escaping (Input) -> Void) {
114+
self.receiveValue = receiveValue
115+
}
116+
117+
func receive(subscription: any Combine.Subscription) {
118+
subscription.request(.unlimited)
119+
}
120+
121+
func receive(_ input: Input) -> Demand {
122+
withLock {
123+
guard let receiveValue else {
124+
return .none
125+
}
126+
127+
currentTask?.cancel()
128+
currentTask = Task { @MainActor in
129+
guard !Task.isCancelled else {
130+
return
131+
}
132+
receiveValue(input)
133+
}
134+
135+
return .unlimited
136+
}
137+
}
138+
139+
func receive(completion: Completion<Never>) {}
140+
141+
func cancel() {
142+
withLock {
143+
currentTask?.cancel()
144+
currentTask = nil
145+
receiveValue = nil
146+
}
147+
}
148+
149+
func withLock<R>(_ body: () -> R) -> R {
150+
os_unfair_lock_lock(&lock)
151+
defer { os_unfair_lock_unlock(&lock) }
152+
return body()
153+
}
154+
}
155+
}

Tests/AtomsTests/Atom/ObservableObjectAtomTests.swift

+37-4
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,46 @@ final class ObservableObjectAtomTests: XCTestCase {
147147
}
148148

149149
object.update()
150-
151-
await context.wait(for: atom) {
152-
$0.value0 == 1 && $0.value1 == 1
153-
}
150+
await context.waitForUpdate()
154151

155152
XCTAssertEqual(updatedCount, 1)
156153
XCTAssertEqual(object.value0, 1)
157154
XCTAssertEqual(object.value1, 1)
158155
}
156+
157+
@MainActor
158+
func testUpdateOnNonIsolatedContext() async {
159+
final class TestObject: ObservableObject, @unchecked Sendable {
160+
@Published
161+
var value = 0
162+
163+
func update() {
164+
value += 1
165+
}
166+
}
167+
168+
struct TestAtom: ObservableObjectAtom, Hashable {
169+
func object(context: Context) -> TestObject {
170+
TestObject()
171+
}
172+
}
173+
174+
let atom = TestAtom()
175+
let context = AtomTestContext()
176+
let object = context.watch(atom)
177+
var updatedCount = 0
178+
179+
context.onUpdate = {
180+
updatedCount += 1
181+
}
182+
183+
Task.detached {
184+
object.update()
185+
}
186+
187+
await context.waitForUpdate()
188+
189+
XCTAssertEqual(updatedCount, 1)
190+
XCTAssertEqual(object.value, 1)
191+
}
159192
}

0 commit comments

Comments
 (0)