Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 46 additions & 72 deletions Sources/AblyChat/DefaultMessages.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ private struct MessageSubscriptionWrapper {
var serial: String
}

#if DEBUG
extension ARTMessage: @retroactive @unchecked Sendable {}
#endif

// TODO: Don't have a strong understanding of why @MainActor is needed here. Revisit as part of https://github.com/ably-labs/ably-chat-swift/issues/83
@MainActor
internal final class DefaultMessages: Messages, EmitsDiscontinuities {
Expand Down Expand Up @@ -58,70 +54,60 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
// (CHA-M5k) Incoming realtime events that are malformed (unknown field should be ignored) shall not be emitted to subscribers.
let eventListener = channel.subscribe(RealtimeMessageName.chatMessage.rawValue) { message in
Task {
do {
// TODO: Revisit errors thrown as part of https://github.com/ably-labs/ably-chat-swift/issues/32
guard let ablyCocoaData = message.data,
let data = JSONValue(ablyCocoaData: ablyCocoaData).objectValue,
let text = data["text"]?.stringValue
else {
throw ARTErrorInfo.create(withCode: 50000, status: 500, message: "Received incoming message without data or text")
}

guard let ablyCocoaExtras = message.extras else {
throw ARTErrorInfo.create(withCode: 50000, status: 500, message: "Received incoming message without extras")
}

let extras = JSONValue.objectFromAblyCocoaExtras(ablyCocoaExtras)
// TODO: Revisit errors thrown as part of https://github.com/ably-labs/ably-chat-swift/issues/32
guard let ablyCocoaData = message.data,
let data = JSONValue(ablyCocoaData: ablyCocoaData).objectValue,
let text = data["text"]?.stringValue
else {
throw ARTErrorInfo.create(withCode: 50000, status: 500, message: "Received incoming message without data or text")
}

guard let serial = message.serial else {
throw ARTErrorInfo.create(withCode: 50000, status: 500, message: "Received incoming message without serial")
}
guard let ablyCocoaExtras = message.extras else {
throw ARTErrorInfo.create(withCode: 50000, status: 500, message: "Received incoming message without extras")
}

guard let clientID = message.clientId else {
throw ARTErrorInfo.create(withCode: 50000, status: 500, message: "Received incoming message without clientId")
}
let extras = JSONValue.objectFromAblyCocoaExtras(ablyCocoaExtras)

guard let version = message.version else {
throw ARTErrorInfo.create(withCode: 50000, status: 500, message: "Received incoming message without version")
}
guard let serial = message.serial else {
throw ARTErrorInfo.create(withCode: 50000, status: 500, message: "Received incoming message without serial")
}

let metadata = try data.optionalObjectValueForKey("metadata")
guard let clientID = message.clientId else {
throw ARTErrorInfo.create(withCode: 50000, status: 500, message: "Received incoming message without clientId")
}

let headers: Headers? = if let headersJSONObject = try extras.optionalObjectValueForKey("headers") {
try headersJSONObject.mapValues { try HeadersValue(jsonValue: $0) }
} else {
nil
}
guard let version = message.version else {
throw ARTErrorInfo.create(withCode: 50000, status: 500, message: "Received incoming message without version")
}

guard let action = MessageAction.fromRealtimeAction(message.action) else {
return
}
let metadata = try data.optionalObjectValueForKey("metadata")

// `message.operation?.toChatOperation()` is throwing but the linter prefers putting the `try` on Message initialization instead of having it nested.
let message = try Message(
serial: serial,
action: action,
clientID: clientID,
roomID: self.roomID,
text: text,
createdAt: message.timestamp,
metadata: metadata ?? .init(),
headers: headers ?? .init(),
version: version,
timestamp: message.timestamp,
operation: message.operation?.toChatOperation()
)
let headers: Headers? = if let headersJSONObject = try extras.optionalObjectValueForKey("headers") {
try headersJSONObject.mapValues { try HeadersValue(jsonValue: $0) }
} else {
nil
}

messageSubscription.emit(message)
} catch {
self.logger.log(message: "Malformed message received: \(error)", level: .debug)
#if DEBUG
for subscription in self.malformedMessageSubscriptions {
subscription.emit(message)
}
#endif
throw error
guard let action = MessageAction.fromRealtimeAction(message.action) else {
return
}

// `message.operation?.toChatOperation()` is throwing but the linter prefers putting the `try` on Message initialization instead of having it nested.
let message = try Message(
serial: serial,
action: action,
clientID: clientID,
roomID: self.roomID,
text: text,
createdAt: message.timestamp,
metadata: metadata ?? .init(),
headers: headers ?? .init(),
version: version,
timestamp: message.timestamp,
operation: message.operation?.toChatOperation()
)

messageSubscription.emit(message)
}
}

Expand All @@ -140,18 +126,6 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
return messageSubscription
}

#if DEBUG
/// Subscription of malformed message events for testing purposes.
private var malformedMessageSubscriptions: [Subscription<ARTMessage>] = []

/// Returns a subscription which emits malformed message events for testing purposes.
internal func testsOnly_subscribeToMalformedMessageEvents() -> Subscription<ARTMessage> {
let subscription = Subscription<ARTMessage>(bufferingPolicy: .unbounded)
malformedMessageSubscriptions.append(subscription)
return subscription
}
#endif

// (CHA-M6a) A method must be exposed that accepts the standard Ably REST API query parameters. It shall call the “REST API”#rest-fetching-messages and return a PaginatedResult containing messages, which can then be paginated through.
internal func get(options: QueryOptions) async throws -> any PaginatedResult<Message> {
try await chatAPI.getMessages(roomId: roomID, params: options)
Expand Down Expand Up @@ -205,7 +179,7 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
}
}

