Skip to content

Commit ed9452d

Browse files
committed
recent_senders: Handle moved messages
The tests do not have to use PerAccountStore, but this setup makes it a bit more integrated. This is similar to how we add support to handling moves for unreads (commit 34e6201), also with optimizations to avoid making unnecessary copies of message IDs when the entire conversation is moved (e.g. resolve/unresolve topic). An alternative approach to this that we didn't take is extracting helpers from handleMessages and handleDeleteMessageEvent and combining the two to handle moves, like web does (https://github.com/zulip/zulip/blob/bd04a30b/web/src/recent_senders.ts#L165-L190), but creating a dedicated helper makes it more sraightforward to optimize for our performance need. Fixes: zulip#901
1 parent f617c1a commit ed9452d

File tree

3 files changed

+246
-0
lines changed

3 files changed

+246
-0
lines changed

lib/model/recent_senders.dart

+79
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,52 @@ class RecentSenders {
6868
[senderId] ??= MessageIdTracker()).add(messageId);
6969
}
7070

71+
/// Handles channel/topic updates to the data resulted from a move.
72+
///
73+
/// This is a no-op if no message move happened.
74+
void handleUpdateMessageEvent(UpdateMessageEvent event, Map<int, Message> cachedMessages) {
75+
if (event.moveData == null) {
76+
return;
77+
}
78+
final UpdateMessageMoveData(
79+
:origStreamId, :newStreamId, :origTopic, :newTopic) = event.moveData!;
80+
81+
final messagesBySender = _groupStreamMessageIdsBySender(event.messageIds, cachedMessages);
82+
final sendersInChannel = streamSenders[origStreamId];
83+
final topicsInChannel = topicSenders[origStreamId];
84+
final sendersInTopic = topicsInChannel?[origTopic];
85+
for (final MapEntry(key: senderId, value: messages) in messagesBySender.entries) {
86+
// The later `popAll` calls require the message IDs to be sorted in
87+
// ascending order. Only sort as many as we need: the message IDs
88+
// with the same sender, instead of all of them in `event.messageIds`.
89+
messages.sort();
90+
91+
if (newStreamId != origStreamId) {
92+
final streamTracker = sendersInChannel?[senderId];
93+
final movedMessagesInStreamTracker = streamTracker?.popAll(messages);
94+
if (streamTracker?.maxId == null) sendersInChannel?.remove(senderId);
95+
if (movedMessagesInStreamTracker != null) {
96+
((streamSenders[newStreamId] ??= {})
97+
[senderId] ??= MessageIdTracker()).addAll(movedMessagesInStreamTracker);
98+
}
99+
}
100+
101+
// This does not need a check like the stream trackers one above,
102+
// because the conversation is guaranteed to have moved. This is an
103+
// invariant [UpdateMessageMoveData] offers.
104+
final topicTracker = sendersInTopic?[senderId];
105+
final movedMessagesInTopicTracker = topicTracker?.popAll(messages);
106+
if (topicTracker?.maxId == null) sendersInTopic?.remove(senderId);
107+
if (movedMessagesInTopicTracker != null) {
108+
(((topicSenders[newStreamId] ??= {})[newTopic] ??= {})
109+
[senderId] ??= MessageIdTracker()).addAll(movedMessagesInTopicTracker);
110+
}
111+
}
112+
if (sendersInChannel?.isEmpty ?? false) streamSenders.remove(origStreamId);
113+
if (sendersInTopic?.isEmpty ?? false) topicsInChannel?.remove(origTopic);
114+
if (topicsInChannel?.isEmpty ?? false) topicSenders.remove(origStreamId);
115+
}
116+
71117
void handleDeleteMessageEvent(DeleteMessageEvent event, Map<int, Message> cachedMessages) {
72118
if (event.messageType != MessageType.stream) return;
73119

@@ -153,6 +199,39 @@ class MessageIdTracker {
153199
ids.removeWhere((id) => binarySearch(idsToRemove, id) != -1);
154200
}
155201

202+
/// Remove message IDs found in [idsToRemove] from the tracker list.
203+
///
204+
/// Returns the removed message IDs sorted in ascending order, or `null` if
205+
/// nothing is removed.
206+
///
207+
/// [idsToRemove] should be sorted ascending.
208+
///
209+
/// Consider using [removeAll] if the returned message IDs are not needed.
210+
// Part of this is adapted from [ListBase.removeWhere].
211+
QueueList<int>? popAll(List<int> idsToRemove) {
212+
assert(isSortedWithoutDuplicates(idsToRemove));
213+
final retainedMessageIds =
214+
ids.where((id) => binarySearch(idsToRemove, id) == -1).toList();
215+
216+
if (retainedMessageIds.isEmpty) {
217+
// All message IDs in this tracker are removed; this is an optimization
218+
// to clear all ids and return the removed ones without making a new copy.
219+
final result = ids;
220+
ids = QueueList();
221+
return result;
222+
}
223+
224+
QueueList<int>? poppedMessageIds;
225+
if (retainedMessageIds.length != ids.length) {
226+
poppedMessageIds = QueueList.from(
227+
ids.where((id) => binarySearch(idsToRemove, id) != -1));
228+
ids.setRange(0, retainedMessageIds.length, retainedMessageIds);
229+
ids.length = retainedMessageIds.length;
230+
assert(isSortedWithoutDuplicates(poppedMessageIds));
231+
}
232+
return poppedMessageIds;
233+
}
234+
156235
@override
157236
String toString() => ids.toString();
158237
}

