-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathhandler.go
More file actions
2278 lines (2086 loc) · 81.7 KB
/
Copy pathhandler.go
File metadata and controls
2278 lines (2086 loc) · 81.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package main
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/url"
"regexp"
"strconv"
"strings"
"sync"
"time"
"unicode/utf8"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"github.com/hmchangw/chat/pkg/displayfmt"
"github.com/hmchangw/chat/pkg/errcode"
"github.com/hmchangw/chat/pkg/idgen"
"github.com/hmchangw/chat/pkg/logctx"
"github.com/hmchangw/chat/pkg/model"
"github.com/hmchangw/chat/pkg/msgraph"
"github.com/hmchangw/chat/pkg/natsrouter"
"github.com/hmchangw/chat/pkg/natsutil"
"github.com/hmchangw/chat/pkg/roomkeystore"
"github.com/hmchangw/chat/pkg/subject"
)
type Handler struct {
store RoomStore
// keyStore reads/writes room keys in the rooms collection (always wired in
// production; tests may pass nil).
keyStore RoomKeyStore
// dekProvisioner is set in main when ATREST_ENABLED; nil disables eager
// at-rest DEK creation at room-create time (message-worker's lazy create
// still covers remote sites and pre-rollout rooms). Injected as a field
// rather than a constructor arg to avoid churning every NewHandler caller.
dekProvisioner DEKProvisioner
memberListClient MemberListClient
msgReader MessageReader
siteID string
maxRoomSize int
maxBatchSize int
memberListTimeout time.Duration
publishToStream func(ctx context.Context, subj string, data []byte, msgID string) error
publishCore func(ctx context.Context, subj string, data []byte) error
restrictedRoomMinMembers int
siteURL *url.URL
maxResponseBytes int64
// Microsoft Teams integration. graphClient is nil-safe: only the meetings
// RPC uses it (the deep-link RPCs are pure string building). teamsEmailDomain
// derives a member's email as account@domain. teamsMeetingStore backs the
// per-room idempotency record (Mongo unique key on roomId+siteId).
// roomMembersLimit / roomMembersCallLimit cap the member set for meetings and
// calls respectively.
graphClient msgraph.Client
teamsMeetingStore TeamsMeetingStore
teamsEmailDomain string
roomMembersLimit int
roomMembersCallLimit int
}
func NewHandler(store RoomStore, keyStore RoomKeyStore, memberListClient MemberListClient, msgReader MessageReader, siteID string, maxRoomSize, maxBatchSize int, memberListTimeout time.Duration, restrictedRoomMinMembers int, publishToStream func(context.Context, string, []byte, string) error, publishCore func(context.Context, string, []byte) error, siteURL *url.URL, maxResponseBytes int64) *Handler {
return &Handler{
store: store,
keyStore: keyStore,
memberListClient: memberListClient,
msgReader: msgReader,
siteID: siteID,
maxRoomSize: maxRoomSize,
maxBatchSize: maxBatchSize,
memberListTimeout: memberListTimeout,
restrictedRoomMinMembers: restrictedRoomMinMembers,
publishToStream: publishToStream,
publishCore: publishCore,
siteURL: siteURL,
maxResponseBytes: maxResponseBytes,
}
}
// Register wires every room-service RPC onto the natsrouter Router. The base
// middleware mints an X-Request-ID when absent (RequestID), so every handler has
// a request ID for logging without rejecting header-less server-to-server calls.
// Handlers that derive dedup keys from the request ID (e.g. roomRestricted) rely
// on callers sending a stable X-Request-ID across retries — see docs/client-api.md.
// Register/RegisterNoBody panic on subscription failure (fatal at startup).
func (h *Handler) Register(r *natsrouter.Router) {
natsrouter.RegisterNoBody(r, subject.MuteTogglePattern(h.siteID), h.muteToggle)
natsrouter.RegisterNoBody(r, subject.FavoriteTogglePattern(h.siteID), h.favoriteToggle)
natsrouter.RegisterNoBody(r, subject.RoomAppTabsPattern(h.siteID), h.getRoomAppTabs)
natsrouter.RegisterNoBody(r, subject.RoomAppCmdMenuPattern(h.siteID), h.getRoomAppCommandMenu)
natsrouter.RegisterNoBody(r, subject.OrgMembersPattern(h.siteID), h.listOrgMembers)
natsrouter.RegisterNoBody(r, subject.MemberListPattern(h.siteID), h.listMembers)
natsrouter.RegisterNoBody(r, subject.MemberStatusesPattern(h.siteID), h.listMemberStatuses)
natsrouter.RegisterNoBody(r, subject.MentionableSubscriptionsPattern(h.siteID), h.listMentionableSubscriptions)
natsrouter.RegisterNoBody(r, subject.RoomKeyGetPattern(h.siteID), h.getRoomKey)
natsrouter.RegisterNoBody(r, subject.MessageReadPattern(h.siteID), h.messageRead)
natsrouter.Register(r, subject.MessageReadReceiptPattern(h.siteID), h.messageReadReceipt)
natsrouter.Register(r, subject.MessageThreadReadPattern(h.siteID), h.messageThreadRead)
natsrouter.Register(r, subject.MemberRoleUpdatePattern(h.siteID), h.updateRole)
natsrouter.Register(r, subject.MemberRemovePattern(h.siteID), h.removeMember)
natsrouter.Register(r, subject.MemberAddPattern(h.siteID), h.addMembers)
natsrouter.Register(r, subject.RoomRenamePattern(h.siteID), h.roomRename)
natsrouter.Register(r, subject.RoomRestricted(h.siteID), h.roomRestricted)
natsrouter.Register(r, subject.RoomsInfoBatchSubscribe(h.siteID), h.roomsInfoBatch)
natsrouter.Register(r, subject.ThreadUnreadSummarySubscribe(h.siteID), h.threadUnreadSummary)
natsrouter.Register(r, subject.RoomKeyEnsure(h.siteID), h.ensureRoomKey)
natsrouter.Register(r, subject.RoomCreatePattern(h.siteID), h.createRoom)
natsrouter.Register(r, subject.TeamsRoomCallPattern(h.siteID), h.teamsRoomCall)
natsrouter.Register(r, subject.TeamsUserCallPattern(h.siteID), h.teamsUserCall)
natsrouter.Register(r, subject.TeamsMeetingPattern(h.siteID), h.teamsMeeting)
}
func (h *Handler) createRoom(c *natsrouter.Context, req model.CreateRoomRequest) (*model.CreateRoomReply, error) { //nolint:gocritic // hugeParam: req is passed by value to satisfy the natsrouter.Register handler signature
var ctx context.Context = c
requesterAccount := c.Param("account")
roomType, err := classifyAndValidate(&req, requesterAccount)
if err != nil {
return nil, err
}
// debug: the classified room type drives all downstream routing.
slog.DebugContext(ctx, "room-service createRoom classified",
"request_id", natsutil.RequestIDFromContext(ctx), "type", roomType)
requester, err := h.store.GetUser(ctx, requesterAccount)
if err != nil {
if errors.Is(err, ErrUserNotFound) {
return nil, errcode.NotFound("user not found", errcode.WithReason(errcode.RoomUserNotFound))
}
return nil, fmt.Errorf("get requester: %w", err)
}
if requester.EngName == "" || requester.ChineseName == "" {
return nil, errInvalidUserData
}
switch roomType {
case model.RoomTypeChannel:
return h.handleCreateRoomChannel(ctx, &req, requester, requesterAccount, roomType)
case model.RoomTypeDM, model.RoomTypeBotDM:
return h.handleCreateRoomDMOrBotDM(ctx, &req, requester, roomType)
default:
return nil, fmt.Errorf("unknown room type: %s", roomType)
}
}
// classifyAndValidate runs all input-only validations in priority order
// (empty → self-DM → channel-name → channel-name-length → bot-in-channel)
// and returns the classified room type. No DB calls.
//
// Dedup/strip of req.Users happens after the empty check and before
// self-DM detection: the post-strip length, combined with the pre-strip
// dedup'd length, lets us detect "users == [requester]" (self-DM) in
// a single pass.
func classifyAndValidate(req *model.CreateRoomRequest, requesterAccount string) (model.RoomType, error) {
if req.Name == "" && len(req.Users) == 0 && len(req.Orgs) == 0 && len(req.Channels) == 0 {
return "", errEmptyCreateRequest
}
// Single dedup + strip pass; capture the pre-strip dedup'd length so we
// can detect self-DM (originalUsers == [requesterAccount]) without a
// second pass.
deduped := dedup(req.Users)
req.Users = stripAccount(deduped, requesterAccount)
if req.Name == "" && len(req.Orgs) == 0 && len(req.Channels) == 0 {
if len(deduped) == 1 && len(req.Users) == 0 {
// Pre-strip set was [requester] and post-strip is empty →
// self-DM.
return "", errSelfDM
}
}
roomType := determineRoomType(req)
if roomType == model.RoomTypeChannel {
if strings.TrimSpace(req.Name) == "" {
return "", errChannelNameRequired
}
if utf8.RuneCountInString(req.Name) > maxChannelNameRunes {
return "", errChannelNameTooLong
}
for _, a := range req.Users {
if model.IsBot(a) || model.IsPlatformAdminAccount(a) {
return "", errBotInChannel
}
}
}
return roomType, nil
}
// maxChannelNameRunes caps the rune length of a client-supplied channel name.
const maxChannelNameRunes = 100
func (h *Handler) handleCreateRoomDMOrBotDM(ctx context.Context, req *model.CreateRoomRequest, requester *model.User, roomType model.RoomType) (*model.CreateRoomReply, error) {
otherAccount := req.Users[0]
other, err := h.store.GetUser(ctx, otherAccount)
if err != nil {
if errors.Is(err, ErrUserNotFound) {
return nil, errcode.NotFound("user not found", errcode.WithReason(errcode.RoomUserNotFound))
}
return nil, fmt.Errorf("get counterpart: %w", err)
}
if roomType == model.RoomTypeDM && (other.EngName == "" || other.ChineseName == "") {
// botDMs counterpart is an app/bot whose users-collection record
// typically has empty name fields; the GetApp + Assistant.Enabled
// check below is the right validation for that case.
return nil, errInvalidUserData
}
req.RoomID = idgen.BuildDMRoomID(requester.ID, other.ID)
// DM/BotDM resolved set matches the literal counterpart list — there is no expansion.
req.ResolvedUsers = append([]string(nil), req.Users...)
// Dedup BEFORE bot-availability check so an existing botDM still resolves
// to the existing roomId even if the bot was later disabled — preserves
// the deterministic "open-or-create" contract for DMs.
existing, err := h.store.FindDMSubscription(ctx, requester.Account, other.Account)
if err == nil && existing != nil {
// debug: open-or-create short-circuit — the DM already exists.
slog.DebugContext(ctx, "room-service DM exists, returning existing",
"request_id", natsutil.RequestIDFromContext(ctx), "room_id", existing.RoomID)
// DM already exists: this is a success ("open-or-create"), not an error.
// Return the existing room ID so the client opens it. RoomType is left
// empty on this branch, matching the prior error-reply behaviour.
return &model.CreateRoomReply{
Status: model.CreateRoomStatusExists,
RoomID: existing.RoomID,
}, nil
}
if err != nil && !errors.Is(err, model.ErrSubscriptionNotFound) {
return nil, fmt.Errorf("dm dedup check: %w", err)
}
if roomType == model.RoomTypeBotDM {
app, err := h.store.GetApp(ctx, other.Account)
if err != nil {
if errors.Is(err, ErrAppNotFound) {
return nil, errBotNotAvailable
}
return nil, fmt.Errorf("get app: %w", err)
}
if app.Assistant == nil || !app.Assistant.Enabled {
return nil, errBotNotAvailable
}
}
return h.publishCreateRoom(ctx, req, requester, roomType)
}
func (h *Handler) handleCreateRoomChannel(ctx context.Context, req *model.CreateRoomRequest, requester *model.User, requesterAccount string, roomType model.RoomType) (*model.CreateRoomReply, error) {
channelOrgIDs, channelAccounts, err := h.expandChannelRefs(ctx, requester.Account, req.Channels)
if err != nil {
return nil, fmt.Errorf("expand channels: %w", err)
}
// Strip bots from channel-ref expansion so they can't leak into a new channel.
channelAccounts = filterBots(channelAccounts)
allOrgs := dedup(append(append([]string{}, req.Orgs...), channelOrgIDs...))
allUsers := stripAccount(dedup(append(append([]string{}, req.Users...), channelAccounts...)), requesterAccount)
if len(allUsers) == 0 && len(allOrgs) == 0 {
return nil, errEmptyCreateRequest
}
// Reject phantom orgs and users before sizing/publishing (run concurrently),
// same reason as addMembers: the worker writes room_members + sys-msg
// without rechecking validity.
if err := h.validateMembershipRefs(ctx, allOrgs, allUsers); err != nil {
return nil, err
}
// Pass requesterAccount as excludeAccount: the requester was stripped from
// allUsers but can still be re-added by org expansion (when their account
// is in any of the resolved orgs). Excluding them from the count lets us
// add exactly +1 below for the owner row without double-counting.
newCount, err := h.store.CountNewMembers(ctx, allOrgs, allUsers, "", requesterAccount)
if err != nil {
return nil, fmt.Errorf("count new members: %w", err)
}
if newCount == 0 {
return nil, errEmptyCreateRequest
}
// Creator is added implicitly as the channel owner. Count them in the
// capacity check so a maxRoomSize=N bound caps the materialized room at
// N members, not N+1.
totalMembers := 1 + newCount
if totalMembers > h.maxRoomSize {
return nil, errcode.Conflict(
fmt.Sprintf("exceeds maximum capacity (%d): would create %d members", h.maxRoomSize, totalMembers),
errcode.WithReason(errcode.RoomMaxSizeReached),
errcode.WithMetadata("maxRoomSize", strconv.Itoa(h.maxRoomSize), "attempted", strconv.Itoa(totalMembers)),
)
}
// Preserve req.Users / req.Orgs as the literal request for room_created; the worker
// uses ResolvedUsers / ResolvedOrgs for materialization and the members_added sys-msg.
req.ResolvedUsers = allUsers
req.ResolvedOrgs = allOrgs
req.RoomID = idgen.GenerateID()
return h.publishCreateRoom(ctx, req, requester, roomType)
}
func (h *Handler) publishCreateRoom(ctx context.Context, req *model.CreateRoomRequest, requester *model.User, roomType model.RoomType) (*model.CreateRoomReply, error) {
req.RequesterID = requester.ID
req.RequesterAccount = requester.Account
req.Timestamp = time.Now().UTC().UnixMilli()
if span := trace.SpanFromContext(ctx); span.IsRecording() {
span.SetAttributes(
attribute.String("room.id", req.RoomID),
attribute.String("room.type", string(roomType)),
attribute.String("site.id", h.siteID),
)
}
// The room encryption key is a field of the room document and is provisioned
// by room-worker when it inserts the room, so room-service no longer
// pre-provisions it here.
// Provision the at-rest DEK BEFORE the canonical event so the first message
// write doesn't pay the create cost. Blocking, like the room key above;
// message-worker's lazy creation still covers remote sites (the DEK is
// per-site) and rooms created before this rollout.
if h.dekProvisioner != nil {
if err := h.dekProvisioner.EnsureDEK(ctx, req.RoomID); err != nil {
return nil, fmt.Errorf("provision at-rest DEK: %w", err)
}
}
payload, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal canonical event: %w", err)
}
if err := h.publishToStream(ctx, subject.RoomCanonical(h.siteID, "create"), payload, ""); err != nil {
return nil, fmt.Errorf("publish canonical: %w", err)
}
// flow: the sync RPC accepted and handed the room create off to room-worker.
slog.Log(ctx, logctx.LevelFlow, "room-service create handoff", "phase", "published",
"request_id", natsutil.RequestIDFromContext(ctx), "room_id", req.RoomID, "type", roomType)
return &model.CreateRoomReply{
Status: model.CreateRoomReplyAccepted,
RoomID: req.RoomID,
RoomType: string(roomType),
}, nil
}
func (h *Handler) listOrgMembers(c *natsrouter.Context) (*model.ListOrgMembersResponse, error) {
var ctx context.Context = c
orgID := c.Param("orgID")
members, err := h.store.ListOrgMembers(ctx, orgID)
if err != nil {
if errcode.HasReason(err, errcode.RoomInvalidOrg) {
return nil, errcode.BadRequest("invalid org", errcode.WithReason(errcode.RoomInvalidOrg))
}
return nil, fmt.Errorf("get org members: %w", err)
}
return &model.ListOrgMembersResponse{Members: members}, nil
}
func (h *Handler) listMembers(c *natsrouter.Context) (*model.ListRoomMembersResponse, error) {
var ctx context.Context = c
requesterAccount := c.Param("account")
roomID := c.Param("roomID")
err := h.store.CheckMembership(ctx, requesterAccount, roomID)
switch {
case errors.Is(err, model.ErrSubscriptionNotFound):
return nil, errNotRoomMember
case err != nil:
return nil, fmt.Errorf("check room membership: %w", err)
}
var req model.ListRoomMembersRequest
if c.Msg != nil && len(c.Msg.Data) > 0 {
if err := json.Unmarshal(c.Msg.Data, &req); err != nil {
return nil, errcode.BadRequest("invalid request")
}
}
if req.Limit != nil && *req.Limit <= 0 {
return nil, errListLimitInvalid
}
if req.Offset != nil && *req.Offset < 0 {
return nil, errListOffsetInvalid
}
members, err := h.store.ListRoomMembers(ctx, roomID, req.Limit, req.Offset, req.Enrich)
if err != nil {
return nil, fmt.Errorf("get room members: %w", err)
}
return &model.ListRoomMembersResponse{Members: members}, nil
}
func (h *Handler) getRoomKey(c *natsrouter.Context) (*model.RoomKeyGetResponse, error) {
var ctx context.Context = c
if h.keyStore == nil {
return nil, fmt.Errorf("get room key: key store not configured")
}
requesterAccount := c.Param("account")
roomID := c.Param("roomID")
err := h.store.CheckMembership(ctx, requesterAccount, roomID)
switch {
case errors.Is(err, model.ErrSubscriptionNotFound):
return nil, errNotRoomMember
case err != nil:
return nil, fmt.Errorf("check room membership: %w", err)
}
var req model.RoomKeyGetRequest
if c.Msg != nil && len(c.Msg.Data) > 0 {
if err := json.Unmarshal(c.Msg.Data, &req); err != nil {
return nil, errcode.BadRequest("invalid request")
}
}
if req.Version == nil {
existing, err := h.keyStore.Get(ctx, roomID)
if err != nil {
return nil, fmt.Errorf("get room key: %w", err)
}
if existing == nil {
return nil, errRoomKeyAbsent
}
// #nosec G117 -- RoomKeyGetResponse.PrivateKey is the intended payload: on-demand key delivery to the authorized room member over an auth-callout-gated per-user NATS subject, not a leak
return &model.RoomKeyGetResponse{
RoomID: roomID,
Version: existing.Version,
PrivateKey: existing.KeyPair.PrivateKey,
}, nil
}
pair, err := h.keyStore.GetByVersion(ctx, roomID, *req.Version)
if err != nil {
return nil, fmt.Errorf("get room key: %w", err)
}
if pair == nil {
return nil, errRoomKeyAbsent
}
// #nosec G117 -- RoomKeyGetResponse.PrivateKey is the intended payload: on-demand key delivery to the authorized room member over an auth-callout-gated per-user NATS subject, not a leak
return &model.RoomKeyGetResponse{
RoomID: roomID,
Version: *req.Version,
PrivateKey: pair.PrivateKey,
}, nil
}
const (
defaultMemberStatusesLimit = 3
defaultMentionableLimit = 3
)
// requireMembershipAndGetRoom checks the requester's room membership and
// loads the room document in parallel — both reads are independent and the
// second RTT is wasted on the happy path. Uses sync.WaitGroup (not
// errgroup.WithContext) so a fast GetRoom failure doesn't cancel
// GetSubscription and surface as context.Canceled, masking the real
// not-member sentinel. Membership errors take precedence over room-fetch
// errors so a non-member always sees errNotRoomMember regardless of which
// goroutine returns first. The subscription itself is discarded; callers
// only need the gate to pass.
func (h *Handler) requireMembershipAndGetRoom(ctx context.Context, account, roomID string) (*model.Room, error) {
var (
room *model.Room
subErr error
roomErr error
wg sync.WaitGroup
)
wg.Add(2)
go func() {
defer wg.Done()
subErr = h.store.CheckMembership(ctx, account, roomID)
}()
go func() {
defer wg.Done()
room, roomErr = h.store.GetRoom(ctx, roomID)
}()
wg.Wait()
if errors.Is(subErr, model.ErrSubscriptionNotFound) {
return nil, errNotRoomMember
}
if subErr != nil {
return nil, fmt.Errorf("check room membership: %w", subErr)
}
if roomErr != nil {
return nil, fmt.Errorf("get room: %w", roomErr)
}
return room, nil
}
func (h *Handler) listMemberStatuses(c *natsrouter.Context) (*model.ListMemberStatusesResponse, error) {
var ctx context.Context = c
requesterAccount := c.Param("account")
roomID := c.Param("roomID")
if span := trace.SpanFromContext(ctx); span.IsRecording() {
span.SetAttributes(
attribute.String("room.id", roomID),
attribute.String("site.id", h.siteID),
)
}
var req model.ListMemberStatusesRequest
if c.Msg != nil && len(c.Msg.Data) > 0 {
if err := json.Unmarshal(c.Msg.Data, &req); err != nil {
return nil, errcode.BadRequest("invalid request")
}
}
room, err := h.requireMembershipAndGetRoom(ctx, requesterAccount, roomID)
if err != nil {
return nil, err
}
// Clamp the default to the room cap so a small no-limit room doesn't trip
// the explicit-limit guard. Client-supplied values stay strictly validated.
var limit int
if req.Limit == nil {
if room.UserCount == 0 {
return &model.ListMemberStatusesResponse{Members: []model.MemberStatus{}}, nil
}
limit = min(defaultMemberStatusesLimit, room.UserCount)
} else {
limit = *req.Limit
if limit <= 0 || limit > room.UserCount {
return nil, errMemberStatusesLimitInvalid
}
}
members, err := h.store.ListMemberStatuses(ctx, roomID, limit)
if err != nil {
return nil, fmt.Errorf("list member statuses: %w", err)
}
return &model.ListMemberStatusesResponse{Members: members}, nil
}
func (h *Handler) listMentionableSubscriptions(c *natsrouter.Context) (*model.MentionableSubscriptionsResponse, error) {
var ctx context.Context = c
requesterAccount := c.Param("account")
roomID := c.Param("roomID")
if span := trace.SpanFromContext(ctx); span.IsRecording() {
span.SetAttributes(
attribute.String("room.id", roomID),
attribute.String("site.id", h.siteID),
)
}
var req model.MentionableSubscriptionsRequest
if c.Msg != nil && len(c.Msg.Data) > 0 {
if err := json.Unmarshal(c.Msg.Data, &req); err != nil {
return nil, errcode.BadRequest("invalid request")
}
}
room, err := h.requireMembershipAndGetRoom(ctx, requesterAccount, roomID)
if err != nil {
return nil, err
}
mentionableCap := room.UserCount + room.AppCount
var limit int
if req.Limit == nil {
if mentionableCap == 0 {
return &model.MentionableSubscriptionsResponse{Subscriptions: []model.MentionableSubscription{}}, nil
}
limit = min(defaultMentionableLimit, mentionableCap)
} else {
limit = *req.Limit
if limit <= 0 || limit > mentionableCap {
return nil, errMentionableLimitInvalid
}
}
// Filter is a literal substring. QuoteMeta escapes regex metacharacters
// so a user typing "a.b" doesn't match every "a<any>b" account. Empty stays empty.
escapedFilter := regexp.QuoteMeta(req.Filter)
subs, err := h.store.ListMentionableSubscriptions(ctx, roomID, requesterAccount, escapedFilter, limit)
if err != nil {
return nil, fmt.Errorf("list mentionable subscriptions: %w", err)
}
return &model.MentionableSubscriptionsResponse{Subscriptions: subs}, nil
}
func (h *Handler) removeMember(c *natsrouter.Context, req model.RemoveMemberRequest) (*model.StatusReply, error) { //nolint:gocritic // hugeParam: req is passed by value to satisfy the natsrouter.Register handler signature
var ctx context.Context = c
requesterAccount := c.Param("account")
roomID := c.Param("roomID")
if req.RoomID != "" && req.RoomID != roomID {
return nil, errRoomIDMismatch
}
req.RoomID = roomID
req.Requester = requesterAccount
// Channel-only: DM/botDM removals are not supported.
room, err := h.store.GetRoom(ctx, roomID)
if err != nil {
return nil, fmt.Errorf("get room: %w", err)
}
if room.Type != model.RoomTypeChannel {
// Preserve sentinel identity (errors.Is matches via %w unwrap) while
// carrying the actual room type for client-side context.
return nil, fmt.Errorf("%w (got %s)", errRemoveChannelOnly, room.Type)
}
// Carry room type to room-worker to avoid a redundant GetRoom round-trip there.
req.RoomType = room.Type
// Exactly one of Account or OrgID must be set.
if (req.Account == "") == (req.OrgID == "") {
return nil, errRemoveTargetAmbiguous
}
// Permission + last-member checks. Dual-membership / no-actual-removal detection moves to room-worker (it owns deletion).
if req.Account != "" {
target, err := h.store.GetSubscriptionWithMembership(ctx, roomID, req.Account)
if err != nil {
return nil, fmt.Errorf("get target subscription: %w", err)
}
if target.HasOrgMembership && !target.HasIndividualMembership {
return nil, errOrgMemberCannotLeaveSolo
}
if req.Account != requesterAccount {
requesterSub, err := h.store.GetSubscription(ctx, requesterAccount, roomID)
if err != nil {
return nil, fmt.Errorf("get requester subscription: %w", err)
}
if !hasRole(requesterSub.Roles, model.RoleOwner) {
return nil, errOnlyOwnersCanRemove
}
}
counts, err := h.store.CountMembersAndOwners(ctx, roomID)
if err != nil {
return nil, fmt.Errorf("count members: %w", err)
}
if counts.MemberCount <= 1 {
return nil, errCannotRemoveLastMember
}
if hasRole(target.Subscription.Roles, model.RoleOwner) && counts.OwnerCount <= 1 {
return nil, errLastOwnerCannotLeave
}
} else {
// Owner-removes-org: only the requester's owner role matters here; org members resolved downstream.
sub, err := h.store.GetSubscription(ctx, requesterAccount, roomID)
if err != nil {
return nil, fmt.Errorf("get requester subscription: %w", err)
}
if !hasRole(sub.Roles, model.RoleOwner) {
return nil, errOnlyOwnersCanRemove
}
}
// Stable seed for room-worker's deterministic system-message IDs across JetStream redeliveries.
req.Timestamp = time.Now().UTC().UnixMilli()
// Publish to ROOMS stream for room-worker processing.
data, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal remove member request: %w", err)
}
if err := h.publishToStream(ctx, subject.RoomCanonical(h.siteID, "member.remove"), data, ""); err != nil {
return nil, fmt.Errorf("publish to stream: %w", err)
}
return &model.StatusReply{Status: "accepted"}, nil
}
func (h *Handler) updateRole(c *natsrouter.Context, req model.UpdateRoleRequest) (*model.StatusReply, error) {
var ctx context.Context = c
requester := c.Param("account")
roomID := c.Param("roomID")
if req.RoomID != "" && req.RoomID != roomID {
return nil, errRoomIDMismatch
}
req.RoomID = roomID
if req.NewRole != model.RoleOwner && req.NewRole != model.RoleMember {
return nil, errInvalidRole
}
room, err := h.store.GetRoom(ctx, roomID)
if err != nil {
return nil, fmt.Errorf("get room: %w", err)
}
if room.Type != model.RoomTypeChannel {
return nil, errRoomTypeGuard
}
requesterSub, err := h.store.GetSubscription(ctx, requester, roomID)
if err != nil {
return nil, fmt.Errorf("requester not found: %w", err)
}
if !hasRole(requesterSub.Roles, model.RoleOwner) {
return nil, errOnlyOwners
}
// Covers both role check and membership-source guard; missing sub → errTargetNotMember.
target, err := h.store.GetSubscriptionWithMembership(ctx, roomID, req.Account)
if err != nil {
if errors.Is(err, model.ErrSubscriptionNotFound) || errors.Is(err, mongo.ErrNoDocuments) {
return nil, errTargetNotMember
}
return nil, fmt.Errorf("get target subscription: %w", err)
}
// Promote: target must not already be owner. Demote: target must be owner.
if req.NewRole == model.RoleOwner && hasRole(target.Subscription.Roles, model.RoleOwner) {
return nil, errAlreadyOwner
}
if req.NewRole == model.RoleMember && !hasRole(target.Subscription.Roles, model.RoleOwner) {
return nil, errNotOwner
}
// Reject only provably org-only members; subscription-only members (both flags false) are promotable.
if req.NewRole == model.RoleOwner && target.HasOrgMembership && !target.HasIndividualMembership {
return nil, errPromoteRequiresIndividual
}
// Last-owner guard only needed on self-demotion; rule #5 ensures requester is an owner.
if req.NewRole == model.RoleMember && req.Account == requester {
count, err := h.store.CountOwners(ctx, roomID)
if err != nil {
return nil, fmt.Errorf("count owners: %w", err)
}
if count <= 1 {
return nil, errCannotDemoteLast
}
}
// One instant shared by the origin write and the published event: the doc's
// rolesUpdatedAt must equal the event timestamp so remote replicas guard against
// the same high-water mark.
now := time.Now().UTC()
sub, err := h.store.SetOwnerRole(ctx, roomID, req.Account, req.NewRole == model.RoleOwner, now)
if err != nil {
if errors.Is(err, model.ErrSubscriptionNotFound) {
return nil, errTargetNotMember // defensive: target removed between validation and mutate
}
return nil, fmt.Errorf("set owner role: %w", err)
}
// Role updates are channel-only (guarded above); the channel name is already in hand.
subEvtData, err := h.publishSubscriptionUpdate(ctx, req.Account, "role_updated", sub, room.Name, now)
if err != nil {
return nil, err
}
userSiteID, err := h.store.GetUserSiteID(ctx, req.Account)
if err != nil {
return nil, fmt.Errorf("get user siteId: %w", err)
}
if userSiteID != "" && userSiteID != h.siteID {
externalEvt := model.InboxEvent{
Type: "role_updated",
SiteID: h.siteID,
DestSiteID: userSiteID,
Payload: subEvtData, // inbox-worker.handleRoleUpdated decodes a SubscriptionUpdateEvent
Timestamp: now.UnixMilli(),
}
externalData, err := json.Marshal(externalEvt)
if err != nil {
return nil, fmt.Errorf("marshal inbox event: %w", err)
}
if err := h.publishToStream(ctx, subject.InboxExternal(userSiteID, "role_updated"), externalData, ""); err != nil {
return nil, fmt.Errorf("publish role-updated inbox: %w", err)
}
}
return &model.StatusReply{Status: "ok"}, nil
}
// publishSubscriptionUpdate best-effort publishes a SubscriptionUpdateEvent
// (sub, action, roomName) over core NATS; a publish failure is logged, not
// returned — the DB write is the source of truth and clients reconcile on next
// refetch. Returns the marshaled event so callers can reuse it (e.g. as a
// cross-site inbox payload).
func (h *Handler) publishSubscriptionUpdate(ctx context.Context, account, action string, sub *model.Subscription, roomName string, ts time.Time) ([]byte, error) {
subEvt := model.SubscriptionUpdateEvent{
UserID: sub.User.ID,
Subscription: *sub,
Action: action,
RoomName: roomName,
Timestamp: ts.UnixMilli(),
}
data, err := json.Marshal(subEvt)
if err != nil {
return nil, fmt.Errorf("marshal subscription update event: %w", err)
}
if err := h.publishCore(ctx, subject.SubscriptionUpdate(account), data); err != nil {
slog.ErrorContext(ctx, "subscription update publish failed",
"request_id", natsutil.RequestIDFromContext(ctx), "error", err, "account", account)
}
return data, nil
}
func (h *Handler) addMembers(c *natsrouter.Context, req model.AddMembersRequest) (*model.StatusReply, error) { //nolint:gocritic // hugeParam: req is passed by value to satisfy the natsrouter.Register handler signature
var ctx context.Context = c
// 1. Subject params → requester, roomID
requester := c.Param("account")
roomID := c.Param("roomID")
// 2. Verify requester is in room. Distinguish "not a member" (typed
// forbidden — the user genuinely can't add members) from an infra failure
// (Mongo timeout etc. — must NOT collapse to a 403 user-error).
sub, err := h.store.GetSubscription(ctx, requester, roomID)
if err != nil {
if errors.Is(err, model.ErrSubscriptionNotFound) {
return nil, errNotRoomMember
}
return nil, fmt.Errorf("check requester room membership: %w", err)
}
// 3. Get room and guard on type
room, err := h.store.GetRoom(ctx, roomID)
if err != nil {
return nil, fmt.Errorf("get room: %w", err)
}
if room.Type != model.RoomTypeChannel {
return nil, errAddMembersChannelOnly
}
if room.Restricted && !hasRole(sub.Roles, model.RoleOwner) {
return nil, errOnlyOwnersCanAddToRes
}
// 4. Cross-check optional body roomID against the subject roomID.
if req.RoomID != "" && req.RoomID != roomID {
return nil, errRoomIDMismatch
}
// Reject direct bots up front — mirrors classifyAndValidate in
// create-channel: a client that explicitly lists a bot must see a hard
// error rather than a silent drop.
for _, a := range req.Users {
if model.IsBot(a) || model.IsPlatformAdminAccount(a) {
return nil, errBotInChannel
}
}
// 5. Expand channels
channelOrgIDs, channelAccounts, err := h.expandChannelRefs(ctx, requester, req.Channels)
if err != nil {
return nil, fmt.Errorf("expand channels: %w", err)
}
// Strip bots from channel-ref expansion so a source channel can never
// silently inject a bot into this channel. Mirrors create-channel.
channelAccounts = filterBots(channelAccounts)
// 6. Dedup orgs and direct accounts
allOrgs := dedup(append(req.Orgs, channelOrgIDs...))
allUsers := dedup(append(req.Users, channelAccounts...))
// 6a/6b. Reject phantom orgs and users up front (run concurrently). Without
// this, room-worker writes a room_members row for the bogus orgId/account
// and fans out a "members added" sys-msg even though no user matches.
if err := h.validateMembershipRefs(ctx, allOrgs, allUsers); err != nil {
return nil, err
}
// 7/8. Capacity check. Short-circuit: with no orgs, the request adds at most
// len(allUsers) new individuals (bot/dup/already-subscribed pruning can only
// make it fewer), so when the room has headroom for that upper bound we
// accept without the costlier CountNewMembers resolution. room.UserCount is
// kept current by room-worker; a rare transient undercount can only
// over-admit by the drift, and only matters near the cap — where the
// condition below falls through to the exact count. Org requests have no
// cheap upper bound, so they always take the precise path.
newCount := -1 // -1 records the short-circuited case (capacity satisfied by the upper bound, not counted)
if len(allOrgs) > 0 || room.UserCount+len(allUsers) > h.maxRoomSize {
// Count net-new members (count-only — actual list materialized in room-worker).
n, err := h.store.CountNewMembers(ctx, allOrgs, allUsers, roomID, "")
if err != nil {
return nil, fmt.Errorf("count new members: %w", err)
}
newCount = n
// debug: how the requested refs resolved and the capacity arithmetic.
slog.DebugContext(ctx, "room-service addMembers resolved",
"request_id", natsutil.RequestIDFromContext(ctx), "room_id", roomID,
"orgs", len(allOrgs), "users", len(allUsers), "new_count", newCount,
"current_count", room.UserCount, "max_size", h.maxRoomSize)
if room.UserCount+newCount > h.maxRoomSize {
return nil, errcode.Conflict(
fmt.Sprintf("room is at maximum capacity (%d): cannot add %d members to room with %d existing", h.maxRoomSize, newCount, room.UserCount),
errcode.WithReason(errcode.RoomMaxSizeReached),
errcode.WithMetadata("maxRoomSize", strconv.Itoa(h.maxRoomSize),
"currentUserCount", strconv.Itoa(room.UserCount),
"attempted", strconv.Itoa(room.UserCount+newCount)),
)
}
}
// 9. Normalize and publish — Users and Orgs ship as merged-but-unresolved.
// room-worker's ListNewMembers reproduces resolution at write time.
req.Users = allUsers
req.Orgs = allOrgs
req.RoomID = roomID
req.RequesterID = sub.User.ID
req.RequesterAccount = sub.User.Account
req.Timestamp = time.Now().UTC().UnixMilli()
normalized, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal add-members request: %w", err)
}
if err := h.publishToStream(ctx, subject.RoomCanonical(h.siteID, "member.add"), normalized, ""); err != nil {
return nil, fmt.Errorf("publish to stream: %w", err)
}
// flow: accepted and handed the member-add off to room-worker.
slog.Log(ctx, logctx.LevelFlow, "room-service member.add handoff", "phase", "published",
"request_id", natsutil.RequestIDFromContext(ctx), "room_id", roomID, "new_count", newCount)
// 10. Reply accepted
return &model.StatusReply{Status: "accepted"}, nil
}
// validateAccountsExist returns a RoomUserNotFound-reason errcode naming the
// first phantom account when any account has no matching user document.
// errcode.HasReason(err, errcode.RoomUserNotFound) holds. Without this gate a
// typo'd account is silently dropped and the async job reports success.
func (h *Handler) validateAccountsExist(ctx context.Context, accounts []string) error {
if len(accounts) == 0 {
return nil
}
existing, err := h.store.FindExistingAccounts(ctx, accounts)
if err != nil {
return fmt.Errorf("validate accounts: %w", err)
}
if len(existing) == len(accounts) {
return nil
}
have := make(map[string]struct{}, len(existing))
for _, a := range existing {
have[a] = struct{}{}
}
for _, a := range accounts {
if _, ok := have[a]; !ok {
return errcode.NotFound(fmt.Sprintf("user %q not found", a), errcode.WithReason(errcode.RoomUserNotFound))
}
}
return nil
}
// validateOrgIDs returns a RoomInvalidOrg-reason errcode naming the first
// phantom orgID when any orgID has zero backing users (no user with
// sectId==orgID or deptId==orgID). errcode.HasReason(err, errcode.RoomInvalidOrg)
// holds. No-op when orgIDs is empty.
func (h *Handler) validateOrgIDs(ctx context.Context, orgIDs []string) error {
if len(orgIDs) == 0 {
return nil
}
existing, err := h.store.FindExistingOrgIDs(ctx, orgIDs)
if err != nil {
return fmt.Errorf("validate org ids: %w", err)
}
if len(existing) == len(orgIDs) {
return nil
}
have := make(map[string]struct{}, len(existing))
for _, id := range existing {
have[id] = struct{}{}
}
for _, id := range orgIDs {
if _, ok := have[id]; !ok {
return errcode.BadRequest(fmt.Sprintf("invalid org %q", id), errcode.WithReason(errcode.RoomInvalidOrg))
}
}
return nil
}
// validateMembershipRefs runs the org and account existence checks
// concurrently — they hit the users collection independently, so there is no
// reason to serialize them. Uses a plain errgroup (no shared context
// cancellation) so both checks always complete, and applies the org error in
// preference to the account error to preserve the prior sequential priority.
func (h *Handler) validateMembershipRefs(ctx context.Context, orgIDs, accounts []string) error {
var orgErr, acctErr error
var g errgroup.Group
g.Go(func() error { orgErr = h.validateOrgIDs(ctx, orgIDs); return orgErr })
g.Go(func() error { acctErr = h.validateAccountsExist(ctx, accounts); return acctErr })
_ = g.Wait()
if orgErr != nil {
return orgErr
}
return acctErr
}
func (h *Handler) expandChannelRefs(ctx context.Context, requester string, refs []model.ChannelRef) (orgIDs, accounts []string, err error) {
// maxRoomSize+1 is enough to distinguish "fits" from "exceeds the cap" without
// ever materializing an unbounded result set in memory.
listLimit := h.maxRoomSize + 1
for _, ref := range refs {
var members []model.RoomMember
// Per-ref deadline so a slow same-site Mongo query or unresponsive
// remote site cannot stall the create/add request indefinitely; a
// timeout here surfaces to the caller as an Unavailable errcode with
// site+roomId so the requester can see which channel stalled.
refCtx, cancel := h.contextWithMemberListTimeout(ctx)
if ref.SiteID == h.siteID {
if subErr := h.store.CheckMembership(refCtx, requester, ref.RoomID); subErr != nil {
cancel()
if errors.Is(subErr, context.DeadlineExceeded) {