// (CHA-M5d) If a channel UPDATE event is received and resumed=false, then it must be assumed that messages have been missed. The subscription point of any subscribers must be reset to the attachSerial.
// (CHA-M4d) If a channel UPDATE event is received and resumed=false, then it must be assumed that messages have been missed. The subscription point of any subscribers must be reset to the attachSerial.
channel.on(.update) { [weak self] stateChange in
Task {
do {
Expand Down
17 changes: 0 additions & 17 deletions Sources/AblyChat/DefaultRoomReactions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ internal final class DefaultRoomReactions: RoomReactions, EmitsDiscontinuities {
subscription.emit(reaction)
} catch {
logger.log(message: "Error processing incoming reaction message: \(error)", level: .error)
#if DEBUG
for subscription in self.malformedMessageSubscriptions {
subscription.emit(message)
}
#endif
}
}
}
Expand All @@ -102,16 +97,4 @@ internal final class DefaultRoomReactions: RoomReactions, EmitsDiscontinuities {
private enum RoomReactionsError: Error {
case noReferenceToSelf
}

#if DEBUG
/// Subscription of malformed message events for testing purposes.
private var malformedMessageSubscriptions: [Subscription<ARTMessage>] = []

/// Returns a subscription which emits malformed message events for testing purposes.
internal func testsOnly_subscribeToMalformedMessageEvents() -> Subscription<ARTMessage> {
let subscription = Subscription<ARTMessage>(bufferingPolicy: .unbounded)
malformedMessageSubscriptions.append(subscription)
return subscription
}
#endif
}
81 changes: 5 additions & 76 deletions Sources/AblyChat/DefaultTyping.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,14 @@ internal final class DefaultTyping: Typing {
private let clientID: String
private let logger: InternalLogger
private let timeout: TimeInterval
private let maxPresenceGetRetryDuration: TimeInterval // Max duration as specified in CHA-T6c1
private let timerManager = TimerManager()

internal init(featureChannel: FeatureChannel, roomID: String, clientID: String, logger: InternalLogger, timeout: TimeInterval, maxPresenceGetRetryDuration: TimeInterval = 30.0) {
internal init(featureChannel: FeatureChannel, roomID: String, clientID: String, logger: InternalLogger, timeout: TimeInterval) {
self.roomID = roomID
self.featureChannel = featureChannel
self.clientID = clientID
self.logger = logger
self.timeout = timeout
self.maxPresenceGetRetryDuration = maxPresenceGetRetryDuration
}

internal nonisolated var channel: any RealtimeChannelProtocol {
Expand All @@ -34,21 +32,18 @@ internal final class DefaultTyping: Typing {
logger.log(message: "Received presence message: \(message)", level: .debug)
Task {
let currentEventID = await eventTracker.updateEventID()
let maxRetryDuration: TimeInterval = 30.0 // Max duration as specified in CHA-T6c1
let baseDelay: TimeInterval = 1.0 // Initial retry delay
let maxDelay: TimeInterval = 5.0 // Maximum delay between retries

var totalElapsedTime: TimeInterval = 0
var delay: TimeInterval = baseDelay

while totalElapsedTime < maxPresenceGetRetryDuration {
while totalElapsedTime < maxRetryDuration {
do {
// (CHA-T6c) When a presence event is received from the realtime client, the Chat client will perform a presence.get() operation to get the current presence set. This guarantees that we get a fully synced presence set. This is then used to emit the typing clients to the subscriber.
let latestTypingMembers = try await get()
#if DEBUG
for subscription in testPresenceGetTypingEventSubscriptions {
subscription.emit(.init())
}
#endif

// (CHA-T6c2) If multiple presence events are received resulting in concurrent presence.get() calls, then we guarantee that only the “latest” event is emitted. That is to say, if presence event A and B occur in that order, then only the typing event generated by B’s call to presence.get() will be emitted to typing subscribers.
let isLatestEvent = await eventTracker.isLatestEvent(currentEventID)
guard isLatestEvent else {
Expand All @@ -72,14 +67,9 @@ internal final class DefaultTyping: Typing {

// Exponential backoff (double the delay)
delay = min(delay * 2, maxDelay)
#if DEBUG
for subscription in testPresenceGetRetryTypingEventSubscriptions {
subscription.emit(.init())
}
#endif
}
}
logger.log(message: "Failed to fetch presence set after \(maxPresenceGetRetryDuration) seconds. Giving up.", level: .error)
logger.log(message: "Failed to fetch presence set after \(maxRetryDuration) seconds. Giving up.", level: .error)
}
}

Expand Down Expand Up @@ -170,11 +160,6 @@ internal final class DefaultTyping: Typing {
// (CHA-T5b) If typing is in progress, he CHA-T3 timeout is cancelled. The client then leaves presence.
await timerManager.cancelTimer()
channel.presence.leaveClient(clientID, data: nil)
#if DEBUG
for subscription in testStopTypingEventSubscriptions {
subscription.emit(.init())
}
#endif
} else {
// (CHA-T5a) If typing is not in progress, this operation is no-op.
logger.log(message: "User is not typing. No need to leave presence.", level: .debug)
Expand Down Expand Up @@ -224,68 +209,12 @@ internal final class DefaultTyping: Typing {
try await stop()
}
}
#if DEBUG
for subscription in testStartTypingEventSubscriptions {
subscription.emit(.init())
}
#endif
}
}
}
}

#if DEBUG
/// The `DefaultTyping` emits a `TestTypingEvent` each time ``start`` or ``stop`` is called.
internal struct TestTypingEvent: Equatable {
internal let timestamp = Date()
}

/// Subscription of typing start events for testing purposes.
private var testStartTypingEventSubscriptions: [Subscription<TestTypingEvent>] = []

/// Subscription of typing stop events for testing purposes.
private var testStopTypingEventSubscriptions: [Subscription<TestTypingEvent>] = []

/// Subscription of presence get events for testing purposes.
private var testPresenceGetTypingEventSubscriptions: [Subscription<TestTypingEvent>] = []

/// Subscription of retry presence get events for testing purposes.
private var testPresenceGetRetryTypingEventSubscriptions: [Subscription<TestTypingEvent>] = []

/// Returns a subscription which emits typing start events for testing purposes.
internal func testsOnly_subscribeToStartTestTypingEvents() -> Subscription<TestTypingEvent> {
let subscription = Subscription<TestTypingEvent>(bufferingPolicy: .unbounded)
testStartTypingEventSubscriptions.append(subscription)
return subscription
}

/// Returns a subscription which emits typing stop events for testing purposes.
internal func testsOnly_subscribeToStopTestTypingEvents() -> Subscription<TestTypingEvent> {
let subscription = Subscription<TestTypingEvent>(bufferingPolicy: .unbounded)
testStopTypingEventSubscriptions.append(subscription)
return subscription
}

/// Returns a subscription which emits presence get events for testing purposes.
internal func testsOnly_subscribeToPresenceGetTypingEvents() -> Subscription<TestTypingEvent> {
let subscription = Subscription<TestTypingEvent>(bufferingPolicy: .unbounded)
testPresenceGetTypingEventSubscriptions.append(subscription)
return subscription
}

/// Returns a subscription which emits retry presence get events for testing purposes.
internal func testsOnly_subscribeToPresenceGetRetryTypingEvents() -> Subscription<TestTypingEvent> {
let subscription = Subscription<TestTypingEvent>(bufferingPolicy: .unbounded)
testPresenceGetRetryTypingEventSubscriptions.append(subscription)
return subscription
}
#endif
}

#if DEBUG
extension DefaultTyping: @unchecked Sendable {}
#endif

private final actor EventTracker {
private var latestEventID: UUID = .init()

Expand Down
2 changes: 1 addition & 1 deletion Sources/AblyChat/RoomFeature.swift
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ internal protocol FeatureChannel: Sendable, EmitsDiscontinuities {

internal struct DefaultFeatureChannel: FeatureChannel {
internal var channel: any RealtimeChannelProtocol
internal var contributor: any RoomLifecycleContributor & EmitsDiscontinuities
internal var contributor: DefaultRoomLifecycleContributor
internal var roomLifecycleManager: RoomLifecycleManager

internal func onDiscontinuity(bufferingPolicy: BufferingPolicy) async -> Subscription<DiscontinuityEvent> {
Expand Down
23 changes: 23 additions & 0 deletions Tests/AblyChatTests/ChatAPITests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,27 @@ struct ChatAPITests {
// Then
#expect(getMessagesResult == expectedPaginatedResult)
}

// @spec CHA-M5i
@Test
func getMessages_whenGetMessagesReturnsServerError_throwsARTError() async {
// Given
let paginatedResponse = MockHTTPPaginatedResponse.successGetMessagesWithNoItems
let artError = ARTErrorInfo.create(withCode: 50000, message: "Internal server error")
let realtime = MockRealtime {
(paginatedResponse, artError)
}
let chatAPI = ChatAPI(realtime: realtime)
let roomId = "basketball::$chat::$chatMessages"

await #expect(
performing: {
// When
try await chatAPI.getMessages(roomId: roomId, params: .init()) as? PaginatedResultWrapper<Message>
}, throws: { error in
// Then
error as? ARTErrorInfo == artError
}
)
}
}
Loading