Skip to content

Commit b8a0348

Browse files
committed
refactor: replace persistence flow cache with snapshot reads
1 parent 16da542 commit b8a0348

13 files changed

Lines changed: 163 additions & 295 deletions

File tree

data/persistence/src/commonMain/db_user/com/wire/kalium/persistence/Members.sq

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,24 @@ SELECT * FROM Member;
4646
selectAllMembersByConversation:
4747
SELECT * FROM Member WHERE conversation = :conversation;
4848

49+
isMemberAdmin:
50+
SELECT EXISTS(
51+
SELECT 1 FROM Member
52+
WHERE conversation = :conversationId
53+
AND user = :userId
54+
AND role = 'wire_admin'
55+
);
56+
57+
selectConversationMemberCounts:
58+
SELECT
59+
COUNT(*) AS conversationSize,
60+
COALESCE(SUM(CASE WHEN usr.user_type IN ('APP', 'SERVICE') THEN 1 ELSE 0 END), 0) AS servicesCount,
61+
COALESCE(SUM(CASE WHEN usr.user_type = 'GUEST' THEN 1 ELSE 0 END), 0) AS guestsCount,
62+
COALESCE(SUM(CASE WHEN usr.user_type = 'GUEST' AND usr.team IS NOT NULL THEN 1 ELSE 0 END), 0) AS guestsProCount
63+
FROM Member AS mem
64+
LEFT JOIN User AS usr ON usr.qualified_id = mem.user
65+
WHERE mem.conversation = :conversationId;
66+
4967
selectConversationsByMember:
5068
SELECT
5169
Conversation.*

data/persistence/src/commonMain/db_user/com/wire/kalium/persistence/Users.sq

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,9 @@ SELECT active_one_on_one_conversation_id FROM User WHERE qualified_id = :userId;
297297
selectNamesAndHandle:
298298
SELECT name, handle FROM User WHERE qualified_id = :userId;
299299

300+
selectTeamByQualifiedId:
301+
SELECT team FROM User WHERE qualified_id = :userId;
302+
300303
updateTeamId:
301304
UPDATE User SET team = ? WHERE qualified_id = ?;
302305

data/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/cache/FlowCache.kt

Lines changed: 0 additions & 100 deletions
This file was deleted.

