chore(ops): one-off thread subscription leak reconciler#342
Conversation
历史上被踢/退群(is_deleted=1)或被拉黑(status=blacklist)的成员,入群时挂进 WuKongIM 的子区频道订阅可能从未被对称摘除,事件驱动修复只覆盖未来移除、不处理 存量泄漏。新增一次性对账工具扫这两类成员,复用 queryThreadShortIDsForCleanup 枚举该群所有非 deleted 子区,逐个 IMRemoveSubscriber 摘订阅。 - 只摘订阅、不删 DB 数据(区别于被踢/退群路径的 removeUserFromGroupThreadsCleanup) - 默认 dry-run(只统计),加 --apply 才执行;支持 --batch-size / --interval 限速 - 幂等(IMRemoveSubscriber 对不存在订阅 no-op),失败只记录不中断 - 作为主二进制子命令 reconcile-thread-subs 接入 main.go - 单测覆盖扫描 / dry-run 不写 / 幂等 / 分批 / 失败不中断 - 附运维手册 docs/thread-subscription-reconcile-runbook.md Part of #YUJ-4186
mochashanyao
left a comment
There was a problem hiding this comment.
[Octo-Q · automated review]
Verdict: Approve — no blocking findings; notes below (data-flow traced).
PR#342 Review Report — chore(ops): one-off thread subscription leak reconciler
Reviewer: Octo-Q (automated review)
PR: #342
Head SHA: 7c6feec485c16d31e5d8c5b85eb9aee350ae0ca8
Files: 4 changed (+588 / −1) — main.go, modules/group/thread_subscription_reconcile.go, modules/group/thread_subscription_reconcile_test.go, docs/thread-subscription-reconcile-runbook.md
1. Verification Summary
| Item | Status | Evidence |
|---|---|---|
| Dry-run default safety | ✅ | Apply defaults false; Run() returns before any removeFn call when !opts.Apply (thread_subscription_reconcile.go:207-209) |
| Idempotency | ✅ | IMRemoveSubscriber is no-op for non-existent subs; TestReconcile_Idempotent verifies re-run stability |
| ChannelID format parity | ✅ | groupNo + "____" + shortID matches thread.BuildChannelID (thread/service.go:800-801, separator "____" at thread/const.go:17) and removeUserFromGroupThreadsCleanup (thread_cleanup.go:76) |
| ChannelType consistency | ✅ | common.ChannelTypeCommunityTopic.Uint8() — same as thread_cleanup.go:80, event.go:629, service.go:1658 |
| SQL leak criteria correctness | ✅ | is_deleted=1 OR status=? with int(common.GroupMemberStatusBlacklist) — matches criteria used in db.go:421 and api.go:3133 |
queryThreadShortIDsForCleanup reuse |
✅ | Same function used by removeUserFromGroupThreadsCleanup (thread_cleanup.go:63); filters status!=3 (excludes deleted threads, includes active+archived) |
| main.go subcommand routing | ✅ | flag.Args() tried first, os.Args[1] fallback preserved; dash-stripping normalizes reconcile-thread-subs → reconcilethreadsubs |
| Backward compat (api/config startup) | ✅ | When no subcommand, flag.Args() empty → falls to os.Args[1] path, identical to pre-PR behavior |
| Failure isolation | ✅ | Per-batch continue on error (thread_subscription_reconcile.go:232-240); TestReconcile_FailureRecordedNotAborted verifies |
| Rate limiting | ✅ | time.Sleep(opts.Interval) before each IM call; --batch-size splits large UID sets |
| Exit codes | ✅ | 0=clean, 1=scan error (err != nil), 2=partial failures (len(report.Failures) > 0) — matches runbook |
| Test coverage | ✅ | 7 tests: scan, dry-run, idempotent, dedup, failure-isolation, empty, batching |
2. Findings
F1 — P2: Full-table scan on group_member without pagination
File: thread_subscription_reconcile.go:134-142
Diff-scope: new (this PR introduces the query)
_, err := r.ctx.DB().Select("group_no", "uid").
From("group_member").
Where("is_deleted=1 OR status=?", int(common.GroupMemberStatusBlacklist)).
OrderAsc("group_no").
Load(&rows)This loads all leaked member rows across all groups into memory in a single query. On a production deployment with millions of group_member rows, this could cause a slow query and memory spike.
Mitigation already present: This is a one-off maintenance tool, not a hot path. The OrderAsc("group_no") helps if an index exists. The runbook recommends dry-run first.
Suggestion: Consider adding a note in the runbook about expected DB load for large deployments, or add an optional --limit flag for staged execution. Not blocking.
F2 — P2: Non-deterministic processing order (Go map iteration)
File: thread_subscription_reconcile.go:175 (for groupNo, uids := range byGroup)
Diff-scope: new
buildPlan iterates over map[string][]string returned by scanLeakedMembers. Go map iteration order is randomized, so the order of groups in the report and in IM-call execution varies between runs.
Impact: Cosmetic for the report; functionally irrelevant since groups are independent. Not blocking.
F3 — P2/nit: Inline channelID separator
File: thread_subscription_reconcile.go:215 — channelID := p.groupNo + "____" + shortID
Diff-scope: new
The "____" literal is hardcoded rather than referencing thread.ChannelIDSeparator. However, this follows the established pattern in thread_cleanup.go:76 and api.go:3074 (both in the group package, both inline). Consistent with local convention. Not blocking.
3. Suggestions
- Runbook addition (F1): Add a line in the runbook's "推荐执行顺序" section about expected query duration / row count for large deployments, so ops can plan accordingly.
- Optional: Consider
--group-nofilter flag to scope the reconciler to a specific group for targeted runs (useful when dry-run reveals one problematic group).
4. Additional Findings
None. The PR is narrowly scoped to its stated purpose. No unintended side effects on the API server path, no new HTTP endpoints, no changes to existing event-driven cleanup paths.
5. Data-Flow Trace
| Consumed Data | Upstream Source | Reaches Consumer? |
|---|---|---|
byGroup map (leaked members) |
SQL: SELECT group_no, uid FROM group_member WHERE is_deleted=1 OR status=? |
✅ Rows loaded → Go-level dedup → map returned to buildPlan |
shortIDs (threads per group) |
queryThreadShortIDsForCleanup → SQL: SELECT short_id FROM thread WHERE group_no=? AND status!=3 |
✅ Returns non-deleted thread short_ids; empty slice → group skipped |
channelID |
groupNo + "____" + shortID |
✅ Matches thread.BuildChannelID format; consumed by removeFn |
SubscriberRemoveReq |
Constructed with ChannelID, ChannelType=ChannelTypeCommunityTopic, Subscribers=chunk |
✅ Same struct shape as thread_cleanup.go:78-82, event.go:629, service.go:1658 |
removeFn default |
ctx.IMRemoveSubscriber(...) |
✅ Called only when opts.Apply=true; dry-run short-circuits at Run():207-209 |
chunk (batched UIDs) |
p.uids[start:end] slicing |
✅ Correctly bounded; end capped at len(p.uids) |
No data-flow gaps found. Every consumed value traces back to a verified upstream source.
6. Blind-Spot Checklist (R5)
C1 — Double-path parity: N/A. This is a one-off reconciler, not a paired add/remove operation. No symmetric path to maintain.
C2 — Control-flow ordering / nesting reuse: Clear. removeFn has a single call-site (the nested loop in Run). No risk of double-application or ordering issues. No regex/sanitize to test with non-canonical input.
C3 — Authorization boundary ≠ capability boundary: Clear. This is a CLI subcommand requiring server/container access. No HTTP endpoint exposed. --apply flag gates writes (default dry-run). No privilege escalation vector.
[Octo-Q] verdict: APPROVE — No P0/P1 findings. Three P2/nit observations (full-table scan, map iteration order, inline separator) are all acceptable for a one-off maintenance tool with dry-run safety, idempotency, rate limiting, and comprehensive test coverage. Well-engineered PR.
Jerry-Xin
left a comment
There was a problem hiding this comment.
The PR is relevant to octo-server and implements a scoped one-off maintenance command for existing group/thread IM subscription state.
💬 Non-blocking
- 🟡 Warning:
modules/group/thread_subscription_reconcile.go:63-64and:241reportPairsRemovedas “actual removed” pairs, butIMRemoveSubscriberis idempotent/no-op for missing subscribers, so this is really “successfully requested pairs.” Consider renaming/reporting it that way to avoid overclaiming in ops output. - 🔵 Suggestion:
modules/group/thread_subscription_reconcile.go:225-226sleeps before the first IM call, although the flag is documented as an interval between calls. This is safe, but moving the sleep after each call, or gating it onreport.IMCalls > 0, would match operator expectations. - 🔵 Suggestion:
modules/group/thread_subscription_reconcile.go:171-190builds plans by ranging over a map, so group processing and failure ordering are nondeterministic. Sorting group keys would make dry-run/apply reports easier to compare. - 🟡 Warning: I could not verify the new group tests in this local checkout because
testutil.NewTestServerpanics on existing shared test DB migration state (unknown migration in database, e.g.20210926000001_robot_legacy01.sql). This appears environmental, not caused by this PR.
✅ Highlights
- Dry-run default is correctly enforced before any
removeFncall (modules/group/thread_subscription_reconcile.go:203-205). - The cleanup scope intentionally reuses
queryThreadShortIDsForCleanup, preserving the existing active+archived, non-deleted thread behavior (modules/group/thread_subscription_reconcile.go:176). - Failure isolation is implemented as described: individual IM removal failures are logged, recorded, and do not abort later batches (
modules/group/thread_subscription_reconcile.go:229-239). - Tests cover the key behaviors: scanning, dry-run no-write, idempotent rerun, batching, and failure isolation.
yujiawei
left a comment
There was a problem hiding this comment.
Code Review — PR #342 (octo-server)
Verdict: APPROVED
A one-off, idempotent, dry-run-first maintenance command (reconcile-thread-subs) that backfills the historical thread-subscription leak. The core logic is correct and well-tested, build/vet/all 7 new unit tests pass, and the design choices (only removing IM subscriptions, never deleting DB rows; dry-run by default; failure-isolation) are sound and appropriately conservative. No correctness, security, data-loss, or build-breaking issues — nothing blocks merge. The notes below are P2 robustness/observability and documentation-accuracy items plus a few nits; none requires a change before merge.
What was verified ✅
- Scan predicate is correct.
scanLeakedMembersusesWHERE is_deleted=1 OR status=2(thread_subscription_reconcile.go:137-141).GroupMemberStatusBlacklist == 2/Normal == 1are the only two statuses in octo-libcommon/constant.go, so the predicate exactly and exclusively covers the two documented leak classes (kicked/left + blacklisted). The OR string is the sole argument to a singleWhere(), so there is no SQL precedence hazard. - Reuse is correct. It correctly reuses
queryThreadShortIDsForCleanup(returnsstatus!=3, i.e. active + archived, excludes deleted) and builds the channel ID as{groupNo}____{shortID}, matching the existing kick/leave path (thread_cleanup.go:77,service.go:1887). - Dry-run never writes.
Runreturns before the apply loop whenApply=false(thread_subscription_reconcile.go:203-206);IMCalls/PairsRemovedstay 0. Covered byTestReconcile_DryRunDoesNotWrite. - Batching & failure-isolation are correct.
batch-size <= 0falls back to 100 (prevents an infinitestart += 0loop); a failed removal is logged + recorded +continued without aborting the run; the failure record defensively copies the chunk (append([]string(nil), chunk...)). main.goflag refactor is backward-compatible. All existing invocations still route correctly: bare./appand./app -config x.yamlfall throughflag.Args()(empty) to theos.Argsbranch →runAPI;./app -config x.yaml reconcile-thread-subs --apply→flag.Args()=["reconcile-thread-subs", ...]→ the new subcommand.go build ./...andgo vet ./modules/group/are clean.
Findings (all non-blocking)
P2 — Report can undercount when a per-group thread query fails (observability)
thread_subscription_reconcile.go:172-191 — GroupsAffected and LeakedMembers are incremented before the per-group queryThreadShortIDsForCleanup call. If that query fails (continue at :183) or returns zero threads (continue at :186), those leaked members still count toward LeakedMembers/GroupsAffected but contribute nothing to ThreadsScanned/PairsPlanned. So a run with thread-query failures can show e.g. LeakedMembers=5000, PairsPlanned=200, which an operator could misread as "almost nothing to do." This is recoverable (failures appear in report.Failures, the tool is re-runnable), but the aggregate counters can mislead.
Suggestion: track a GroupsSkippedThreadQueryFailed counter and surface it in String(), or add a one-line note (in the field comment or runbook) telling operators to reconcile LeakedMembers vs PairsPlanned against the Failures list.
P2 — Runbook exit-code table is imprecise; dry-run can exit 2
docs/thread-subscription-reconcile-runbook.md:56-58 — The table presents exit 1 as "扫描阶段出错(如 DB 查询失败)" and exit 2 as "摘除阶段有失败项". In the actual code:
- Only the top-level
scanLeakedMembersfailure propagates as an error → exit 1 (main.goexit-1 branch). - A per-group thread-query DB failure is recorded into
report.Failuresand yields exit 2 (main.goexit-2 branch) — even in dry-run, becausebuildPlanruns before the dry-run early return.
So "a DB query failed" maps to exit 1 or exit 2 depending on which query failed, and exit 2 is not purely an apply-stage outcome. The binary behavior is fine; only the doc is ambiguous for ops automation that branches on exit codes.
Suggestion: exit 1 = the initial member scan failed (aborts before planning); exit 2 = one or more per-group operations failed (thread query OR IMRemoveSubscriber), run completed — and note dry-run can also exit 2. (The full report.String() already disambiguates: thread-query failures render with an empty channel= and an err=query threads: ... prefix.)
P2 — Unbounded in-memory scan
thread_subscription_reconcile.go:136-161 — scanLeakedMembers loads every (group_no, uid) matching is_deleted=1 OR status=blacklist across all groups into memory at once (rows slice + dedup maps + plans), with no LIMIT/pagination. Since this tool exists precisely because leakage accumulated over the whole bug lifetime, on a large deployment this set can be very large, and the recommended invocation is inside a memory-limited container (docker exec). Per-record footprint is small (two short strings), so this is a scalability/robustness concern, not a guaranteed OOM.
Suggestion: iterate group-by-group (SELECT DISTINCT group_no, then scan each group's leaked members) so only one group's working set is resident; at minimum, note an expected-rows ceiling / "run off-peak" in the runbook.
P2 — Idempotency promise rests on an unasserted WuKongIM behavior
thread_subscription_reconcile.go:31 + runbook lines 20/64 — The "safe to re-run / safe to run early" guidance depends on IMRemoveSubscriber being a server-side no-op for a non-subscribed uid. The idempotency unit test uses a mock that always returns nil, so it proves the tool re-issues identical calls, not that WuKongIM tolerates the redundant remove. If WuKongIM ever errored on a missing subscriber, re-runs would populate report.Failures and exit 2, contradicting the "harmless re-run" claim. (Mitigation: the identical call shape is already relied on by the shipped kick/exit path — thread_cleanup.go:78, event.go:629/800, service.go:1658 — so this is established codebase behavior, just undocumented here.)
Suggestion: cite the WuKongIM no-op-on-missing-subscriber guarantee in the comment/runbook, or soften the wording to "failures on already-removed subscribers are recorded but non-fatal."
P2 — Test gaps (build-passing, but coverage holes)
thread_subscription_reconcile_test.go:141-153—TestReconcile_DedupesMemberAcrossDeletedAndBlacklistinserts a single row, so theseen-map dedup branch (scanLeakedMembers:155-158) never fires; the test would pass even if the dedup code were deleted. To actually exercise it, insert two rows for the same(groupNo, uid).--interval(the production rate-limit knob the runbook recommends) has no test — a regression that drops/misplaces the sleep would silently hammer WuKongIM and go uncaught. A robust check: dry-run withIntervalset still makes zeroremoveFncalls (avoid wall-clock assertions, which are flaky in CI).- No multi-group test: cross-group counter summation and per-group plan isolation (group A's uids not leaking into group B's channels) are unverified (map iteration order is nondeterministic). Logic looks correct by inspection, but it is untested.
- The per-group thread-query-failure path (Failures + exit 2) and the top-level scan-failure path (exit 1) — the two exit codes the runbook documents — are both untested. Consider a
queryFnseam analogous toremoveFn, or a drop-table test asserting theerr=query threads:shape with emptyChannelID.
Nits
thread_subscription_reconcile.go:146-159— the cross-row dedup map is effectively dead code given thegroup_no+uidunique index ongroup_member(defensive, harmless).thread_subscription_reconcile.go:218-228— the rate-limittime.Sleepfires before the first IM call, wasting exactly one interval globally; conventional shape is sleep-between (skip whenIMCalls==0).main.go:90-107—./app -config=x.yaml(equals-form, no subcommand) puts-config=x.yamlintoflag.Args(), soserverTypebecomes a non-matching value and the process exits 0 doing nothing. This is a pre-existing quirk of the bare-arg dispatch (the space-form-config x.yamlworks), not introduced by this PR, but worth being aware of.main.go:105-108— an unknown subcommand / stray arg exits 0 silently (pre-existing). A smalldefault:branch logging "unknown subcommand" would improve operability.- Apply-mode report prints two extra lines (
实际摘除订阅对,IM 调用次数) that the runbook's sample (DRY-RUN only) never illustrates; an apply-mode sample block would make the runbook complete.
Summary
Solid, conservative, well-scoped one-off tool. The reconciliation logic, reuse of the existing cleanup helper, dry-run gating, and failure-isolation are all correct and verified. Remaining items are observability/doc-accuracy refinements and test-coverage strengthening — none blocks merge. Recommend addressing the report-undercount note and the runbook exit-code clarification as a small follow-up, since this is an ops tool whose primary interfaces are its report output and exit codes.
QA Verdict: PASS-WITH-NITSScope reviewed: 4 files, +588/-1 (one-off reconciler subcommand + runbook + unit tests). Coverage assessmentStrong coverage on the acceptance-critical invariants:
CI: Gaps (nits, not blockers)
Verify completeness
VerdictPASS-WITH-NITS. Safety-critical paths (dry-run default, idempotency, failure isolation, deleted-thread exclusion) are covered. Gaps above are quality-of-life, not blockers — recommend addressing 1 (multi-group test) in a follow-up if this code ever grows beyond a one-off. |
lml2468
left a comment
There was a problem hiding this comment.
QA review complete — see verdict comment above (PASS-WITH-NITS).
Summary: Safety-critical invariants (dry-run default, idempotency, batching, failure isolation, deleted-thread exclusion, OR-clause dedup) all covered. CI green (Build/Test/Vet/Lint/code-review).
Non-blocking nits: missing multi-group scan test, Interval not exercised, buildPlan query-error path uncovered, ReconcileReport.String() has no golden test, exit-code paths in runReconcileThreadSubs not unit-tested.
Operator ask: Please attach a staging dry-run report to this PR (or in the linked issue) before --apply in prod.
Security Verdict: CLEAREDScope reviewed: 4 files, +588/-1 (one-off ops CLI subcommand; only IM-side mutation via STRIDE / OWASP pass
SQL injection
Crypto / secretsNo crypto, no secret handling, no new auth tokens. No changes to SBOM / dependency
Behavioural safety
Risks (informational, not blocking)
VerdictCLEARED — no security blockers. Adversarial path coverage is good: SQL is parameterized, no new auth/secret surface, the only side-effect is an explicitly-narrow IM unsubscribe, dry-run-default is test-enforced, rate limiting is operator-controlled. |
lml2468
left a comment
There was a problem hiding this comment.
Security review complete — see verdict comment above (CLEARED).
Summary: No new auth/secret surface, no SQL injection (status is parameterized), no new dependencies (osv-scanner SUCCESS), dry-run default is test-enforced, only side-effect is IM unsubscribe (non-destructive, idempotent).
Informational risks: (R1) error-log volume on large --apply failures, mitigable via --interval. (R2) non-deterministic UID ordering in batches — functionally fine, just slightly harder forensics.
No blockers. Approve for security gate.
Code Verdict: APPROVE-WITH-NITSScope reviewed: 4 files, +588/-1. Correctness
Design / fit
Readability
Nits (non-blocking)
VerdictAPPROVE-WITH-NITS. Code is correct, well-tested, well-documented, and appropriately narrow for a one-off ops tool. Nothing blocks merge; nits are cleanup opportunities for a follow-up if any. |
lml2468
left a comment
There was a problem hiding this comment.
Code review complete — see verdict comment above (APPROVE-WITH-NITS).
Summary: Correct (predicate, batching, dry-run early-return, channel ID format all check out), well-designed (clean removeFn test seam, good encapsulation), readable (counter names self-explain, file-header explains the why), and CI green.
Non-blocking nits: (1) unreachable panic(err) after flag.ExitOnError; (2) duplicated "____" channel separator — author's own comment flags the coupling to thread.BuildChannelID; (3) GroupsAffected increments on query-failed groups (cosmetic); (4) dead OrderAsc("group_no") (overwritten by map); (5) runbook should note --interval applies per (channel, batch).
Recommend addressing nits 1 & 2 in a follow-up; not blocking this merge.
Aggregate Verdict: RISKED — needs-human-reviewThree reviewer verdicts are in. QA carries a non-blocking risk flag (
Next step
Posted by |
What
Adds a one-off, idempotent, dry-run-first maintenance command that removes
stale community-topic (thread) subscriptions left in the IM layer for members
who are no longer eligible.
Event-driven fixes only handle removals that happen going forward; they don't
reconcile subscriptions that leaked before the fix shipped. This command closes
that gap as a one-time backfill.
Scope
group_memberfor members withis_deleted=1OR blacklist status.(group_no, uid), reuses the existingqueryThreadShortIDsForCleanuphelper to enumerate all non-deleted threadsof that group and removes the user's subscription on each thread channel.
thread_member/thread_settingrows or touch pins / conversation extensions. (This isdeliberately narrower than the kick/leave cleanup path.)
Safety
--applyit only counts the(uid, thread)pairs that would be removed; it never calls the IM API.subscriptions, so re-runs are safe.
--batch-sizeand--intervalbound the load on the IMservice for large groups.
into the report; the run continues and exits non-zero if any failures
occurred.
Usage
See
docs/thread-subscription-reconcile-runbook.mdfor the full runbook(dry-run vs apply, parameters, exit codes, rollback/impact notes).
Tests
New unit tests cover scanning, dry-run-does-not-write, idempotency, batching,
and failure-isolation.
go build ./...,go vet ./..., and the group packagetests all pass.
Rollback
No rollback needed — the tool only removes IM subscriptions and deletes no
data. Affected users are already kicked/left/blacklisted and should not be
receiving those thread messages anyway.
Part of #YUJ-4186