@@ -42,16 +42,24 @@ type publishFunc func(ctx context.Context, msg *nats.Msg, opts ...jetstream.Publ
4242// Handler processes messages from the MESSAGES stream and validates them
4343// before publishing to MESSAGES_CANONICAL.
4444type Handler struct {
45- store Store
46- publish publishFunc
47- reply replyFunc
48- siteID string
49- parentFetcher ParentMessageFetcher
45+ store Store
46+ publish publishFunc
47+ reply replyFunc
48+ siteID string
49+ parentFetcher ParentMessageFetcher
50+ largeRoomThreshold int
5051}
5152
5253// NewHandler constructs a new Handler with the given dependencies.
53- func NewHandler (store Store , publish publishFunc , reply replyFunc , siteID string , parentFetcher ParentMessageFetcher ) * Handler {
54- return & Handler {store : store , publish : publish , reply : reply , siteID : siteID , parentFetcher : parentFetcher }
54+ func NewHandler (store Store , publish publishFunc , reply replyFunc , siteID string , parentFetcher ParentMessageFetcher , largeRoomThreshold int ) * Handler {
55+ return & Handler {
56+ store : store ,
57+ publish : publish ,
58+ reply : reply ,
59+ siteID : siteID ,
60+ parentFetcher : parentFetcher ,
61+ largeRoomThreshold : largeRoomThreshold ,
62+ }
5563}
5664
5765// HandleJetStreamMsg processes a JetStream message from the MESSAGES stream.
@@ -75,7 +83,7 @@ func (h *Handler) HandleJetStreamMsg(ctx context.Context, msg jetstream.Msg) {
7583 }
7684 } else {
7785 // Validation error: reply with error and ack.
78- h .sendReply (ctx , account , msg .Data (), natsutil . MarshalError (err . Error () ))
86+ h .sendReply (ctx , account , msg .Data (), h . marshalErrorReply (err ))
7987 if err := msg .Ack (); err != nil {
8088 slog .Error ("failed to ack message" , "error" , err )
8189 }
@@ -156,6 +164,30 @@ func (h *Handler) processMessage(ctx context.Context, account, roomID, siteID st
156164 return nil , & infraError {cause : fmt .Errorf ("get subscription for user %s in room %s: %w" , account , roomID , err )}
157165 }
158166
167+ // Large-room post restriction: in rooms with more than the configured
168+ // threshold of members, only owners, admins, and bots may send top-level
169+ // messages. Thread replies are exempt regardless of room size; bypass-eligible
170+ // senders (owner/admin role, or bot account name) are exempt regardless of
171+ // room size. Both bypasses skip the Room fetch entirely (approach B —
172+ // owner fast-path generalized).
173+ isThreadReply := req .ThreadParentMessageID != ""
174+ if ! isThreadReply && ! canBypassLargeRoomCap (sub ) {
175+ userCount , err := h .store .GetRoomUserCount (ctx , roomID )
176+ if err != nil {
177+ return nil , & infraError {cause : fmt .Errorf ("get user count for room %s: %w" , roomID , err )}
178+ }
179+ if userCount > h .largeRoomThreshold {
180+ slog .Info ("send blocked" ,
181+ "reason" , codeLargeRoomPostRestricted ,
182+ "account" , account ,
183+ "roomID" , roomID ,
184+ "userCount" , userCount ,
185+ "threshold" , h .largeRoomThreshold ,
186+ )
187+ return nil , errLargeRoomPostRestricted
188+ }
189+ }
190+
159191 // Build Message
160192 now := time .Now ().UTC ()
161193
@@ -221,3 +253,29 @@ func (h *Handler) resolveQuoteSnapshot(ctx context.Context, account, roomID, sit
221253 return snap , nil
222254 }
223255}
256+
257+ // canBypassLargeRoomCap reports whether the subscriber is exempt from the
258+ // large-room post restriction. Owners, admins, and bots bypass.
259+ //
260+ // "Bot" is detected by account-name pattern (\.bot$|^p_) — see helper.go.
261+ // This single function is the edit point if/when the bypass policy changes
262+ // (e.g. promoting isBot to a shared package, adding new roles, etc.).
263+ func canBypassLargeRoomCap (sub * model.Subscription ) bool {
264+ for _ , r := range sub .Roles {
265+ if r == model .RoleOwner || r == model .RoleAdmin {
266+ return true
267+ }
268+ }
269+ return isBot (sub .User .Account )
270+ }
271+
272+ // marshalErrorReply produces the JSON reply payload for a validation error.
273+ // If the error is (or wraps) a *codedError, the reply carries the code;
274+ // otherwise the reply is the legacy uncoded shape.
275+ func (h * Handler ) marshalErrorReply (err error ) []byte {
276+ var ce * codedError
277+ if errors .As (err , & ce ) {
278+ return natsutil .MarshalErrorWithCode (ce .Message , ce .Code )
279+ }
280+ return natsutil .MarshalError (err .Error ())
281+ }
0 commit comments