lib/model/store.dart

+1
Original file line numberDiff line numberDiff line change
@@ -754,6 +754,7 @@ class PerAccountStore extends ChangeNotifier with EmojiStore, UserStore, Channel
754754

755755
case UpdateMessageEvent():
756756
assert(debugLog("server event: update_message ${event.messageId}"));
757+
recentSenders.handleUpdateMessageEvent(event, messages);
757758
_messages.handleUpdateMessageEvent(event);
758759
unreads.handleUpdateMessageEvent(event);
759760

test/model/recent_senders_test.dart

+166
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ import 'package:checks/checks.dart';
22
import 'package:flutter_test/flutter_test.dart';
33
import 'package:zulip/api/model/model.dart';
44
import 'package:zulip/model/recent_senders.dart';
5+
import 'package:zulip/model/store.dart';
56
import '../example_data.dart' as eg;
7+
import 'test_store.dart';
68

79
/// [messages] should be sorted by [id] ascending.
810
void checkMatchesMessages(RecentSenders model, List<Message> messages) {
911
final Map<int, Map<int, Set<int>>> messagesByUserInStream = {};
1012
final Map<int, Map<TopicName, Map<int, Set<int>>>> messagesByUserInTopic = {};
13+
messages.sort((a, b) => a.id - b.id);
1114
for (final message in messages) {
1215
if (message is! StreamMessage) {
1316
throw UnsupportedError('Message of type ${message.runtimeType} is not expected.');
@@ -142,6 +145,169 @@ void main() {
142145
});
143146
});
144147