data/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/UserDAO.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ interface UserDAO {
341341
suspend fun getOneOnOnConversationId(userId: UserIDEntity): QualifiedIDEntity?
342342
suspend fun getUsersMinimizedByQualifiedIDs(qualifiedIDs: List<QualifiedIDEntity>): List<UserEntityMinimized>
343343
suspend fun getNameAndHandle(userId: UserIDEntity): NameAndHandleEntity?
344+
suspend fun getTeamIdByQualifiedID(qualifiedID: QualifiedIDEntity): String?
344345
suspend fun updateTeamId(userId: UserIDEntity, teamId: String)
345346
suspend fun countContactsAmount(selfUserId: QualifiedIDEntity): Int
346347
suspend fun countTeamMembersAmount(teamId: String): Int

data/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/UserDAOImpl.kt

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import app.cash.sqldelight.async.coroutines.awaitAsOneOrNull
2424

2525
import app.cash.sqldelight.coroutines.asFlow
2626
import com.wire.kalium.persistence.UsersQueries
27-
import com.wire.kalium.persistence.cache.FlowCache
2827
import com.wire.kalium.persistence.dao.conversation.NameAndHandleEntity
2928
import com.wire.kalium.persistence.db.ReadDispatcher
3029
import com.wire.kalium.persistence.db.WriteDispatcher
@@ -165,7 +164,6 @@ class UserMapper {
165164
@Suppress("TooManyFunctions")
166165
class UserDAOImpl internal constructor(
167166
private val userQueries: UsersQueries,
168-
private val userCache: FlowCache<UserIDEntity, UserDetailsEntity?>,
169167
private val readDispatcher: ReadDispatcher,
170168
private val writeDispatcher: WriteDispatcher,
171169
) : UserDAO {
@@ -287,13 +285,11 @@ class UserDAOImpl internal constructor(
287285
.flowOn(readDispatcher.value)
288286

289287
override suspend fun observeUserDetailsByQualifiedID(qualifiedID: QualifiedIDEntity): Flow<UserDetailsEntity?> =
290-
userCache.get(qualifiedID) {
291-
userQueries.selectDetailsByQualifiedId(listOf(qualifiedID))
292-
.asFlow()
293-
.mapToOneOrNull()
294-
.map { it?.let { mapper.toDetailsModel(it) } }
295-
.flowOn(readDispatcher.value)
296-
}
288+
userQueries.selectDetailsByQualifiedId(listOf(qualifiedID))
289+
.asFlow()
290+
.mapToOneOrNull()
291+
.map { it?.let { mapper.toDetailsModel(it) } }
292+
.flowOn(readDispatcher.value)
297293

298294
override fun getUserDetailsWithTeamByQualifiedID(qualifiedID: QualifiedIDEntity): Flow<Pair<UserDetailsEntity, TeamEntity?>?> =
299295
userQueries.selectWithTeamByQualifiedId(listOf(qualifiedID), mapper::toUserAndTeamPairModel)
@@ -532,6 +528,10 @@ class UserDAOImpl internal constructor(
532528
userQueries.selectNamesAndHandle(userId, ::NameAndHandleEntity).awaitAsOneOrNull()
533529
}
534530

531+
override suspend fun getTeamIdByQualifiedID(qualifiedID: QualifiedIDEntity): String? = withContext(readDispatcher.value) {
532+
userQueries.selectTeamByQualifiedId(qualifiedID).executeAsOneOrNull()?.team
533+
}
534+
535535
override suspend fun updateTeamId(userId: UserIDEntity, teamId: String) {
536536
userQueries.updateTeamId(teamId, userId)
537537
}

data/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/conversation/ConversationDAO.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ interface ConversationDAO {
3636

3737
suspend fun observeConversationById(qualifiedID: QualifiedIDEntity): Flow<ConversationEntity?>
3838
suspend fun getConversationById(qualifiedID: QualifiedIDEntity): ConversationEntity?
39+
suspend fun getConversationLastReadDate(qualifiedID: QualifiedIDEntity): Instant?
3940
suspend fun getNonDeletedConversationById(qualifiedID: QualifiedIDEntity): ConversationEntity?
4041
suspend fun getConversationDetailsById(qualifiedID: QualifiedIDEntity): ConversationViewEntity?
4142
suspend fun observeConversationDetailsById(conversationId: QualifiedIDEntity): Flow<ConversationViewEntity?>

data/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/conversation/ConversationDAOImpl.kt

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ import com.wire.kalium.persistence.ConversationDetailsWithEventsQueries
2929
import com.wire.kalium.persistence.ConversationsQueries
3030
import com.wire.kalium.persistence.MembersQueries
3131
import com.wire.kalium.persistence.UnreadEventsQueries
32-
import com.wire.kalium.persistence.cache.FlowCache
33-
import com.wire.kalium.persistence.dao.ConversationIDEntity
3432
import com.wire.kalium.persistence.dao.QualifiedIDEntity
3533
import com.wire.kalium.persistence.dao.UserIDEntity
3634
import com.wire.kalium.persistence.db.ReadDispatcher
@@ -42,7 +40,6 @@ import com.wire.kalium.persistence.util.mapToOneOrNull
4240
import com.wire.kalium.util.DateTimeUtil
4341
import com.wire.kalium.util.DateTimeUtil.toIsoDateTimeString
4442
import kotlinx.coroutines.flow.Flow
45-
import kotlinx.coroutines.flow.first
4643
import kotlinx.coroutines.flow.flowOn
4744
import kotlinx.coroutines.flow.map
4845
import kotlinx.coroutines.withContext
@@ -58,8 +55,6 @@ internal val MLS_DEFAULT_CIPHER_SUITE = ConversationEntity.CipherSuite.MLS_128_D
5855
// Even if they operate on the same table underneath, these DAOs can represent/do different things.
5956
@Suppress("TooManyFunctions", "LongParameterList", "LargeClass")
6057
internal class ConversationDAOImpl internal constructor(
61-
private val conversationDetailsCache: FlowCache<ConversationIDEntity, ConversationViewEntity?>,
62-
private val conversationCache: FlowCache<ConversationIDEntity, ConversationEntity?>,
6358
private val conversationQueries: ConversationsQueries,
6459
private val conversationDetailsQueries: ConversationDetailsQueries,
6560
private val conversationDetailsWithEventsQueries: ConversationDetailsWithEventsQueries,
@@ -78,16 +73,23 @@ internal class ConversationDAOImpl internal constructor(
7873
override suspend fun observeConversationById(
7974
qualifiedID: QualifiedIDEntity
8075
): Flow<ConversationEntity?> =
81-
conversationCache.get(qualifiedID) {
82-
conversationQueries.selectConversationByQualifiedId(qualifiedID, conversationMapper::fromViewToModel)
83-
.asFlow()
84-
.mapToOneOrNull()
85-
.flowOn(readDispatcher.value)
86-
}
76+
conversationQueries.selectConversationByQualifiedId(qualifiedID, conversationMapper::fromViewToModel)
77+
.asFlow()
78+
.mapToOneOrNull()
79+
.flowOn(readDispatcher.value)
8780

8881
override suspend fun getConversationById(
8982
qualifiedID: QualifiedIDEntity
90-
): ConversationEntity? = observeConversationById(qualifiedID).first()
83+
): ConversationEntity? = withContext(readDispatcher.value) {
84+
conversationQueries.selectConversationByQualifiedId(qualifiedID, conversationMapper::fromViewToModel)
85+
.executeAsOneOrNull()
86+
}
87+
88+
override suspend fun getConversationLastReadDate(
89+
qualifiedID: QualifiedIDEntity
90+
): Instant? = withContext(readDispatcher.value) {
91+
conversationQueries.getConversationLastReadDate(qualifiedID).executeAsOneOrNull()
92+
}
9193

9294
override suspend fun getNonDeletedConversationById(
9395
qualifiedID: QualifiedIDEntity
@@ -97,17 +99,18 @@ internal class ConversationDAOImpl internal constructor(
9799

98100
override suspend fun observeConversationDetailsById(
99101
conversationId: QualifiedIDEntity
100-
): Flow<ConversationViewEntity?> = conversationDetailsCache.get(conversationId) {
102+
): Flow<ConversationViewEntity?> =
101103
conversationDetailsQueries.selectConversationDetailsByQualifiedId(conversationId, conversationMapper::fromViewToModel)
102104
.asFlow()
103105
.mapToOneOrNull()
104106
.flowOn(readDispatcher.value)
105-
}
106107

107108
override suspend fun getConversationDetailsById(
108109
qualifiedID: QualifiedIDEntity
109-
): ConversationViewEntity? =
110-
observeConversationDetailsById(qualifiedID).first()
110+
): ConversationViewEntity? = withContext(readDispatcher.value) {
111+
conversationDetailsQueries.selectConversationDetailsByQualifiedId(qualifiedID, conversationMapper::fromViewToModel)
112+
.executeAsOneOrNull()
113+
}
111114

112115
// endregion
113116

data/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/member/MemberDAO.kt

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import app.cash.sqldelight.coroutines.asFlow
2525
import com.wire.kalium.persistence.ConversationsQueries
2626
import com.wire.kalium.persistence.MembersQueries
2727
import com.wire.kalium.persistence.UsersQueries
28-
import com.wire.kalium.persistence.cache.FlowCache
2928
import com.wire.kalium.persistence.dao.ConversationIDEntity
3029
import com.wire.kalium.persistence.dao.QualifiedIDEntity
3130
import com.wire.kalium.persistence.dao.UserIDEntity
@@ -41,6 +40,13 @@ import kotlinx.coroutines.flow.flowOn
4140
import kotlinx.coroutines.flow.map
4241
import kotlinx.coroutines.withContext
4342

43+
data class ConversationMemberCountsEntity(
44+
val conversationSize: Int,
45+
val servicesCount: Int,
46+
val guestsCount: Int,
47+
val guestsProCount: Int
48+
)
49+
4450
@Suppress("TooManyFunctions")
4551
interface MemberDAO {
4652
suspend fun insertMember(member: MemberEntity, conversationID: QualifiedIDEntity)
@@ -56,6 +62,8 @@ interface MemberDAO {
5662
*/
5763
suspend fun deleteMembersByQualifiedID(userIDList: List<QualifiedIDEntity>, conversationID: QualifiedIDEntity): Long
5864
suspend fun observeConversationMembers(qualifiedID: QualifiedIDEntity): Flow<List<MemberEntity>>
65+
suspend fun isMemberAdmin(conversationId: QualifiedIDEntity, userId: QualifiedIDEntity): Boolean
66+
suspend fun getConversationMemberCounts(conversationId: QualifiedIDEntity): ConversationMemberCountsEntity
5967
suspend fun updateConversationMemberRole(conversationId: QualifiedIDEntity, userId: UserIDEntity, role: MemberEntity.Role)
6068
suspend fun updateOrInsertOneOnOneMember(
6169
member: MemberEntity,
@@ -79,7 +87,6 @@ interface MemberDAO {
7987

8088
@Suppress("TooManyFunctions", "LongParameterList")
8189
internal class MemberDAOImpl internal constructor(
82-
private val membersCache: FlowCache<ConversationIDEntity, List<MemberEntity>>,
8390
private val memberQueries: MembersQueries,
8491
private val userQueries: UsersQueries,
8592
private val conversationsQueries: ConversationsQueries,
@@ -157,13 +164,29 @@ internal class MemberDAOImpl internal constructor(
157164

158165
override suspend fun observeConversationMembers(
159166
qualifiedID: QualifiedIDEntity
160-
): Flow<List<MemberEntity>> = membersCache.get(qualifiedID) {
167+
): Flow<List<MemberEntity>> =
161168
memberQueries.selectAllMembersByConversation(qualifiedID)
162169
.asFlow()
163170
.mapToList()
164171
.map { it.map(memberMapper::toModel) }
165172
.flowOn(readDispatcher.value)
166-
}
173+
174+
override suspend fun isMemberAdmin(conversationId: QualifiedIDEntity, userId: QualifiedIDEntity): Boolean =
175+
withContext(readDispatcher.value) {
176+
memberQueries.isMemberAdmin(conversationId, userId).executeAsOne()
177+
}
178+
179+
override suspend fun getConversationMemberCounts(conversationId: QualifiedIDEntity): ConversationMemberCountsEntity =
180+
withContext(readDispatcher.value) {
181+
memberQueries.selectConversationMemberCounts(conversationId) { conversationSize, servicesCount, guestsCount, guestsProCount ->
182+
ConversationMemberCountsEntity(
183+
conversationSize = conversationSize.toInt(),
184+
servicesCount = servicesCount.toInt(),
185+
guestsCount = guestsCount.toInt(),
186+
guestsProCount = guestsProCount.toInt()
187+
)
188+
}.executeAsOne()
189+
}
167190

168191
override suspend fun updateConversationMemberRole(conversationId: QualifiedIDEntity, userId: UserIDEntity, role: MemberEntity.Role) {
169192
withContext(writeDispatcher.value) {

0 commit comments

Comments
 (0)