Skip to content

Commit 7f3729e

Browse files
committed
Resolve issues with concurrent message sends, and improve transport decoupling
1 parent 28e18bb commit 7f3729e

File tree

7 files changed

+313
-180
lines changed

7 files changed

+313
-180
lines changed

Sources/ErlangActorSystem/ErlangActorSystem.swift

Lines changed: 121 additions & 134 deletions
Large diffs are not rendered by default.

Sources/ErlangActorSystem/RemoteCallAdapter.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,6 @@ public protocol ResultHandlerAdapter {
7979
func encode(throwing error: some Error) throws -> Message
8080
}
8181

82-
public protocol ContinuationAdapter {
82+
public protocol ContinuationAdapter: Sendable {
8383
func decode(_ message: ErlangTermBuffer) throws -> Result<ErlangTermBuffer, any Error>
8484
}

Sources/ErlangActorSystem/RemoteCallAdapters/GenServerRemoteCallAdapter.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ public struct GenServerRemoteCallAdapter: RemoteCallAdapter {
251251
let monitorReference: ErlangTermBuffer
252252

253253
/// `{<ref>, <reply>}`
254-
func encode(returning value: Value) throws -> Message {
254+
func encode(returning value: Value) throws -> Self.Message {
255255
let result = ErlangTermBuffer()
256256
result.newWithVersion()
257257

@@ -263,7 +263,7 @@ public struct GenServerRemoteCallAdapter: RemoteCallAdapter {
263263
}
264264

265265
/// `{<ref>, :ok}`
266-
func encodeVoid() throws -> Message {
266+
func encodeVoid() throws -> Self.Message {
267267
let result = ErlangTermBuffer()
268268
result.newWithVersion()
269269

@@ -275,7 +275,7 @@ public struct GenServerRemoteCallAdapter: RemoteCallAdapter {
275275
}
276276

277277
/// `{<ref>, {:error, "msg"}}`
278-
func encode(throwing error: some Error) throws -> Message {
278+
func encode(throwing error: some Error) throws -> Self.Message {
279279
let result = ErlangTermBuffer()
280280
result.newWithVersion()
281281

Sources/ErlangActorSystem/RemoteCallAdapters/ProcessGroupRemoteCallAdapter.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ public distributed actor ProcessGroups: HasRemoteCallAdapter {
175175

176176
// send "discover" to all nodes in the cluster
177177
try await broadcast(
178-
to: actorSystem.remoteNodes.keys.map({ .name(scope, node: $0) })
178+
to: actorSystem.remoteNodes.withLock { $0.keys.map({ .name(scope, node: $0) }) }
179179
) { remote in
180180
try await remote.discover(self.id)
181181
}

Sources/ErlangActorSystem/Transport.swift

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import Synchronization
2+
13
public protocol Transport {
24
typealias ListenSocket = Int32
35
typealias AcceptSocket = Int32
@@ -26,10 +28,88 @@ public protocol Transport {
2628

2729
mutating func connect(to ipAddress: String, port: Int) async throws -> AcceptSocket
2830

29-
mutating func send(_ message: ErlangTermBuffer, to pid: Term.PID, on socket: AcceptSocket) throws
31+
mutating func send(_ message: SendMessage, on socket: AcceptSocket) async throws
3032

31-
mutating func send(_ message: ErlangTermBuffer, to name: String, on socket: AcceptSocket) throws
33+
func receive(on socket: AcceptSocket) throws -> ReceiveResult
3234

3335
mutating func makePID() -> Term.PID
3436
mutating func makeReference() -> Term.Reference
3537
}
38+
39+
/// This type only exists to make `ErlangTermBuffer` sendable. It is not sendable, and should not be
40+
/// modified concurrently. However, `send(_:on:)` hands the value off to the `Transport`, so it should
41+
/// be safe.
42+
///
43+
/// Ideally, we should be able to specify `sending ErlangTermBuffer` as an argument in a protocol.
44+
/// However, there is a bug in Swift 6.1 (and possibly 6.2) that prevents this from working.
45+
/// It looks like it has been fixed on `main` though, so we should be able to drop it in the next Swift version.
46+
///
47+
/// https://github.com/swiftlang/swift/issues/78588 (merged April 26, 2025)
48+
public struct SendMessage: @unchecked Sendable {
49+
let content: ErlangTermBuffer
50+
let recipient: Recipient
51+
52+
enum Recipient: Sendable {
53+
case pid(Term.PID)
54+
case name(String)
55+
}
56+
}
57+
58+
public enum ReceiveResult {
59+
case tick
60+
case success(Message)
61+
}
62+
63+
public struct Message {
64+
public let info: Info
65+
public let content: ErlangTermBuffer
66+
67+
public struct Info: Sendable {
68+
public let kind: Kind
69+
public let sender: Term.PID
70+
public let recipient: Term.PID
71+
public let namedRecipient: String
72+
73+
public init(
74+
kind: Kind,
75+
sender: Term.PID,
76+
recipient: Term.PID,
77+
namedRecipient: String
78+
) {
79+
self.kind = kind
80+
self.sender = sender
81+
self.recipient = recipient
82+
self.namedRecipient = namedRecipient
83+
}
84+
}
85+
86+
public enum Kind: Int, Sendable {
87+
case link = 1
88+
case send = 2
89+
case exit = 3
90+
case unlink = 4
91+
case nodeLink = 5
92+
case regSend = 6
93+
case groupLeader = 7
94+
case exit2 = 8
95+
case passThrough = 112
96+
97+
case sendTraceToken = 12
98+
case exitTraceToken = 13
99+
case regSendTraceToken = 16
100+
case exit2TraceToken = 18
101+
case monitorProcess = 19
102+
case demonitorProcess = 20
103+
case monitorProcessExit = 21
104+
105+
case unknown = -1
106+
}
107+
108+
public init(
109+
info: Info,
110+
content: ErlangTermBuffer
111+
) {
112+
self.info = info
113+
self.content = content
114+
}
115+
}

Sources/ErlangActorSystem/Transports/ErlInterfaceTransport.swift

Lines changed: 61 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import CErlInterface
33
import Glibc
44
#endif
55

6-
public struct ErlInterfaceTransport: Transport {
6+
public final class ErlInterfaceTransport: Transport {
77
private var node: ei_cnode = ei_cnode()
88

99
public var name: String {
@@ -32,12 +32,12 @@ public struct ErlInterfaceTransport: Transport {
3232
self.backlog = backlog
3333
}
3434

35-
public mutating func setup(name: String, cookie: String) throws {
35+
public func setup(name: String, cookie: String) throws {
3636
guard ei_connect_init(&node, name, cookie, UInt32(time(nil) + 1)) >= 0
3737
else { throw TransportError.initFailed }
3838
}
3939

40-
public mutating func setup(
40+
public func setup(
4141
hostName: String,
4242
aliveName: String,
4343
nodeName: String,
@@ -58,20 +58,20 @@ public struct ErlInterfaceTransport: Transport {
5858
else { throw TransportError.initFailed }
5959
}
6060

61-
public mutating func listen(port: Int) throws -> (socket: ListenSocket, port: Int) {
61+
public func listen(port: Int) throws -> (socket: ListenSocket, port: Int) {
6262
var port = Int32(port)
6363
let listen = ei_listen(&node, &port, 5)
6464
guard listen > 0
6565
else { throw TransportError.listenFailed }
6666
return (socket: listen, port: Int(port))
6767
}
6868

69-
public mutating func publish(port: Int) throws {
69+
public func publish(port: Int) throws {
7070
guard ei_publish(&node, Int32(port)) != -1
7171
else { throw TransportError.publishFailed }
7272
}
7373

74-
public mutating func accept(from listen: ListenSocket) throws -> (socket: AcceptSocket, name: String) {
74+
public func accept(from listen: ListenSocket) throws -> (socket: AcceptSocket, name: String) {
7575
var conn = ErlConnect()
7676
let fileDescriptor = ei_accept(&node, listen, &conn)
7777
guard fileDescriptor >= 0,
@@ -80,7 +80,7 @@ public struct ErlInterfaceTransport: Transport {
8080
return (socket: fileDescriptor, name: nodeName)
8181
}
8282

83-
public mutating func connect(to nodeName: String) throws -> AcceptSocket {
83+
public func connect(to nodeName: String) throws -> AcceptSocket {
8484
let fileDescriptor = nodeName.withCString { nodeName in
8585
ei_connect(&node, UnsafeMutablePointer(mutating: nodeName))
8686
}
@@ -89,7 +89,7 @@ public struct ErlInterfaceTransport: Transport {
8989
return fileDescriptor
9090
}
9191

92-
public mutating func connect(to ipAddress: String, port: Int) throws -> AcceptSocket {
92+
public func connect(to ipAddress: String, port: Int) throws -> AcceptSocket {
9393
var addr = in_addr()
9494
_ = ipAddress.withCString { ipAddress in
9595
inet_aton(ipAddress, &addr)
@@ -102,37 +102,69 @@ public struct ErlInterfaceTransport: Transport {
102102
return fileDescriptor
103103
}
104104

105-
public mutating func send(_ message: ErlangTermBuffer, to pid: Term.PID, on socket: AcceptSocket) throws {
106-
var pid = pid.pid
107-
guard ei_send(
108-
socket,
109-
&pid,
110-
message.buff,
111-
message.buffsz
112-
) >= 0
113-
else { throw TransportError.sendFailed }
114-
}
115-
116-
public mutating func send(_ message: ErlangTermBuffer, to name: String, on socket: AcceptSocket) throws {
117-
try name.withCString { name in
118-
guard ei_reg_send(
119-
&self.node,
105+
@MainActor
106+
public func send(_ message: SendMessage, on socket: AcceptSocket) async throws {
107+
switch message.recipient {
108+
case let .pid(pid):
109+
var pid = pid.pid
110+
guard ei_send(
120111
socket,
121-
UnsafeMutablePointer(mutating: name),
122-
message.buff,
123-
message.index
112+
&pid,
113+
message.content.buff,
114+
message.content.buffsz
124115
) >= 0
125-
else { throw ErlangActorSystemError.sendFailed }
116+
else { throw TransportError.sendFailed }
117+
case let .name(name):
118+
try name.withCString { name in
119+
guard ei_reg_send(
120+
&self.node,
121+
socket,
122+
UnsafeMutablePointer(mutating: name),
123+
message.content.buff,
124+
message.content.index
125+
) >= 0
126+
else { throw ErlangActorSystemError.sendFailed }
127+
}
126128
}
127129
}
128130

129-
public mutating func makePID() -> Term.PID {
131+
public func receive(on socket: AcceptSocket) throws -> ReceiveResult {
132+
var message = erlang_msg()
133+
let content = ErlangTermBuffer()
134+
content.new()
135+
136+
switch ei_xreceive_msg(socket, &message, &content.buffer) {
137+
case ERL_TICK:
138+
return .tick
139+
case ERL_ERROR:
140+
throw ReceiveError.receiveFailed
141+
case ERL_MSG:
142+
return .success(Message(
143+
info: Message.Info(
144+
kind: Message.Kind(rawValue: message.msgtype) ?? .unknown,
145+
sender: Term.PID(pid: message.from),
146+
recipient: Term.PID(pid: message.to),
147+
namedRecipient: String(cString: Array(tuple: message.toname, start: \.0), encoding: .utf8)!
148+
),
149+
content: content
150+
))
151+
case let messageKind:
152+
throw ReceiveError.unknownMessageKind(messageKind)
153+
}
154+
}
155+
156+
enum ReceiveError: Error {
157+
case receiveFailed
158+
case unknownMessageKind(Int32)
159+
}
160+
161+
public func makePID() -> Term.PID {
130162
var pid = erlang_pid()
131163
ei_make_pid(&node, &pid)
132164
return Term.PID(pid: pid)
133165
}
134166

135-
public mutating func makeReference() -> Term.Reference {
167+
public func makeReference() -> Term.Reference {
136168
var ref = erlang_ref()
137169
ei_make_ref(&node, &ref)
138170
return Term.Reference(ref: ref)

Tests/ErlangActorSystemBenchmarks/ErlangActorSystemBenchmarks.swift

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,60 @@ import ErlangActorSystem
33

44
@main
55
struct ErlangActorSystemBenchmarks {
6+
static let clock = ContinuousClock()
7+
68
static func main() async throws {
7-
let actorSystem = try await ErlangActorSystem(name: "benchmark", cookie: "cookie")
9+
let actorSystem1 = try await ErlangActorSystem(name: "a", cookie: "cookie")
10+
let actorSystem2 = try await ErlangActorSystem(name: "b", cookie: "cookie")
11+
try await actorSystem1.connect(to: actorSystem2.name)
812

9-
var actors = [PingPongActor]()
10-
let clock = ContinuousClock()
11-
let start = clock.now
13+
let actors = await benchmark(label: "create 1_000_000 actors") {
14+
var actors = [PingPongActor]()
15+
for _ in 0..<1_000_000 {
16+
let actor = PingPongActor(actorSystem: actorSystem1)
17+
actors.append(actor)
18+
}
19+
return actors
20+
}
1221

13-
for _ in 0..<1_000_000 {
14-
let actor = PingPongActor(actorSystem: actorSystem)
15-
actors.append(actor)
22+
let remoteActors = try await benchmark(label: "resolve 1_000_000 remote actors") {
23+
try actors.map { localActor in
24+
try PingPongActor.resolve(id: localActor.id, using: actorSystem2)
25+
}
1626
}
1727

18-
let result = start.duration(to: .now)
19-
print(result)
28+
try await benchmark(label: "ping 1_000_000 remote actors") {
29+
try await withThrowingDiscardingTaskGroup { group in
30+
for actor in remoteActors {
31+
group.addTask {
32+
_ = try await actor.ping()
33+
}
34+
}
35+
}
36+
}
37+
}
38+
39+
static func benchmark<T>(
40+
label: String,
41+
_ block: @Sendable () async throws -> T
42+
) async rethrows -> T {
43+
let start = clock.now
44+
let result = try await block()
45+
let duration = start.duration(to: .now)
46+
print("'\(label)' took \(duration)")
47+
return result
2048
}
2149
}
2250

23-
distributed actor PingPongActor {
51+
@StableNames
52+
distributed actor PingPongActor: HasRemoteCallAdapter {
2453
typealias ActorSystem = ErlangActorSystem
2554

55+
nonisolated var remoteCallAdapter: some RemoteCallAdapter {
56+
.genServer
57+
}
58+
59+
@StableName("ping")
2660
distributed func ping() -> String {
2761
return "pong"
2862
}

0 commit comments

Comments
 (0)