fix(gateway): queue interrupts during in-flight context compression

With the default busy_input_mode=interrupt, a burst of rapid gateway
messages arriving while context compression is in flight could interrupt
the current turn and start a fresh turn against the pre-rotation parent
session. Because compression is interrupt-immune (#23975), the still-
running compression later rotates the id out from under that new turn,
and if the new turn also grew past the compression threshold it started
its own uncancellable compression on the same stale parent — forking
multiple orphaned one-shot sibling continuations (#56391).

While a state.db compression lock is held for the session, demote
'interrupt' busy-input mode to 'queue' semantics (mirroring the subagent
protection in #30170), so the follow-up message waits for the in-flight
compression + its id rotation to land instead of racing a new turn
against the stale parent. Ack copy explains the compression demotion.

Fixes #56391.
This commit is contained in:
nankingjing 2026-07-01 06:26:52 -07:00 committed by Teknium
parent 1641441837
commit 5eaccf5802
3 changed files with 242 additions and 0 deletions

View file

@ -4794,6 +4794,36 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
except Exception:
return False
def _session_has_compression_in_flight(self, session_key: str) -> bool:
"""Return True when a compression lock is held for this session's id.
Context compression is interrupt-protected (#23975) but gateway
``interrupt`` busy-input mode can still start a follow-up turn against
the pre-rotation parent while compression is mid-flight, producing
orphaned compression siblings (#56391). Callers demote interrupt to
queue when this returns True.
"""
session_store = getattr(self, "session_store", None)
if not session_key or session_store is None:
return False
try:
with session_store._lock: # noqa: SLF001 — snapshot entry under lock
session_store._ensure_loaded_locked() # noqa: SLF001
entry = session_store._entries.get(session_key) # noqa: SLF001
session_id = getattr(entry, "session_id", None) if entry is not None else None
if not session_id:
return False
except Exception:
return False
session_db = getattr(self, "_session_db", None)
if session_db is None:
return False
db = getattr(session_db, "_db", session_db)
try:
return bool(db.get_compression_lock_holder(str(session_id)))
except Exception:
return False
# Hard cap on per-session pending follow-ups for busy_input_mode=queue
# (and the draining/steer-fallback/subagent-demotion paths that share
# this entry point). Without a cap, a stuck agent + a rapid-fire user
@ -5014,6 +5044,17 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
session_key,
)
effective_mode = "queue"
demoted_for_compression = (
effective_mode == "interrupt"
and self._session_has_compression_in_flight(session_key)
)
if demoted_for_compression:
logger.info(
"Demoting busy_input_mode 'interrupt' to 'queue' for session %s "
"because context compression is in flight (#56391)",
session_key,
)
effective_mode = "queue"
steered = False
if effective_mode == "steer":
steer_text = (event.text or "").strip()
@ -5127,6 +5168,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
f"⏳ Subagent working{status_detail} — your message is queued for "
f"when it finishes (use /stop to cancel everything)."
)
elif is_queue_mode and demoted_for_compression:
message = (
f"⏳ Compressing context{status_detail} — your message is queued for "
f"when it finishes (use /stop to cancel everything)."
)
elif is_queue_mode:
message = (
f"⏳ Queued for the next turn{status_detail}. "

View file

@ -45,6 +45,7 @@ ACP_REGISTRY_MANIFEST = REPO_ROOT / "acp_registry" / "agent.json"
# Auto-extracted from noreply emails + manual overrides
AUTHOR_MAP = {
"1079826437@qq.com": "nankingjing", # PR #56404 salvage (gateway: while a state.db compression lock is held for the session, demote busy_input_mode 'interrupt' to 'queue' so a rapid message burst can't interrupt and fork orphaned compression siblings off a stale parent; #56391)
"ud@arubangles.com": "udatny", # PR #29433 salvage (subdirectory_hints: catch RuntimeError from Path.expanduser()/Path.home() so a literal ~ in tool-call args — e.g. LLM "~500-700" or ~unknownuser — can't escape the hint walker and crash the conversation loop)
"brett@personalfinancelab.com": "brett539", # PR #49369 salvage (cap Telegram initialize() with asyncio.wait_for(HERMES_TELEGRAM_INIT_TIMEOUT, default 30s) per attempt so an unreachable fallback-IP connect chain can't block gateway startup indefinitely; add WARNING progress logs before DoH discovery and each connect attempt)
"randomuser2026x@proton.me": "randomuser2026x", # PR #50204 salvage (gateway /restart under systemd: probe both system + --user scope for MainPID instead of hardcoding --user; always exit 75 so RestartForceExitStatus=75 revives the unit under Restart=on-failure too, not just Restart=always)

View file

@ -0,0 +1,195 @@
"""Regression tests for #56391.
When context compression is in flight (state.db compression lock held),
gateway ``busy_input_mode='interrupt'`` must demote to queue semantics so a
rapid message burst cannot start a follow-up turn against the pre-rotation
parent and fork orphaned compression siblings.
"""
from __future__ import annotations
import sys
import threading
import time
import types
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
_tg = types.ModuleType("telegram")
_tg.constants = types.ModuleType("telegram.constants")
_ct = MagicMock()
_ct.SUPERGROUP = "supergroup"
_ct.GROUP = "group"
_ct.PRIVATE = "private"
_tg.constants.ChatType = _ct
sys.modules.setdefault("telegram", _tg)
sys.modules.setdefault("telegram.constants", _tg.constants)
sys.modules.setdefault("telegram.ext", types.ModuleType("telegram.ext"))
from gateway.platforms.base import ( # noqa: E402
MessageEvent,
MessageType,
SessionSource,
build_session_key,
)
from gateway.run import GatewayRunner, _AGENT_PENDING_SENTINEL # noqa: E402
def _make_event(text: str = "hello", chat_id: str = "123") -> MessageEvent:
source = SessionSource(
platform=MagicMock(value="telegram"),
chat_id=chat_id,
chat_type="private",
user_id="user1",
)
return MessageEvent(
text=text,
message_type=MessageType.TEXT,
source=source,
message_id="msg1",
)
def _make_runner(*, session_id: str = "parent-session") -> GatewayRunner:
runner = object.__new__(GatewayRunner)
runner._running_agents = {}
runner._running_agents_ts = {}
runner._pending_messages = {}
runner._busy_ack_ts = {}
runner._draining = False
runner.adapters = {}
runner.config = MagicMock()
runner.hooks = MagicMock()
runner.hooks.emit = AsyncMock()
runner.pairing_store = MagicMock()
runner.pairing_store.is_approved.return_value = True
runner._is_user_authorized = lambda _source: True
runner._busy_input_mode = "interrupt"
session_key = build_session_key(_make_event().source)
entry = SimpleNamespace(session_key=session_key, session_id=session_id)
session_store = SimpleNamespace(
_lock=threading.Lock(),
_entries={session_key: entry},
switch_session=MagicMock(),
)
session_store._ensure_loaded_locked = lambda: None
runner.session_store = session_store
runner._session_db = MagicMock()
runner._session_db._db = MagicMock()
runner._session_db._db.get_compression_lock_holder.return_value = None
return runner
def _make_adapter() -> MagicMock:
adapter = MagicMock()
adapter._pending_messages = {}
adapter._send_with_retry = AsyncMock()
adapter.config = MagicMock()
adapter.config.extra = {}
adapter.platform = MagicMock(value="telegram")
return adapter
def _make_parent_no_subagents() -> MagicMock:
parent = MagicMock()
parent._active_children = []
parent._active_children_lock = threading.Lock()
parent.get_activity_summary.return_value = {
"api_call_count": 3,
"max_iterations": 60,
"current_tool": "terminal",
}
return parent
class TestSessionHasCompressionInFlight:
def test_returns_false_without_session_store(self) -> None:
runner = _make_runner()
runner.session_store = None
assert runner._session_has_compression_in_flight("sk") is False
def test_returns_true_when_lock_held(self) -> None:
runner = _make_runner()
sk = build_session_key(_make_event().source)
runner._session_db._db.get_compression_lock_holder.return_value = "holder-1"
assert runner._session_has_compression_in_flight(sk) is True
def test_returns_false_when_lock_free(self) -> None:
runner = _make_runner()
sk = build_session_key(_make_event().source)
runner._session_db._db.get_compression_lock_holder.return_value = None
assert runner._session_has_compression_in_flight(sk) is False
class TestBusyHandlerDemotesInterruptForCompression:
@pytest.mark.asyncio
async def test_does_not_interrupt_when_compression_in_flight(self) -> None:
runner = _make_runner()
adapter = _make_adapter()
event = _make_event(text="follow up during compression")
sk = build_session_key(event.source)
parent = _make_parent_no_subagents()
runner._running_agents[sk] = parent
runner.adapters[event.source.platform] = adapter
runner._session_db._db.get_compression_lock_holder.return_value = "compressing"
handled = await runner._handle_active_session_busy_message(event, sk)
assert handled is True
parent.interrupt.assert_not_called()
assert adapter._pending_messages.get(sk) is event
@pytest.mark.asyncio
async def test_ack_explains_compression_demotion(self) -> None:
runner = _make_runner()
adapter = _make_adapter()
event = _make_event(text="hi mid-compress")
sk = build_session_key(event.source)
parent = _make_parent_no_subagents()
runner._running_agents[sk] = parent
runner._running_agents_ts[sk] = time.time() - 120
runner.adapters[event.source.platform] = adapter
runner._session_db._db.get_compression_lock_holder.return_value = "compressing"
with patch("gateway.run.merge_pending_message_event"):
await runner._handle_active_session_busy_message(event, sk)
adapter._send_with_retry.assert_called_once()
content = adapter._send_with_retry.call_args.kwargs.get("content", "")
assert "Compressing context" in content
assert "queued" in content.lower()
assert "/stop" in content
assert "Interrupting" not in content
@pytest.mark.asyncio
async def test_interrupt_still_fires_without_compression_lock(self) -> None:
runner = _make_runner()
adapter = _make_adapter()
event = _make_event(text="please stop")
sk = build_session_key(event.source)
parent = _make_parent_no_subagents()
runner._running_agents[sk] = parent
runner.adapters[event.source.platform] = adapter
runner._session_db._db.get_compression_lock_holder.return_value = None
with patch("gateway.run.merge_pending_message_event"):
await runner._handle_active_session_busy_message(event, sk)
parent.interrupt.assert_called_once_with("please stop")
@pytest.mark.asyncio
async def test_pending_sentinel_does_not_trigger_false_positive(self) -> None:
runner = _make_runner()
adapter = _make_adapter()
event = _make_event(text="hello")
sk = build_session_key(event.source)
runner._running_agents[sk] = _AGENT_PENDING_SENTINEL
runner.adapters[event.source.platform] = adapter
runner._session_db._db.get_compression_lock_holder.return_value = "compressing"
with patch("gateway.run.merge_pending_message_event"):
handled = await runner._handle_active_session_busy_message(event, sk)
assert handled is True