Skip to content

Commit 04832ec

Browse files
committed
fix: Send pre-message after successful sending of post-message (#8063)
On the receiving side, download post messages w/o known pre-message after 30s to protect from lost messages. This isn't easy to test in Rust though because the `smtp` sending code requires an SMTP connection, so it's only tested (by already existing tests) that messages are queued in the right order. Another problem is that the `smtp` sending logic doesn't try to send any messages following a failed-to-send message until the retry limit is reached, so if there's smth wrong with a post-message, other unrelated messages are delayed, but this problem has already existed.
1 parent ca70fb9 commit 04832ec

10 files changed

Lines changed: 89 additions & 48 deletions

File tree

src/chat.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2970,22 +2970,23 @@ WHERE id=?
29702970
)?;
29712971
for recipients_chunk in recipients.chunks(chunk_size) {
29722972
let recipients_chunk = recipients_chunk.join(" ");
2973-
if let Some(pre_msg) = &rendered_pre_msg {
2974-
let row_id = stmt.execute((
2975-
&pre_msg.rfc724_mid,
2976-
&recipients_chunk,
2977-
&pre_msg.message,
2978-
msg.id,
2979-
))?;
2980-
row_ids.push(row_id.try_into()?);
2981-
}
29822973
let row_id = stmt.execute((
29832974
&rendered_msg.rfc724_mid,
29842975
&recipients_chunk,
29852976
&rendered_msg.message,
29862977
msg.id,
29872978
))?;
29882979
row_ids.push(row_id.try_into()?);
2980+
let Some(pre_msg) = &rendered_pre_msg else {
2981+
continue;
2982+
};
2983+
let row_id = stmt.execute((
2984+
&pre_msg.rfc724_mid,
2985+
&recipients_chunk,
2986+
&pre_msg.message,
2987+
msg.id,
2988+
))?;
2989+
row_ids.push(row_id.try_into()?);
29892990
}
29902991
Ok(row_ids)
29912992
};

src/download.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::context::Context;
1010
use crate::imap::session::Session;
1111
use crate::log::warn;
1212
use crate::message::{self, Message, MsgId, rfc724_mid_exists};
13+
use crate::tools;
1314
use crate::{EventType, chatlist_events};
1415

