Skip to content

Commit e4d82a0

Browse files
committed
retry: add option to store outgoing messages in db
1 parent 52d69f8 commit e4d82a0

10 files changed

Lines changed: 164 additions & 9 deletions

File tree

client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,10 @@ type Client struct {
150150
// PreRetryCallback is called before a retry receipt is accepted.
151151
// If it returns false, the accepting will be cancelled and the retry receipt will be ignored.
152152
PreRetryCallback func(receipt *events.Receipt, id types.MessageID, retryCount int, msg *waE2E.Message) bool
153+
// Should whatsmeow store recently sent messages in the database so that retry receipts can be accepted
154+
// even if the process is restarted? If false, only the in-memory cache and GetMessageForRetry will be used.
155+
UseRetryMessageStore bool
156+
lastRetryStoreClear time.Time
153157

154158
// PrePairCallback is called before pairing is completed. If it returns false, the pairing will be cancelled and
155159
// the client will disconnect.

internals.go

Lines changed: 19 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

retry.go

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,34 @@ func (rm RecentMessage) IsEmpty() bool {
4747
return rm.wa == nil && rm.fb == nil
4848
}
4949

50-
func (cli *Client) addRecentMessage(to types.JID, id types.MessageID, wa *waE2E.Message, fb *waMsgApplication.MessageApplication) {
50+
func (cli *Client) addRecentMessage(ctx context.Context, to types.JID, id types.MessageID, wa *waE2E.Message, fb *waMsgApplication.MessageApplication) error {
51+
if cli.UseRetryMessageStore {
52+
var buf []byte
53+
var format string
54+
var err error
55+
if wa != nil {
56+
buf, err = proto.Marshal(wa)
57+
format = "wa"
58+
} else if fb != nil {
59+
buf, err = proto.Marshal(fb)
60+
format = "fb"
61+
}
62+
if err != nil {
63+
return fmt.Errorf("failed to marshal message for retry store: %w", err)
64+
}
65+
if buf != nil {
66+
err = cli.Store.EventBuffer.AddOutgoingEvent(ctx, to, id, format, buf)
67+
if err != nil {
68+
return fmt.Errorf("failed to add message to retry store: %w", err)
69+
}
70+
if time.Since(cli.lastRetryStoreClear) > 12*time.Hour {
71+
err = cli.Store.EventBuffer.DeleteOldOutgoingEvents(ctx)
72+
if err != nil {
73+
return fmt.Errorf("failed to clear old messages from retry store: %w", err)
74+
}
75+
}
76+
}
77+
}
5178
cli.recentMessagesLock.Lock()
5279
key := recentMessageKey{to, id}
5380
if cli.recentMessagesList[cli.recentMessagesPtr].ID != "" {
@@ -60,6 +87,7 @@ func (cli *Client) addRecentMessage(to types.JID, id types.MessageID, wa *waE2E.
6087
cli.recentMessagesPtr = 0
6188
}
6289
cli.recentMessagesLock.Unlock()
90+
return nil
6391
}
6492

6593
func (cli *Client) getRecentMessage(to types.JID, id types.MessageID) RecentMessage {
@@ -91,6 +119,13 @@ func (cli *Client) getMessageForRetry(ctx context.Context, receipt *events.Recei
91119
return &msg, nil
92120
}
93121
}
122+
if cli.UseRetryMessageStore {
123+
format, buf, err := cli.Store.EventBuffer.GetOutgoingEvent(ctx, receipt.Chat, altChat, messageID)
124+
if err != nil {
125+
return nil, fmt.Errorf("failed to get message from retry store: %w", err)
126+
}
127+
return parseRecentMessage(format, buf)
128+
}
94129
waMsg := cli.GetMessageForRetry(receipt.Sender, receipt.Chat, messageID)
95130
if waMsg != nil {
96131
cli.Log.Debugf("Found message in GetMessageForRetry to accept retry receipt for %s/%s from %s", receipt.Chat, messageID, receipt.Sender)
@@ -99,6 +134,25 @@ func (cli *Client) getMessageForRetry(ctx context.Context, receipt *events.Recei
99134
return nil, nil
100135
}
101136

137+
func parseRecentMessage(format string, buf []byte) (*RecentMessage, error) {
138+
var rm RecentMessage
139+
var err error
140+
switch format {
141+
case "wa":
142+
rm.wa = &waE2E.Message{}
143+
err = proto.Unmarshal(buf, rm.wa)
144+
case "fb":
145+
rm.fb = &waMsgApplication.MessageApplication{}
146+
err = proto.Unmarshal(buf, rm.fb)
147+
default:
148+
err = fmt.Errorf("unknown format in retry store: %s", format)
149+
}
150+
if err != nil {
151+
return nil, fmt.Errorf("failed to unmarshal payload in retry store: %w", err)
152+
}
153+
return &rm, nil
154+
}
155+
102156
const recreateSessionTimeout = 1 * time.Hour
103157

104158
func (cli *Client) shouldRecreateSession(ctx context.Context, retryCount int, jid types.JID) (reason string, recreate bool) {

send.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -373,10 +373,12 @@ func (cli *Client) SendMessage(ctx context.Context, to types.JID, message *waE2E
373373
resp.DebugTimings.Queue = time.Since(start)
374374
defer cli.messageSendLock.Unlock()
375375

376-
respChan := cli.waitResponse(req.ID)
377376
// Peer message retries aren't implemented yet
378377
if !req.Peer {
379-
cli.addRecentMessage(to, req.ID, message, nil)
378+
err = cli.addRecentMessage(ctx, to, req.ID, message, nil)
379+
if err != nil {
380+
return
381+
}
380382
}
381383

382384
if message.GetMessageContextInfo().GetMessageSecret() != nil {
@@ -387,6 +389,8 @@ func (cli *Client) SendMessage(ctx context.Context, to types.JID, message *waE2E
387389
cli.Log.Debugf("Stored message secret key for outgoing message %s", req.ID)
388390
}
389391
}
392+
393+
respChan := cli.waitResponse(req.ID)
390394
var phash string
391395
var data []byte
392396
switch to.Server {

sendfb.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,13 @@ func (cli *Client) SendFBMessage(
137137
resp.DebugTimings.Queue = time.Since(start)
138138
defer cli.messageSendLock.Unlock()
139139

140-
respChan := cli.waitResponse(req.ID)
141140
if !req.Peer {
142-
cli.addRecentMessage(to, req.ID, nil, messageAppProto)
141+
err = cli.addRecentMessage(ctx, to, req.ID, nil, messageAppProto)
142+
if err != nil {
143+
return
144+
}
143145
}
146+
respChan := cli.waitResponse(req.ID)
144147
var phash string
145148
var data []byte
146149
switch to.Server {

store/noop.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,3 +275,15 @@ func (n *NoopStore) PutManyLIDMappings(ctx context.Context, mappings []LIDMappin
275275
func (n *NoopStore) PutLIDMapping(ctx context.Context, lid types.JID, jid types.JID) error {
276276
return n.Error
277277
}
278+
279+
func (n *NoopStore) DeleteOldOutgoingEvents(ctx context.Context) error {
280+
return nil
281+
}
282+
283+
func (n *NoopStore) GetOutgoingEvent(ctx context.Context, chatJID, altChatJID types.JID, id types.MessageID) (string, []byte, error) {
284+
return "", nil, nil
285+
}
286+
287+
func (n *NoopStore) AddOutgoingEvent(ctx context.Context, chatJID types.JID, id types.MessageID, format string, plaintext []byte) error {
288+
return nil
289+
}

store/sqlstore/store.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,3 +1028,33 @@ func (s *SQLStore) DeleteOldBufferedHashes(ctx context.Context) error {
10281028
_, err := s.db.Exec(ctx, deleteOldBufferedHashesQuery, time.Now().Add(-14*24*time.Hour).UnixMilli())
10291029
return err
10301030
}
1031+
1032+
const (
1033+
getOutgoingEventQuery = `
1034+
SELECT format, plaintext FROM whatsmeow_retry_buffer WHERE our_jid=$1 AND (chat_jid=$2 OR chat_jid=$3) AND message_id=$4
1035+
`
1036+
addOutgoingEventQuery = `
1037+
INSERT INTO whatsmeow_retry_buffer (our_jid, chat_jid, message_id, format, plaintext, timestamp)
1038+
VALUES ($1, $2, $3, $4, $5, $6)
1039+
ON CONFLICT (our_jid, chat_jid, message_id) DO UPDATE
1040+
SET format=excluded.format, plaintext=excluded.plaintext, timestamp=excluded.timestamp
1041+
`
1042+
deleteOldOutgoingEventsQuery = `
1043+
DELETE FROM whatsmeow_retry_buffer WHERE our_jid=$1 AND timestamp < $2
1044+
`
1045+
)
1046+
1047+
func (s *SQLStore) GetOutgoingEvent(ctx context.Context, chatJID, altChatJID types.JID, id types.MessageID) (format string, result []byte, err error) {
1048+
err = s.db.QueryRow(ctx, getOutgoingEventQuery, s.JID, chatJID, altChatJID, id).Scan(&format, &result)
1049+
return
1050+
}
1051+
1052+
func (s *SQLStore) AddOutgoingEvent(ctx context.Context, chatJID types.JID, id types.MessageID, format string, plaintext []byte) error {
1053+
_, err := s.db.Exec(ctx, addOutgoingEventQuery, s.JID, chatJID, id, format, plaintext, time.Now().UnixMilli())
1054+
return err
1055+
}
1056+
1057+
func (s *SQLStore) DeleteOldOutgoingEvents(ctx context.Context) error {
1058+
_, err := s.db.Exec(ctx, deleteOldOutgoingEventsQuery, s.JID, time.Now().Add(-7*24*time.Hour).UnixMilli())
1059+
return err
1060+
}

store/sqlstore/upgrades/00-latest-schema.sql

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
-- v0 -> v12 (compatible with v8+): Latest schema
1+
-- v0 -> v13 (compatible with v8+): Latest schema
22
CREATE TABLE whatsmeow_device (
33
jid TEXT PRIMARY KEY,
44
lid TEXT,
@@ -158,3 +158,17 @@ CREATE TABLE whatsmeow_event_buffer (
158158
PRIMARY KEY (our_jid, ciphertext_hash),
159159
FOREIGN KEY (our_jid) REFERENCES whatsmeow_device(jid) ON DELETE CASCADE ON UPDATE CASCADE
160160
);
161+
162+
CREATE TABLE whatsmeow_retry_buffer (
163+
our_jid TEXT NOT NULL,
164+
chat_jid TEXT NOT NULL,
165+
message_id TEXT NOT NULL,
166+
format TEXT NOT NULL,
167+
plaintext bytea NOT NULL,
168+
timestamp BIGINT NOT NULL,
169+
170+
PRIMARY KEY (our_jid, chat_jid, message_id),
171+
FOREIGN KEY (our_jid) REFERENCES whatsmeow_device(jid) ON DELETE CASCADE ON UPDATE CASCADE
172+
);
173+
174+
CREATE INDEX whatsmeow_retry_buffer_timestamp_idx ON whatsmeow_retry_buffer (our_jid, timestamp);
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
-- v13 (compatible with v8+): Add buffer for outgoing events to accept retry receipts
2+
CREATE TABLE whatsmeow_retry_buffer (
3+
our_jid TEXT NOT NULL,
4+
chat_jid TEXT NOT NULL,
5+
message_id TEXT NOT NULL,
6+
format TEXT NOT NULL,
7+
plaintext bytea NOT NULL,
8+
timestamp BIGINT NOT NULL,
9+
10+
PRIMARY KEY (our_jid, chat_jid, message_id),
11+
FOREIGN KEY (our_jid) REFERENCES whatsmeow_device(jid) ON DELETE CASCADE ON UPDATE CASCADE
12+
);
13+
14+
CREATE INDEX whatsmeow_retry_buffer_timestamp_idx ON whatsmeow_retry_buffer (our_jid, timestamp);

store/store.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,10 @@ type EventBuffer interface {
158158
DoDecryptionTxn(ctx context.Context, fn func(context.Context) error) error
159159
ClearBufferedEventPlaintext(ctx context.Context, ciphertextHash [32]byte) error
160160
DeleteOldBufferedHashes(ctx context.Context) error
161+
162+
GetOutgoingEvent(ctx context.Context, chatJID, altChatJID types.JID, id types.MessageID) (string, []byte, error)
163+
AddOutgoingEvent(ctx context.Context, chatJID types.JID, id types.MessageID, format string, plaintext []byte) error
164+
DeleteOldOutgoingEvents(ctx context.Context) error
161165
}
162166

163167
type LIDMapping struct {

0 commit comments

Comments
 (0)