@@ -238,52 +238,6 @@ import Foundation
238238 }
239239 }
240240
241- @StableNames
242- distributed actor PubSubTest : HasRemoteCallAdapter {
243- typealias ActorSystem = ErlangActorSystem
244-
245- nonisolated var remoteCallAdapter : some RemoteCallAdapter {
246- . primitive
247- }
248-
249- @StableName ( " hello_world " )
250- distributed func helloWorld( ) {
251- print ( " Received hello world event " )
252- }
253- }
254-
255- @Test func pubSub( ) async throws {
256- let actorSystem = try await ErlangActorSystem ( name: " swift " , cookie: " LJTPNYYQIOIRKYDCWCQH " )
257- try await actorSystem. connect ( to: " iex@DCKYRD-NMXCKatri " )
258-
259- let actor = PubSubTest ( actorSystem: actorSystem)
260-
261- let pubSub = try await PubSub ( name: " test_pubsub " , actorSystem: actorSystem)
262-
263- await pubSub. whenLocal {
264- $0. subscribe ( actor , to: " topic " )
265- }
266-
267- print ( " joined " )
268-
269- try await pubSub. whenLocal { try await $0. broadcast ( Term . atom ( " hello_world " ) , to: " topic " ) }
270-
271- try await Task . sleep ( for: . seconds( 1024 ) )
272- }
273-
274- @Resolvable
275- @StableNames
276- protocol CounterProtocol : DistributedActor , HasStableNames where ActorSystem == ErlangActorSystem {
277- @StableName ( " count " )
278- distributed var count : Int { get }
279-
280- @StableName ( " increment " )
281- distributed func increment( )
282-
283- @StableName ( " decrement " )
284- distributed func decrement( )
285- }
286-
287241 @StableNames
288242 distributed actor Counter : CounterProtocol {
289243 var _count = 0
@@ -329,181 +283,15 @@ import Foundation
329283 }
330284}
331285
332- import CErlInterface
333-
334- /// ```elixir
335- /// {:forward_to_local, "<topic>", <message>, <dispatcher>}
336- /// ```
337- ///
338- /// 'Elixir.Phoenix.PubSub'
339- struct PubSubRemoteCallAdapter : RemoteCallAdapter {
340- let topic : String
341- let primitive = PrimitiveRemoteCallAdapter ( )
342-
343- func encode(
344- _ invocation: RemoteCallInvocation ,
345- for system: ActorSystem
346- ) throws -> EncodedRemoteCall {
347- let message = ErlangTermBuffer ( )
348- message. newWithVersion ( )
349- message. encode ( tupleHeader: 4 )
350- _ = Data ( topic. utf8) . withUnsafeBytes {
351- message. encode ( binary: $0. baseAddress!, len: Int32 ( $0. count) )
352- }
353-
354- message. append ( try primitive. encode ( invocation, for: system) . message)
355-
356- message. encode ( atom: " Elixir.Phoenix.PubSub " )
357- print ( message)
358-
359- return EncodedRemoteCall (
360- message: message,
361- continuationAdapter: nil
362- )
363- }
364-
365- func decode(
366- _ message: ErlangTermBuffer ,
367- for system: ActorSystem
368- ) throws -> LocalCallInvocation {
369- var index : Int32 = 0
370-
371- var version : Int32 = 0
372- message. decode ( version: & version, index: & index)
373-
374- var arity : Int32 = 0
375- message. decode ( tupleHeader: & arity, index: & index)
376-
377- message. skipTerm ( index: & index) // :forward_to_local
378-
379- var type : UInt32 = 0
380- var size : Int32 = 0
381- message. getType ( type: & type, size: & size, index: & index)
382- let binary : UnsafeMutableRawPointer = . allocate( byteCount: Int ( size) , alignment: 0 )
383- var length : Int = 0
384- message. decode ( binary: binary, len: & length, index: & index)
385- let topic = String ( data: Data ( bytes: binary, count: length) , encoding: . utf8) !
386- guard topic == self . topic else { throw PubSubError . unhandledTopic ( topic) }
387-
388- let messageStart = index
389- message. skipTerm ( index: & index)
390-
391- return try primitive. decode ( message [ messageStart..< index] , for: system)
392- }
393-
394- enum PubSubError : Error {
395- case unhandledTopic( String )
396- }
397- }
398-
399- extension RemoteCallAdapter where Self == ProcessGroupRemoteCallAdapter {
400- static func pubSub( topic: String ) -> ProcessGroupRemoteCallAdapter {
401- . processGroup( PubSubRemoteCallAdapter ( topic: topic) )
402- }
403- }
404-
286+ @Resolvable
405287@StableNames
406- public distributed actor PubSub : HasRemoteCallAdapter {
407- public typealias ActorSystem = ErlangActorSystem
408-
409- private let name : String
410- private var subscribers = [ String : Set < ActorSystem . ActorID > ] ( )
411-
412- public nonisolated var remoteCallAdapter : ProcessGroupRemoteCallAdapter {
413- . processGroup
414- }
415-
416- public init ( name: String , actorSystem: ActorSystem ) async throws {
417- self . name = name
418- self . actorSystem = actorSystem
419-
420- try await self . join (
421- scope: " Elixir.Phoenix.PubSub " ,
422- group: " Elixir. \( name) .Adapter "
423- )
424- }
425-
426- @StableName ( " forward_to_local " )
427- distributed func forwardToLocal( topic: String , message: Term , dispatcher: String ) async throws {
428- for subscriber in subscribers [ topic] ?? [ ] {
429- guard let actor = actorSystem. resolve ( id: subscriber)
430- else { continue }
431-
432- let remoteCallAdapter = ( actor as? any HasRemoteCallAdapter ) ? . remoteCallAdapter ?? actorSystem. remoteCallAdapter
433- let localCall = try remoteCallAdapter. decode ( message. makeBuffer ( ) , for: actorSystem)
434-
435- nonisolated ( unsafe) let handler = ActorSystem . ResultHandler (
436- sender: localCall. sender,
437- resultHandlerAdapter: localCall. resultHandler,
438- fileDescriptor: 0
439- )
440-
441- let decoder = TermDecoder ( )
442- nonisolated ( unsafe) var invocationDecoder = ActorSystem . InvocationDecoder (
443- buffer: localCall. arguments,
444- decoder: decoder,
445- index: 0
446- )
447-
448- if let stableNamed = actor as? any HasStableNames {
449- try ! await stableNamed. _executeStableName (
450- target: RemoteCallTarget ( localCall. identifier) ,
451- invocationDecoder: & invocationDecoder,
452- handler: handler
453- )
454- } else {
455- try await actorSystem. executeDistributedTarget (
456- on: actor ,
457- target: RemoteCallTarget ( localCall. identifier) ,
458- invocationDecoder: & invocationDecoder,
459- handler: handler
460- )
461- }
462- }
463- }
288+ protocol CounterProtocol : DistributedActor , HasStableNames where ActorSystem == ErlangActorSystem {
289+ @StableName ( " count " )
290+ distributed var count : Int { get }
464291
465- public func subscribe< Act> ( _ act: Act , to topic: String )
466- where Act: DistributedActor , Act. ID == ActorSystem . ActorID
467- {
468- subscribers [ topic, default: [ ] ] . insert ( act. id)
469- }
470-
471- public func unsubscribe< Act> ( _ act: Act , to topic: String )
472- where Act: DistributedActor , Act. ID == ActorSystem . ActorID
473- {
474- subscribers [ topic] ? . remove ( act. id)
475- }
292+ @StableName ( " increment " )
293+ distributed func increment( )
476294
477- public func localBroadcast< Act: DistributedActor > (
478- to topic: String ,
479- as actorType: Act . Type = Act . self,
480- _ body: @Sendable ( Act) async throws -> ( )
481- ) async throws where Act. ID == ActorSystem . ActorID {
482- for subscriber in subscribers [ " topic " ] ?? [ ] {
483- guard let actor = try actorSystem. resolve ( id: subscriber, as: actorType)
484- else { continue }
485- try await body ( actor )
486- }
487- }
488-
489- public func broadcast(
490- _ message: some Encodable ,
491- to topic: String
492- ) async throws {
493- let encoder = TermEncoder ( )
494- encoder. includeVersion = false
495- let message = try Term ( from: encoder. encode ( message) )
496-
497- try await forwardToLocal ( topic: topic, message: message, dispatcher: " Elixir.Phoenix.PubSub " )
498-
499- for member in try await ProcessGroups . remoteMembers (
500- scope: " Elixir.Phoenix.PubSub " ,
501- group: " Elixir. \( name) .Adapter " ,
502- using: actorSystem
503- ) {
504- print ( member)
505- let remote = try PubSub . resolve ( id: member, using: actorSystem)
506- try await remote. forwardToLocal ( topic: topic, message: message, dispatcher: " Elixir.Phoenix.Pubsub " )
507- }
508- }
509- }
295+ @StableName ( " decrement " )
296+ distributed func decrement( )
297+ }
0 commit comments