|
| 1 | +"""Regression tests for #31501 — prune stale Telegram DM topic bindings. |
| 2 | +
|
| 3 | +When a Telegram user deletes a DM topic in the client, the Bot API |
| 4 | +responds to the gateway's next send with ``Thread not found``. The |
| 5 | +adapter falls back to a plain send (no ``message_thread_id``), but |
| 6 | +prior to this fix it left the corresponding row in |
| 7 | +``telegram_dm_topic_bindings`` untouched. |
| 8 | +``gateway.run._recover_telegram_topic_thread_id`` then walked the |
| 9 | +user's bindings newest-first on every later inbound message and |
| 10 | +cheerfully redirected them back to the deleted topic — tool |
| 11 | +progress, approvals and replies all silently landed in the wrong |
| 12 | +place until the operator manually ran ``DELETE`` on ``state.db``. |
| 13 | +
|
| 14 | +The fix has three pieces — these tests pin all three: |
| 15 | +
|
| 16 | +1. ``SessionDB.delete_telegram_topic_binding`` — the targeted |
| 17 | + prune helper (new public API). |
| 18 | +2. ``TelegramAdapter._prune_stale_dm_topic_binding`` — the |
| 19 | + adapter glue that calls the helper from a send-fallback hot |
| 20 | + path without raising on cleanup failure. |
| 21 | +3. The two "Thread not found" call sites in the streaming send |
| 22 | + loop and the control-message helper now invoke (2) — we pin |
| 23 | + this with a source-level guard rather than spinning the full |
| 24 | + send pipeline. |
| 25 | +""" |
| 26 | + |
| 27 | +from __future__ import annotations |
| 28 | + |
| 29 | +import inspect |
| 30 | +from types import SimpleNamespace |
| 31 | + |
| 32 | +import pytest |
| 33 | + |
| 34 | +from hermes_state import SessionDB |
| 35 | + |
| 36 | + |
| 37 | +# --------------------------------------------------------------------------- |
| 38 | +# SessionDB.delete_telegram_topic_binding |
| 39 | +# --------------------------------------------------------------------------- |
| 40 | + |
| 41 | + |
| 42 | +def _seed_binding( |
| 43 | + db: SessionDB, |
| 44 | + *, |
| 45 | + chat_id: str = "5595856929", |
| 46 | + thread_id: str = "15287", |
| 47 | + user_id: str = "5595856929", |
| 48 | + session_id: str = "sess-target", |
| 49 | +) -> None: |
| 50 | + db.create_session( |
| 51 | + session_id=session_id, |
| 52 | + source="telegram", |
| 53 | + user_id=user_id, |
| 54 | + ) |
| 55 | + db.bind_telegram_topic( |
| 56 | + chat_id=chat_id, |
| 57 | + thread_id=thread_id, |
| 58 | + user_id=user_id, |
| 59 | + session_key=f"agent:main:telegram:dm:{chat_id}:{thread_id}", |
| 60 | + session_id=session_id, |
| 61 | + ) |
| 62 | + |
| 63 | + |
| 64 | +class TestDeleteTelegramTopicBinding: |
| 65 | + def test_removes_matching_row_and_returns_count(self, tmp_path): |
| 66 | + db = SessionDB(db_path=tmp_path / "state.db") |
| 67 | + _seed_binding(db, thread_id="15287") |
| 68 | + # Sanity check — binding present before prune. |
| 69 | + assert db.get_telegram_topic_binding( |
| 70 | + chat_id="5595856929", thread_id="15287", |
| 71 | + ) is not None |
| 72 | + |
| 73 | + removed = db.delete_telegram_topic_binding( |
| 74 | + chat_id="5595856929", thread_id="15287", |
| 75 | + ) |
| 76 | + |
| 77 | + assert removed == 1 |
| 78 | + assert db.get_telegram_topic_binding( |
| 79 | + chat_id="5595856929", thread_id="15287", |
| 80 | + ) is None |
| 81 | + db.close() |
| 82 | + |
| 83 | + def test_does_not_touch_unrelated_bindings(self, tmp_path): |
| 84 | + # Critical for the fix: a chat with multiple topics must |
| 85 | + # only lose the one Telegram confirmed deleted, never the |
| 86 | + # rest. Otherwise the user's healthy topics also vanish |
| 87 | + # from recovery's view. |
| 88 | + db = SessionDB(db_path=tmp_path / "state.db") |
| 89 | + _seed_binding(db, thread_id="15287", session_id="sess-stale") |
| 90 | + _seed_binding(db, thread_id="15418", session_id="sess-fresh") |
| 91 | + |
| 92 | + removed = db.delete_telegram_topic_binding( |
| 93 | + chat_id="5595856929", thread_id="15287", |
| 94 | + ) |
| 95 | + assert removed == 1 |
| 96 | + |
| 97 | + # Stale binding is gone; the fresh one survives. |
| 98 | + assert db.get_telegram_topic_binding( |
| 99 | + chat_id="5595856929", thread_id="15287", |
| 100 | + ) is None |
| 101 | + assert db.get_telegram_topic_binding( |
| 102 | + chat_id="5595856929", thread_id="15418", |
| 103 | + ) is not None |
| 104 | + db.close() |
| 105 | + |
| 106 | + def test_missing_row_returns_zero_silently(self, tmp_path): |
| 107 | + db = SessionDB(db_path=tmp_path / "state.db") |
| 108 | + _seed_binding(db, thread_id="15287") |
| 109 | + |
| 110 | + # Different thread_id — must not raise, just report 0. |
| 111 | + removed = db.delete_telegram_topic_binding( |
| 112 | + chat_id="5595856929", thread_id="99999", |
| 113 | + ) |
| 114 | + assert removed == 0 |
| 115 | + # Original binding still intact. |
| 116 | + assert db.get_telegram_topic_binding( |
| 117 | + chat_id="5595856929", thread_id="15287", |
| 118 | + ) is not None |
| 119 | + db.close() |
| 120 | + |
| 121 | + def test_pristine_database_with_no_topic_tables_is_silent_noop(self, tmp_path): |
| 122 | + # Fresh profile that has never run /topic — the topic-mode |
| 123 | + # tables don't exist yet. The send-fallback hot path can |
| 124 | + # still hit this code, so we must not crash. |
| 125 | + db = SessionDB(db_path=tmp_path / "state.db") |
| 126 | + # Confirm precondition: tables really aren't there. |
| 127 | + tables = { |
| 128 | + row[0] |
| 129 | + for row in db._conn.execute( |
| 130 | + "SELECT name FROM sqlite_master WHERE type='table' " |
| 131 | + "AND name LIKE 'telegram_dm%'" |
| 132 | + ).fetchall() |
| 133 | + } |
| 134 | + assert "telegram_dm_topic_bindings" not in tables |
| 135 | + |
| 136 | + removed = db.delete_telegram_topic_binding( |
| 137 | + chat_id="any", thread_id="any", |
| 138 | + ) |
| 139 | + assert removed == 0 |
| 140 | + db.close() |
| 141 | + |
| 142 | + def test_idempotent_under_repeated_calls(self, tmp_path): |
| 143 | + db = SessionDB(db_path=tmp_path / "state.db") |
| 144 | + _seed_binding(db, thread_id="15287") |
| 145 | + |
| 146 | + first = db.delete_telegram_topic_binding( |
| 147 | + chat_id="5595856929", thread_id="15287", |
| 148 | + ) |
| 149 | + second = db.delete_telegram_topic_binding( |
| 150 | + chat_id="5595856929", thread_id="15287", |
| 151 | + ) |
| 152 | + |
| 153 | + assert first == 1 |
| 154 | + assert second == 0 # already gone, no spurious "1" |
| 155 | + db.close() |
| 156 | + |
| 157 | + |
| 158 | +# --------------------------------------------------------------------------- |
| 159 | +# Adapter glue — _prune_stale_dm_topic_binding |
| 160 | +# --------------------------------------------------------------------------- |
| 161 | + |
| 162 | + |
| 163 | +def _bare_adapter(db: SessionDB | None = None): |
| 164 | + # The adapter accesses the SessionDB via |
| 165 | + # ``self._session_store._db`` (set by GatewayRunner via |
| 166 | + # ``set_session_store``). Build a minimal stand-in with just |
| 167 | + # the surface the prune helper touches; we don't need the |
| 168 | + # python-telegram-bot import-graph here. ``name`` is a |
| 169 | + # property that delegates to ``platform.value.title()``, so |
| 170 | + # we set ``platform`` rather than poking ``name`` directly. |
| 171 | + from gateway.config import Platform |
| 172 | + from plugins.platforms.telegram.adapter import TelegramAdapter |
| 173 | + |
| 174 | + adapter = object.__new__(TelegramAdapter) |
| 175 | + adapter.platform = Platform.TELEGRAM |
| 176 | + if db is not None: |
| 177 | + adapter._session_store = SimpleNamespace(_db=db) |
| 178 | + return adapter |
| 179 | + |
| 180 | + |
| 181 | +class TestPruneStaleDmTopicBindingHelper: |
| 182 | + def test_drops_binding_when_session_store_db_is_present(self, tmp_path): |
| 183 | + db = SessionDB(db_path=tmp_path / "state.db") |
| 184 | + _seed_binding(db, thread_id="15287") |
| 185 | + |
| 186 | + adapter = _bare_adapter(db) |
| 187 | + adapter._prune_stale_dm_topic_binding("5595856929", 15287) |
| 188 | + |
| 189 | + assert db.get_telegram_topic_binding( |
| 190 | + chat_id="5595856929", thread_id="15287", |
| 191 | + ) is None |
| 192 | + db.close() |
| 193 | + |
| 194 | + def test_silent_when_session_store_unavailable(self): |
| 195 | + # No ``_session_store`` attribute — the helper must not |
| 196 | + # explode (the streaming send path hits this in tests |
| 197 | + # that bypass the gateway runner). |
| 198 | + adapter = _bare_adapter() |
| 199 | + adapter._prune_stale_dm_topic_binding("123", "456") |
| 200 | + |
| 201 | + def test_silent_when_db_lacks_helper(self): |
| 202 | + # Old SessionDB without the new method (e.g. running |
| 203 | + # against an older state.db schema). Must be a no-op |
| 204 | + # rather than AttributeError. |
| 205 | + adapter = _bare_adapter() |
| 206 | + adapter._session_store = SimpleNamespace( |
| 207 | + _db=SimpleNamespace(), # no methods at all |
| 208 | + ) |
| 209 | + adapter._prune_stale_dm_topic_binding("123", "456") |
| 210 | + |
| 211 | + def test_swallows_db_exceptions_so_send_continues(self): |
| 212 | + class ExplodingDb: |
| 213 | + def delete_telegram_topic_binding(self, **_): |
| 214 | + raise RuntimeError("disk full or whatever") |
| 215 | + |
| 216 | + adapter = _bare_adapter() |
| 217 | + adapter._session_store = SimpleNamespace(_db=ExplodingDb()) |
| 218 | + |
| 219 | + # The point of the helper is that a failed cleanup must |
| 220 | + # NEVER turn into a failed user-facing send. No exception |
| 221 | + # should escape. |
| 222 | + adapter._prune_stale_dm_topic_binding("123", "456") |
| 223 | + |
| 224 | + def test_skips_when_chat_or_thread_missing(self, tmp_path): |
| 225 | + # Defensive — control-message paths sometimes call us |
| 226 | + # with chat_id=None when kwargs lack the key. We must |
| 227 | + # not produce a spurious DELETE that matches every row |
| 228 | + # with a NULL chat_id. |
| 229 | + db = SessionDB(db_path=tmp_path / "state.db") |
| 230 | + _seed_binding(db, thread_id="15287") |
| 231 | + |
| 232 | + adapter = _bare_adapter(db) |
| 233 | + |
| 234 | + adapter._prune_stale_dm_topic_binding(None, "15287") |
| 235 | + adapter._prune_stale_dm_topic_binding("5595856929", None) |
| 236 | + |
| 237 | + # Still there — neither call generated a DELETE. |
| 238 | + assert db.get_telegram_topic_binding( |
| 239 | + chat_id="5595856929", thread_id="15287", |
| 240 | + ) is not None |
| 241 | + db.close() |
| 242 | + |
| 243 | + |
| 244 | +# --------------------------------------------------------------------------- |
| 245 | +# Source-level wiring guards — both fallback sites must call the helper |
| 246 | +# --------------------------------------------------------------------------- |
| 247 | + |
| 248 | + |
| 249 | +class TestThreadNotFoundFallbackSitesPruneBinding: |
| 250 | + """Pin that the two ``Thread not found`` warning sites in the |
| 251 | + Telegram adapter actually invoke ``_prune_stale_dm_topic_binding``. |
| 252 | + These guards stop a future refactor from quietly losing the |
| 253 | + cleanup wire — re-opening #31501. |
| 254 | + """ |
| 255 | + |
| 256 | + def test_streaming_send_fallback_calls_prune(self): |
| 257 | + from plugins.platforms.telegram import adapter as telegram_mod |
| 258 | + |
| 259 | + src = inspect.getsource(telegram_mod.TelegramAdapter.send) |
| 260 | + # Locate the second-failure branch (the one that flips |
| 261 | + # ``used_thread_fallback``). It must invoke the prune |
| 262 | + # helper before flipping the flag. |
| 263 | + marker = "retrying without message_thread_id" |
| 264 | + idx = src.find(marker) |
| 265 | + assert idx != -1, ( |
| 266 | + "Streaming send must keep its 'thread not found' " |
| 267 | + "fallback log line — the prune wiring is anchored " |
| 268 | + "next to it." |
| 269 | + ) |
| 270 | + # 600 char window is enough to cover the warning, the |
| 271 | + # prune call, and the ``used_thread_fallback = True`` |
| 272 | + # assignment that follows. |
| 273 | + window = src[idx:idx + 600] |
| 274 | + assert "_prune_stale_dm_topic_binding" in window, ( |
| 275 | + "Streaming send 'Thread not found' fallback must call " |
| 276 | + "_prune_stale_dm_topic_binding so the stale row in " |
| 277 | + "telegram_dm_topic_bindings doesn't keep redirecting " |
| 278 | + "future inbound messages to the deleted topic (#31501)." |
| 279 | + ) |
| 280 | + |
| 281 | + def test_control_message_helper_calls_prune(self): |
| 282 | + from plugins.platforms.telegram import adapter as telegram_mod |
| 283 | + |
| 284 | + src = inspect.getsource( |
| 285 | + telegram_mod.TelegramAdapter._send_message_with_thread_fallback |
| 286 | + ) |
| 287 | + # The helper has a single retry path; the prune call |
| 288 | + # must sit inside it, not in dead code outside the |
| 289 | + # ``if message_thread_id is not None and …`` guard. |
| 290 | + assert "_prune_stale_dm_topic_binding" in src, ( |
| 291 | + "_send_message_with_thread_fallback must call " |
| 292 | + "_prune_stale_dm_topic_binding when Telegram returns " |
| 293 | + "BadRequest('Thread not found') for a control message " |
| 294 | + "(#31501)." |
| 295 | + ) |
| 296 | + # Belt-and-braces: the call must precede the retry |
| 297 | + # ``send_message`` so the prune happens whether or not |
| 298 | + # the retry itself succeeds. |
| 299 | + prune_idx = src.find("_prune_stale_dm_topic_binding") |
| 300 | + retry_idx = src.find("send_message(**retry_kwargs)") |
| 301 | + assert 0 <= prune_idx < retry_idx, ( |
| 302 | + "_prune_stale_dm_topic_binding must run before the " |
| 303 | + "fallback send_message retry." |
| 304 | + ) |
| 305 | + |
| 306 | + |
| 307 | +# --------------------------------------------------------------------------- |
| 308 | +# End-to-end semantic — prune + recovery returns None for deleted topic |
| 309 | +# --------------------------------------------------------------------------- |
| 310 | + |
| 311 | + |
| 312 | +class TestRecoveryAfterPrune: |
| 313 | + """The whole point of the fix: once a topic is pruned, the |
| 314 | + GatewayRunner's ``_recover_telegram_topic_thread_id`` must no |
| 315 | + longer steer future inbound messages to it. |
| 316 | + """ |
| 317 | + |
| 318 | + def test_recovery_no_longer_returns_pruned_topic(self, tmp_path): |
| 319 | + # Build the same fixture used elsewhere: two topic bindings |
| 320 | + # for the same user, then prune the most-recent one. |
| 321 | + # ``_recover_telegram_topic_thread_id`` walks bindings |
| 322 | + # newest-first, so without the prune it would pick the |
| 323 | + # one we just removed. |
| 324 | + from gateway.config import GatewayConfig, Platform, PlatformConfig |
| 325 | + from gateway.run import GatewayRunner |
| 326 | + from gateway.session import SessionSource, build_session_key |
| 327 | + |
| 328 | + db = SessionDB(db_path=tmp_path / "state.db") |
| 329 | + db.enable_telegram_topic_mode( |
| 330 | + chat_id="5595856929", user_id="5595856929", |
| 331 | + ) |
| 332 | + |
| 333 | + for sid, thread in (("sess-A", "111"), ("sess-B", "222")): |
| 334 | + db.create_session( |
| 335 | + session_id=sid, source="telegram", |
| 336 | + user_id="5595856929", |
| 337 | + ) |
| 338 | + db.bind_telegram_topic( |
| 339 | + chat_id="5595856929", |
| 340 | + thread_id=thread, |
| 341 | + user_id="5595856929", |
| 342 | + session_key=build_session_key(SessionSource( |
| 343 | + platform=Platform.TELEGRAM, |
| 344 | + user_id="5595856929", |
| 345 | + chat_id="5595856929", |
| 346 | + user_name="tester", |
| 347 | + chat_type="dm", |
| 348 | + thread_id=thread, |
| 349 | + )), |
| 350 | + session_id=sid, |
| 351 | + ) |
| 352 | + |
| 353 | + runner = object.__new__(GatewayRunner) |
| 354 | + runner.config = GatewayConfig( |
| 355 | + platforms={ |
| 356 | + Platform.TELEGRAM: PlatformConfig(enabled=True, token="***"), |
| 357 | + } |
| 358 | + ) |
| 359 | + runner._session_db = db |
| 360 | + runner._telegram_topic_mode_enabled = lambda _src: True |
| 361 | + |
| 362 | + # Sanity: before the prune, recovery picks "222" (newest). |
| 363 | + # Recovery only fires for a lobby-shaped inbound (omitted |
| 364 | + # message_thread_id or General topic "1"); a non-lobby |
| 365 | + # unknown thread is preserved as a brand-new topic. Use the |
| 366 | + # General topic id so the recovery walk actually runs. |
| 367 | + before = runner._recover_telegram_topic_thread_id(SessionSource( |
| 368 | + platform=Platform.TELEGRAM, |
| 369 | + user_id="5595856929", |
| 370 | + chat_id="5595856929", |
| 371 | + user_name="tester", |
| 372 | + chat_type="dm", |
| 373 | + thread_id="1", # General/stripped reply — triggers recovery |
| 374 | + )) |
| 375 | + assert before == "222" |
| 376 | + |
| 377 | + # User deletes topic 222 in Telegram → adapter prunes. |
| 378 | + db.delete_telegram_topic_binding( |
| 379 | + chat_id="5595856929", thread_id="222", |
| 380 | + ) |
| 381 | + |
| 382 | + # Now recovery falls back to topic 111 (the surviving |
| 383 | + # binding) instead of the dead one. This is the exact |
| 384 | + # behaviour change the bug report asks for. |
| 385 | + after = runner._recover_telegram_topic_thread_id(SessionSource( |
| 386 | + platform=Platform.TELEGRAM, |
| 387 | + user_id="5595856929", |
| 388 | + chat_id="5595856929", |
| 389 | + user_name="tester", |
| 390 | + chat_type="dm", |
| 391 | + thread_id="1", |
| 392 | + )) |
| 393 | + assert after == "111" |
| 394 | + db.close() |
0 commit comments