Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ final class ConfigurationService {
projectId: InputConfig.projectId,
socketFactory: DefaultSocketFactory()
)
Networking.instance.setLogging(level: .off)
Networking.instance.setLogging(level: .debug)

let metadata = AppMetadata(
name: "Example Wallet",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ extension NotifyClient {
}

public func register(deviceToken: String) async throws {
// try await pushClient.register(deviceToken: deviceToken)
try await pushClient.register(deviceToken: deviceToken)
}
}
#endif
Expand Down
2 changes: 1 addition & 1 deletion Sources/WalletConnectRelay/PackageConfig.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"version": "1.2.0"}
{"version": "1.2.1"}
101 changes: 73 additions & 28 deletions Sources/WalletConnectRelay/RelayClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -156,24 +156,87 @@ public final class RelayClient {
}

public func subscribe(topic: String) async throws {
logger.debug("Subscribing to topic: \(topic)")
logger.debug("[Subscribe] Subscribing to topic: \(topic)")

let rpc = Subscribe(params: .init(topic: topic))
let request = rpc
.asRPCRequest()
let message = try! request.asJSONEncodedString()
let request = rpc.asRPCRequest()
let message = try request.asJSONEncodedString()

try await dispatcher.protectedSend(message)
observeSubscription(requestId: request.id!, topics: [topic])

// Wait for relay's subscription response
try await waitForSubscriptionResponse(
requestId: request.id!,
topics: [topic],
logPrefix: "[Subscribe]"
)
}

public func batchSubscribe(topics: [String]) async throws {
guard !topics.isEmpty else { return }
logger.debug("Subscribing to topics: \(topics)")
logger.debug("[BatchSubscribe] Subscribing to topics: \(topics)")

let rpc = BatchSubscribe(params: .init(topics: topics))
let request = rpc
.asRPCRequest()
let message = try! request.asJSONEncodedString()
let request = rpc.asRPCRequest()
let message = try request.asJSONEncodedString()

try await dispatcher.protectedSend(message)
observeSubscription(requestId: request.id!, topics: topics)

// Same wait, but for multiple topics
try await waitForSubscriptionResponse(
requestId: request.id!,
topics: topics,
logPrefix: "[BatchSubscribe]"
)
}

private func waitForSubscriptionResponse(
requestId: RPCID,
topics: [String],
logPrefix: String
) async throws {
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
var cancellable: AnyCancellable?

cancellable = subscriptionResponsePublisher
// Only handle responses matching this request ID
.filter { $0.0 == requestId }
// Convert Never to RelayError so we can throw on timeout
.setFailureType(to: RelayError.self)
// Enforce a 30-second timeout
.timeout(.seconds(30), scheduler: concurrentQueue, customError: { .requestTimeout })
.sink(
receiveCompletion: { [unowned self] completion in
switch completion {
case .failure(let error):
cancellable?.cancel()
logger.debug("\(logPrefix) Relay request timeout for topics: \(topics)")
continuation.resume(throwing: error)
case .finished:
// Not typically called in this pattern, but required by Combine
break
}
},
receiveValue: { [unowned self] (_, subscriptionIds) in
cancellable?.cancel()
logger.debug("\(logPrefix) Subscribed to topics: \(topics)")

// Check ID counts, warn if mismatch
guard topics.count == subscriptionIds.count else {
logger.warn("\(logPrefix) Number of returned subscription IDs != number of topics")
continuation.resume(returning: ())
return
}

// Track each subscription
for (i, topic) in topics.enumerated() {
subscriptionsTracker.setSubscription(for: topic, id: subscriptionIds[i])
}

continuation.resume(returning: ())
}
)
}
}

public func unsubscribe(topic: String) async throws {
Expand Down Expand Up @@ -219,24 +282,6 @@ public final class RelayClient {
}
}


private func observeSubscription(requestId: RPCID, topics: [String]) {
var cancellable: AnyCancellable?
cancellable = subscriptionResponsePublisher
.filter { $0.0 == requestId }
.sink { [unowned self] (_, subscriptionIds) in
cancellable?.cancel()
logger.debug("Subscribed to topics: \(topics)")
guard topics.count == subscriptionIds.count else {
logger.warn("Number of topics in (batch)subscribe does not match number of subscriptions")
return
}
for i in 0..<topics.count {
subscriptionsTracker.setSubscription(for: topics[i], id: subscriptionIds[i])
}
}
}

public func getClientId() throws -> String {
try clientIdStorage.getClientId()
}
Expand Down
11 changes: 0 additions & 11 deletions Tests/RelayerTests/RelayClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,6 @@ final class RelayClientTests: XCTestCase {
waitForExpectations(timeout: 0.001, handler: nil)
}

func testSubscribeRequest() async {
try? await sut.subscribe(topic: "")
let request = dispatcher.getLastRequestSent()
XCTAssertNotNil(request)
}

func testUnsubscribeRequest() {
let topic = String.randomTopic()
subscriptionsTracker.setSubscription(for: topic, id: "")
Expand All @@ -75,11 +69,6 @@ final class RelayClientTests: XCTestCase {
waitForExpectations(timeout: 0.1, handler: nil)
}

func testSendOnSubscribe() async {
try? await sut.subscribe(topic: "")
XCTAssertTrue(dispatcher.sent)
}

func testSendOnUnsubscribe() {
let topic = "123"
subscriptionsTracker.setSubscription(for: topic, id: "")
Expand Down
Loading