Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ tmp/
# stray local builds of the data-migration services
/oplog-connector
/oplog-transformer
/oplog-collections-transformer

# Captured pprof profiles from `make profile`.
profiles/
75 changes: 75 additions & 0 deletions data-migration/CDC_COVERAGE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Collections CDC — coverage matrix

> Companion to `README.md` (component overview) and `SOURCE_DATA.md` (source schema).
> This doc pins **exactly which source change events the collections migration covers, and which it does not** — the reference for the team building the `oplog-collections-transformer`.
> Design: `docs/superpowers/specs/2026-06-16-oplog-transformer-collections-design.md`.
>
> Scope: the **live CDC tail** of the operational collections (rooms, subscriptions, thread_subscriptions, users). The bulk/initial state sync ≤ checkpoint is a separate owner's job; we tail from the handed-off checkpoint.

## CDC payload facts (all collections)

The connector forwards raw change-stream events with **no `updateLookup`** and **no `fullDocumentBeforeChange`**:

| Op | Payload carried | Source lookup by `_id` |
|---|---|---|
| `insert` | full `fullDocument` | in payload |
| `replace` | full `fullDocument` | in payload (lookup not needed) |
| `update` | only `updateDescription` (changed fields, no post-image) | **full current doc** (doc still exists) |
| `delete` | only `documentKey._id` | **nothing** — doc already gone |

→ A source lookup resolves the full doc for any op **except `delete`**.

## Event coverage matrix

**Legend:** ✅ migrated · ❌ intentionally not migrated · ⚠️ deferred / later work.

| # | Source event | Op + payload | Source lookup (by `_id`) | Current-system facts | Handling / impact |
|---|---|---|---|---|---|
| **Rooms** |
| 1 | Room create | `insert` — full doc | in payload | `t` ∈ `c,p,d,l,v`; `prid`⇒discussion; `teamId`/`teamMain`; `d` can have >2 users | ✅ map → `room_sync` (skip `l`,`v`,group-DM) |
| 2 | Room replace | `replace` — full doc | not needed | whole-doc rewrite; can cross type/exclusion boundary | ✅ re-classify → `room_sync` |
| 3 | Room change | `update` — changed fields only | full current doc | — | ✅ re-read doc → `room_renamed` / `room_restricted` / `room_sync` |
| 4 | Room delete | `delete` — `_id` only | nothing — doc gone | app has no room-delete operation | ❌ skip (no app deletion; un-actionable) |
| **Subscriptions** |
| 5 | Sub create | `insert` — full doc | in payload | `u`, `rid`, `roles[]`, `open`, `f`, `disableNotifications`, `ls`/`lr`, `alert` | ✅ `member_added` + state events |
| 6 | Sub replace | `replace` — full doc | not needed | whole-doc rewrite | ✅ re-classify → `member_added` + state |
| 7 | Sub change (incl. leave/rejoin) | `update` — changed fields only | full current doc | leaving sets `open:false` (not a row delete) | ✅ re-read doc → `open`-toggle → `member_added`/`member_removed`; mute/fav/role/read → matching event |
| 8 | Sub delete (true row removal) | `delete` — `_id` only | nothing — doc gone | destination subs key by generated `UUIDv7`, not source `_id`; removal needs `(roomID, account)` | ❌ skip (un-actionable; rare — leave is `open:false`) |
| **Thread subscriptions** |
| 9 | Follow / first reply | `insert` — full doc | in payload | keyed `(u._id, parentMessage._id)`; carries `rid`, `lastSeenAt`, `unreadMention` | ✅ resolve thread-room+user → `thread_subscription_upserted` |
| 10 | Thread-sub replace | `replace` — full doc | not needed | whole-doc rewrite | ✅ re-resolve → upsert |
| 11 | Thread read / mention change | `update` — changed fields only | full current doc | — | ✅ re-read doc → re-upsert |
| 12 | Thread unfollow | `delete` — `_id` only | nothing — doc gone | destination thread-subs key by `(threadRoomId, userId)`; inbox-worker has no thread-sub removal handler; live stack emits no thread-unfollow federation event | ❌ skip (un-actionable **and** no handler) → stale follow lingers |
| **Users** |
| 13 | User create | `insert` — full doc | in payload | `_id`, `username` (mutable), `type`, `customFields.*`, `roles[]`, `federation.origin` | ✅ insert-if-absent by account |
| 14 | User replace | `replace` — full doc | not needed | whole-doc rewrite | ✅ insert-if-absent (re-classify) |
| 15 | User **HR-field** change (engName, tsmcName, dept/sect, roles, …) after first seed | `update` — changed fields only | full current doc | company-wide user sync owns these; insert-if-absent leaves existing untouched | ❌ not propagated (other sync keeps it current) |
| 15a | User **`statusText`** change | `update` — changed fields only | full current doc | chat-originated (set by the user inside legacy chat), **not** in the HR dataset — no other sync carries it | ✅ fan `user_status_updated` to all sites (global-visibility) |
| 16 | User deactivate / delete | `update` (`active:false`) or `delete` | `update`: full doc · `delete`: nothing | source sets `active:false` (no row deletion); no destination apply-path wired | ❌ deferred (out of scope) |
| **All collections** |
| 17 | Collection drop / rename | collection-level (`drop`/`rename`/`invalidate`) | n/a | terminates/invalidates the per-collection change stream | ⚠️ out of scope, deferred — connector re-point, not migration logic |

