This repository was archived by the owner on Jun 30, 2026. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathBinding.swift
More file actions
272 lines (231 loc) · 8.33 KB
/
Copy pathBinding.swift
File metadata and controls
272 lines (231 loc) · 8.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
import Foundation
// MARK: - Unbound Channel Types
/// Unbound Tx - created by `channel()`, bound at call time.
public final class UnboundTx<T: Sendable>: @unchecked Sendable {
public private(set) var channelId: ChannelId = 0
private var taskTx: (@Sendable (TaskMessage) -> Void)?
private let serialize: @Sendable (T) -> [UInt8]
private var bound = false
public init(serialize: @escaping @Sendable (T) -> [UInt8]) {
self.serialize = serialize
}
public var isBound: Bool { bound }
/// Bind for sending (client-side outgoing).
func bind(channelId: ChannelId, taskTx: @escaping @Sendable (TaskMessage) -> Void) {
precondition(!bound, "UnboundTx already bound")
self.channelId = channelId
self.taskTx = taskTx
self.bound = true
}
/// Set channel ID only (when paired Rx is bound).
func setChannelIdOnly(channelId: ChannelId) {
precondition(!bound, "UnboundTx already bound")
self.channelId = channelId
self.bound = true
}
/// Send a value.
public func send(_ value: T) throws {
guard let taskTx = taskTx else {
throw ChannelError.notBound
}
let bytes = serialize(value)
taskTx(.data(channelId: channelId, payload: bytes))
}
/// Close this channel.
public func close() {
taskTx?(.close(channelId: channelId))
}
}
/// Unbound Rx - created by `channel()`, bound at call time.
public final class UnboundRx<T: Sendable>: @unchecked Sendable {
public private(set) var channelId: ChannelId = 0
private var receiver: ChannelReceiver?
private let deserialize: @Sendable ([UInt8]) throws -> T
private var bound = false
// Weak reference to paired Tx
weak var pairedTx: AnyObject?
public init(deserialize: @escaping @Sendable ([UInt8]) throws -> T) {
self.deserialize = deserialize
}
public var isBound: Bool { bound }
/// Bind for receiving (client-side incoming).
func bind(channelId: ChannelId, receiver: ChannelReceiver) {
precondition(!bound, "UnboundRx already bound")
self.channelId = channelId
self.receiver = receiver
self.bound = true
}
/// Set channel ID only (when paired Tx is bound).
func setChannelIdOnly(channelId: ChannelId) {
precondition(!bound, "UnboundRx already bound")
self.channelId = channelId
self.bound = true
}
/// Receive the next value, or nil if closed.
public func recv() async throws -> T? {
guard let receiver = receiver else {
throw ChannelError.notBound
}
guard let bytes = await receiver.recv() else {
return nil
}
return try deserialize(bytes)
}
}
// MARK: - AsyncSequence for UnboundRx
extension UnboundRx: AsyncSequence {
public typealias Element = T
public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(rx: self)
}
public struct AsyncIterator: AsyncIteratorProtocol {
let rx: UnboundRx<T>
public mutating func next() async throws -> T? {
try await rx.recv()
}
}
}
// MARK: - Channel Factory
/// Create paired unbound channels.
public func channel<T: Sendable>(
serialize: @escaping @Sendable (T) -> [UInt8],
deserialize: @escaping @Sendable ([UInt8]) throws -> T
) -> (UnboundTx<T>, UnboundRx<T>) {
let tx = UnboundTx<T>(serialize: serialize)
let rx = UnboundRx<T>(deserialize: deserialize)
rx.pairedTx = tx
return (tx, rx)
}
// MARK: - Task Sender
/// Type alias for task message sender.
public typealias TaskSender = @Sendable (TaskMessage) -> Void
// MARK: - Incoming Channel Registry
/// Type alias for incoming channel registry.
public typealias IncomingChannelRegistry = ChannelRegistry
// MARK: - Bind Channels
/// Bind channels from method arguments using schema.
public func bindChannels(
schemas: [Schema],
args: [Any],
allocator: ChannelIdAllocator,
incomingRegistry: ChannelRegistry,
taskSender: @escaping TaskSender,
serializers: any BindingSerializers
) async {
for (schema, arg) in zip(schemas, args) {
await bindValue(
schema: schema,
value: arg,
allocator: allocator,
incomingRegistry: incomingRegistry,
taskSender: taskSender,
serializers: serializers
)
}
}
private func bindValue(
schema: Schema,
value: Any,
allocator: ChannelIdAllocator,
incomingRegistry: ChannelRegistry,
taskSender: @escaping TaskSender,
serializers: any BindingSerializers
) async {
switch schema {
case .rx:
// Schema Rx = client passes Rx to method, sends via paired Tx
// Need to bind Tx for outgoing
// The value is the Rx; find its paired Tx
if let rx = value as? AnyUnboundRx {
let channelId = allocator.allocate()
rx.bindForSchema(channelId: channelId, taskSender: taskSender)
}
case .tx:
// Schema Tx = client passes Tx to method, receives via paired Rx
// Need to bind Rx for incoming
if let tx = value as? AnyUnboundTx {
let channelId = allocator.allocate()
let receiver = await incomingRegistry.register(channelId)
tx.bindForSchema(channelId: channelId, receiver: receiver)
}
case .vec(let element):
if let arr = value as? [Any] {
for item in arr {
await bindValue(
schema: element,
value: item,
allocator: allocator,
incomingRegistry: incomingRegistry,
taskSender: taskSender,
serializers: serializers
)
}
}
case .option(let inner):
// Use Mirror to check if value is Some(x) vs None
let mirror = Mirror(reflecting: value)
if mirror.displayStyle == .optional, let (_, unwrapped) = mirror.children.first {
await bindValue(
schema: inner,
value: unwrapped,
allocator: allocator,
incomingRegistry: incomingRegistry,
taskSender: taskSender,
serializers: serializers
)
}
case .struct(let fields):
// Use Mirror for struct field access
let mirror = Mirror(reflecting: value)
for (fieldName, fieldSchema) in fields {
if let child = mirror.children.first(where: { $0.label == fieldName }) {
await bindValue(
schema: fieldSchema,
value: child.value,
allocator: allocator,
incomingRegistry: incomingRegistry,
taskSender: taskSender,
serializers: serializers
)
}
}
default:
// Primitives and other types - no channels to bind
break
}
}
// MARK: - Type Erasure for Binding
/// Protocol for type-erased UnboundRx binding.
protocol AnyUnboundRx: AnyObject {
func bindForSchema(channelId: ChannelId, taskSender: @escaping TaskSender)
}
/// Protocol for type-erased UnboundTx binding.
protocol AnyUnboundTx: AnyObject {
func bindForSchema(channelId: ChannelId, receiver: ChannelReceiver)
}
extension UnboundRx: AnyUnboundRx {
func bindForSchema(channelId: ChannelId, taskSender: @escaping TaskSender) {
// Schema Rx = client sends via Tx, so bind the paired Tx
if let pairedTx = self.pairedTx as? AnyUnboundTxSender {
pairedTx.bindForSending(channelId: channelId, taskSender: taskSender)
}
self.setChannelIdOnly(channelId: channelId)
}
}
extension UnboundTx: AnyUnboundTx {
func bindForSchema(channelId: ChannelId, receiver: ChannelReceiver) {
// Schema Tx = client receives via Rx, so this Tx just gets ID
self.setChannelIdOnly(channelId: channelId)
// The Rx would be paired but we don't have reference here
// Client needs to track the Rx separately
}
}
/// Protocol for sending via Tx.
protocol AnyUnboundTxSender: AnyObject {
func bindForSending(channelId: ChannelId, taskSender: @escaping TaskSender)
}
extension UnboundTx: AnyUnboundTxSender {
func bindForSending(channelId: ChannelId, taskSender: @escaping TaskSender) {
self.bind(channelId: channelId, taskTx: taskSender)
}
}