Skip to content

Commit 056c8d7

Browse files
MacOMNIxlc
andauthored
Update reconnect attempt (#216)
* Adjust test cases * update test * udpate test cases * update test cases * update test cases * update test cases * update test cases * update test cases * update reconnect maxRetryAttempts * update * Update Networking/Sources/Networking/Peer.swift Co-authored-by: Xiliang Chen <[email protected]> * Update Networking/Sources/Networking/Peer.swift Co-authored-by: Xiliang Chen <[email protected]> * update reconnect attempt --------- Co-authored-by: Xiliang Chen <[email protected]>
1 parent 8035d63 commit 056c8d7

File tree

3 files changed

+72
-44
lines changed

3 files changed

+72
-44
lines changed

Networking/Sources/Networking/Peer.swift

Lines changed: 63 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,27 @@ public enum PeerRole: Sendable, Hashable {
1616
// case proxy // not yet specified
1717
}
1818

19+
struct ReconnectState {
20+
var attempt: Int
21+
var delay: TimeInterval
22+
23+
init() {
24+
attempt = 0
25+
delay = 1
26+
}
27+
28+
// Initializer with custom values
29+
init(attempt: Int = 0, delay: TimeInterval = 1) {
30+
self.attempt = attempt
31+
self.delay = delay
32+
}
33+
34+
mutating func applyBackoff() {
35+
attempt += 1
36+
delay *= 2
37+
}
38+
}
39+
1940
public struct PeerOptions<Handler: StreamHandler>: Sendable {
2041
public var role: PeerRole
2142
public var listenAddress: NetAddr
@@ -50,7 +71,7 @@ public struct PeerOptions<Handler: StreamHandler>: Sendable {
5071
}
5172
}
5273

53-
// TODO: reconnects, reopen UP stream, peer reputation system to ban peers not following the protocol
74+
// TODO: reopen UP stream, peer reputation system to ban peers not following the protocol
5475
public final class Peer<Handler: StreamHandler>: Sendable {
5576
private let impl: PeerImpl<Handler>
5677

@@ -61,7 +82,6 @@ public final class Peer<Handler: StreamHandler>: Sendable {
6182
}
6283

6384
public let publicKey: Data
64-
6585
public init(options: PeerOptions<Handler>) throws {
6686
let logger = Logger(label: "Peer".uniqueId)
6787

@@ -214,7 +234,9 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
214234

215235
fileprivate let connections: ThreadSafeContainer<ConnectionStorage> = .init(.init())
216236
fileprivate let streams: ThreadSafeContainer<[UniqueId: Stream<Handler>]> = .init([:])
237+
fileprivate let reconnectStates: ThreadSafeContainer<[NetAddr: ReconnectState]> = .init([:])
217238

239+
let reconnectMaxRetryAttempts = 5
218240
let presistentStreamHandler: Handler.PresistentHandler
219241
let ephemeralStreamHandler: Handler.EphemeralHandler
220242

@@ -271,29 +293,43 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
271293
}
272294
}
273295

274-
// TODO: Add reconnection attempts & Apply exponential backoff delay
275296
func reconnect(to address: NetAddr, role: PeerRole) throws {
276-
logger.debug("reconnecting", metadata: ["to address": "\(address)", "role": "\(role)"])
277-
try connections.write { connections in
278-
if connections.byAddr[address] != nil {
279-
logger.warning("reconnecting to \(address) already connected")
280-
return
297+
let state = reconnectStates.read { reconnectStates in
298+
reconnectStates[address] ?? .init()
299+
}
300+
if state.attempt < reconnectMaxRetryAttempts {
301+
reconnectStates.write { reconnectStates in
302+
if var state = reconnectStates[address] {
303+
state.applyBackoff()
304+
reconnectStates[address] = state
305+
}
281306
}
282-
let quicConn = try QuicConnection(
283-
handler: PeerEventHandler(self),
284-
registration: clientConfiguration.registration,
285-
configuration: clientConfiguration
286-
)
287-
try quicConn.connect(to: address)
288-
let conn = Connection(
289-
quicConn,
290-
impl: self,
291-
role: role,
292-
remoteAddress: address,
293-
initiatedByLocal: true
294-
)
295-
connections.byAddr[address] = conn
296-
connections.byId[conn.id] = conn
307+
Task {
308+
try await Task.sleep(for: .seconds(state.delay))
309+
try connections.write { connections in
310+
if connections.byAddr[address] != nil {
311+
logger.warning("reconnecting to \(address) already connected")
312+
return
313+
}
314+
let quicConn = try QuicConnection(
315+
handler: PeerEventHandler(self),
316+
registration: clientConfiguration.registration,
317+
configuration: clientConfiguration
318+
)
319+
try quicConn.connect(to: address)
320+
let conn = Connection(
321+
quicConn,
322+
impl: self,
323+
role: role,
324+
remoteAddress: address,
325+
initiatedByLocal: true
326+
)
327+
connections.byAddr[address] = conn
328+
connections.byId[conn.id] = conn
329+
}
330+
}
331+
} else {
332+
logger.warning("reconnect attempt exceeded max attempts")
297333
}
298334
}
299335

@@ -417,6 +453,10 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
417453
)
418454
return
419455
}
456+
// Check if the connection is already reconnected
457+
impl.reconnectStates.write { reconnectStates in
458+
reconnectStates[conn.remoteAddress] = nil
459+
}
420460

421461
if conn.initiatedByLocal {
422462
for kind in Handler.PresistentHandler.StreamKind.allCases {

Networking/Tests/NetworkingTests/MockPeerEventTests.swift

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,8 @@ final class MockPeerEventTests {
147147
configuration: clientConfiguration
148148
)
149149

150-
// Attempt to connect
151150
try clientConnection.connect(to: listenAddress)
152-
let stream1 = try clientConnection.createStream()
153-
try stream1.send(data: Data("test data 1".utf8))
154-
155-
try? await Task.sleep(for: .milliseconds(100))
151+
try await Task.sleep(for: .milliseconds(100))
156152
let (_, reason) = clientHandler.events.value.compactMap {
157153
switch $0 {
158154
case let .shutdownInitiated(connection, reason):
@@ -212,7 +208,7 @@ final class MockPeerEventTests {
212208
let stream1 = try clientConnection.createStream()
213209
try stream1.send(data: Data("test data 1".utf8))
214210

215-
try? await Task.sleep(for: .milliseconds(100))
211+
try await Task.sleep(for: .milliseconds(100))
216212
let (_, info) = serverHandler.events.value.compactMap {
217213
switch $0 {
218214
case let .newConnection(_, connection, info):
@@ -266,14 +262,9 @@ final class MockPeerEventTests {
266262
registration: registration,
267263
configuration: clientConfiguration
268264
)
269-
270-
// Attempt to connect
271265
try clientConnection.connect(to: listenAddress)
272-
let stream1 = try clientConnection.createStream()
273-
try stream1.send(data: Data("test data 1".utf8))
274-
275-
try? await Task.sleep(for: .milliseconds(100))
276-
let (_, reason) = serverHandler.events.value.compactMap {
266+
try await Task.sleep(for: .milliseconds(100))
267+
let (_, reason) = clientHandler.events.value.compactMap {
277268
switch $0 {
278269
case let .shutdownInitiated(connection, reason):
279270
(connection, reason) as (QuicConnection, ConnectionCloseReason)?

Networking/Tests/NetworkingTests/PeerTests.swift

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -297,12 +297,12 @@ struct PeerTests {
297297
try? await Task.sleep(for: .milliseconds(100))
298298
// Simulate abnormal shutdown of connections
299299
connection.close(abort: true)
300-
// Wait to simulate downtime
301-
try? await Task.sleep(for: .milliseconds(200))
300+
// Wait to simulate downtime & reconnected 3~5s
301+
try? await Task.sleep(for: .milliseconds(3000))
302302
peer1.broadcast(
303303
kind: .uniqueC, message: .init(kind: .uniqueC, data: messageData)
304304
)
305-
try? await Task.sleep(for: .milliseconds(1000))
305+
try await Task.sleep(for: .milliseconds(1000))
306306
let lastReceivedData = await handler2.lastReceivedData
307307
#expect(lastReceivedData == messageData)
308308
}
@@ -693,12 +693,9 @@ struct PeerTests {
693693
}
694694

695695
var connections = [Connection<MockStreamHandler>]()
696-
for i in 0 ..< peers.count {
696+
for i in 0 ..< peersCount {
697697
let peer = peers[i]
698-
for j in 0 ..< peers.count {
699-
if i >= j {
700-
continue
701-
}
698+
for j in i + 1 ..< peersCount {
702699
let otherPeer = peers[j]
703700
let conn = try peer.connect(
704701
to: otherPeer.listenAddress(),

0 commit comments

Comments
 (0)