From 5eaccf5802be4e46e271b3e7ca8de5302b9741f2 Mon Sep 17 00:00:00 2001 From: nankingjing <1079826437@qq.com> Date: Wed, 1 Jul 2026 06:26:52 -0700 Subject: [PATCH] fix(gateway): queue interrupts during in-flight context compression MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With the default busy_input_mode=interrupt, a burst of rapid gateway messages arriving while context compression is in flight could interrupt the current turn and start a fresh turn against the pre-rotation parent session. Because compression is interrupt-immune (#23975), the still- running compression later rotates the id out from under that new turn, and if the new turn also grew past the compression threshold it started its own uncancellable compression on the same stale parent — forking multiple orphaned one-shot sibling continuations (#56391). While a state.db compression lock is held for the session, demote 'interrupt' busy-input mode to 'queue' semantics (mirroring the subagent protection in #30170), so the follow-up message waits for the in-flight compression + its id rotation to land instead of racing a new turn against the stale parent. Ack copy explains the compression demotion. Fixes #56391. --- gateway/run.py | 46 +++++ scripts/release.py | 1 + ...st_compression_interrupt_demotion_56391.py | 195 ++++++++++++++++++ 3 files changed, 242 insertions(+) create mode 100644 tests/gateway/test_compression_interrupt_demotion_56391.py diff --git a/gateway/run.py b/gateway/run.py index a42bd7c2b..e87739c14 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -4794,6 +4794,36 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew except Exception: return False + def _session_has_compression_in_flight(self, session_key: str) -> bool: + """Return True when a compression lock is held for this session's id. + + Context compression is interrupt-protected (#23975) but gateway + ``interrupt`` busy-input mode can still start a follow-up turn against + the pre-rotation parent while compression is mid-flight, producing + orphaned compression siblings (#56391). Callers demote interrupt to + queue when this returns True. + """ + session_store = getattr(self, "session_store", None) + if not session_key or session_store is None: + return False + try: + with session_store._lock: # noqa: SLF001 — snapshot entry under lock + session_store._ensure_loaded_locked() # noqa: SLF001 + entry = session_store._entries.get(session_key) # noqa: SLF001 + session_id = getattr(entry, "session_id", None) if entry is not None else None + if not session_id: + return False + except Exception: + return False + session_db = getattr(self, "_session_db", None) + if session_db is None: + return False + db = getattr(session_db, "_db", session_db) + try: + return bool(db.get_compression_lock_holder(str(session_id))) + except Exception: + return False + # Hard cap on per-session pending follow-ups for busy_input_mode=queue # (and the draining/steer-fallback/subagent-demotion paths that share # this entry point). Without a cap, a stuck agent + a rapid-fire user @@ -5014,6 +5044,17 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew session_key, ) effective_mode = "queue" + demoted_for_compression = ( + effective_mode == "interrupt" + and self._session_has_compression_in_flight(session_key) + ) + if demoted_for_compression: + logger.info( + "Demoting busy_input_mode 'interrupt' to 'queue' for session %s " + "because context compression is in flight (#56391)", + session_key, + ) + effective_mode = "queue" steered = False if effective_mode == "steer": steer_text = (event.text or "").strip() @@ -5127,6 +5168,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew f"⏳ Subagent working{status_detail} — your message is queued for " f"when it finishes (use /stop to cancel everything)." ) + elif is_queue_mode and demoted_for_compression: + message = ( + f"⏳ Compressing context{status_detail} — your message is queued for " + f"when it finishes (use /stop to cancel everything)." + ) elif is_queue_mode: message = ( f"⏳ Queued for the next turn{status_detail}. " diff --git a/scripts/release.py b/scripts/release.py index bdb8ff72f..93045098c 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -45,6 +45,7 @@ ACP_REGISTRY_MANIFEST = REPO_ROOT / "acp_registry" / "agent.json" # Auto-extracted from noreply emails + manual overrides AUTHOR_MAP = { + "1079826437@qq.com": "nankingjing", # PR #56404 salvage (gateway: while a state.db compression lock is held for the session, demote busy_input_mode 'interrupt' to 'queue' so a rapid message burst can't interrupt and fork orphaned compression siblings off a stale parent; #56391) "ud@arubangles.com": "udatny", # PR #29433 salvage (subdirectory_hints: catch RuntimeError from Path.expanduser()/Path.home() so a literal ~ in tool-call args — e.g. LLM "~500-700" or ~unknownuser — can't escape the hint walker and crash the conversation loop) "brett@personalfinancelab.com": "brett539", # PR #49369 salvage (cap Telegram initialize() with asyncio.wait_for(HERMES_TELEGRAM_INIT_TIMEOUT, default 30s) per attempt so an unreachable fallback-IP connect chain can't block gateway startup indefinitely; add WARNING progress logs before DoH discovery and each connect attempt) "randomuser2026x@proton.me": "randomuser2026x", # PR #50204 salvage (gateway /restart under systemd: probe both system + --user scope for MainPID instead of hardcoding --user; always exit 75 so RestartForceExitStatus=75 revives the unit under Restart=on-failure too, not just Restart=always) diff --git a/tests/gateway/test_compression_interrupt_demotion_56391.py b/tests/gateway/test_compression_interrupt_demotion_56391.py new file mode 100644 index 000000000..53b430d5f --- /dev/null +++ b/tests/gateway/test_compression_interrupt_demotion_56391.py @@ -0,0 +1,195 @@ +"""Regression tests for #56391. + +When context compression is in flight (state.db compression lock held), +gateway ``busy_input_mode='interrupt'`` must demote to queue semantics so a +rapid message burst cannot start a follow-up turn against the pre-rotation +parent and fork orphaned compression siblings. +""" + +from __future__ import annotations + +import sys +import threading +import time +import types +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +_tg = types.ModuleType("telegram") +_tg.constants = types.ModuleType("telegram.constants") +_ct = MagicMock() +_ct.SUPERGROUP = "supergroup" +_ct.GROUP = "group" +_ct.PRIVATE = "private" +_tg.constants.ChatType = _ct +sys.modules.setdefault("telegram", _tg) +sys.modules.setdefault("telegram.constants", _tg.constants) +sys.modules.setdefault("telegram.ext", types.ModuleType("telegram.ext")) + +from gateway.platforms.base import ( # noqa: E402 + MessageEvent, + MessageType, + SessionSource, + build_session_key, +) +from gateway.run import GatewayRunner, _AGENT_PENDING_SENTINEL # noqa: E402 + + +def _make_event(text: str = "hello", chat_id: str = "123") -> MessageEvent: + source = SessionSource( + platform=MagicMock(value="telegram"), + chat_id=chat_id, + chat_type="private", + user_id="user1", + ) + return MessageEvent( + text=text, + message_type=MessageType.TEXT, + source=source, + message_id="msg1", + ) + + +def _make_runner(*, session_id: str = "parent-session") -> GatewayRunner: + runner = object.__new__(GatewayRunner) + runner._running_agents = {} + runner._running_agents_ts = {} + runner._pending_messages = {} + runner._busy_ack_ts = {} + runner._draining = False + runner.adapters = {} + runner.config = MagicMock() + runner.hooks = MagicMock() + runner.hooks.emit = AsyncMock() + runner.pairing_store = MagicMock() + runner.pairing_store.is_approved.return_value = True + runner._is_user_authorized = lambda _source: True + runner._busy_input_mode = "interrupt" + session_key = build_session_key(_make_event().source) + entry = SimpleNamespace(session_key=session_key, session_id=session_id) + session_store = SimpleNamespace( + _lock=threading.Lock(), + _entries={session_key: entry}, + switch_session=MagicMock(), + ) + session_store._ensure_loaded_locked = lambda: None + runner.session_store = session_store + runner._session_db = MagicMock() + runner._session_db._db = MagicMock() + runner._session_db._db.get_compression_lock_holder.return_value = None + return runner + + +def _make_adapter() -> MagicMock: + adapter = MagicMock() + adapter._pending_messages = {} + adapter._send_with_retry = AsyncMock() + adapter.config = MagicMock() + adapter.config.extra = {} + adapter.platform = MagicMock(value="telegram") + return adapter + + +def _make_parent_no_subagents() -> MagicMock: + parent = MagicMock() + parent._active_children = [] + parent._active_children_lock = threading.Lock() + parent.get_activity_summary.return_value = { + "api_call_count": 3, + "max_iterations": 60, + "current_tool": "terminal", + } + return parent + + +class TestSessionHasCompressionInFlight: + def test_returns_false_without_session_store(self) -> None: + runner = _make_runner() + runner.session_store = None + assert runner._session_has_compression_in_flight("sk") is False + + def test_returns_true_when_lock_held(self) -> None: + runner = _make_runner() + sk = build_session_key(_make_event().source) + runner._session_db._db.get_compression_lock_holder.return_value = "holder-1" + assert runner._session_has_compression_in_flight(sk) is True + + def test_returns_false_when_lock_free(self) -> None: + runner = _make_runner() + sk = build_session_key(_make_event().source) + runner._session_db._db.get_compression_lock_holder.return_value = None + assert runner._session_has_compression_in_flight(sk) is False + + +class TestBusyHandlerDemotesInterruptForCompression: + @pytest.mark.asyncio + async def test_does_not_interrupt_when_compression_in_flight(self) -> None: + runner = _make_runner() + adapter = _make_adapter() + event = _make_event(text="follow up during compression") + sk = build_session_key(event.source) + parent = _make_parent_no_subagents() + runner._running_agents[sk] = parent + runner.adapters[event.source.platform] = adapter + runner._session_db._db.get_compression_lock_holder.return_value = "compressing" + + handled = await runner._handle_active_session_busy_message(event, sk) + + assert handled is True + parent.interrupt.assert_not_called() + assert adapter._pending_messages.get(sk) is event + + @pytest.mark.asyncio + async def test_ack_explains_compression_demotion(self) -> None: + runner = _make_runner() + adapter = _make_adapter() + event = _make_event(text="hi mid-compress") + sk = build_session_key(event.source) + parent = _make_parent_no_subagents() + runner._running_agents[sk] = parent + runner._running_agents_ts[sk] = time.time() - 120 + runner.adapters[event.source.platform] = adapter + runner._session_db._db.get_compression_lock_holder.return_value = "compressing" + + with patch("gateway.run.merge_pending_message_event"): + await runner._handle_active_session_busy_message(event, sk) + + adapter._send_with_retry.assert_called_once() + content = adapter._send_with_retry.call_args.kwargs.get("content", "") + assert "Compressing context" in content + assert "queued" in content.lower() + assert "/stop" in content + assert "Interrupting" not in content + + @pytest.mark.asyncio + async def test_interrupt_still_fires_without_compression_lock(self) -> None: + runner = _make_runner() + adapter = _make_adapter() + event = _make_event(text="please stop") + sk = build_session_key(event.source) + parent = _make_parent_no_subagents() + runner._running_agents[sk] = parent + runner.adapters[event.source.platform] = adapter + runner._session_db._db.get_compression_lock_holder.return_value = None + + with patch("gateway.run.merge_pending_message_event"): + await runner._handle_active_session_busy_message(event, sk) + + parent.interrupt.assert_called_once_with("please stop") + + @pytest.mark.asyncio + async def test_pending_sentinel_does_not_trigger_false_positive(self) -> None: + runner = _make_runner() + adapter = _make_adapter() + event = _make_event(text="hello") + sk = build_session_key(event.source) + runner._running_agents[sk] = _AGENT_PENDING_SENTINEL + runner.adapters[event.source.platform] = adapter + runner._session_db._db.get_compression_lock_holder.return_value = "compressing" + + with patch("gateway.run.merge_pending_message_event"): + handled = await runner._handle_active_session_busy_message(event, sk) + + assert handled is True