148+
group('RecentSenders.handleUpdateMessageUpdate', () {
149+
late PerAccountStore store;
150+
late RecentSenders model;
151+
152+
final origChannel = eg.stream(); final newChannel = eg.stream();
153+
final origTopic = 'origTopic'; final newTopic = 'newTopic';
154+
final userX = eg.user(); final userY = eg.user();
155+
156+
Future<void> prepare(List<Message> messages) async {
157+
store = eg.store();
158+
await store.addMessages(messages);
159+
await store.addStreams([origChannel, newChannel]);
160+
await store.addUsers([userX, userY]);
161+
model = store.recentSenders;
162+
}
163+
164+
List<StreamMessage> copyMessagesWith(Iterable<StreamMessage> messages, {
165+
ZulipStream? newChannel,
166+
String? newTopic,
167+
}) {
168+
assert(newChannel != null || newTopic != null);
169+
return messages.map((message) => StreamMessage.fromJson(
170+
message.toJson()
171+
..['stream_id'] = newChannel?.streamId ?? message.streamId
172+
// See [StreamMessage.displayRecipient] for why this is needed.
173+
..['display_recipient'] = newChannel?.name ?? message.displayRecipient!
174+
175+
..['subject'] = newTopic ?? message.topic
176+
)).toList();
177+
}
178+
179+
test('move a conversation entirely, with additional unknown messages', () async {
180+
final messages = List.generate(10, (i) => eg.streamMessage(
181+
stream: origChannel, topic: origTopic, sender: userX));
182+
await prepare(messages);
183+
final unknownMessages = List.generate(10, (i) => eg.streamMessage(
184+
stream: origChannel, topic: origTopic, sender: userX));
185+
checkMatchesMessages(model, messages);
186+
187+
final streamSenderIdsBefore = model.streamSenders
188+
[origChannel.streamId]![userX.userId]!.ids;
189+
final topicSenderIdsBefore = model.topicSenders
190+
[origChannel.streamId]![TopicName(origTopic)]![userX.userId]!.ids;
191+
192+
await store.handleEvent(eg.updateMessageEventMoveFrom(
193+
origMessages: messages + unknownMessages,
194+
newStreamId: newChannel.streamId));
195+
checkMatchesMessages(model, copyMessagesWith(
196+
messages, newChannel: newChannel));
197+
198+
// Check we avoided creating a new list for the moved message IDs.
199+
final streamSenderIdsAfter = model.streamSenders
200+
[newChannel.streamId]![userX.userId]!.ids;
201+
final topicSenderIdsAfter = model.topicSenders
202+
[newChannel.streamId]![TopicName(origTopic)]![userX.userId]!.ids;
203+
check(streamSenderIdsBefore).identicalTo(streamSenderIdsAfter);
204+
check(topicSenderIdsBefore).identicalTo(topicSenderIdsAfter);
205+
});
206+
207+
test('move a conversation exactly', () async {
208+
final messages = List.generate(10, (i) => eg.streamMessage(
209+
stream: origChannel, topic: origTopic, sender: userX));
210+
await prepare(messages);
211+
212+
final streamSenderIdsBefore = model.streamSenders
213+
[origChannel.streamId]![userX.userId]!.ids;
214+
final topicSenderIdsBefore = model.topicSenders
215+
[origChannel.streamId]![TopicName(origTopic)]![userX.userId]!.ids;
216+
217+
await store.handleEvent(eg.updateMessageEventMoveFrom(
218+
origMessages: messages,
219+
newStreamId: newChannel.streamId,
220+
newTopicStr: newTopic));
221+
checkMatchesMessages(model, copyMessagesWith(
222+
messages, newChannel: newChannel, newTopic: newTopic));
223+
224+
// Check we avoided creating a new list for the moved message IDs.
225+
final streamSenderIdsAfter = model.streamSenders
226+
[newChannel.streamId]![userX.userId]!.ids;
227+
final topicSenderIdsAfter = model.topicSenders
228+
[newChannel.streamId]![TopicName(newTopic)]![userX.userId]!.ids;
229+
check(streamSenderIdsBefore).identicalTo(streamSenderIdsAfter);
230+
check(topicSenderIdsBefore).identicalTo(topicSenderIdsAfter);
231+
});
232+
233+
test('move a conversation partially to a different channel', () async {
234+
final messages = List.generate(10, (i) => eg.streamMessage(
235+
stream: origChannel, topic: origTopic));
236+
final movedMessages = messages.take(5).toList();
237+
final otherMessages = messages.skip(5);
238+
await prepare(messages);
239+
240+
await store.handleEvent(eg.updateMessageEventMoveFrom(
241+
origMessages: movedMessages,
242+
newStreamId: newChannel.streamId));
243+
checkMatchesMessages(model, [
244+
...copyMessagesWith(movedMessages, newChannel: newChannel),
245+
...otherMessages,
246+
]);
247+
});
248+
249+
test('move a conversation partially to a different topic, within the same channel', () async {
250+
final messages = List.generate(10, (i) => eg.streamMessage(
251+
stream: origChannel, topic: origTopic, sender: userX));
252+
final movedMessages = messages.take(5).toList();
253+
final otherMessages = messages.skip(5);
254+
await prepare(messages);
255+
256+
final streamSenderIdsBefore = model.streamSenders
257+
[origChannel.streamId]![userX.userId]!.ids;
258+
259+
await store.handleEvent(eg.updateMessageEventMoveFrom(
260+
origMessages: movedMessages,
261+
newTopicStr: newTopic));
262+
checkMatchesMessages(model, [
263+
...copyMessagesWith(movedMessages, newTopic: newTopic),
264+
...otherMessages,
265+
]);
266+
267+
// Check that we did not touch stream message IDs tracker
268+
// when there wasn't a stream move.
269+
final streamSenderIdsAfter = model.streamSenders
270+
[origChannel.streamId]![userX.userId]!.ids;
271+
check(streamSenderIdsBefore).identicalTo(streamSenderIdsAfter);
272+
});
273+
274+
test('move a conversation with multiple senders', () async {
275+
final messages = [
276+
eg.streamMessage(stream: origChannel, topic: origTopic, sender: userX),
277+
eg.streamMessage(stream: origChannel, topic: origTopic, sender: userX),
278+
eg.streamMessage(stream: origChannel, topic: origTopic, sender: userY),
279+
];
280+
await prepare(messages);
281+
282+
await store.handleEvent(eg.updateMessageEventMoveFrom(
283+
origMessages: messages,
284+
newStreamId: newChannel.streamId));
285+
checkMatchesMessages(model, copyMessagesWith(
286+
messages, newChannel: newChannel));
287+
});
288+
289+
test('move a converstion, but message IDs from the event are not sorted in ascending order', () async {
290+
final messages = List.generate(10, (i) => eg.streamMessage(
291+
id: 100-i, stream: origChannel, topic: origTopic));
292+
await prepare(messages);
293+
294+
await store.handleEvent(eg.updateMessageEventMoveFrom(
295+
origMessages: messages,
296+
newStreamId: newChannel.streamId));
297+
checkMatchesMessages(model,
298+
copyMessagesWith(messages, newChannel: newChannel));
299+
});
300+
301+
test('message edit update without move', () async {
302+
final messages = List.generate(10, (i) => eg.streamMessage(
303+
stream: origChannel, topic: origTopic));
304+
await prepare(messages);
305+
306+
await store.handleEvent(eg.updateMessageEditEvent(messages[0]));
307+
checkMatchesMessages(model, messages);
308+
});
309+
});
310+
145311
test('RecentSenders.handleDeleteMessageEvent', () {
146312
final model = RecentSenders();
147313
final stream = eg.stream();

0 commit comments

Comments
 (0)