Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions docs/client-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -2325,14 +2325,60 @@ Used by every history-service method that returns messages. Mirrors the Cassandr
| `reactions` | map<emoji, [ReactionUser](#reactionuser)[]> | Optional. Omitted when absent; `{}` when present but empty. |
| `deleted` | boolean | Optional. `true` for tombstoned messages. |
| `type` | string | Optional. System-message type when set; regular messages omit it. Known values: `"room_created"`, `"members_added"`, `"member_removed"`, `"member_left"`, `"room_renamed"`, `"room_restricted"`. For all six, `msg` is populated with a server-rendered human-readable body and `sender.account` is the responsible actor (the requester for adds/removes-by-other / room-creates / renames / restricted changes, the leaving user for self-leave). |
| `sysMsgData` | string | Optional. Base64-encoded raw JSON payload for system messages. |
| `sysMsgData` | string | Optional. Base64-encoded JSON payload for system messages; shape depends on `type` (see [System-message `sysMsgData` payloads](#system-message-sysmsgdata-payloads)). |
| `siteId` | string | Optional. The site that owns the message. |
| `editedAt` | string | Optional. RFC 3339. Set after an edit. |
| `updatedAt` | string | Optional. RFC 3339. Mirrors `editedAt` for edits, set on delete to record the deletion time. |
| `threadRoomId` | string | Optional. The thread room ID when this is a thread message. |
| `pinnedAt` | string | Optional. RFC 3339. With the `messages_by_room` `pinned_at` mirror, room-timeline history loads now return this on pinned rows too (previously only `pin.list` and point lookups carried it). |
| `pinnedBy` | [MessageParticipant](#messageparticipant) | Optional. |

##### System-message `sysMsgData` payloads

`sysMsgData` is base64-encoded JSON whose shape depends on `type`.

`members_added` (also emitted on room creation):

| field | type | description |
|-------|------|-------------|
| `individuals` | string[] | Accounts of the individuals in the request (direct + channel-expanded, deduped; excludes organization members and the requester). May include accounts already in the room. Empty `[]`, never `null`. The "n people" count is `individuals.length`; clients may render it as a clickable list. |
| `orgs` | string[] | Organization IDs in the request (direct + channel-expanded, deduped). Empty `[]`, never `null`. The "m organizations" count is `orgs.length`; clients may render it as a clickable list. |
| `channels` | [ChannelRef](#channelref)[] | Source channels whose members were copied in (provenance). Empty `[]`, never `null`. |
| `addedUsersCount` | number | New subscriptions created by the operation, including organization-expanded members; may differ from `individuals.length`. |

```json
{ "individuals": ["alice", "bob"], "orgs": ["eng"], "channels": [], "addedUsersCount": 12 }
```

`member_removed`:

| field | type | description |
|-------|------|-------------|
| `user` | [SysMsgUser](#sysmsguser) | Set when an individual was removed. |
| `orgId` | string | Set when an organization was removed. |
| `sectName` | string | Display name of the removed organization (set with `orgId`). |
| `removedUsersCount` | number | Number of underlying accounts whose subscription was removed. |

```json
{ "user": { "account": "bob", "engName": "Bob", "chineseName": "鮑勃" }, "removedUsersCount": 1 }
```

`member_left` — `{ "user": SysMsgUser }` for the user who left:

```json
{ "user": { "account": "bob", "engName": "Bob", "chineseName": "鮑勃" } }
```

##### SysMsgUser

A user referenced by a system-message payload.

| field | type | description |
|-------|------|-------------|
| `account` | string | The user's account. |
| `engName` | string | English display name; may be empty. |
| `chineseName` | string | Chinese display name; may be empty. |

Comment thread
coderabbitai[bot] marked this conversation as resolved.
##### MessageParticipant

The author/mention/pinner embedded in a message. Distinct from the event-actor
Expand Down Expand Up @@ -4706,7 +4752,7 @@ The canonical broadcast message (distinct from the history [Message schema](#mes
| `threadParentMessageId` | string | Optional. Set for a thread reply. |
| `tshow` | boolean | Optional. Whether a thread reply is also shown in the parent room. |
| `type` | string | Optional. System-message type when set. |
| `sysMsgData` | string | Optional. Base64-encoded raw JSON payload for system messages. |
| `sysMsgData` | string | Optional. Base64-encoded JSON payload for system messages; shape depends on `type` (see [System-message `sysMsgData` payloads](#system-message-sysmsgdata-payloads)). |
| `quotedParentMessage` | [QuotedParentMessage](#quotedparentmessage) | Optional. |
| `pinnedAt` | string | Optional. RFC 3339. |
| `pinnedBy` | [Participant](#participant) | Optional. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package cassrepo

import (
"context"
"encoding/json"
"fmt"
"testing"
"time"
Expand All @@ -15,6 +16,7 @@ import (

"github.com/hmchangw/chat/history-service/internal/models"
"github.com/hmchangw/chat/pkg/atrest"
"github.com/hmchangw/chat/pkg/model"
cassmodel "github.com/hmchangw/chat/pkg/model/cassandra"
"github.com/hmchangw/chat/pkg/msgbucket"
"github.com/hmchangw/chat/pkg/testutil"
Expand Down Expand Up @@ -335,3 +337,38 @@ func TestGetMessages_DecryptErrorHaltsWalk(t *testing.T) {
assert.ErrorIs(t, walkErr, atrest.ErrPayloadMalformed,
"walk must halt on the day-1 malformed-nonce error; surfacing ErrAuthFailed means the walk continued past day1 and let day2 overwrite scanErr")
}

func TestGetMessagesBefore_SysMsgDataRoundTrips(t *testing.T) {
session := setupCassandra(t)
sizer := msgbucket.New(24 * time.Hour)
repo := NewRepository(session, sizer, 365, nil)
ctx := context.Background()

roomID := "room-sysmsg"
createdAt := time.Now().UTC().Truncate(time.Millisecond)
sender := models.Participant{ID: "u_a", Account: "alice"}

payload, err := json.Marshal(model.MembersAdded{Individuals: []string{"u1", "u2"}, Orgs: []string{"o1"}, AddedUsersCount: 3})
require.NoError(t, err)

require.NoError(t, session.Query(
`INSERT INTO messages_by_room (room_id, bucket, created_at, message_id, sender, msg, type, sys_msg_data) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
roomID, sizer.Of(createdAt), createdAt, "m-sys", sender,
`"alice" added 2 people and 1 organization to the chatroom`,
model.MessageTypeMembersAdded, payload,
).Exec())

page, err := repo.GetMessagesBefore(ctx, roomID, createdAt.Add(time.Second), time.Time{}, PageRequest{PageSize: 10})
require.NoError(t, err)
require.Len(t, page.Data, 1)

got := page.Data[0]
assert.Equal(t, model.MessageTypeMembersAdded, got.Type)
require.NotEmpty(t, got.SysMsgData, "sysMsgData must survive the load-history round-trip")

var decoded model.MembersAdded
require.NoError(t, json.Unmarshal(got.SysMsgData, &decoded))
assert.Equal(t, []string{"u1", "u2"}, decoded.Individuals)
assert.Equal(t, []string{"o1"}, decoded.Orgs)
assert.Equal(t, 3, decoded.AddedUsersCount, "whole payload (not just the slices) must survive")
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
5 changes: 3 additions & 2 deletions pkg/model/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,9 @@ type MentionableSubscriptionsResponse struct {

// CreateRoomRequest is the canonical event payload (X-Request-ID rides on the NATS header).
// Users/Orgs/Channels are the literal client request; ResolvedUsers/ResolvedOrgs carry the
// post-expansion (channel-ref-merged, requester-stripped, dedup'd) sets the worker uses for
// member materialization. Sys-message payloads use the literal lists.
// post-expansion (channel-ref-merged, requester-stripped, dedup'd) sets. The worker uses the
// resolved sets for member materialization and the members_added sys-message; room_created
// uses the literal lists.
type CreateRoomRequest struct {
Name string `json:"name" bson:"name"`
Users []string `json:"users" bson:"users"`
Expand Down
4 changes: 2 additions & 2 deletions room-service/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ func (h *Handler) handleCreateRoomChannel(ctx context.Context, req *model.Create
)
}

// Preserve req.Users / req.Orgs as the literal client request for sys-message payloads.
// The worker uses ResolvedUsers / ResolvedOrgs for capacity and member materialization.
// Preserve req.Users / req.Orgs as the literal request for room_created; the worker
// uses ResolvedUsers / ResolvedOrgs for materialization and the members_added sys-msg.
req.ResolvedUsers = allUsers
req.ResolvedOrgs = allOrgs
req.RoomID = idgen.GenerateID()
Expand Down
51 changes: 32 additions & 19 deletions room-worker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ func (h *Handler) processRemoveIndividual(ctx context.Context, req *model.Remove
if isSelfLeave {
content = formatLeft(&user.User)
} else {
content = formatRemovedUser(&user.User)
content = formatRemovedUser(requester, &user.User)
}
sysMsg := model.Message{
ID: idgen.MessageIDFromRequestID(seed, "rmindiv"),
Expand Down Expand Up @@ -656,7 +656,7 @@ func (h *Handler) processRemoveOrg(ctx context.Context, req *model.RemoveMemberR
UserID: requester.ID,
UserAccount: requester.Account,
Type: model.MessageTypeMemberRemoved,
Content: formatRemovedOrg(name, tcName, req.OrgID),
Content: formatRemovedOrg(requester, name, tcName, req.OrgID),
SysMsgData: sysMsgPayload,
CreatedAt: now,
}
Expand Down Expand Up @@ -1063,8 +1063,8 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) (err error
// (actualAccounts) or new org rows (req.Orgs). The org→individual upgrade
// path (only needIRM populated) writes the missing individual room_members
// row silently — no membership state changed for the room itself, so
// emitting an empty MemberAddEvent and a "added members to the channel"
// sys-msg with no actual members listed would mislead end users.
// emitting an empty MemberAddEvent and a members_added sys-msg with no
// actual members listed would mislead end users.
historySharedSince := historySharedSincePtr(req.History, req.Timestamp, req.RoomID)
if len(actualAccounts) > 0 || len(req.Orgs) > 0 {
memberAddEvt := model.MemberAddEvent{
Expand Down Expand Up @@ -1107,21 +1107,24 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) (err error
}
}

// Individuals = req.Users (direct + channel individuals, merged by
// room-service) minus the requester — mirrors create's creator strip.
sysIndividuals := withoutAccount(req.Users, req.RequesterAccount)
membersAdded := model.MembersAdded{
Individuals: actualAccounts,
Orgs: req.Orgs,
Channels: req.Channels,
Individuals: sysIndividuals,
Orgs: nonNil(req.Orgs),
Channels: nonNil(req.Channels),
AddedUsersCount: len(subs),
}
sysMsgData, _ := json.Marshal(membersAdded)
seed := messageDedupSeed(ctx, "processAddMembers", req.RoomID,
fmt.Sprintf("%s:%s:%d", req.RoomID, req.RequesterAccount, req.Timestamp))
// Single form only for direct 1-user adds; org-bearing adds always use multi.
content := formatAddedMulti(requester)
if len(subs) == 1 && len(req.Orgs) == 0 {
onlyUser := userMap[subs[0].User.Account]
content = formatAddedSingle(requester, &onlyUser)
}
content := addedContent(requester, sysIndividuals, req.Orgs, func(a string) *model.User {
if u, ok := userMap[a]; ok {
return &u
}
return nil
})
sysMsg := model.Message{
ID: idgen.MessageIDFromRequestID(seed, "addmembers"),
RoomID: req.RoomID,
Expand Down Expand Up @@ -1524,7 +1527,7 @@ func (h *Handler) finishCreateRoom(ctx context.Context, req *model.CreateRoomReq

// Task 36: channel-only sys-messages
if room.Type == model.RoomTypeChannel {
if err := h.publishChannelSysMessages(ctx, req, room, requester, len(subs)-1, requestID, now); err != nil {
if err := h.publishChannelSysMessages(ctx, req, room, requester, userByAccount, len(subs)-1, requestID, now); err != nil {
return fmt.Errorf("publish sys messages: %w", err)
}
}
Expand Down Expand Up @@ -1610,7 +1613,7 @@ func (h *Handler) finishCreateRoom(ctx context.Context, req *model.CreateRoomReq
return nil
}

func (h *Handler) publishChannelSysMessages(ctx context.Context, req *model.CreateRoomRequest, room *model.Room, requester *model.User, addedUsersCount int, requestID string, now time.Time) error {
func (h *Handler) publishChannelSysMessages(ctx context.Context, req *model.CreateRoomRequest, room *model.Room, requester *model.User, userByAccount map[string]*model.User, addedUsersCount int, requestID string, now time.Time) error {
acceptedAt := time.UnixMilli(req.Timestamp).UTC()

sysData1, err := json.Marshal(model.RoomCreated{
Expand All @@ -1637,22 +1640,32 @@ func (h *Handler) publishChannelSysMessages(ctx context.Context, req *model.Crea
return fmt.Errorf("publish room_created: %w", err)
}

// ResolvedUsers = resolved individuals, already creator-stripped and
// org-member-free by room-service; Orgs counted as orgs, not expanded.
if len(req.ResolvedUsers) == 0 && len(req.ResolvedOrgs) == 0 {
// Nothing added beyond the creator (e.g. an empty channel); room_created
// already fired, so do not emit a degenerate "added 0 people" message.
return nil
}
sysData2, err := json.Marshal(model.MembersAdded{
Individuals: req.Users,
Orgs: req.Orgs,
Channels: req.Channels,
Individuals: nonNil(req.ResolvedUsers),
Orgs: nonNil(req.ResolvedOrgs),
Channels: nonNil(req.Channels),
AddedUsersCount: addedUsersCount,
})
if err != nil {
return fmt.Errorf("marshal members_added sys data: %w", err)
}
content := addedContent(requester, req.ResolvedUsers, req.ResolvedOrgs, func(a string) *model.User {
return userByAccount[a]
})
msg2 := model.Message{
ID: idgen.MessageIDFromRequestID(requestID, "members_added"),
RoomID: room.ID,
UserID: requester.ID,
UserAccount: requester.Account,
Type: model.MessageTypeMembersAdded,
Content: formatAddedMulti(requester),
Content: content,
SysMsgData: sysData2,
CreatedAt: acceptedAt.Add(time.Millisecond),
}
Expand Down
Loading
Loading