-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathstore_mongo.go
More file actions
1814 lines (1722 loc) · 71.7 KB
/
Copy pathstore_mongo.go
File metadata and controls
1814 lines (1722 loc) · 71.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package main
import (
"context"
"errors"
"fmt"
"log/slog"
"regexp"
"time"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"github.com/hmchangw/chat/pkg/errcode"
"github.com/hmchangw/chat/pkg/model"
"github.com/hmchangw/chat/pkg/pipelines"
)
// botAccountRegex matches bot/app accounts by the ".bot" suffix only — it excludes
// "p_" platform-admin accounts, which have user records and are looked up as users here.
const botAccountRegex = `\.bot$`
var botAccountPattern = regexp.MustCompile(botAccountRegex)
type MongoStore struct {
rooms *mongo.Collection
subscriptions *mongo.Collection
threadSubscriptions *mongo.Collection
threadRooms *mongo.Collection
roomMembers *mongo.Collection
users *mongo.Collection
apps *mongo.Collection
botCmdMenus *mongo.Collection
teamsMeetings *mongo.Collection
}
func NewMongoStore(db *mongo.Database) *MongoStore {
return &MongoStore{
rooms: db.Collection("rooms"),
subscriptions: db.Collection("subscriptions"),
threadSubscriptions: db.Collection("thread_subscriptions"),
threadRooms: db.Collection("thread_rooms"),
roomMembers: db.Collection("room_members"),
users: db.Collection("users"),
apps: db.Collection("apps"),
botCmdMenus: db.Collection("bot_cmd_menu"),
teamsMeetings: db.Collection("teams_meetings"),
}
}
// EnsureIndexes creates the indexes that back the read paths in this service
// and the unique constraints required for retry-safe writes by room-worker.
// Must be invoked once at startup. Mongo treats index creation as idempotent
// when the key spec and options match.
func (s *MongoStore) EnsureIndexes(ctx context.Context) error {
if _, err := s.roomMembers.Indexes().CreateOne(ctx, mongo.IndexModel{
Keys: bson.D{{Key: "rid", Value: 1}},
}); err != nil {
return fmt.Errorf("ensure room_members (rid) index: %w", err)
}
// Unique logical key — retries from room-worker generate fresh _id values
// (see processAddMembers), so without this constraint a redelivered
// member.add would silently insert duplicate room_members. The bulk-insert
// path in room-worker already ignores mongo.IsDuplicateKeyError, so this
// makes redelivery idempotent.
if _, err := s.roomMembers.Indexes().CreateOne(ctx, mongo.IndexModel{
Keys: bson.D{{Key: "rid", Value: 1}, {Key: "member.type", Value: 1}, {Key: "member.id", Value: 1}},
Options: options.Index().SetUnique(true),
}); err != nil {
return fmt.Errorf("ensure room_members (rid,member.type,member.id) unique index: %w", err)
}
// Unique logical key for subscriptions. Same retry-idempotency rationale as room_members above.
if _, err := s.subscriptions.Indexes().CreateOne(ctx, mongo.IndexModel{
Keys: bson.D{{Key: "roomId", Value: 1}, {Key: "u.account", Value: 1}},
Options: options.Index().SetUnique(true),
}); err != nil {
return fmt.Errorf("ensure subscriptions (roomId,u.account) unique index: %w", err)
}
// Unique: account is a user's identity, so at most one users doc per account.
// findUsersForDisplay already folds results into a map keyed by account, and
// user-service declares this index unique on the shared collection — both must
// agree or the second service's CreateOne hits IndexOptionsConflict.
if _, err := s.users.Indexes().CreateOne(ctx, mongo.IndexModel{
Keys: bson.D{{Key: "account", Value: 1}},
Options: options.Index().SetUnique(true),
}); err != nil {
// E11000 here means pre-existing duplicate account values (populated env
// pre-rollout) — point operators at the one-time dedupe preflight.
if mongo.IsDuplicateKeyError(err) {
return fmt.Errorf("ensure users (account) unique index: duplicate account values exist in the users "+
"collection — run the one-time dedupe preflight (group users by account, resolve n>1) before "+
"starting this service: %w", err)
}
// A pre-existing non-unique account_1 conflicts (85 IndexOptionsConflict /
// 86 IndexKeySpecsConflict); Mongo won't upgrade it — the operator must drop it.
if se := mongo.ServerError(nil); errors.As(err, &se) && (se.HasErrorCode(85) || se.HasErrorCode(86)) {
return fmt.Errorf("ensure users (account) unique index: a non-unique account_1 index already exists on "+
"the users collection — drop the old non-unique account_1 index (db.users.dropIndex(\"account_1\")) "+
"before starting this service so it can be recreated as unique: %w", err)
}
return fmt.Errorf("ensure users (account) unique index: %w", err)
}
if _, err := s.users.Indexes().CreateOne(ctx, mongo.IndexModel{
Keys: bson.D{{Key: "sectId", Value: 1}, {Key: "account", Value: 1}},
}); err != nil {
return fmt.Errorf("ensure users (sectId,account) index: %w", err)
}
if _, err := s.users.Indexes().CreateOne(ctx, mongo.IndexModel{
Keys: bson.D{{Key: "deptId", Value: 1}, {Key: "account", Value: 1}},
}); err != nil {
return fmt.Errorf("ensure users (deptId,account) index: %w", err)
}
// Lookup index for botDM creation: GetApp filters by assistant.name.
appsIndex := mongo.IndexModel{
Keys: bson.D{{Key: "assistant.name", Value: 1}},
Options: options.Index().SetName("assistant_name_idx"),
}
if _, err := s.apps.Indexes().CreateOne(ctx, appsIndex); err != nil {
return fmt.Errorf("ensure apps index: %w", err)
}
if _, err := s.subscriptions.Indexes().CreateOne(ctx, mongo.IndexModel{
Keys: bson.D{{Key: "roomId", Value: 1}, {Key: "lastSeenAt", Value: 1}},
}); err != nil {
return fmt.Errorf("ensure subscriptions (roomId,lastSeenAt) index: %w", err)
}
// Backs room-worker's ReconcileMemberCounts, which counts bot vs non-bot
// subs per room off u.isBot — keeps both CountDocuments index-only instead
// of scanning every subscription in the room.
if _, err := s.subscriptions.Indexes().CreateOne(ctx, mongo.IndexModel{
Keys: bson.D{{Key: "roomId", Value: 1}, {Key: "u.isBot", Value: 1}},
}); err != nil {
return fmt.Errorf("ensure subscriptions (roomId,u.isBot) index: %w", err)
}
// Lookup index for FindDMSubscription (filters on u.account+name).
// Without this index, FindDMSubscription falls back to a collection scan.
if _, err := s.subscriptions.Indexes().CreateOne(ctx, mongo.IndexModel{
Keys: bson.D{{Key: "u.account", Value: 1}, {Key: "name", Value: 1}},
}); err != nil {
return fmt.Errorf("ensure subscriptions (u.account,name) index: %w", err)
}
// Backs getRoomSubscriptions: filter roomId, sort {joinedAt, _id} with
// skip/limit pagination. Including the sort keys lets Mongo return ordered
// pages from the index instead of an in-memory sort that risks the 32MB
// sort limit on large rooms.
if _, err := s.subscriptions.Indexes().CreateOne(ctx, mongo.IndexModel{
Keys: bson.D{{Key: "roomId", Value: 1}, {Key: "joinedAt", Value: 1}, {Key: "_id", Value: 1}},
}); err != nil {
return fmt.Errorf("ensure subscriptions (roomId,joinedAt,_id) index: %w", err)
}
// Backs CountOwners (filters on roomId+roles) so owner counts stay
// index-only instead of scanning every subscription in the room.
if _, err := s.subscriptions.Indexes().CreateOne(ctx, mongo.IndexModel{
Keys: bson.D{{Key: "roomId", Value: 1}, {Key: "roles", Value: 1}},
}); err != nil {
return fmt.Errorf("ensure subscriptions (roomId,roles) index: %w", err)
}
// Mirrors the unique index created by message-worker / history-service so per-service test DBs also enforce it.
if _, err := s.threadSubscriptions.Indexes().CreateOne(ctx, mongo.IndexModel{
Keys: bson.D{{Key: "threadRoomId", Value: 1}, {Key: "userAccount", Value: 1}},
Options: options.Index().SetUnique(true),
}); err != nil {
return fmt.Errorf("ensure thread_subscriptions (threadRoomId,userAccount) unique index: %w", err)
}
if _, err := s.threadSubscriptions.Indexes().CreateOne(ctx, mongo.IndexModel{
Keys: bson.D{{Key: "parentMessageId", Value: 1}, {Key: "userAccount", Value: 1}},
}); err != nil {
return fmt.Errorf("ensure thread_subscriptions (parentMessageId,userAccount) index: %w", err)
}
// Backs MinThreadSubscriptionLastSeenByThreadRoomID: covered index seek on
// (threadRoomId, lastSeenAt ASC) returns the subscriber with the smallest
// lastSeenAt in one seek — same algorithm as the (roomId, lastSeenAt) index
// on subscriptions that backs MinSubscriptionLastSeenByRoomID.
if _, err := s.threadSubscriptions.Indexes().CreateOne(ctx, mongo.IndexModel{
Keys: bson.D{{Key: "threadRoomId", Value: 1}, {Key: "lastSeenAt", Value: 1}},
}); err != nil {
return fmt.Errorf("ensure thread_subscriptions (threadRoomId,lastSeenAt) index: %w", err)
}
// Backs GetThreadUnreadSummary's $match {userAccount, siteId}. No existing
// thread_subscriptions index has userAccount as a prefix.
if _, err := s.threadSubscriptions.Indexes().CreateOne(ctx, mongo.IndexModel{
Keys: bson.D{{Key: "userAccount", Value: 1}, {Key: "siteId", Value: 1}},
}); err != nil {
return fmt.Errorf("ensure thread_subscriptions (userAccount,siteId) index: %w", err)
}
if _, err := s.apps.Indexes().CreateOne(ctx, mongo.IndexModel{
Keys: bson.D{
{Key: "channelTab.default", Value: 1},
{Key: "channelTab.enabled", Value: 1},
{Key: "channelTab.name", Value: 1},
},
}); err != nil {
return fmt.Errorf("ensure apps (channelTab.default,enabled,name) index: %w", err)
}
if _, err := s.botCmdMenus.Indexes().CreateOne(ctx, mongo.IndexModel{
Keys: bson.D{{Key: "activeStatus", Value: 1}, {Key: "name", Value: 1}},
}); err != nil {
return fmt.Errorf("ensure bot_cmd_menu (activeStatus,name) index: %w", err)
}
// Unique logical key for teams_meetings — the per-room idempotency record for
// the meetings RPC. A concurrent second create hits this constraint and the
// loser reads back the winner's record instead of inserting a duplicate (and
// thus publishing a second teams_meet_started system message). Same retry-safe
// rationale as the room_members / subscriptions unique indexes above.
if _, err := s.teamsMeetings.Indexes().CreateOne(ctx, mongo.IndexModel{
Keys: bson.D{{Key: "roomId", Value: 1}, {Key: "siteId", Value: 1}},
Options: options.Index().SetUnique(true),
}); err != nil {
return fmt.Errorf("ensure teams_meetings (roomId,siteId) unique index: %w", err)
}
return nil
}
// GetTeamsMeeting fast-path reads the room's existing Teams meeting record.
// found=false with err=nil means the room has no meeting yet.
func (s *MongoStore) GetTeamsMeeting(ctx context.Context, roomID, siteID string) (*model.TeamsMeetingRecord, bool, error) {
var rec model.TeamsMeetingRecord
err := s.teamsMeetings.FindOne(ctx, bson.M{"roomId": roomID, "siteId": siteID}).Decode(&rec)
if errors.Is(err, mongo.ErrNoDocuments) {
return nil, false, nil
}
if err != nil {
return nil, false, fmt.Errorf("get teams meeting for room %q: %w", roomID, err)
}
return &rec, true, nil
}
// InsertTeamsMeeting inserts the meeting record. The (roomId, siteId) unique
// index makes this the idempotency gate: a concurrent second insert returns a
// duplicate-key error the handler detects via mongo.IsDuplicateKeyError (which
// unwraps with errors.As) and reads back the winner's record.
func (s *MongoStore) InsertTeamsMeeting(ctx context.Context, record model.TeamsMeetingRecord) error {
if _, err := s.teamsMeetings.InsertOne(ctx, record); err != nil {
return fmt.Errorf("insert teams meeting record: %w", err)
}
return nil
}
// roomReadProjection is the field set GetRoom returns — the union of every
// Room field read by a handler call site. The full Room doc is never needed on
// these read paths, and projecting trims the BSON decode (a top CPU consumer in
// profiling) plus the wire payload. Keep in sync with the Room field reads in
// handler.go; the projection-field integration test guards drift.
var roomReadProjection = bson.D{
{Key: "_id", Value: 1}, {Key: "type", Value: 1}, {Key: "name", Value: 1},
{Key: "userCount", Value: 1}, {Key: "appCount", Value: 1},
{Key: "restricted", Value: 1}, {Key: "externalAccess", Value: 1},
{Key: "lastMsgAt", Value: 1}, {Key: "minUserLastSeenAt", Value: 1},
}
// subscriptionReadProjection is the field set GetSubscription returns — the
// union of every Subscription field read by a handler call site. The fat
// Subscription doc (~30 fields incl. byte arrays and time pointers) is never
// needed here; projecting trims the reflection-heavy BSON decode. Keep in sync
// with the Subscription field reads in handler.go; the projection-field
// integration test guards drift.
var subscriptionReadProjection = bson.D{
{Key: "_id", Value: 1}, {Key: "u", Value: 1}, {Key: "roomId", Value: 1},
{Key: "siteId", Value: 1}, {Key: "roles", Value: 1}, {Key: "alert", Value: 1},
{Key: "threadUnread", Value: 1}, {Key: "lastSeenAt", Value: 1},
}
func (s *MongoStore) GetRoom(ctx context.Context, id string) (*model.Room, error) {
var room model.Room
opts := options.FindOne().SetProjection(roomReadProjection)
if err := s.rooms.FindOne(ctx, bson.M{"_id": id}, opts).Decode(&room); err != nil {
return nil, fmt.Errorf("room %q not found: %w", id, err)
}
return &room, nil
}
func (s *MongoStore) GetSubscription(ctx context.Context, account, roomID string) (*model.Subscription, error) {
var sub model.Subscription
filter := bson.M{"u.account": account, "roomId": roomID}
opts := options.FindOne().SetProjection(subscriptionReadProjection)
if err := s.subscriptions.FindOne(ctx, filter, opts).Decode(&sub); err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
return nil, fmt.Errorf("%q in room %q: %w", account, roomID, model.ErrSubscriptionNotFound)
}
return nil, fmt.Errorf("get subscription for %q in room %q: %w", account, roomID, err)
}
return &sub, nil
}
// membershipExistsProjection returns only _id so the existence check decodes
// essentially nothing — the cheapest form of GetSubscription for call sites
// that use the result solely as a membership gate.
var membershipExistsProjection = bson.D{{Key: "_id", Value: 1}}
func (s *MongoStore) CheckMembership(ctx context.Context, account, roomID string) error {
filter := bson.M{"u.account": account, "roomId": roomID}
opts := options.FindOne().SetProjection(membershipExistsProjection)
if err := s.subscriptions.FindOne(ctx, filter, opts).Err(); err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
return fmt.Errorf("%q in room %q: %w", account, roomID, model.ErrSubscriptionNotFound)
}
return fmt.Errorf("check membership for %q in room %q: %w", account, roomID, err)
}
return nil
}
// GetSubscriptionWithMembership loads the target subscription joined with their
// individual and org membership sources. Used by the remove-member validation
// flow to decide whether a user can leave or be removed individually.
func (s *MongoStore) GetSubscriptionWithMembership(ctx context.Context, roomID, account string) (*SubscriptionWithMembership, error) {
pipeline := mongo.Pipeline{
{{Key: "$match", Value: bson.M{"roomId": roomID, "u.account": account}}},
{{Key: "$lookup", Value: bson.M{
"from": "room_members",
"let": bson.M{"acct": "$u.account"},
"pipeline": bson.A{
bson.M{"$match": bson.M{"$expr": bson.M{"$and": bson.A{
bson.M{"$eq": bson.A{"$rid", roomID}},
bson.M{"$eq": bson.A{"$member.type", "individual"}},
bson.M{"$eq": bson.A{"$member.account", "$$acct"}},
}}}},
bson.M{"$limit": 1},
},
"as": "individualMembership",
}}},
{{Key: "$lookup", Value: bson.M{
"from": "users",
"let": bson.M{"acct": "$u.account"},
"pipeline": bson.A{
bson.M{"$match": bson.M{"$expr": bson.M{"$eq": bson.A{"$account", "$$acct"}}}},
bson.M{"$limit": 1},
bson.M{"$project": bson.M{"sectId": 1, "deptId": 1}},
},
"as": "userDoc",
}}},
// Dept-aware org-membership lookup: a user added via Orgs:["X"] may
// match the org by deptId only (no sectId), so the room_members row
// has member.id = deptId. Checking only sectId would miss that case
// and report HasOrgMembership=false, leading the remove flow to drop
// the user's subscription even though they are still org-attached.
{{Key: "$lookup", Value: bson.M{
"from": "room_members",
"let": bson.M{
"sectId": bson.M{"$arrayElemAt": bson.A{"$userDoc.sectId", 0}},
"deptId": bson.M{"$arrayElemAt": bson.A{"$userDoc.deptId", 0}},
},
"pipeline": bson.A{
bson.M{"$match": bson.M{"$expr": bson.M{"$and": bson.A{
bson.M{"$eq": bson.A{"$rid", roomID}},
bson.M{"$eq": bson.A{"$member.type", "org"}},
bson.M{"$or": bson.A{
bson.M{"$eq": bson.A{"$member.id", "$$sectId"}},
bson.M{"$eq": bson.A{"$member.id", "$$deptId"}},
}},
}}}},
bson.M{"$limit": 1},
},
"as": "orgMembership",
}}},
{{Key: "$addFields", Value: bson.M{
"hasIndividualMembership": bson.M{"$gt": bson.A{bson.M{"$size": "$individualMembership"}, 0}},
"hasOrgMembership": bson.M{"$gt": bson.A{bson.M{"$size": "$orgMembership"}, 0}},
}}},
{{Key: "$project", Value: bson.M{"individualMembership": 0, "orgMembership": 0, "userDoc": 0}}},
}
cursor, err := s.subscriptions.Aggregate(ctx, pipeline)
if err != nil {
return nil, fmt.Errorf("aggregate subscription with membership: %w", err)
}
defer cursor.Close(ctx)
var result struct {
model.Subscription `bson:",inline"`
HasIndividualMembership bool `bson:"hasIndividualMembership"`
HasOrgMembership bool `bson:"hasOrgMembership"`
}
if !cursor.Next(ctx) {
if err := cursor.Err(); err != nil {
return nil, fmt.Errorf("iterate subscription with membership: %w", err)
}
return nil, fmt.Errorf("subscription not found for account %q in room %q: %w", account, roomID, mongo.ErrNoDocuments)
}
if err := cursor.Decode(&result); err != nil {
return nil, fmt.Errorf("decode subscription with membership: %w", err)
}
sub := result.Subscription
return &SubscriptionWithMembership{
Subscription: &sub,
HasIndividualMembership: result.HasIndividualMembership,
HasOrgMembership: result.HasOrgMembership,
}, nil
}
// CountMembersAndOwners returns the total and owner-role subscription counts
// for a room in a single aggregation, driving the last-owner and last-member
// guards in remove-member validation.
func (s *MongoStore) CountMembersAndOwners(ctx context.Context, roomID string) (*RoomCounts, error) {
pipeline := mongo.Pipeline{
{{Key: "$match", Value: bson.M{"roomId": roomID}}},
{{Key: "$facet", Value: bson.M{
"members": bson.A{bson.M{"$count": "count"}},
"owners": bson.A{
bson.M{"$match": bson.M{"roles": model.RoleOwner}},
bson.M{"$count": "count"},
},
}}},
}
cursor, err := s.subscriptions.Aggregate(ctx, pipeline)
if err != nil {
return nil, fmt.Errorf("aggregate room counts: %w", err)
}
defer cursor.Close(ctx)
var result struct {
Members []struct {
Count int `bson:"count"`
} `bson:"members"`
Owners []struct {
Count int `bson:"count"`
} `bson:"owners"`
}
if !cursor.Next(ctx) {
if err := cursor.Err(); err != nil {
return nil, fmt.Errorf("iterate room counts: %w", err)
}
return &RoomCounts{}, nil
}
if err := cursor.Decode(&result); err != nil {
return nil, fmt.Errorf("decode room counts: %w", err)
}
counts := &RoomCounts{}
if len(result.Members) > 0 {
counts.MemberCount = result.Members[0].Count
}
if len(result.Owners) > 0 {
counts.OwnerCount = result.Owners[0].Count
}
return counts, nil
}
func (s *MongoStore) ListRoomsByIDs(ctx context.Context, ids []string) ([]model.Room, error) {
if len(ids) == 0 {
return nil, nil
}
cursor, err := s.rooms.Find(ctx, bson.M{"_id": bson.M{"$in": ids}})
if err != nil {
return nil, fmt.Errorf("list rooms by ids: %w", err)
}
var rooms []model.Room
if err := cursor.All(ctx, &rooms); err != nil {
return nil, fmt.Errorf("list rooms by ids: decode: %w", err)
}
return rooms, nil
}
func (s *MongoStore) CountOwners(ctx context.Context, roomID string) (int, error) {
count, err := s.subscriptions.CountDocuments(ctx, bson.M{"roomId": roomID, "roles": model.RoleOwner})
if err != nil {
return 0, fmt.Errorf("count owners for room %q: %w", roomID, err)
}
return int(count), nil
}
func (s *MongoStore) CountNewMembers(ctx context.Context, orgIDs, directAccounts []string, roomID, excludeAccount string) (int, error) {
if len(orgIDs) == 0 && len(directAccounts) == 0 {
return 0, nil
}
pipeline := pipelines.GetNewMembersPipeline(orgIDs, directAccounts, roomID, excludeAccount)
pipeline = append(pipeline, bson.M{"$count": "n"})
cursor, err := s.users.Aggregate(ctx, pipeline)
if err != nil {
return 0, fmt.Errorf("count new members: %w", err)
}
var results []struct {
Count int `bson:"n"`
}
if err := cursor.All(ctx, &results); err != nil {
return 0, fmt.Errorf("decode count new members: %w", err)
}
if len(results) == 0 {
return 0, nil
}
return results[0].Count, nil
}
// ListRoomMembers returns the members of a room. It prefers the room_members
// collection. When no room_members document exists for roomID, it falls back
// to synthesizing RoomMember entries from the subscriptions collection so
// callers always see the same response shape. Sort: orgs first, then
// individuals, each group by ts ascending with _id tiebreaker.
func (s *MongoStore) ListRoomMembers(ctx context.Context, roomID string, limit, offset *int, enrich bool) ([]model.RoomMember, error) {
// Lightweight existence probe — project only _id to minimize payload.
err := s.roomMembers.FindOne(ctx, bson.M{"rid": roomID},
options.FindOne().SetProjection(bson.M{"_id": 1})).Err()
switch {
case err == nil:
return s.getRoomMembers(ctx, roomID, limit, offset, enrich)
case errors.Is(err, mongo.ErrNoDocuments):
return s.getRoomSubscriptions(ctx, roomID, limit, offset, enrich)
default:
return nil, fmt.Errorf("probe room_members for %q: %w", roomID, err)
}
}
func (s *MongoStore) getRoomMembers(ctx context.Context, roomID string, limit, offset *int, enrich bool) ([]model.RoomMember, error) {
pipeline := mongo.Pipeline{
bson.D{{Key: "$match", Value: bson.M{"rid": roomID}}},
bson.D{{Key: "$addFields", Value: bson.M{
"typeOrder": bson.M{"$cond": bson.A{
bson.M{"$eq": bson.A{"$member.type", "org"}}, 0, 1,
}},
}}},
bson.D{{Key: "$sort", Value: bson.D{
{Key: "typeOrder", Value: 1},
{Key: "ts", Value: 1},
{Key: "_id", Value: 1},
}}},
}
if offset != nil && *offset > 0 {
pipeline = append(pipeline, bson.D{{Key: "$skip", Value: int64(*offset)}})
}
// Mongo rejects {$limit: 0}; the handler guards against <=0 but we
// defend here too so the store is robust to direct internal callers.
if limit != nil && *limit > 0 {
pipeline = append(pipeline, bson.D{{Key: "$limit", Value: int64(*limit)}})
}
if enrich {
pipeline = append(pipeline, enrichRoomMembersStages(roomID)...)
}
// Drop the helper typeOrder field last so it never leaks into the result.
pipeline = append(pipeline, bson.D{{Key: "$project", Value: bson.M{"typeOrder": 0}}})
cursor, err := s.roomMembers.Aggregate(ctx, pipeline)
if err != nil {
return nil, fmt.Errorf("aggregate room_members for %q: %w", roomID, err)
}
defer cursor.Close(ctx)
if !enrich {
members := []model.RoomMember{}
if err := cursor.All(ctx, &members); err != nil {
return nil, fmt.Errorf("decode room_members for %q: %w", roomID, err)
}
return members, nil
}
// Enriched path: decode into a hybrid row type that carries a parallel
// `display` sub-document (the aggregation writes individual-member values
// there to sidestep the bson:"-" tags on RoomMemberEntry's display fields).
// Org-member display (sectName, memberCount) is resolved separately in a
// single index-backed batch below — see attachOrgDisplay — rather than via a
// per-row correlated $lookup that would force a users collection scan per row.
var rows []roomMemberEnrichedRow
if err := cursor.All(ctx, &rows); err != nil {
return nil, fmt.Errorf("decode enriched room_members for %q: %w", roomID, err)
}
members := make([]model.RoomMember, len(rows))
var orgIDs []string
for i := range rows {
rm := rows[i].RoomMember
d := rows[i].Display
rm.Member.EngName = d.EngName
rm.Member.ChineseName = d.ChineseName
rm.Member.IsOwner = d.IsOwner
if rm.Member.Type == model.RoomMemberOrg {
orgIDs = append(orgIDs, rm.Member.ID)
} else {
rm.Member.SectName = d.SectName
rm.Member.EmployeeID = d.EmployeeID
}
members[i] = rm
}
if len(orgIDs) > 0 {
if err := s.attachOrgDisplay(ctx, roomID, members, orgIDs); err != nil {
return nil, err
}
}
return members, nil
}
// attachOrgDisplay resolves org-member display names and member counts for the
// org rows in members, then fills SectName (dept-first tiebreak) and MemberCount
// in place. It mirrors attachUserDisplayNames but for the org dimension: a
// single index-backed batch query feeds a Go-side rollup, replacing the prior
// per-row correlated $lookup whose $expr $or could not use an index.
func (s *MongoStore) attachOrgDisplay(ctx context.Context, roomID string, members []model.RoomMember, orgIDs []string) error {
users, err := s.fetchOrgDisplayUsers(ctx, orgIDs)
if err != nil {
return fmt.Errorf("attach org display for %q: %w", roomID, err)
}
agg := buildOrgDisplay(orgIDs, users)
for i := range members {
if members[i].Member.Type != model.RoomMemberOrg {
continue
}
id := members[i].Member.ID
if a := agg[id]; a != nil {
members[i].Member.MemberCount = a.memberCount
}
members[i].Member.OrgName = orgDisplaySectName(agg[id], id)
members[i].Member.OrgDescription = orgDisplayDescription(agg[id])
}
return nil
}
// fetchOrgDisplayUsers returns the dept/sect identity and name fields for every
// user whose deptId or sectId is one of orgIDs. The top-level $or with $in is
// index-backed by the (deptId, account) and (sectId, account) indexes — unlike
// the prior $expr-based correlated $lookup, which forced a users collection
// scan for each org row.
func (s *MongoStore) fetchOrgDisplayUsers(ctx context.Context, orgIDs []string) ([]orgDisplayUser, error) {
cursor, err := s.users.Find(ctx,
bson.M{"$or": []bson.M{
{"deptId": bson.M{"$in": orgIDs}},
{"sectId": bson.M{"$in": orgIDs}},
}},
options.Find().SetProjection(bson.M{
"_id": 0,
"deptId": 1,
"sectId": 1,
"deptName": 1,
"deptTCName": 1,
"sectName": 1,
"sectTCName": 1,
"deptDescription": 1,
"sectDescription": 1,
}),
)
if err != nil {
return nil, fmt.Errorf("find org display users: %w", err)
}
defer cursor.Close(ctx)
var users []orgDisplayUser
if err := cursor.All(ctx, &users); err != nil {
return nil, fmt.Errorf("decode org display users: %w", err)
}
return users, nil
}
// roomMemberEnrichedRow is the decode target for the enriched aggregation
// pipeline. It carries the standard RoomMember plus a parallel `display`
// sub-document populated by enrichment stages. This exists because
// RoomMemberEntry's display fields are tagged bson:"-" for persistence
// safety — the pipeline therefore writes enrichment values to a separate
// field that has normal bson tags, and Go-side post-processing copies
// them onto RoomMemberEntry.
type roomMemberEnrichedRow struct {
model.RoomMember `bson:",inline"`
Display roomMemberEnrichedDisplay `bson:"display"`
}
type roomMemberEnrichedDisplay struct {
EngName string `bson:"engName,omitempty"`
ChineseName string `bson:"chineseName,omitempty"`
IsOwner bool `bson:"isOwner,omitempty"`
SectName string `bson:"sectName,omitempty"`
EmployeeID string `bson:"employeeId,omitempty"`
}
// enrichRoomMembersStages returns the $lookup + $set stages appended to the
// room_members aggregation when enrich=true. These enrich INDIVIDUAL members
// only (engName/chineseName/isOwner) via account-keyed, index-backed lookups.
// Org-member display is resolved separately by attachOrgDisplay — see there for
// why it is not a pipeline $lookup. Enrichment output is written into a
// `display` sub-document so it survives the RoomMemberEntry bson:"-" tags.
func enrichRoomMembersStages(roomID string) []bson.D {
return []bson.D{
// Individuals: join users on account → pull engName / chineseName.
{{Key: "$lookup", Value: bson.M{
"from": "users",
"let": bson.M{
"acct": "$member.account",
"mtyp": "$member.type",
},
"pipeline": bson.A{
bson.M{"$match": bson.M{"$expr": bson.M{"$and": bson.A{
bson.M{"$eq": bson.A{"$$mtyp", "individual"}},
bson.M{"$eq": bson.A{"$account", "$$acct"}},
}}}},
bson.M{"$limit": 1},
bson.M{"$project": bson.M{"engName": 1, "chineseName": 1, "sectName": 1, "employeeId": 1, "_id": 0}},
},
"as": "_userMatch",
}}},
// Individuals: join subscriptions on (roomId, u.account) → pull roles.
{{Key: "$lookup", Value: bson.M{
"from": "subscriptions",
"let": bson.M{
"acct": "$member.account",
"mtyp": "$member.type",
},
"pipeline": bson.A{
bson.M{"$match": bson.M{"$expr": bson.M{"$and": bson.A{
bson.M{"$eq": bson.A{"$$mtyp", "individual"}},
bson.M{"$eq": bson.A{"$roomId", roomID}},
bson.M{"$eq": bson.A{"$u.account", "$$acct"}},
}}}},
bson.M{"$limit": 1},
bson.M{"$project": bson.M{"roles": 1, "_id": 0}},
},
"as": "_subMatch",
}}},
// Fold the individual matches into a single `display` sub-document.
{{Key: "$set", Value: bson.M{
"display": bson.M{
"engName": bson.M{"$arrayElemAt": bson.A{"$_userMatch.engName", 0}},
"chineseName": bson.M{"$arrayElemAt": bson.A{"$_userMatch.chineseName", 0}},
"sectName": bson.M{"$arrayElemAt": bson.A{"$_userMatch.sectName", 0}},
"employeeId": bson.M{"$arrayElemAt": bson.A{"$_userMatch.employeeId", 0}},
"isOwner": bson.M{"$in": bson.A{
"owner",
bson.M{"$ifNull": bson.A{
bson.M{"$arrayElemAt": bson.A{"$_subMatch.roles", 0}},
bson.A{},
}},
}},
},
}}},
// Drop the temporary join arrays.
{{Key: "$project", Value: bson.M{"_userMatch": 0, "_subMatch": 0}}},
}
}
func (s *MongoStore) getRoomSubscriptions(ctx context.Context, roomID string, limit, offset *int, enrich bool) ([]model.RoomMember, error) {
opts := options.Find().SetSort(bson.D{
{Key: "joinedAt", Value: 1},
{Key: "_id", Value: 1},
})
if offset != nil && *offset > 0 {
opts.SetSkip(int64(*offset))
}
// SetLimit(0) means "no limit" in the driver, which would silently return
// unbounded results. Only set when >0 so it matches the aggregation path.
if limit != nil && *limit > 0 {
opts.SetLimit(int64(*limit))
}
cursor, err := s.subscriptions.Find(ctx, bson.M{"roomId": roomID}, opts)
if err != nil {
return nil, fmt.Errorf("find subscriptions for %q: %w", roomID, err)
}
defer cursor.Close(ctx)
var subs []model.Subscription
if err := cursor.All(ctx, &subs); err != nil {
return nil, fmt.Errorf("decode subscriptions for %q: %w", roomID, err)
}
members := make([]model.RoomMember, 0, len(subs))
for i := range subs {
sub := &subs[i]
entry := model.RoomMemberEntry{
ID: sub.User.ID,
Type: model.RoomMemberIndividual,
Account: sub.User.Account,
}
if enrich {
entry.IsOwner = hasRole(sub.Roles, model.RoleOwner)
}
members = append(members, model.RoomMember{
ID: sub.ID,
RoomID: roomID,
Ts: sub.JoinedAt,
Member: entry,
})
}
if enrich && len(members) > 0 {
if err := s.attachUserDisplayNames(ctx, roomID, members); err != nil {
return nil, fmt.Errorf("attach user display names for %q: %w", roomID, err)
}
}
return members, nil
}
// attachUserDisplayNames batch-loads display fields for all individual
// members in the slice and copies them onto each member entry in place.
// Used on the subscriptions-fallback + enrichment path. Accounts are
// partitioned by the ".bot$" pattern: human accounts are looked up in
// users for EngName/ChineseName; bot accounts are looked up in apps
// for Name. Each partition is queried only when non-empty.
func (s *MongoStore) attachUserDisplayNames(ctx context.Context, roomID string, members []model.RoomMember) error {
var humanAccounts, botAccounts []string
for i := range members {
if members[i].Member.Type != model.RoomMemberIndividual || members[i].Member.Account == "" {
continue
}
if botAccountPattern.MatchString(members[i].Member.Account) {
botAccounts = append(botAccounts, members[i].Member.Account)
} else {
humanAccounts = append(humanAccounts, members[i].Member.Account)
}
}
var (
userByAccount map[string]*model.User
appByAssistant map[string]string // assistant.name → app.name
)
if len(humanAccounts) > 0 {
u, err := s.findUsersForDisplay(ctx, humanAccounts)
if err != nil {
return fmt.Errorf("find users for room %q: %w", roomID, err)
}
userByAccount = u
}
if len(botAccounts) > 0 {
a, err := s.findAppsForDisplay(ctx, botAccounts)
if err != nil {
return fmt.Errorf("find apps for room %q: %w", roomID, err)
}
appByAssistant = a
}
for i := range members {
if members[i].Member.Type != model.RoomMemberIndividual {
continue
}
acct := members[i].Member.Account
if u, ok := userByAccount[acct]; ok {
members[i].Member.EngName = u.EngName
members[i].Member.ChineseName = u.ChineseName
members[i].Member.SectName = u.SectName
members[i].Member.EmployeeID = u.EmployeeID
continue
}
if name, ok := appByAssistant[acct]; ok {
members[i].Member.Name = name
}
}
return nil
}
// findUsersForDisplay returns engName/chineseName indexed by account
// for every users document matching one of accounts. The existing
// users.account index covers the $in filter.
func (s *MongoStore) findUsersForDisplay(ctx context.Context, accounts []string) (map[string]*model.User, error) {
cursor, err := s.users.Find(ctx,
bson.M{"account": bson.M{"$in": accounts}},
options.Find().SetProjection(bson.M{"_id": 0, "account": 1, "engName": 1, "chineseName": 1, "sectName": 1, "employeeId": 1}),
)
if err != nil {
return nil, fmt.Errorf("find users for display: %w", err)
}
defer cursor.Close(ctx)
var users []model.User
if err := cursor.All(ctx, &users); err != nil {
return nil, fmt.Errorf("decode users for display: %w", err)
}
out := make(map[string]*model.User, len(users))
for i := range users {
out[users[i].Account] = &users[i]
}
return out, nil
}
// findAppsForDisplay returns app.name indexed by assistant.name for
// every apps document whose assistant.name matches one of botAccounts.
// The existing apps (assistant.name) index covers the $in filter.
func (s *MongoStore) findAppsForDisplay(ctx context.Context, botAccounts []string) (map[string]string, error) {
cursor, err := s.apps.Find(ctx,
bson.M{"assistant.name": bson.M{"$in": botAccounts}},
options.Find().SetProjection(bson.M{"_id": 0, "name": 1, "assistant.name": 1}),
)
if err != nil {
return nil, fmt.Errorf("find apps for display: %w", err)
}
defer cursor.Close(ctx)
type row struct {
Name string `bson:"name"`
Assistant struct {
Name string `bson:"name"`
} `bson:"assistant"`
}
var rows []row
if err := cursor.All(ctx, &rows); err != nil {
return nil, fmt.Errorf("decode apps for display: %w", err)
}
out := make(map[string]string, len(rows))
for _, r := range rows {
out[r.Assistant.Name] = r.Name
}
return out, nil
}
func (s *MongoStore) GetUser(ctx context.Context, account string) (*model.User, error) {
var u model.User
err := s.users.FindOne(ctx, bson.M{"account": account}).Decode(&u)
if errors.Is(err, mongo.ErrNoDocuments) {
return nil, ErrUserNotFound
}
if err != nil {
return nil, fmt.Errorf("get user %q: %w", account, err)
}
return &u, nil
}
func (s *MongoStore) GetApp(ctx context.Context, botAccount string) (*model.App, error) {
var a model.App
err := s.apps.FindOne(ctx, bson.M{"assistant.name": botAccount}).Decode(&a)
if errors.Is(err, mongo.ErrNoDocuments) {
return nil, ErrAppNotFound
}
if err != nil {
return nil, fmt.Errorf("get app for bot %q: %w", botAccount, err)
}
return &a, nil
}
func (s *MongoStore) FindDMSubscription(ctx context.Context, account, targetName string) (*model.Subscription, error) {
var sub model.Subscription
err := s.subscriptions.FindOne(ctx, bson.M{
"u.account": account,
"name": targetName,
"roomType": bson.M{"$in": []model.RoomType{model.RoomTypeDM, model.RoomTypeBotDM}},
}).Decode(&sub)
if errors.Is(err, mongo.ErrNoDocuments) {
return nil, model.ErrSubscriptionNotFound
}
if err != nil {
return nil, fmt.Errorf("find dm subscription: %w", err)
}
return &sub, nil
}
// ListOrgMembers returns all users whose sectId OR deptId equals orgID,
// projected as OrgMember rows sorted by account ascending. The dept branch
// is symmetric to the membership-lookup pipelines (GetSubscriptionWithMembership,
// GetUserWithMembership): an org added by a dept-only match stores
// member.id = deptId in room_members, so the expansion RPC must look up
// users by deptId too. Both (sectId, account) and (deptId, account) indexes
// exist (see ensureIndexes) so the $or stays index-backed. Returns a
// RoomInvalidOrg-reason errcode when neither branch matches any users.
func (s *MongoStore) ListOrgMembers(ctx context.Context, orgID string) ([]model.OrgMember, error) {
opts := options.Find().
SetSort(bson.D{{Key: "account", Value: 1}}).
SetProjection(bson.M{
"_id": 1,
"account": 1,
"engName": 1,
"chineseName": 1,
"siteId": 1,
})
cursor, err := s.users.Find(ctx, bson.M{"$or": []bson.M{
{"sectId": orgID},
{"deptId": orgID},
}}, opts)
if err != nil {
return nil, fmt.Errorf("find users for org %q: %w", orgID, err)
}
defer cursor.Close(ctx)
var members []model.OrgMember
if err := cursor.All(ctx, &members); err != nil {
return nil, fmt.Errorf("decode users for org %q: %w", orgID, err)
}
if len(members) == 0 {
return nil, errcode.BadRequest(fmt.Sprintf("list org members for %q", orgID), errcode.WithReason(errcode.RoomInvalidOrg))
}
return members, nil
}
// FindExistingOrgIDs returns the subset of orgIDs that match at least one
// user via sectId or deptId. Two parallel distinct calls — one on each
// indexed field — keep the query covered by the (sectId, account) and
// (deptId, account) compound indexes; the result of each distinct is
// bounded by len(orgIDs) since the filter is an $in on the same field.
//
// A single $unionWith aggregation was tried (one round-trip instead of
// two) and benchmarked ~8.5% faster end-to-end with the same index
// coverage, but the aggregation form is more complex, ships ~55% more
// Go-side allocations per call, and shifts behavior onto Mongo's
// aggregation framework (slightly different optimizations across
// versions, more surface area in a sharded future). The two-Distinct
// form is simpler, version-agnostic from at least Mongo 4.4 onward, and
// the perf delta is not material at this call rate. Keep it simple.
func (s *MongoStore) FindExistingOrgIDs(ctx context.Context, orgIDs []string) ([]string, error) {
if len(orgIDs) == 0 {
return nil, nil
}
var sectIDs []string
if err := s.users.Distinct(ctx, "sectId", bson.M{"sectId": bson.M{"$in": orgIDs}}).Decode(§IDs); err != nil {
return nil, fmt.Errorf("distinct sectIds for org validation: %w", err)
}
var deptIDs []string
if err := s.users.Distinct(ctx, "deptId", bson.M{"deptId": bson.M{"$in": orgIDs}}).Decode(&deptIDs); err != nil {
return nil, fmt.Errorf("distinct deptIds for org validation: %w", err)
}
out := make([]string, 0, len(sectIDs)+len(deptIDs))
seen := make(map[string]struct{}, len(sectIDs)+len(deptIDs))
for _, id := range sectIDs {
if _, ok := seen[id]; !ok {
seen[id] = struct{}{}
out = append(out, id)
}
}
for _, id := range deptIDs {
if _, ok := seen[id]; !ok {
seen[id] = struct{}{}
out = append(out, id)