diff --git a/gateway/run.py b/gateway/run.py index a42bd7c2b..e87739c14 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -4794,6 +4794,36 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew except Exception: return False + def _session_has_compression_in_flight(self, session_key: str) -> bool: + """Return True when a compression lock is held for this session's id. + + Context compression is interrupt-protected (#23975) but gateway + ``interrupt`` busy-input mode can still start a follow-up turn against + the pre-rotation parent while compression is mid-flight, producing + orphaned compression siblings (#56391). Callers demote interrupt to + queue when this returns True. + """ + session_store = getattr(self, "session_store", None) + if not session_key or session_store is None: + return False + try: + with session_store._lock: # noqa: SLF001 — snapshot entry under lock + session_store._ensure_loaded_locked() # noqa: SLF001 + entry = session_store._entries.get(session_key) # noqa: SLF001 + session_id = getattr(entry, "session_id", None) if entry is not None else None + if not session_id: + return False + except Exception: + return False + session_db = getattr(self, "_session_db", None) + if session_db is None: + return False + db = getattr(session_db, "_db", session_db) + try: + return bool(db.get_compression_lock_holder(str(session_id))) + except Exception: + return False + # Hard cap on per-session pending follow-ups for busy_input_mode=queue # (and the draining/steer-fallback/subagent-demotion paths that share # this entry point). Without a cap, a stuck agent + a rapid-fire user @@ -5014,6 +5044,17 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew session_key, ) effective_mode = "queue" + demoted_for_compression = ( + effective_mode == "interrupt" + and self._session_has_compression_in_flight(session_key) + ) + if demoted_for_compression: + logger.info( + "Demoting busy_input_mode 'interrupt' to 'queue' for session %s " + "because context compression is in flight (#56391)", + session_key, + ) + effective_mode = "queue" steered = False if effective_mode == "steer": steer_text = (event.text or "").strip() @@ -5127,6 +5168,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew f"⏳ Subagent working{status_detail} — your message is queued for " f"when it finishes (use /stop to cancel everything)." ) + elif is_queue_mode and demoted_for_compression: + message = ( + f"⏳ Compressing context{status_detail} — your message is queued for " + f"when it finishes (use /stop to cancel everything)." + ) elif is_queue_mode: message = ( f"⏳ Queued for the next turn{status_detail}. " diff --git a/scripts/release.py b/scripts/release.py index bdb8ff72f..93045098c 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -45,6 +45,7 @@ ACP_REGISTRY_MANIFEST = REPO_ROOT / "acp_registry" / "agent.json" # Auto-extracted from noreply emails + manual overrides AUTHOR_MAP = { + "1079826437@qq.com": "nankingjing", # PR #56404 salvage (gateway: while a state.db compression lock is held for the session, demote busy_input_mode 'interrupt' to 'queue' so a rapid message burst can't interrupt and fork orphaned compression siblings off a stale parent; #56391) "ud@arubangles.com": "udatny", # PR #29433 salvage (subdirectory_hints: catch RuntimeError from Path.expanduser()/Path.home() so a literal ~ in tool-call args — e.g. LLM "~500-700" or ~unknownuser — can't escape the hint walker and crash the conversation loop) "brett@personalfinancelab.com": "brett539", # PR #49369 salvage (cap Telegram initialize() with asyncio.wait_for(HERMES_TELEGRAM_INIT_TIMEOUT, default 30s) per attempt so an unreachable fallback-IP connect chain can't block gateway startup indefinitely; add WARNING progress logs before DoH discovery and each connect attempt) "randomuser2026x@proton.me": "randomuser2026x", # PR #50204 salvage (gateway /restart under systemd: probe both system + --user scope for MainPID instead of hardcoding --user; always exit 75 so RestartForceExitStatus=75 revives the unit under Restart=on-failure too, not just Restart=always) diff --git a/tests/gateway/test_compression_interrupt_demotion_56391.py b/tests/gateway/test_compression_interrupt_demotion_56391.py new file mode 100644 index 000000000..53b430d5f --- /dev/null +++ b/tests/gateway/test_compression_interrupt_demotion_56391.py @@ -0,0 +1,195 @@ +"""Regression tests for #56391. + +When context compression is in flight (state.db compression lock held), +gateway ``busy_input_mode='interrupt'`` must demote to queue semantics so a +rapid message burst cannot start a follow-up turn against the pre-rotation +parent and fork orphaned compression siblings. +""" + +from __future__ import annotations + +import sys +import threading +import time +import types +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +_tg = types.ModuleType("telegram") +_tg.constants = types.ModuleType("telegram.constants") +_ct = MagicMock() +_ct.SUPERGROUP = "supergroup" +_ct.GROUP = "group" +_ct.PRIVATE = "private" +_tg.constants.ChatType = _ct +sys.modules.setdefault("telegram", _tg) +sys.modules.setdefault("telegram.constants", _tg.constants) +sys.modules.setdefault("telegram.ext", types.ModuleType("telegram.ext")) + +from gateway.platforms.base import ( # noqa: E402 + MessageEvent, + MessageType, + SessionSource, + build_session_key, +) +from gateway.run import GatewayRunner, _AGENT_PENDING_SENTINEL # noqa: E402 + + +def _make_event(text: str = "hello", chat_id: str = "123") -> MessageEvent: + source = SessionSource( + platform=MagicMock(value="telegram"), + chat_id=chat_id, + chat_type="private", + user_id="user1", + ) + return MessageEvent( + text=text, + message_type=MessageType.TEXT, + source=source, + message_id="msg1", + ) + + +def _make_runner(*, session_id: str = "parent-session") -> GatewayRunner: + runner = object.__new__(GatewayRunner) + runner._running_agents = {} + runner._running_agents_ts = {} + runner._pending_messages = {} + runner._busy_ack_ts = {} + runner._draining = False + runner.adapters = {} + runner.config = MagicMock() + runner.hooks = MagicMock() + runner.hooks.emit = AsyncMock() + runner.pairing_store = MagicMock() + runner.pairing_store.is_approved.return_value = True + runner._is_user_authorized = lambda _source: True + runner._busy_input_mode = "interrupt" + session_key = build_session_key(_make_event().source) + entry = SimpleNamespace(session_key=session_key, session_id=session_id) + session_store = SimpleNamespace( + _lock=threading.Lock(), + _entries={session_key: entry}, + switch_session=MagicMock(), + ) + session_store._ensure_loaded_locked = lambda: None + runner.session_store = session_store + runner._session_db = MagicMock() + runner._session_db._db = MagicMock() + runner._session_db._db.get_compression_lock_holder.return_value = None + return runner + + +def _make_adapter() -> MagicMock: + adapter = MagicMock() + adapter._pending_messages = {} + adapter._send_with_retry = AsyncMock() + adapter.config = MagicMock() + adapter.config.extra = {} + adapter.platform = MagicMock(value="telegram") + return adapter + + +def _make_parent_no_subagents() -> MagicMock: + parent = MagicMock() + parent._active_children = [] + parent._active_children_lock = threading.Lock() + parent.get_activity_summary.return_value = { + "api_call_count": 3, + "max_iterations": 60, + "current_tool": "terminal", + } + return parent + + +class TestSessionHasCompressionInFlight: + def test_returns_false_without_session_store(self) -> None: + runner = _make_runner() + runner.session_store = None + assert runner._session_has_compression_in_flight("sk") is False + + def test_returns_true_when_lock_held(self) -> None: + runner = _make_runner() + sk = build_session_key(_make_event().source) + runner._session_db._db.get_compression_lock_holder.return_value = "holder-1" + assert runner._session_has_compression_in_flight(sk) is True + + def test_returns_false_when_lock_free(self) -> None: + runner = _make_runner() + sk = build_session_key(_make_event().source) + runner._session_db._db.get_compression_lock_holder.return_value = None + assert runner._session_has_compression_in_flight(sk) is False + + +class TestBusyHandlerDemotesInterruptForCompression: + @pytest.mark.asyncio + async def test_does_not_interrupt_when_compression_in_flight(self) -> None: + runner = _make_runner() + adapter = _make_adapter() + event = _make_event(text="follow up during compression") + sk = build_session_key(event.source) + parent = _make_parent_no_subagents() + runner._running_agents[sk] = parent + runner.adapters[event.source.platform] = adapter + runner._session_db._db.get_compression_lock_holder.return_value = "compressing" + + handled = await runner._handle_active_session_busy_message(event, sk) + + assert handled is True + parent.interrupt.assert_not_called() + assert adapter._pending_messages.get(sk) is event + + @pytest.mark.asyncio + async def test_ack_explains_compression_demotion(self) -> None: + runner = _make_runner() + adapter = _make_adapter() + event = _make_event(text="hi mid-compress") + sk = build_session_key(event.source) + parent = _make_parent_no_subagents() + runner._running_agents[sk] = parent + runner._running_agents_ts[sk] = time.time() - 120 + runner.adapters[event.source.platform] = adapter + runner._session_db._db.get_compression_lock_holder.return_value = "compressing" + + with patch("gateway.run.merge_pending_message_event"): + await runner._handle_active_session_busy_message(event, sk) + + adapter._send_with_retry.assert_called_once() + content = adapter._send_with_retry.call_args.kwargs.get("content", "") + assert "Compressing context" in content + assert "queued" in content.lower() + assert "/stop" in content + assert "Interrupting" not in content + + @pytest.mark.asyncio + async def test_interrupt_still_fires_without_compression_lock(self) -> None: + runner = _make_runner() + adapter = _make_adapter() + event = _make_event(text="please stop") + sk = build_session_key(event.source) + parent = _make_parent_no_subagents() + runner._running_agents[sk] = parent + runner.adapters[event.source.platform] = adapter + runner._session_db._db.get_compression_lock_holder.return_value = None + + with patch("gateway.run.merge_pending_message_event"): + await runner._handle_active_session_busy_message(event, sk) + + parent.interrupt.assert_called_once_with("please stop") + + @pytest.mark.asyncio + async def test_pending_sentinel_does_not_trigger_false_positive(self) -> None: + runner = _make_runner() + adapter = _make_adapter() + event = _make_event(text="hello") + sk = build_session_key(event.source) + runner._running_agents[sk] = _AGENT_PENDING_SENTINEL + runner.adapters[event.source.platform] = adapter + runner._session_db._db.get_compression_lock_holder.return_value = "compressing" + + with patch("gateway.run.merge_pending_message_event"): + handled = await runner._handle_active_session_busy_message(event, sk) + + assert handled is True