From 5f7deeba84a0120ac94a519bb37acd19091fd5f0 Mon Sep 17 00:00:00 2001 From: Ben Date: Wed, 1 Jul 2026 13:45:38 +1000 Subject: [PATCH] fix(gateway): suppress NO_REPLY/[SILENT] markers on the streaming path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The agent emits a bare control marker (NO_REPLY / [SILENT] / …) when it intentionally chooses not to reply. The gateway's whole-response filter (is_intentional_silence_agent_result) suppresses this on the non-streaming delivery path, but the streaming path (GatewayStreamConsumer) had no silence awareness: it edited the raw marker onto the screen delta-by-delta and finalized it BEFORE the whole-response filter could run. On any streaming-capable adapter (Slack, Telegram, Discord, …) users saw a literal 'NO_REPLY' message leak into chat. Fix (contained in the stream consumer + a shared predicate; no new config, no platform-specific code): - gateway/response_filters.py: add is_partial_silence_marker() — the streaming counterpart to is_intentional_silence_response(), sharing the same marker set and canonicalization so the two never drift. - gateway/stream_consumer.py: - Mid-stream hold-back: defer edits while the accumulated buffer is still a prefix of a silence marker, so a partial marker never flashes on an interval tick. - On stream end (got_done): if the final buffer is exactly a marker, retract any preview already shown (best-effort delete_message, reusing the _try_fresh_final cleanup path) and leave the delivery flags False so the gateway's own filter turns the marker into '' and no fallback send fires. Substantive prose that merely mentions a marker is still delivered normally. Tests: tests/gateway/test_stream_consumer_silence.py — predicate truth table + end-to-end run() suppression (single-shot + token-by-token), preview retraction, no-delete-support best-effort, [SILENT] parity, and prose-passthrough. Prove-fail verified by reverting only the consumer change (the 4 behavioral tests fail: 'NO_REPLY'/'[SILENT]' leaks). --- gateway/response_filters.py | 27 ++ gateway/stream_consumer.py | 81 ++++++ tests/gateway/test_stream_consumer_silence.py | 239 ++++++++++++++++++ 3 files changed, 347 insertions(+) create mode 100644 tests/gateway/test_stream_consumer_silence.py diff --git a/gateway/response_filters.py b/gateway/response_filters.py index cc4b5c4f5..a5e09d309 100644 --- a/gateway/response_filters.py +++ b/gateway/response_filters.py @@ -51,3 +51,30 @@ def is_intentional_silence_agent_result(agent_result: dict | None, response: Any if agent_result.get("failed"): return False return is_intentional_silence_response(response) + + +def is_partial_silence_marker(text: Any) -> bool: + """Return True while ``text`` could still resolve to a silence marker. + + The streaming path accumulates the reply delta-by-delta and must decide, + before the whole response is known, whether to show what it has so far. + A buffer whose canonical form is a non-empty *prefix* of a silence marker + (e.g. ``"NO"`` on the way to ``"NO_REPLY"``, or an exact marker that has + not yet been terminated by stream-end) is held back so a raw marker is + never edited onto the screen and then belatedly retracted. + + Anything that has already diverged from every marker (ordinary prose) — + and anything longer than the marker cap — returns False so normal + streaming resumes immediately. This is the streaming counterpart to + :func:`is_intentional_silence_response`, sharing the same marker set and + canonicalization so the two never drift. + """ + if not isinstance(text, str): + return False + stripped = text.strip() + if not stripped or len(stripped) > 64: + return False + candidate = _canonical_silence_candidate(stripped) + if not candidate: + return False + return any(marker.startswith(candidate) for marker in LIVE_GATEWAY_SILENT_MARKERS) diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index 9c6d12808..66084e2d4 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -32,6 +32,10 @@ from gateway.config import ( DEFAULT_STREAMING_BUFFER_THRESHOLD as _DEFAULT_STREAMING_BUFFER_THRESHOLD, DEFAULT_STREAMING_CURSOR as _DEFAULT_STREAMING_CURSOR, ) +from gateway.response_filters import ( + is_intentional_silence_response as _is_intentional_silence_response, + is_partial_silence_marker as _is_partial_silence_marker, +) logger = logging.getLogger("gateway.stream_consumer") @@ -542,6 +546,22 @@ class GatewayStreamConsumer: if got_done: self._flush_think_buffer() + # Intentional-silence suppression. When the agent chose + # not to reply it emits a bare control marker (NO_REPLY / + # [SILENT] / …). The gateway's whole-response filter + # (gateway/run.py) suppresses this on the non-streaming + # path, but by the time it runs the stream consumer has + # already edited the raw marker onto the screen. Detect + # the exact-marker final buffer here and retract any + # preview instead of finalizing it, so the marker never + # reaches the chat. Substantive prose that merely mentions + # a marker is NOT suppressed (see is_intentional_silence_response). + if _is_intentional_silence_response( + self._clean_for_display(self._accumulated) + ): + await self._suppress_silence_marker() + return + # Decide whether to flush an edit now = time.monotonic() elapsed = now - self._last_edit_time @@ -562,6 +582,24 @@ class GatewayStreamConsumer: ) current_update_visible = False + # Hold back mid-stream edits while the buffer so far could + # still resolve to an intentional-silence marker. Without + # this, a partial marker (e.g. "NO_REPLY" streamed as + # "NO"→"NO_REPLY") would flash onto the screen on an interval + # tick before got_done can suppress it. Only defers display — + # got_done above always resolves the buffer (suppress if it's + # an exact marker, otherwise fall through and flush normally), + # so genuine prose that merely starts marker-like is never lost. + if ( + should_edit + and not got_done + and not got_segment_break + and commentary_text is None + and _is_partial_silence_marker( + self._clean_for_display(self._accumulated) + ) + ): + should_edit = False if should_edit and self._accumulated: # Split overflow: if accumulated text exceeds the platform # limit, split into properly sized chunks. @@ -1359,6 +1397,49 @@ class GatewayStreamConsumer: self._final_response_sent = True return True + async def _suppress_silence_marker(self) -> None: + """Retract any streamed preview when the final reply is a silence marker. + + The agent chose not to respond and emitted a bare control marker. Any + preview message the consumer already put on screen (a partial marker + flushed on an interval tick, or a preamble before a tool boundary) must + be removed so the raw marker is never left visible. Deletion reuses the + same best-effort ``delete_message`` path as :meth:`_try_fresh_final`. + + Crucially, the delivery flags (``_final_response_sent`` / + ``_final_content_delivered``) are left **False**: nothing was delivered. + The gateway then does not mistake the marker for a delivered reply, and + its own whole-response filter turns the marker into "" so no fallback + send happens either. ``_already_sent`` is likewise cleared so the + gateway's ``already_sent`` short-circuits do not fire. + """ + stale_ids = set(self._preview_message_ids) + if self._message_id and self._message_id != "__no_edit__": + stale_ids.add(self._message_id) + delete_fn = getattr(self.adapter, "delete_message", None) + if delete_fn is not None: + for stale_id in stale_ids: + if not stale_id or stale_id == "__no_edit__": + continue + try: + await delete_fn(self.chat_id, stale_id) + except Exception as e: + logger.debug( + "Silence-marker preview cleanup failed (%s): %s", + stale_id, e, + ) + self._preview_message_ids = set() + self._message_id = None + self._accumulated = "" + self._last_sent_text = "" + self._already_sent = False + self._final_response_sent = False + self._final_content_delivered = False + logger.info( + "Suppressed streamed intentional-silence marker (chat=%s)", + self.chat_id, + ) + async def _send_or_edit( self, text: str, *, finalize: bool = False, is_turn_final: bool = True, ) -> bool: diff --git a/tests/gateway/test_stream_consumer_silence.py b/tests/gateway/test_stream_consumer_silence.py new file mode 100644 index 000000000..fc6dcf67e --- /dev/null +++ b/tests/gateway/test_stream_consumer_silence.py @@ -0,0 +1,239 @@ +"""Streaming intentional-silence suppression. + +When the agent chooses not to reply it emits a bare control marker +(``NO_REPLY`` / ``[SILENT]`` / …). The gateway's whole-response filter +(``gateway/response_filters.is_intentional_silence_agent_result``) suppresses +this on the non-streaming delivery path, but the *streaming* path +(``GatewayStreamConsumer``) previously had no silence awareness: it edited the +raw marker onto the screen delta-by-delta and finalized it *before* the +whole-response filter could run. On any streaming-capable adapter (Slack, +Telegram, Discord, …) users saw a literal ``NO_REPLY`` bubble. + +These tests pin the two halves of the fix: + +* ``is_partial_silence_marker`` — the mid-stream hold-back predicate. +* ``GatewayStreamConsumer`` — an exact-marker final buffer is suppressed and + any already-shown preview is retracted, while substantive prose that merely + mentions a marker is delivered normally. +""" + +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from gateway.response_filters import ( + is_intentional_silence_response, + is_partial_silence_marker, +) +from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig + + +# -------------------------------------------------------------------------- +# is_partial_silence_marker — mid-stream hold-back predicate +# -------------------------------------------------------------------------- + +# Buffers that could still resolve to a marker → held back while streaming. +PARTIAL_POSITIVE = [ + "N", + "NO", + "NO_", + "NO_REP", + "NO_REPLY", # exact marker, not yet terminated by stream-end + "NO REPLY", + "no reply", # canonicalized (case/space-insensitive) + " no_reply ", # surrounding whitespace stripped + "[", + "[SIL", + "[SILENT]", + "SILENT", + "sil", +] + +# Buffers that have already diverged from every marker → stream normally. +PARTIAL_NEGATIVE = [ + "", + " ", + "No reply needed — here is the plan", # diverged past the marker + "NO_REPLYING", # superset, not a prefix + "Nope", + "Hello there", + "The NO_REPLY token means silence", # marker mentioned mid-prose + "x" * 65, # over the 64-char cap + "silence is golden", # 'SILENCE...' is not a marker prefix +] + + +@pytest.mark.parametrize("text", PARTIAL_POSITIVE) +def test_partial_silence_marker_positive(text): + assert is_partial_silence_marker(text) is True + + +@pytest.mark.parametrize("text", PARTIAL_NEGATIVE) +def test_partial_silence_marker_negative(text): + assert is_partial_silence_marker(text) is False + + +def test_partial_silence_marker_none_safe(): + assert is_partial_silence_marker(None) is False + + +def test_partial_predicate_agrees_with_exact_on_full_markers(): + """Every exact silence marker is also a (trivial) partial of itself.""" + from gateway.response_filters import LIVE_GATEWAY_SILENT_MARKERS + + for marker in LIVE_GATEWAY_SILENT_MARKERS: + assert is_partial_silence_marker(marker) is True + assert is_intentional_silence_response(marker) is True + + +# -------------------------------------------------------------------------- +# GatewayStreamConsumer — end-to-end suppression through run() +# -------------------------------------------------------------------------- + +def _make_adapter(*, supports_delete: bool = True) -> MagicMock: + """Minimal MagicMock adapter wired for send/edit/delete.""" + adapter = MagicMock() + adapter.REQUIRES_EDIT_FINALIZE = False + adapter.MAX_MESSAGE_LENGTH = 4096 + adapter.send = AsyncMock(return_value=SimpleNamespace( + success=True, message_id="preview_1", + )) + adapter.edit_message = AsyncMock(return_value=SimpleNamespace( + success=True, message_id="preview_1", + )) + if supports_delete: + adapter.delete_message = AsyncMock(return_value=True) + else: + del adapter.delete_message # type: ignore[attr-defined] + return adapter + + +def _sent_and_edited(adapter): + texts = [] + for call in adapter.send.call_args_list: + texts.append(call.kwargs.get("content", "")) + if getattr(adapter, "edit_message", None) is not None: + for call in adapter.edit_message.call_args_list: + texts.append(call.kwargs.get("content", "")) + return texts + + +class TestStreamedSilenceSuppression: + @pytest.mark.asyncio + async def test_no_reply_only_stream_is_fully_suppressed(self): + """A stream whose entire content is NO_REPLY sends nothing visible.""" + adapter = _make_adapter() + consumer = GatewayStreamConsumer( + adapter, "chat_1", + StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1), + ) + consumer.on_delta("NO_REPLY") + consumer.finish() + await consumer.run() + + # No marker text ever reached the platform. + for text in _sent_and_edited(adapter): + assert "NO_REPLY" not in text, f"marker leaked: {text!r}" + + # Delivery flags stay False so the gateway does not treat the marker + # as a delivered reply (its whole-response filter then drops it too). + assert consumer.final_response_sent is False + assert consumer.final_content_delivered is False + assert consumer.already_sent is False + + @pytest.mark.asyncio + async def test_partial_marker_preview_is_retracted(self): + """A marker flushed mid-stream as a preview is deleted on completion.""" + adapter = _make_adapter() + consumer = GatewayStreamConsumer( + adapter, "chat_1", + StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1), + ) + # Force a mid-stream preview: pretend "NO_REPLY" was already put on + # screen (the pre-fix behaviour) before got_done runs. + consumer._message_id = "preview_1" + consumer._preview_message_ids = {"preview_1"} + consumer._already_sent = True + + consumer.on_delta("NO_REPLY") + consumer.finish() + await consumer.run() + + # The stale preview was best-effort deleted. + adapter.delete_message.assert_awaited_once_with("chat_1", "preview_1") + assert consumer.final_content_delivered is False + assert consumer.already_sent is False + + @pytest.mark.asyncio + async def test_suppression_without_delete_support_is_best_effort(self): + """Adapter lacking delete_message still suppresses (leaves no new send).""" + adapter = _make_adapter(supports_delete=False) + consumer = GatewayStreamConsumer( + adapter, "chat_1", + StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1), + ) + consumer.on_delta("NO_REPLY") + consumer.finish() + await consumer.run() + + for text in _sent_and_edited(adapter): + assert "NO_REPLY" not in text + assert consumer.final_content_delivered is False + + @pytest.mark.asyncio + async def test_bracket_silent_marker_suppressed(self): + """The [SILENT] marker is suppressed just like NO_REPLY.""" + adapter = _make_adapter() + consumer = GatewayStreamConsumer( + adapter, "chat_1", + StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1), + ) + consumer.on_delta("[SILENT]") + consumer.finish() + await consumer.run() + + for text in _sent_and_edited(adapter): + assert "[SILENT]" not in text + assert consumer.final_content_delivered is False + + @pytest.mark.asyncio + async def test_prose_mentioning_marker_is_delivered(self): + """Substantive prose that merely mentions NO_REPLY is NOT suppressed.""" + adapter = _make_adapter() + consumer = GatewayStreamConsumer( + adapter, "chat_1", + StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5), + ) + body = "The NO_REPLY token tells the gateway to stay silent." + consumer.on_delta(body) + consumer.finish() + await consumer.run() + + delivered = "".join(_sent_and_edited(adapter)) + assert "NO_REPLY" in delivered + assert consumer.final_content_delivered is True + + @pytest.mark.asyncio + async def test_marker_prefix_then_prose_is_delivered(self): + """A reply that starts marker-like but continues is delivered whole. + + "NO REPLY needed …" passes through the mid-stream hold-back while the + buffer is still a marker prefix, then flushes normally once it diverges. + The final text is NOT an exact marker, so got_done does not suppress it. + """ + adapter = _make_adapter() + consumer = GatewayStreamConsumer( + adapter, "chat_1", + StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1), + ) + consumer.on_delta("NO REPLY") + consumer.on_delta(" needed — the build is already green.") + consumer.finish() + await consumer.run() + + delivered = "".join(_sent_and_edited(adapter)) + assert "the build is already green" in delivered + assert consumer.final_content_delivered is True