Skip to content

Commit b556c24

Browse files
authored
message: degrade quoted-parent to client fallback on transient history outage (#400)
1 parent d2931f0 commit b556c24

16 files changed

Lines changed: 881 additions & 65 deletions

docs/client-api.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4530,7 +4530,7 @@ The same subject and request body cover three send variants: plain message, thre
45304530
| `attachments` | string[] | no | Optional. Each entry is base64-encoded bytes — the JSON of one [Attachment](#attachment) from the upload endpoint ([§2.3](#23-http--protected-image-uploaddownload)). Max 1 entry, ≤ 8 KiB total. Stored opaquely and returned **decoded** (as `Attachment[]`) in message payloads. |
45314531
| `threadParentMessageId` | string | no | Set when posting a thread reply. Must be a valid 20-char base62 message ID. |
45324532
| `tshow` | boolean | no | The "Also send to channel" option. Only meaningful on a thread reply (`threadParentMessageId` set): the reply is persisted into the parent room's channel timeline as well as the thread (dual-write into `messages_by_room` in addition to `thread_messages_by_thread` + `messages_by_id`), and is surfaced with `tshow: true` on the persisted message. On a non-thread send the flag is **ignored and normalized to `false`** — the request is not rejected. |
4533-
| `quotedParentMessageId` | string | no | Set when posting a quoted message. The gatekeeper fetches the parent and embeds a snapshot in the persisted message; the client does not send the snapshot itself. |
4533+
| `quotedParentMessageId` | string | no | Set when posting a quoted message. The gatekeeper fetches the authoritative parent snapshot from message history and embeds it in the persisted message. If that fetch fails *transiently* (history briefly unavailable), the message is not dropped: the gatekeeper inserts a placeholder snapshot for live delivery (body `"Content temporarily unavailable"`), and `message-worker` re-projects the authoritative snapshot (or drops the quote) from history before the durable write, so the placeholder never persists. A genuinely missing/forbidden parent is still rejected. |
45344534

45354535
##### Plain message
45364536

@@ -4564,6 +4564,8 @@ The same subject and request body cover three send variants: plain message, thre
45644564
}
45654565
```
45664566

4567+
The client sends `quotedParentMessageId`; the server fetches and embeds the authoritative quoted-parent snapshot. During a transient history outage the server fills the live copy with a `"Content temporarily unavailable"` placeholder and re-projects the authoritative quote (or drops it) before persisting, so the placeholder never persists.
4568+
45674569
##### Thread reply quoting the thread-starter
45684570

45694571
A thread reply may quote the thread's own parent message (the message that started the thread) by setting both `threadParentMessageId` and `quotedParentMessageId` to the same ID. The quoted snapshot is embedded in the response like any other quote.

message-gatekeeper/debug_log_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func TestHandler_processMessage_DebugBreadcrumbs(t *testing.T) {
101101
store.EXPECT().GetSubscription(gomock.Any(), account, roomID).Return(sub, nil).AnyTimes()
102102
store.EXPECT().GetRoomMeta(gomock.Any(), roomID).Return(roommetacache.Meta{ID: roomID, UserCount: 1}, nil).AnyTimes()
103103

104-
h := NewHandler(store, nil, makePublishFunc(nil, nil), func(context.Context, *nats.Msg) error { return nil }, siteID, nil, 500, 1, 8192)
104+
h := NewHandler(store, nil, makePublishFunc(nil, nil), func(context.Context, *nats.Msg) error { return nil }, siteID, nil, 500, 1, 8192, "")
105105

106106
newReq := func() model.SendMessageRequest {
107107
return model.SendMessageRequest{ID: idgen.GenerateMessageID(), Content: "hello world", RequestID: reqID}

message-gatekeeper/fetcher_history.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func (f *historyParentFetcher) FetchQuotedParent(
9898
Msg: parent.Msg,
9999
Mentions: parent.Mentions,
100100
DecodedAttachments: parent.DecodedAttachments,
101-
MessageLink: fmt.Sprintf("%s/%s/%s", f.chatBaseURL, parent.RoomID, messageID),
101+
MessageLink: messageLink(f.chatBaseURL, parent.RoomID, messageID),
102102
ThreadParentID: parent.ThreadParentID,
103103
ThreadParentCreatedAt: parent.ThreadParentCreatedAt,
104104
}, nil

message-gatekeeper/handler.go

Lines changed: 119 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ import (
2727

2828
const maxContentBytes = 20 * 1024 // 20 KB
2929

30+
// quotedParentUnavailablePlaceholder is the degraded-mode quoted-parent body used
31+
// when the authoritative fetch fails transiently. It never persists —
32+
// message-worker re-projects the real snapshot before the durable write.
33+
const quotedParentUnavailablePlaceholder = "Content temporarily unavailable"
34+
3035
// replyFunc is the function signature for publishing a reply to a NATS subject.
3136
type replyFunc func(ctx context.Context, msg *nats.Msg) error
3237

@@ -51,12 +56,16 @@ type Handler struct {
5156
largeRoomThreshold int
5257
maxAttachments int
5358
maxAttachmentBytes int
59+
// chatBaseURL builds the messageLink on the degraded-mode placeholder quoted
60+
// snapshot, from trusted inputs (the send room + the validated quoted message
61+
// ID) so the link is correct even on the outage path.
62+
chatBaseURL string
5463
}
5564

5665
// NewHandler constructs a new Handler with the given dependencies.
5766
// users may be nil; when nil, sender display-name resolution is skipped and
5867
// downstream consumers fall back to UserAccount.
59-
func NewHandler(store Store, users UserGetter, publish publishFunc, reply replyFunc, siteID string, parentFetcher ParentMessageFetcher, largeRoomThreshold, maxAttachments, maxAttachmentBytes int) *Handler {
68+
func NewHandler(store Store, users UserGetter, publish publishFunc, reply replyFunc, siteID string, parentFetcher ParentMessageFetcher, largeRoomThreshold, maxAttachments, maxAttachmentBytes int, chatBaseURL string) *Handler {
6069
return &Handler{
6170
store: store,
6271
users: users,
@@ -67,6 +76,7 @@ func NewHandler(store Store, users UserGetter, publish publishFunc, reply replyF
6776
largeRoomThreshold: largeRoomThreshold,
6877
maxAttachments: maxAttachments,
6978
maxAttachmentBytes: maxAttachmentBytes,
79+
chatBaseURL: chatBaseURL,
7080
}
7181
}
7282

@@ -230,6 +240,13 @@ func (h *Handler) processMessage(ctx context.Context, account, roomID, siteID st
230240
return nil, errcode.BadRequest(fmt.Sprintf("invalid thread parent message ID %q: must be a 20-char base62 string", req.ThreadParentMessageID))
231241
}
232242

243+
// Validate the quoted parent ID at the boundary too: on the degrade path it is
244+
// copied verbatim into the snapshot MessageID and messageLink, so a malformed
245+
// value must fail fast rather than leak into the canonical event.
246+
if req.QuotedParentMessageID != "" && !idgen.IsValidMessageID(req.QuotedParentMessageID) {
247+
return nil, errcode.BadRequest(fmt.Sprintf("invalid quoted parent message ID %q: must be a 20-char base62 string", req.QuotedParentMessageID))
248+
}
249+
233250
// A message with attachments may carry empty content.
234251
if req.Content == "" && len(req.Attachments) == 0 {
235252
return nil, errcode.BadRequest("content must not be empty")
@@ -307,20 +324,20 @@ func (h *Handler) processMessage(ctx context.Context, account, roomID, siteID st
307324
// Build Message
308325
now := time.Now().UTC()
309326

310-
quotedSnapshot, err := h.resolveQuoteSnapshot(ctx, account, roomID, siteID, req.QuotedParentMessageID, req.ThreadParentMessageID)
327+
quotedSnapshot, quotedUnverified, err := h.resolveQuoteSnapshot(ctx, account, roomID, siteID, req.QuotedParentMessageID, req.ThreadParentMessageID, now)
311328
if err != nil {
312329
return nil, err
313330
}
314331
if req.QuotedParentMessageID != "" {
315332
// debug: quote passed the same-conversation-context check.
316-
slog.DebugContext(ctx, "gatekeeper quote resolved", "request_id", req.RequestID, "quoted_id", req.QuotedParentMessageID)
333+
slog.DebugContext(ctx, "gatekeeper quote resolved", "request_id", req.RequestID, "quoted_id", req.QuotedParentMessageID, "unverified", quotedUnverified)
317334
}
318335

319336
// #322: resolve the thread parent's createdAt server-side. The
320337
// server-resolved value always wins over any client-sent value — a wrong
321338
// client value must not corrupt downstream consumers. Done after the quote
322339
// resolution so a quote-context failure short-circuits the extra fetch.
323-
threadParentCreatedAt, err := h.resolveThreadParentCreatedAt(ctx, account, roomID, siteID, req.ThreadParentMessageID, req.QuotedParentMessageID, quotedSnapshot)
340+
threadParentCreatedAt, err := h.resolveThreadParentCreatedAt(ctx, account, roomID, siteID, req.ThreadParentMessageID, req.QuotedParentMessageID, quotedSnapshot, quotedUnverified)
324341
if err != nil {
325342
return nil, err
326343
}
@@ -363,8 +380,11 @@ func (h *Handler) processMessage(ctx context.Context, account, roomID, siteID st
363380
Attachments: req.Attachments,
364381
}
365382

366-
// Publish MessageEvent to MESSAGES_CANONICAL
367-
evt := model.MessageEvent{Event: model.EventCreated, Message: msg, SiteID: siteID, Timestamp: now.UnixMilli()}
383+
// Publish MessageEvent to MESSAGES_CANONICAL. QuotedParentUnverified rides the
384+
// envelope (not the persisted Message) so message-worker knows to re-project
385+
// the authoritative snapshot before the durable write when the gatekeeper had
386+
// to fall back to the untrusted client snapshot.
387+
evt := model.MessageEvent{Event: model.EventCreated, Message: msg, SiteID: siteID, Timestamp: now.UnixMilli(), QuotedParentUnverified: quotedUnverified}
368388
evtData, err := sonic.Marshal(evt)
369389
if err != nil {
370390
return nil, fmt.Errorf("marshal message event: %w", err)
@@ -391,13 +411,17 @@ func (h *Handler) resolveThreadParentCreatedAt(
391411
ctx context.Context,
392412
account, roomID, siteID, threadParentMessageID, quotedParentMessageID string,
393413
quotedSnapshot *cassandra.QuotedParentMessage,
414+
quotedUnverified bool,
394415
) (*time.Time, error) {
395416
if threadParentMessageID == "" {
396417
return nil, nil
397418
}
398419

399-
// Reuse the quote snapshot when the quoted parent is the thread parent.
400-
if quotedSnapshot != nil && quotedParentMessageID == threadParentMessageID {
420+
// Reuse the quote snapshot's CreatedAt only when authoritative. The unverified
421+
// placeholder carries a synthetic timestamp, and the thread parent's createdAt
422+
// (#322) isn't re-resolved downstream, so fetch it authoritatively instead
423+
// (NAKs if history is still down).
424+
if quotedSnapshot != nil && !quotedUnverified && quotedParentMessageID == threadParentMessageID {
401425
t := quotedSnapshot.CreatedAt.UTC()
402426
return &t, nil
403427
}
@@ -419,40 +443,101 @@ func (h *Handler) resolveThreadParentCreatedAt(
419443
return &t, nil
420444
}
421445

422-
// resolveQuoteSnapshot fetches the quoted parent and returns its snapshot.
423-
// The strict same-conversation-context rule rejects cross-thread quotes:
424-
// main-room messages may only quote main-room parents, and thread-T messages
425-
// may only quote other thread-T messages — including the thread's own parent.
426-
func (h *Handler) resolveQuoteSnapshot(ctx context.Context, account, roomID, siteID, quotedParentMessageID, newMessageThreadID string) (*cassandra.QuotedParentMessage, error) {
446+
// resolveQuoteSnapshot resolves the quoted parent into a snapshot, preferring the
447+
// authoritative history fetch and enforcing the same-conversation rule (a message
448+
// may only quote parents in its own thread or main room). The bool is the
449+
// "unverified" marker — true when the snapshot is the degraded placeholder.
450+
//
451+
// Fetch failures are tiered:
452+
// - terminal (not_found, forbidden, bad_request): reject — a quote must never
453+
// resurrect a missing parent or bypass access control.
454+
// - transient (history unavailable/internal, NATS timeout): degrade to a
455+
// placeholder (marked unverified) with the fixed "Content temporarily
456+
// unavailable" body — which message-worker re-projects (or drops) before the
457+
// durable write.
458+
func (h *Handler) resolveQuoteSnapshot(ctx context.Context, account, roomID, siteID, quotedParentMessageID, newMessageThreadID string, now time.Time) (*cassandra.QuotedParentMessage, bool, error) {
427459
if quotedParentMessageID == "" {
428-
return nil, nil
460+
return nil, false, nil
429461
}
430462
snap, err := h.parentFetcher.FetchQuotedParent(ctx, account, roomID, siteID, quotedParentMessageID)
431-
switch {
432-
case err != nil:
433-
// Preserve upstream errcode classification (transient → Unavailable,
434-
// real 404 → NotFound). For non-errcode infra failures (NATS timeout,
435-
// no-responders, unmarshal), classify as Unavailable — a transient
436-
// quoted-parent fetch failure shouldn't surface to the client as 404.
463+
if err == nil && snap == nil {
464+
// A nil snapshot with no error is a fetcher contract violation, not a
465+
// genuine missing parent. Synthesize a transient error so we degrade to the
466+
// placeholder rather than treating it as authoritative-empty.
467+
err = fmt.Errorf("fetch quoted parent %s: fetcher returned nil snapshot", quotedParentMessageID)
468+
}
469+
if err != nil {
437470
var ee *errcode.Error
438-
if errors.As(err, &ee) {
439-
return nil, ee
471+
if quoteFetchErrIsTerminal(err) && errors.As(err, &ee) {
472+
// Terminal typed *errcode.Error → reply + Ack; preserves the upstream
473+
// category (not_found, forbidden, …) for the client. The errors.As guard
474+
// is belt-and-suspenders: quoteFetchErrIsTerminal only reports true for a
475+
// typed errcode today, but guarding the bool means a future predicate
476+
// change can't fall through here returning a nil *errcode.Error.
477+
return nil, false, ee
440478
}
441-
return nil, errcode.Unavailable(fmt.Sprintf("fetch quoted parent %s", quotedParentMessageID), errcode.WithCause(err))
442-
case snap == nil:
443-
// A nil snapshot with no error is a fetcher contract violation, not a
444-
// genuine missing parent. Return a bare error so the caller's branch
445-
// classifies this as infra (Nak for redelivery + log) rather than
446-
// permanently dropping the message via a 404 reply+Ack.
447-
return nil, fmt.Errorf("fetch quoted parent %s: fetcher returned nil snapshot", quotedParentMessageID)
448-
case snap.ThreadParentID != newMessageThreadID &&
479+
// Transient failure: degrade to a placeholder snapshot rather than NAK, so
480+
// the message still flows through the outage. message-worker re-projects the
481+
// authoritative snapshot (or drops the quote) before the durable write.
482+
ph := h.placeholderQuoteSnapshot(roomID, quotedParentMessageID, newMessageThreadID, now)
483+
slog.WarnContext(ctx, "quoted-parent fetch failed; using placeholder snapshot",
484+
"request_id", natsutil.RequestIDFromContext(ctx), "quoted_id", quotedParentMessageID, "error", err)
485+
return ph, true, nil
486+
}
487+
if cerr := checkQuoteThreadContext(snap, quotedParentMessageID, newMessageThreadID); cerr != nil {
488+
return nil, false, cerr
489+
}
490+
return snap, false, nil
491+
}
492+
493+
// quoteFetchErrIsTerminal reports whether a quoted-parent fetch error is a
494+
// permanent reason not to quote (reject) vs a transient infra failure (degrade
495+
// to the placeholder). Only unavailable/internal errcodes and non-errcode infra
496+
// failures (NATS timeout, no-responders, unmarshal) are transient; every other
497+
// errcode category (not_found, forbidden, bad_request, …) is terminal.
498+
// history-service collapses a Cassandra read failure to code=internal, so
499+
// internal is treated as transient here.
500+
func quoteFetchErrIsTerminal(err error) bool {
501+
var ee *errcode.Error
502+
if errors.As(err, &ee) {
503+
switch ee.Code {
504+
case errcode.CodeUnavailable, errcode.CodeInternal:
505+
return false
506+
default:
507+
return true
508+
}
509+
}
510+
return false
511+
}
512+
513+
// placeholderQuoteSnapshot builds the degraded-mode quoted-parent snapshot for a
514+
// transient fetch failure. Only identity/link fields are real; the body is always
515+
// the fixed "Content temporarily unavailable" placeholder (the client cannot
516+
// supply the quoted-parent content). ThreadParentID mirrors the quoting message's
517+
// thread to satisfy the same-conversation rule, and CreatedAt is cosmetic. The
518+
// caller marks it unverified (see resolveQuoteSnapshot); the body is re-projected
519+
// authoritatively by message-worker before the durable write.
520+
func (h *Handler) placeholderQuoteSnapshot(roomID, messageID, newMessageThreadID string, now time.Time) *cassandra.QuotedParentMessage {
521+
return &cassandra.QuotedParentMessage{
522+
MessageID: messageID,
523+
RoomID: roomID,
524+
CreatedAt: now,
525+
Msg: quotedParentUnavailablePlaceholder,
526+
MessageLink: messageLink(h.chatBaseURL, roomID, messageID),
527+
ThreadParentID: newMessageThreadID,
528+
}
529+
}
530+
531+
// checkQuoteThreadContext enforces the same-conversation rule between the quoted
532+
// parent and the quoting message.
533+
func checkQuoteThreadContext(snap *cassandra.QuotedParentMessage, quotedParentMessageID, newMessageThreadID string) error {
534+
if snap.ThreadParentID != newMessageThreadID &&
449535
// Thread-root quote: starter is a main-room msg (ThreadParentID=="") whose ID is the thread parent — allowed.
450-
(snap.ThreadParentID != "" || quotedParentMessageID != newMessageThreadID):
451-
return nil, errcode.BadRequest(fmt.Sprintf("quoted parent %s thread context mismatch: parent thread %q, new message thread %q",
536+
(snap.ThreadParentID != "" || quotedParentMessageID != newMessageThreadID) {
537+
return errcode.BadRequest(fmt.Sprintf("quoted parent %s thread context mismatch: parent thread %q, new message thread %q",
452538
quotedParentMessageID, snap.ThreadParentID, newMessageThreadID))
453-
default:
454-
return snap, nil
455539
}
540+
return nil
456541
}
457542

458543
// canBypassLargeRoomCap reports whether the subscriber is exempt from the

0 commit comments

Comments
 (0)