1516
pub(crate) mod post_msg_metadata;
@@ -320,12 +321,22 @@ pub(crate) async fn download_known_post_messages_without_pre_message(
320321
context: &Context,
321322
session: &mut Session,
322323
) -> Result<()> {
324+
const PRE_MSG_WAIT_TIME: i64 = 30;
325+
let now = tools::time();
323326
let rfc724_mids = context
324327
.sql
325-
.query_map_vec("SELECT rfc724_mid FROM available_post_msgs", (), |row| {
326-
let rfc724_mid: String = row.get(0)?;
327-
Ok(rfc724_mid)
328-
})
328+
.query_map_vec(
329+
"
330+
SELECT rfc724_mid FROM available_post_msgs
331+
WHERE timestamp<=? OR timestamp>?
332+
ORDER BY timestamp, rowid
333+
",
334+
(now.saturating_sub(PRE_MSG_WAIT_TIME), now),
335+
|row| {
336+
let rfc724_mid: String = row.get(0)?;
337+
Ok(rfc724_mid)
338+
},
339+
)
329340
.await?;
330341
for rfc724_mid in &rfc724_mids {
331342
if msg_is_downloaded_for(context, rfc724_mid).await? {

src/imap.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,7 @@ impl Imap {
601601
let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await;
602602

603603
let mut uids_fetch: Vec<u32> = Vec::new();
604-
let mut available_post_msgs: Vec<String> = Vec::new();
604+
let mut available_post_msgs: Vec<(String, i64)> = Vec::new();
605605
let mut download_later: Vec<String> = Vec::new();
606606
let mut uid_message_ids = BTreeMap::new();
607607
let mut largest_uid_skipped = None;
@@ -689,7 +689,7 @@ impl Imap {
689689
.is_some()
690690
{
691691
info!(context, "{message_id:?} is a post-message.");
692-
available_post_msgs.push(message_id.clone());
692+
available_post_msgs.push((message_id.clone(), time()));
693693

694694
let is_bot = context.get_config_bool(Config::Bot).await?;
695695
if is_bot && download_limit.is_none_or(|download_limit| size <= download_limit)
@@ -793,9 +793,10 @@ impl Imap {
793793
download_later.len(),
794794
);
795795
let trans_fn = |t: &mut rusqlite::Transaction| {
796-
let mut stmt = t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?)")?;
797-
for rfc724_mid in available_post_msgs {
798-
stmt.execute((rfc724_mid,))
796+
let mut stmt =
797+
t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?,?)")?;
798+
for entry in available_post_msgs {
799+
stmt.execute(entry)
799800
.context("INSERT OR IGNORE INTO available_post_msgs")?;
800801
}
801802
let mut stmt =

src/receive_imf.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -525,18 +525,10 @@ pub(crate) async fn receive_imf_inner(
525525
"Receiving message {rfc724_mid_orig:?}, seen={seen}...",
526526
);
527527

528-
// These checks must be done before processing of SecureJoin and other special messages.
529-
if mime_parser.pre_message == mimeparser::PreMessageMode::Post {
530-
// Post-Message just replaces the attachment and modifies Params, not the whole message.
531-
// This is done in the `handle_post_message` method.
532-
} else if let Some(msg_id) = message::rfc724_mid_exists(context, rfc724_mid_orig).await? {
533-
info!(
534-
context,
535-
"Message {rfc724_mid} is already in some chat or deleted."
536-
);
537-
if mime_parser.incoming {
538-
return Ok(None);
539-
}
528+
let msg_id = message::rfc724_mid_exists(context, rfc724_mid_orig).await?;
529+
if let Some(msg_id) = msg_id
530+
&& !mime_parser.incoming
531+
{
540532
// For the case if we missed a successful SMTP response. Be optimistic that the message is
541533
// delivered also.
542534
let self_addr = context.get_primary_self_addr().await?;
@@ -551,6 +543,16 @@ pub(crate) async fn receive_imf_inner(
551543
if !msg_has_pending_smtp_job(context, msg_id).await? {
552544
msg_id.set_delivered(context).await?;
553545
}
546+
}
547+
// These checks must be done before processing of SecureJoin and other special messages.
548+
if mime_parser.pre_message == mimeparser::PreMessageMode::Post {
549+
// Post-Message just replaces the attachment and modifies Params, not the whole message.
550+
// This is done in the `handle_post_message` method.
551+
} else if msg_id.is_some() {
552+
info!(
553+
context,
554+
"Message {rfc724_mid} is already in some chat or deleted."
555+
);
554556
return Ok(None);
555557
}
556558

@@ -2504,7 +2506,10 @@ WHERE id=?
25042506
part.typ,
25052507
part.bytes as isize,
25062508
part.error.as_deref().unwrap_or_default(),
2507-
state,
2509+
match mime_parser.incoming {
2510+
true => state,
2511+
false => MessageState::Undefined,
2512+
},
25082513
DownloadState::Done as u32,
25092514
original_msg.id,
25102515
),

src/receive_imf/receive_imf_tests.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5592,27 +5592,27 @@ async fn test_mark_message_as_delivered_only_after_sent_out_fully() -> Result<()
55925592
.await
55935593
.unwrap();
55945594

5595-
let (pre_msg_id, pre_msg_payload) = first_row_in_smtp_queue(alice).await;
5596-
assert_eq!(msg_id, pre_msg_id);
5597-
assert!(pre_msg_payload.len() < file_bytes.len());
5595+
let (post_msg_id, post_msg_payload) = first_row_in_smtp_queue(alice).await;
5596+
assert_eq!(msg_id, post_msg_id);
5597+
assert!(post_msg_payload.len() > file_bytes.len());
55985598

55995599
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
5600-
// Alice receives her own pre-message because of bcc_self
5600+
// Alice receives her own post-message because of bcc_self
56015601
// This should not yet mark the message as delivered,
56025602
// because not everything was sent,
5603-
// but it does remove the pre-message from the SMTP queue
5604-
receive_imf(alice, pre_msg_payload.as_bytes(), false).await?;
5603+
// but it does remove the post-message from the SMTP queue.
5604+
receive_imf(alice, post_msg_payload.as_bytes(), false).await?;
56055605
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
56065606

5607-
let (post_msg_id, post_msg_payload) = first_row_in_smtp_queue(alice).await;
5608-
assert_eq!(msg_id, post_msg_id);
5609-
assert!(post_msg_payload.len() > file_bytes.len());
5607+
let (pre_msg_id, pre_msg_payload) = first_row_in_smtp_queue(alice).await;
5608+
assert_eq!(msg_id, pre_msg_id);
5609+
assert!(pre_msg_payload.len() < file_bytes.len());
56105610

56115611
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
5612-
// Alice receives her own post-message because of bcc_self
5612+
// Alice receives her own pre-message because of bcc_self
56135613
// This should now mark the message as delivered,
56145614
// because everything was sent by now.
5615-
receive_imf(alice, post_msg_payload.as_bytes(), false).await?;
5615+
receive_imf(alice, pre_msg_payload.as_bytes(), false).await?;
56165616
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutDelivered);
56175617

56185618
Ok(())

src/smtp.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ pub(crate) async fn send_msg_to_smtp(
379379
if retries > 6 {
380380
context
381381
.sql
382-
.execute("DELETE FROM smtp WHERE id=?", (rowid,))
382+
.execute("DELETE FROM smtp WHERE msg_id=?", (msg_id,))
383383
.await
384384
.context("Failed to remove message with exceeded retry limit from smtp table")?;
385385
if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? {

src/sql/migrations.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2385,6 +2385,19 @@ UPDATE msgs SET state=19 WHERE state=24; -- Change OutPreparing to OutFailed.
23852385
.await?;
23862386
}
23872387

2388+
inc_and_check(&mut migration_version, 153)?;
2389+
if dbversion < migration_version {
2390+
sql.execute_migration(
2391+
"
2392+
CREATE INDEX smtp_index_msg_id ON smtp (msg_id, id);
2393+
ALTER TABLE available_post_msgs ADD COLUMN timestamp INTEGER DEFAULT 0 NOT NULL;
2394+
CREATE INDEX available_post_msgs_timestamp ON available_post_msgs (timestamp);
2395+
",
2396+
migration_version,
2397+
)
2398+
.await?;
2399+
}
2400+
23882401
let new_version = sql
23892402
.get_raw_config_int(VERSION_CFG)
23902403
.await?

src/test_utils.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -722,12 +722,14 @@ ORDER BY id"
722722
})
723723
}
724724

725+
/// Returns `SentMessage` instances representing `smtp` rows for the given message. Returned
726+
/// items go in reverse order for historical reasons.
725727
pub async fn get_smtp_rows_for_msg<'a>(&'a self, msg_id: MsgId) -> Vec<SentMessage<'a>> {
726728
let sent_msgs = self
727729
.ctx
728730
.sql
729731
.query_map_vec(
730-
"SELECT id, msg_id, mime, recipients FROM smtp WHERE msg_id=?",
732+
"SELECT id, msg_id, mime, recipients FROM smtp WHERE msg_id=? ORDER BY id DESC",
731733
(msg_id,),
732734
|row| {
733735
let _id: MsgId = row.get(0)?;
@@ -1055,9 +1057,17 @@ ORDER BY id"
10551057
/// This is not hooked up to any SMTP-IMAP pipeline, so the other account must call
10561058
/// [`TestContext::recv_msg`] with the returned [`SentMessage`] if it wants to receive
10571059
/// the message.
1060+
///
1061+
/// Removes SMTP jobs existed before and marks the corresponding messages as delivered, as
1062+
/// tracking of these jobs is probably already lost by the test code.
10581063
pub async fn send_msg(&self, chat_id: ChatId, msg: &mut Message) -> SentMessage<'_> {
1064+
while self.pop_sent_msg_opt(Duration::ZERO).await.is_some() {}
10591065
let msg_id = chat::send_msg(self, chat_id, msg).await.unwrap();
1060-
let res = self.pop_sent_msg().await;
1066+
let rev_order = false;
1067+
let res = self
1068+
.pop_sent_msg_ex(rev_order, Duration::ZERO)
1069+
.await
1070+
.unwrap();
10611071
assert_eq!(
10621072
res.sender_msg_id, msg_id,
10631073
"Apparently the message was not actually sent out"

src/tests/pre_messages/forward_and_save.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ async fn test_receive_both() -> Result<()> {
106106
assert_eq!(msg.text, "test".to_owned());
107107

108108
forward_msgs(alice, &[alice_msg_id], alice_chat_id).await?;
109-
let rev_order = false;
109+
let rev_order = true;
110110
let msg = bob
111111
.recv_msg(
112112
&alice

src/tests/pre_messages/receiving.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -547,8 +547,8 @@ async fn test_webxdc_updates_in_post_message_after_pre_message() -> Result<()> {
547547
.await?;
548548

549549
send_msg(alice, alice_chat_id, &mut alice_instance).await?;
550-
let post_message = alice.pop_sent_msg().await;
551550
let pre_message = alice.pop_sent_msg().await;
551+
let post_message = alice.pop_sent_msg().await;
552552

553553
let bob_instance = bob.recv_msg(&pre_message).await;
554554
assert_eq!(bob_instance.download_state, DownloadState::Available);
@@ -588,8 +588,8 @@ async fn test_webxdc_updates_in_post_message_without_pre_message() -> Result<()>
588588
.await?;
589589

590590
send_msg(alice, alice_chat_id, &mut alice_instance).await?;
591-
let post_message = alice.pop_sent_msg().await;
592591
let pre_message = alice.pop_sent_msg().await;
592+
let post_message = alice.pop_sent_msg().await;
593593

594594
// Bob receives post-message first.
595595
let bob_instance = bob.recv_msg(&post_message).await;

0 commit comments

Comments
 (0)