Skip to content

Commit 1b613b6

Browse files
committed
refactor: use snapshot reads for correctness paths (#4178)
1 parent b8a0348 commit 1b613b6

15 files changed

Lines changed: 142 additions & 178 deletions

File tree

logic/src/commonMain/kotlin/com/wire/kalium/logic/data/call/CallRepository.kt

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import com.wire.kalium.common.functional.getOrNull
3030
import com.wire.kalium.common.functional.map
3131
import com.wire.kalium.common.functional.onFailure
3232
import com.wire.kalium.common.functional.onSuccess
33-
import com.wire.kalium.common.functional.onlyRight
3433
import com.wire.kalium.common.functional.right
3534
import com.wire.kalium.common.logger.callingLogger
3635
import com.wire.kalium.common.logger.logStructuredJson
@@ -44,7 +43,6 @@ import com.wire.kalium.logic.data.client.CryptoTransactionProvider
4443
import com.wire.kalium.logic.data.client.wrapInMLSContext
4544
import com.wire.kalium.logic.data.conversation.ClientId
4645
import com.wire.kalium.logic.data.conversation.Conversation
47-
import com.wire.kalium.logic.data.conversation.ConversationDetails
4846
import com.wire.kalium.logic.data.conversation.ConversationRepository
4947
import com.wire.kalium.logic.data.conversation.EpochChangesObserver
5048
import com.wire.kalium.logic.data.conversation.JoinSubconversationUseCase
@@ -238,8 +236,10 @@ internal class CallDataSource(
238236
)
239237
}
240238
) {
241-
val conversation: ConversationDetails =
242-
conversationRepository.observeConversationDetailsById(conversationId).onlyRight().first()
239+
val conversation = conversationRepository.getConversationById(conversationId).getOrNull() ?: run {
240+
callingLogger.w("[CallRepository][createCall] -> Conversation not found: ${conversationId.toLogString()}")
241+
return@withLock
242+
}
243243

244244
val caller = userRepository.getKnownUser(callerId).first()
245245
val team = caller?.teamId?.let { teamId -> teamRepository.getTeam(teamId).first() }
@@ -249,22 +249,22 @@ internal class CallDataSource(
249249
id = Uuid.random().toString(),
250250
type = type,
251251
status = status,
252-
conversationType = conversation.conversation.type,
252+
conversationType = conversation.type,
253253
callerId = callerId
254254
)
255255

256256
val metadata = CallMetadata(
257257
callerId = callerId,
258-
conversationName = conversation.conversation.name,
259-
conversationType = conversation.conversation.type,
258+
conversationName = conversation.name,
259+
conversationType = conversation.type,
260260
callerName = caller?.name,
261261
callerTeamName = team?.name,
262262
isMuted = isMuted,
263263
isCameraOn = isCameraOn,
264264
isCbrEnabled = isCbrEnabled,
265265
establishedTime = null,
266266
callStatus = status,
267-
protocol = conversation.conversation.protocol,
267+
protocol = conversation.protocol,
268268
activeSpeakers = mapOf()
269269
)
270270

logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/ConversationRepository.kt

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,17 @@ import kotlinx.coroutines.flow.Flow
7474
import kotlinx.coroutines.flow.combine
7575
import kotlinx.coroutines.flow.distinctUntilChanged
7676
import kotlinx.coroutines.flow.filterNotNull
77-
import kotlinx.coroutines.flow.first
7877
import kotlinx.coroutines.flow.flowOf
7978
import kotlinx.coroutines.flow.map
8079
import kotlinx.datetime.Instant
8180

81+
internal data class ConversationMemberCounts(
82+
val conversationSize: Int,
83+
val servicesCount: Int,
84+
val guestsCount: Int,
85+
val guestsProCount: Int
86+
)
87+
8288
@Suppress("TooManyFunctions")
8389
internal interface ConversationRepository {
8490
val extensions: ConversationRepositoryExtensions
@@ -88,6 +94,7 @@ internal interface ConversationRepository {
8894
suspend fun observeConversationById(conversationId: ConversationId): Flow<Either<StorageFailure, Conversation>>
8995
suspend fun observeConversationDetailsById(conversationID: ConversationId): Flow<Either<StorageFailure, ConversationDetails>>
9096
suspend fun getConversationById(conversationId: ConversationId): Either<StorageFailure, Conversation>
97+
suspend fun getConversationLastReadDate(conversationId: ConversationId): Either<StorageFailure, Instant>
9198
suspend fun getNonDeletedConversationById(conversationId: ConversationId): Either<StorageFailure, Conversation>
9299

93100
// endregion
@@ -145,6 +152,8 @@ internal interface ConversationRepository {
145152
suspend fun getConversationRecipientsForCalling(conversationId: ConversationId): Either<CoreFailure, List<Recipient>>
146153
suspend fun getConversationProtocolInfo(conversationId: ConversationId): Either<StorageFailure, Conversation.ProtocolInfo>
147154
suspend fun observeConversationMembers(conversationID: ConversationId): Flow<List<Conversation.Member>>
155+
suspend fun isConversationMemberAdmin(conversationId: ConversationId, userId: UserId): Either<StorageFailure, Boolean>
156+
suspend fun getConversationMemberCounts(conversationId: ConversationId): Either<StorageFailure, ConversationMemberCounts>
148157

149158
/**
150159
* Fetches a list of all members' IDs or a given conversation including self user
@@ -407,6 +416,10 @@ internal class ConversationDataSource internal constructor(
407416
}
408417
}
409418

419+
override suspend fun getConversationLastReadDate(conversationId: ConversationId): Either<StorageFailure, Instant> = wrapStorageRequest {
420+
conversationDAO.getConversationLastReadDate(conversationId.toDao())
421+
}
422+
410423
override suspend fun getNonDeletedConversationById(
411424
conversationId: ConversationId
412425
): Either<StorageFailure, Conversation> = wrapStorageRequest {
@@ -594,8 +607,27 @@ internal class ConversationDataSource internal constructor(
594607
members.map(memberMapper::fromDaoModel)
595608
}
596609

610+
override suspend fun isConversationMemberAdmin(
611+
conversationId: ConversationId,
612+
userId: UserId
613+
): Either<StorageFailure, Boolean> = wrapStorageRequest {
614+
memberDAO.isMemberAdmin(conversationId.toDao(), userId.toDao())
615+
}
616+
617+
override suspend fun getConversationMemberCounts(conversationId: ConversationId): Either<StorageFailure, ConversationMemberCounts> =
618+
wrapStorageRequest {
619+
memberDAO.getConversationMemberCounts(conversationId.toDao()).let {
620+
ConversationMemberCounts(
621+
conversationSize = it.conversationSize,
622+
servicesCount = it.servicesCount,
623+
guestsCount = it.guestsCount,
624+
guestsProCount = it.guestsProCount
625+
)
626+
}
627+
}
628+
597629
override suspend fun getConversationMembers(conversationId: ConversationId): Either<StorageFailure, List<UserId>> = wrapStorageRequest {
598-
memberDAO.observeConversationMembers(conversationId.toDao()).first().map { it.user.toModel() }
630+
memberDAO.getConversationMembers(conversationId.toDao()).map { it.toModel() }
599631
}
600632

601633
override suspend fun persistMembers(

logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,6 @@ import kotlinx.coroutines.flow.Flow
584584
import kotlinx.coroutines.flow.MutableSharedFlow
585585
import kotlinx.coroutines.flow.filterNotNull
586586
import kotlinx.coroutines.flow.first
587-
import kotlinx.coroutines.flow.firstOrNull
588587
import kotlinx.coroutines.flow.map
589588
import kotlinx.coroutines.launch
590589
import kotlinx.coroutines.sync.Mutex
@@ -732,10 +731,10 @@ public class UserSessionScope internal constructor(
732731
// this can depend directly on DAO it will make it easier to user
733732
// and remove any circular dependency when using this inside user repository
734733
wrapStorageNullableRequest {
735-
userStorage.database.userDAO.observeUserDetailsByQualifiedID(userId.toDao()).firstOrNull()
736-
}.map { userDetailsEntity ->
737-
_teamId = Either.Right(userDetailsEntity?.team?.let { TeamId(it) })
738-
userDetailsEntity?.team?.let { TeamId(it) }
734+
userStorage.database.userDAO.getTeamIdByQualifiedID(userId.toDao())
735+
}.map { teamId ->
736+
_teamId = Either.Right(teamId?.let { TeamId(it) })
737+
teamId?.let { TeamId(it) }
739738
}
740739
}
741740

@@ -2802,7 +2801,7 @@ public class UserSessionScope internal constructor(
28022801
private val createAndPersistRecentlyEndedCallMetadata: CreateAndPersistRecentlyEndedCallMetadataUseCase
28032802
get() = CreateAndPersistRecentlyEndedCallMetadataUseCaseImpl(
28042803
callRepository = callRepository,
2805-
observeConversationMembers = conversations.observeConversationMembers,
2804+
conversationRepository = conversationRepository,
28062805
selfTeamIdProvider = selfTeamId
28072806
)
28082807

logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/call/ShouldRemoteMuteChecker.kt

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
*/
1818
package com.wire.kalium.logic.feature.call
1919

20-
import com.wire.kalium.logic.data.conversation.Conversation
2120
import com.wire.kalium.logic.data.message.MessageContent
2221
import com.wire.kalium.logic.data.user.UserId
2322

@@ -28,28 +27,20 @@ import com.wire.kalium.logic.data.user.UserId
2827
*/
2928
internal interface ShouldRemoteMuteChecker {
3029
fun check(
31-
senderUserId: UserId,
30+
isSenderAdmin: Boolean,
3231
selfUserId: UserId,
3332
selfClientId: String,
34-
targets: MessageContent.Calling.Targets?,
35-
conversationMembers: List<Conversation.Member>
33+
targets: MessageContent.Calling.Targets?
3634
): Boolean
3735
}
3836

3937
internal class ShouldRemoteMuteCheckerImpl : ShouldRemoteMuteChecker {
4038
override fun check(
41-
senderUserId: UserId,
39+
isSenderAdmin: Boolean,
4240
selfUserId: UserId,
4341
selfClientId: String,
44-
targets: MessageContent.Calling.Targets?,
45-
conversationMembers: List<Conversation.Member>
46-
): Boolean = isSenderAnAdmin(senderUserId, conversationMembers) && isCurrentClientTargeted(selfUserId, selfClientId, targets)
47-
48-
// Only admins can remotely mute other participants, so we need to check if the sender is an admin or not.
49-
private fun isSenderAnAdmin(senderUserId: UserId, conversationMembers: List<Conversation.Member>): Boolean =
50-
conversationMembers.any { member ->
51-
member.id == senderUserId && member.role == Conversation.Member.Role.Admin
52-
}
42+
targets: MessageContent.Calling.Targets?
43+
): Boolean = isSenderAdmin && isCurrentClientTargeted(selfUserId, selfClientId, targets)
5344

5445
// Because of the nature of MLS (where all the MLS group members always receive a message),
5546
// we need to check if the current client is a target of the remote mute or not.

logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/call/usecase/CreateAndPersistRecentlyEndedCallMetadataUseCase.kt

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,11 @@ import com.wire.kalium.logic.data.call.CallRepository
2323
import com.wire.kalium.logic.data.call.CallScreenSharingMetadata
2424
import com.wire.kalium.logic.data.call.CallStatus
2525
import com.wire.kalium.logic.data.call.RecentlyEndedCallMetadata
26+
import com.wire.kalium.logic.data.conversation.ConversationRepository
2627
import com.wire.kalium.logic.data.id.ConversationId
2728
import com.wire.kalium.logic.data.id.SelfTeamIdProvider
28-
import com.wire.kalium.logic.data.user.type.isAppOrBot
29-
import com.wire.kalium.logic.data.user.type.isGuest
3029
import com.wire.kalium.logic.data.user.type.isOwner
31-
import com.wire.kalium.logic.feature.conversation.ObserveConversationMembersUseCase
3230
import com.wire.kalium.util.DateTimeUtil
33-
import kotlinx.coroutines.flow.first
3431

3532
/**
3633
* Given a call and raw call end reason create metadata containing all information regarding
@@ -42,7 +39,7 @@ internal interface CreateAndPersistRecentlyEndedCallMetadataUseCase {
4239

4340
internal class CreateAndPersistRecentlyEndedCallMetadataUseCaseImpl internal constructor(
4441
private val callRepository: CallRepository,
45-
private val observeConversationMembers: ObserveConversationMembersUseCase,
42+
private val conversationRepository: ConversationRepository,
4643
private val selfTeamIdProvider: SelfTeamIdProvider,
4744
) : CreateAndPersistRecentlyEndedCallMetadataUseCase {
4845
override suspend fun invoke(conversationId: ConversationId, callEndedReason: Int) {
@@ -54,12 +51,9 @@ internal class CreateAndPersistRecentlyEndedCallMetadataUseCaseImpl internal con
5451
}
5552
}
5653

57-
private suspend fun CallMetadata.createMetadata(conversationId: ConversationId, callEndedReason: Int): RecentlyEndedCallMetadata {
54+
private suspend fun CallMetadata.createMetadata(conversationId: ConversationId, callEndedReason: Int): RecentlyEndedCallMetadata? {
5855
val selfCallUser = getFullParticipants().firstOrNull { participant -> participant.userType.isOwner() }
59-
val conversationMembers = observeConversationMembers(conversationId).first()
60-
val conversationServicesCount = conversationMembers.count { member -> member.user.userType.isAppOrBot() }
61-
val guestsCount = conversationMembers.count { member -> member.user.userType.isGuest() }
62-
val guestsProCount = conversationMembers.count { member -> member.user.userType.isGuest() && member.user.teamId != null }
56+
val memberCounts = conversationRepository.getConversationMemberCounts(conversationId).getOrNull() ?: return null
6357
val isOutgoingCall = callStatus == CallStatus.STARTED
6458
val callDurationInSeconds = establishedTime?.let {
6559
DateTimeUtil.calculateMillisDifference(it, DateTimeUtil.currentIsoDateTimeString()) / MILLIS_IN_SECOND
@@ -74,15 +68,15 @@ internal class CreateAndPersistRecentlyEndedCallMetadataUseCaseImpl internal con
7468
isOutgoingCall = isOutgoingCall,
7569
callDurationInSeconds = callDurationInSeconds,
7670
callParticipantsCount = participants.size,
77-
conversationServices = conversationServicesCount,
71+
conversationServices = memberCounts.servicesCount,
7872
callAVSwitchToggle = selfCallUser?.isCameraOn ?: false,
7973
callVideoEnabled = isCameraOn
8074
),
8175
conversationDetails = RecentlyEndedCallMetadata.ConversationDetails(
8276
conversationType = conversationType,
83-
conversationSize = conversationMembers.size,
84-
conversationGuests = guestsCount,
85-
conversationGuestsPro = guestsProCount
77+
conversationSize = memberCounts.conversationSize,
78+
conversationGuests = memberCounts.guestsCount,
79+
conversationGuestsPro = memberCounts.guestsProCount
8680
),
8781
isTeamMember = selfTeamIdProvider().getOrNull() != null
8882
)

logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/conversation/UpdateConversationReadDateUseCase.kt

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ import com.wire.kalium.messaging.hooks.ConversationLastReadEventData
4343
import com.wire.kalium.messaging.hooks.PersistenceEventHookNotifier
4444
import kotlinx.coroutines.coroutineScope
4545
import kotlinx.coroutines.currentCoroutineContext
46-
import kotlinx.coroutines.flow.first
4746
import kotlinx.coroutines.launch
4847
import kotlinx.coroutines.NonCancellable
4948
import kotlinx.coroutines.withContext
@@ -95,13 +94,13 @@ public class UpdateConversationReadDateUseCase internal constructor(
9594

9695
private suspend fun doWork(conversationId: QualifiedID, time: Instant) {
9796
coroutineScope {
98-
conversationRepository.observeConversationById(conversationId).first().onFailure {
97+
conversationRepository.getConversationLastReadDate(conversationId).onFailure {
9998
logger.w("Failed to update conversation read date; StorageFailure $it")
100-
}.onSuccess { conversation ->
101-
if (conversation.lastReadDate >= time) {
99+
}.onSuccess { lastReadDate ->
100+
if (lastReadDate >= time) {
102101
logger.d(
103102
"Skipping last-read update for '${conversationId.toLogString()}': " +
104-
"stored=${conversation.lastReadDate} >= requested=$time"
103+
"stored=$lastReadDate >= requested=$time"
105104
)
106105
// Skipping, as current lastRead is already newer than the scheduled one
107106
return@onSuccess
@@ -111,7 +110,7 @@ public class UpdateConversationReadDateUseCase internal constructor(
111110

112111
if (shouldRunOptionalSideEffects) {
113112
launch {
114-
sendConfirmation(conversationId, conversation.lastReadDate, time)
113+
sendConfirmation(conversationId, lastReadDate, time)
115114
}
116115
}
117116

logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/message/confirmation/ConfirmationDeliveryHandler.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import kotlinx.coroutines.channels.BufferOverflow
3535
import kotlinx.coroutines.flow.MutableSharedFlow
3636
import kotlinx.coroutines.flow.collectLatest
3737
import kotlinx.coroutines.flow.debounce
38-
import kotlinx.coroutines.flow.first
3938

4039
/**
4140
* Internal: Handles the send of delivery confirmation of messages.
@@ -81,7 +80,7 @@ internal class ConfirmationDeliveryHandlerImpl(
8180
kaliumLogger.d("Started collecting pending messages for delivery confirmation")
8281
val messagesToSend = pendingConfirmationMessages.block { it.toMap() }
8382
messagesToSend.forEach { (conversationId, messages) ->
84-
conversationRepository.observeConversationById(conversationId).first().flatMap { conversation ->
83+
conversationRepository.getConversationById(conversationId).flatMap { conversation ->
8584
if (conversation.type == Conversation.Type.OneOnOne) {
8685
sendDeliverSignalUseCase(
8786
conversation = conversation,

logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/handler/CallingMessageHandler.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import com.wire.kalium.logic.feature.call.CallManager
3232
import com.wire.kalium.logic.feature.call.ShouldRemoteMuteChecker
3333
import com.wire.kalium.logic.feature.call.ShouldRemoteMuteCheckerImpl
3434
import com.wire.kalium.logic.feature.call.usecase.MuteCallUseCase
35-
import kotlinx.coroutines.flow.first
3635
import kotlinx.serialization.json.Json
3736

3837
internal interface CallingMessageHandler {
@@ -81,13 +80,14 @@ internal class CallingMessageHandlerImpl internal constructor(
8180
return
8281
}
8382

84-
val conversationMembers = conversationRepository.observeConversationMembers(targetConversationId).first()
83+
val isSenderAdmin = conversationRepository
84+
.isConversationMemberAdmin(targetConversationId, message.senderUserId)
85+
.getOrNull() ?: false
8586
val shouldRemoteMute = shouldRemoteMuteChecker.check(
86-
senderUserId = message.senderUserId,
87+
isSenderAdmin = isSenderAdmin,
8788
selfUserId = selfUserId,
8889
selfClientId = clientId.value,
89-
targets = callingValue.targets,
90-
conversationMembers = conversationMembers
90+
targets = callingValue.targets
9191
)
9292
callingLogger.i("$tagWithUserId: Calling $REMOTE_MUTE_TYPE message received for conversationId: $targetConversationId.")
9393
if (shouldRemoteMute) {

logic/src/commonTest/kotlin/com/wire/kalium/logic/data/call/CallRepositoryTest.kt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ import kotlinx.coroutines.flow.Flow
9090
import kotlinx.coroutines.flow.MutableSharedFlow
9191
import kotlinx.coroutines.flow.MutableStateFlow
9292
import kotlinx.coroutines.flow.emptyFlow
93+
import kotlinx.coroutines.flow.first
9394
import kotlinx.coroutines.flow.flowOf
9495
import kotlinx.coroutines.joinAll
9596
import kotlinx.coroutines.launch
@@ -2126,9 +2127,13 @@ class CallRepositoryTest {
21262127
}
21272128

21282129
suspend fun givenObserveConversationDetailsByIdReturns(flow: Flow<Either<StorageFailure, ConversationDetails>>) = apply {
2130+
val result = when (val details = flow.first()) {
2131+
is Either.Left -> Either.Left(details.value)
2132+
is Either.Right -> Either.Right(details.value.conversation)
2133+
}
21292134
everySuspend {
2130-
conversationRepository.observeConversationDetailsById(any())
2131-
} returns flow
2135+
conversationRepository.getConversationById(any())
2136+
} returns result
21322137
}
21332138

21342139
suspend fun givenGetKnownUserSucceeds() = apply {

0 commit comments

Comments
 (0)