## inbox-worker handler coverage

Every apply-handler the inbox-worker exposes is either produced by the migration or intentionally not:

| Inbox handler | Emitted? | From |
|---|---|---|
| `member_added` | ✅ | sub `insert`/`replace`; `open` false→true |
| `member_removed` | ✅ | sub `open` true→false |
| `room_sync` | ✅ | room `insert`/`replace`/other-field `update` |
| `role_updated` | ✅ | sub `roles[]` |
| `subscription_read` | ✅ | sub `max(ls,lr)` + `alert` |
| `subscription_mute_toggled` | ✅ | sub `disableNotifications` |
| `subscription_favorite_toggled` | ✅ | sub `f` |
| `thread_subscription_upserted` | ✅ | thread-sub `insert`/`replace`/`update` |
| `room_renamed` | ✅ | room `name`/`fname` change |
| `room_restricted` | ✅ | room `restricted`/`externalAccess` change |
| `user_status_updated` | ✅ | user `statusText` change (chat-owned; fanned to all sites) |
| `thread_read` | ⚠️ not emitted | redundant — thread-sub `lastSeenAt` rides `thread_subscription_upserted`; `Subscription.ThreadUnread` is message-pipeline-owned |

## Open confirmations (source engineers)

- Which room field(s) back **`Restricted`** (read-only) and **`ExternalAccess`** — see `SOURCE_DATA.md`.
- Does the source emit whole-doc **`replace`** for these collections, or only field-level `update`? (If never, rows 2/6/10/14 are moot.)
- Where does a user **employee id** live (if at all).
107 changes: 107 additions & 0 deletions data-migration/SOURCE_DATA.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,110 @@ Example document (values rotated/sanitized):
- `updateDescription` — absent (no field-level diff).
- `fullDocument` — the new version (with the `fullDocument` option).
- `previousDocument` — the old version (with `showExpandedEvents:true`).

---

# Collections migration — source schema (assumptions, for source-engineer cross-check)

> This section is the migration team's **current understanding** of the operational
> source collections read by the collections path (`oplog-collections-transformer`,
> design: `docs/superpowers/specs/2026-06-16-oplog-transformer-collections-design.md`).
> **Every "Assumed" row drives a write into the new system — please correct anything wrong.**
> Legend: ✅ confirmed by source team · ❓ assumption awaiting confirmation · ⛔ deliberately ignored.

## Conventions assumed across these collections
- ✅ **`federation.origin`** is authoritative. **Absent** ⇒ record is **local**. **Present** ⇒ a federated peer domain whose **first dotted label is the site code** (`0030204.tchat-test.test.company.com` ⇒ `0030204`).
- ❓ `federation.origin` is never the literal `"local"` (we treat absent ⇒ local).
- ✅ Each site's source DB already holds its **federated copies**; we migrate the full local source with **no** drop-filter.

## 3. `rocketchat_rooms`

| Source field | Type | Interpretation | Status |
|---|---|---|---|
| `_id` | string | Room id | ✅ |
| `t` | string | Room type — **only** `c`,`p`,`d`,`l`,`v` exist | ✅ |
| `prid` | string (opt) | Parent room id — **present ⇒ discussion** (`t` is `p`) | ✅ |
| `teamId` | string (opt) | Room belongs to a Team | ✅ |
| `teamMain` | bool (opt) | True only on a team's **primary** room | ✅ |
| `name` | string | Machine/handle name | ❓ |
| `fname` | string | Friendly display name | ❓ |
| `uids` / `usernames` | array | Members; for `t:d` length **can exceed 2** (group DM) | ✅ |
| `u` | object | Creator (`u._id`, `u.username`) | ❓ |
| `ts` / `_updatedAt` | date | Created / last-updated | ❓ |
| **restricted / read-only** | ? | **Which field is authoritative for "restricted"?** | ❓ |
| **external/federation access** | ? | **Which field is authoritative for "external access allowed"?** | ❓ |
| `federation.origin` | string (opt) | Origin site | ✅ |
| `federation.domains[]` | array | Member domains, service-synced, may be stale | ✅ ⛔ |

Type mapping logic to sanity-check: `c`/`p` (no `prid`) → one channel type (no public/private split); `p`+`prid` → discussion; `d` (2 participants) → dm (botDM if a participant is a bot); `d` (>2) → **skip** (no group DM); `l`/`v` → **skip**; team rooms → plain channel (`teamId`/`teamMain` dropped).

## 4. `rocketchat_subscriptions`

One row per (user, room). ✅ Unique index `{ rid:1, 'u._id':1 }`.

| Source field | Type | Interpretation | Status |
|---|---|---|---|
| `u._id`, `u.username` | string | Member identity | ✅ |
| `rid` | string | Room id | ✅ |
| `open` | bool | **Membership active.** Leave ⇒ `open:false` (no delete); re-join ⇒ true | ✅ |
| `ts` | date | Join time (set once, stable across re-joins) | ✅ |
| `roles[]` | string[] | `owner`/`moderator`/`leader`/`user` (role-based ownership) | ✅ |
| `ls` | date | Last **seen** (scrolled cursor) | ✅ |
| `lr` | date | Last **read** (explicit mark) | ✅ |
| `alert` | bool | True if **any** unread content (not just mentions) | ✅ |
| `userMentions` / `groupMentions` | int | Unread `@user` / `@all`,`@here` counts | ✅ |
| `tunread[]` | string[] | Parent-message ids (`tmid`) of threads with any unread | ✅ |
| `tunreadGroup[]` / `tunreadUser[]` | string[] | …group-mention / direct-mention variants | ✅ |
| `disableNotifications` | bool | **TSMC custom — authoritative mute (all-off)** | ✅ |
| `muteGroupMentions` | bool | `@all`/`@here` only (**not** our mute flag) | ✅ |
| `f` | bool (opt) | Favorited (absent ⇒ false) | ✅ |
| `name` / `fname` | string | Machine name / friendly display name | ✅ |
| `federation.origin` | string (opt) | Origin site (assumed consistent with room) | ✅ ❓ |

Derived: "has mention" = `userMentions>0 || groupMentions>0`; "muted" = `disableNotifications`; **read timestamp (`lastSeenAt`) = `max(ls, lr)`** (resolved per design D1 — the furthest point consumed by either the scrolled cursor or the explicit mark-read).

## 5. `tsmc_thread_subscriptions`

One row per (user, thread). ✅ Unique index `{ 'u._id':1, 'parentMessage._id':1 }`.

| Source field | Type | Interpretation | Status |
|---|---|---|---|
| `_id` | string | Row id | ✅ |
| `u._id`, `u.username` | string | Follower identity | ✅ |
| `rid` | string | Room id (matches parent room) | ✅ |
| `parentMessage._id` | string | Thread root message id (`tmid`) — the thread key | ✅ |
| `lastMessage._id` / `._updatedAt` | string/date | Last message in thread | ✅ |
| `createdAt` | date | Row creation (lazy — on follow/first reply) | ✅ |
| `lastSeenAt` | date | Last-read timestamp for the thread | ✅ |
| `unreadMention` | int | Thread mention/unread count | ✅ |

Lifecycle: created lazily; **unfollow deletes the row** (no soft-delete); no `federation.origin` (site inherited from room/user). **Open:** please share a redacted sample doc to confirm nothing is missed.

## 6. `users`

| Source field | Type | Interpretation | Status |
|---|---|---|---|
| `_id` | string (17-char base62) | Stable user id | ✅ |
| `username` | string | **Account id — unique but mutable** | ✅ |
| `type` | string | `user` or `bot` (bot has `appId`); no other non-human types | ✅ |
| `appId` | string (opt) | Present on bot/app accounts | ✅ |
| `name` | string | Display name | ✅ |
| `customFields.engName` / `tsmcName` | string | English / Chinese name | ✅ |
| `customFields.deptId` / `deptName` | string | Department id / name | ✅ |
| `customFields.sectId` / `sectName` | string | Section id / name | ✅ |
| `customFields.appId` / `appName` | string | App id / name | ✅ |
| `hrInfo` | `ITsmcUser[]` | HR directory records | ❓ (not consumed yet) |
| `statusText` / `status` | string | Status message / presence | ✅ |
| `roles[]` | string[] | Global roles (`admin` marker) | ✅ |
| `active` | bool | Deactivation ⇒ `active:false` (no deletion) | ✅ |
| `isRemote` | bool | True on local docs of **federated** users | ✅ |
| `federation.origin` | string (opt) | Origin site (absent ⇒ local) | ✅ |
| **employee id** | ? | **Where does an employee id live — is it `username`?** | ❓ |
| **Traditional-Chinese dept/sect names** | ? | Is there a TC variant of `deptName`/`sectName`? | ❓ |

Seeded (insert-if-absent, keyed by account): `username`, `engName`, `tsmcName`, dept/sect ids+names, `roles`, `statusText`, site, bot flag. Everything else is owned by the company-wide user sync.

Post-seed **updates**: HR fields are **not** re-propagated (the company-wide sync keeps them current). The **one exception is `statusText`** — it is chat-originated (not in the HR dataset), so a live `statusText` change fans a `user_status_updated` event to all sites (design §4.1a); without it, legacy status changes during the migration window would be lost.

## Explicitly **not** migrated
`federation.domains[]`; livechat (`l`) / voip (`v`) rooms; group DMs (`d`>2); team grouping (`teamId`/`teamMain`); user deactivation/deletion; thread-sub unfollows during cutover. Flag any of these you'd expect to matter.
33 changes: 33 additions & 0 deletions data-migration/oplog-collections-transformer/bootstrap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package main

import (
"context"
"fmt"

"github.com/nats-io/nats.go/jetstream"

"github.com/Marz32onE/instrumentation-go/otel-nats/oteljetstream"

"github.com/hmchangw/chat/pkg/stream"
)

// streamManager is the minimal JetStream surface bootstrapStreams needs, service-local so tests can fake it without mockgen.
type streamManager interface {
CreateOrUpdateStream(ctx context.Context, cfg jetstream.StreamConfig) (oteljetstream.Stream, error)
}

// bootstrapStreams is a no-op in production (this service owns no streams). When Enabled
// (dev/integration) it creates only the MIGRATION_OPLOG_{siteID} schema; inbox-worker owns INBOX.
func bootstrapStreams(ctx context.Context, js streamManager, siteID string, enabled bool) error {
if !enabled {
return nil
}
cfg := stream.MigrationOplog(siteID)
if _, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: cfg.Name,
Subjects: cfg.Subjects,
}); err != nil {
return fmt.Errorf("create MIGRATION_OPLOG stream: %w", err)
}
return nil
}
39 changes: 39 additions & 0 deletions data-migration/oplog-collections-transformer/classify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package main

import "github.com/hmchangw/chat/pkg/model"

// roomClass is the result of classifying a source room.
type roomClass struct {
Type model.RoomType // valid only when !Excluded
Excluded bool
Reason string // exclusion reason (for metrics), set only when Excluded
}

// classifyRoom maps a source room type t (+ prid/teamId/participant/bot signals) to a destination
// RoomType or an exclusion (group_dm/livechat/voip/unknown_type). p+prid→discussion, team→channel. §4.2.
func classifyRoom(t string, hasPrid, hasTeamID bool, hasBot bool, participantCount int) roomClass {
// hasTeamID is accepted for caller clarity and future use; c/p branch already returns channel
// regardless of teamId, so no separate branch is needed here.
_ = hasTeamID
switch t {
case "c", "p":
if t == "p" && hasPrid {
return roomClass{Type: model.RoomTypeDiscussion}
}
return roomClass{Type: model.RoomTypeChannel}
case "d":
if participantCount > 2 {
return roomClass{Excluded: true, Reason: "group_dm"}
}
if hasBot {
return roomClass{Type: model.RoomTypeBotDM}
}
return roomClass{Type: model.RoomTypeDM}
case "l":
return roomClass{Excluded: true, Reason: "livechat"}
case "v":
return roomClass{Excluded: true, Reason: "voip"}
default:
return roomClass{Excluded: true, Reason: "unknown_type"}
}
}
Loading
Loading