fix(gateway): suppress NO_REPLY/[SILENT] markers on the streaming path

The agent emits a bare control marker (NO_REPLY / [SILENT] / …) when it
intentionally chooses not to reply.  The gateway's whole-response filter
(is_intentional_silence_agent_result) suppresses this on the non-streaming
delivery path, but the streaming path (GatewayStreamConsumer) had no silence
awareness: it edited the raw marker onto the screen delta-by-delta and
finalized it BEFORE the whole-response filter could run.  On any
streaming-capable adapter (Slack, Telegram, Discord, …) users saw a literal
'NO_REPLY' message leak into chat.

Fix (contained in the stream consumer + a shared predicate; no new config,
no platform-specific code):

- gateway/response_filters.py: add is_partial_silence_marker() — the
  streaming counterpart to is_intentional_silence_response(), sharing the
  same marker set and canonicalization so the two never drift.
- gateway/stream_consumer.py:
  - Mid-stream hold-back: defer edits while the accumulated buffer is still a
    prefix of a silence marker, so a partial marker never flashes on an
    interval tick.
  - On stream end (got_done): if the final buffer is exactly a marker, retract
    any preview already shown (best-effort delete_message, reusing the
    _try_fresh_final cleanup path) and leave the delivery flags False so the
    gateway's own filter turns the marker into '' and no fallback send fires.

Substantive prose that merely mentions a marker is still delivered normally.

Tests: tests/gateway/test_stream_consumer_silence.py — predicate truth table
+ end-to-end run() suppression (single-shot + token-by-token), preview
retraction, no-delete-support best-effort, [SILENT] parity, and
prose-passthrough. Prove-fail verified by reverting only the consumer change
(the 4 behavioral tests fail: 'NO_REPLY'/'[SILENT]' leaks).
This commit is contained in:
Ben 2026-07-01 13:45:38 +10:00 committed by Teknium
parent 3bdb23de10
commit 5f7deeba84
3 changed files with 347 additions and 0 deletions

View file

@ -51,3 +51,30 @@ def is_intentional_silence_agent_result(agent_result: dict | None, response: Any
if agent_result.get("failed"):
return False
return is_intentional_silence_response(response)
def is_partial_silence_marker(text: Any) -> bool:
"""Return True while ``text`` could still resolve to a silence marker.
The streaming path accumulates the reply delta-by-delta and must decide,
before the whole response is known, whether to show what it has so far.
A buffer whose canonical form is a non-empty *prefix* of a silence marker
(e.g. ``"NO"`` on the way to ``"NO_REPLY"``, or an exact marker that has
not yet been terminated by stream-end) is held back so a raw marker is
never edited onto the screen and then belatedly retracted.
Anything that has already diverged from every marker (ordinary prose)
and anything longer than the marker cap returns False so normal
streaming resumes immediately. This is the streaming counterpart to
:func:`is_intentional_silence_response`, sharing the same marker set and
canonicalization so the two never drift.
"""
if not isinstance(text, str):
return False
stripped = text.strip()
if not stripped or len(stripped) > 64:
return False
candidate = _canonical_silence_candidate(stripped)
if not candidate:
return False
return any(marker.startswith(candidate) for marker in LIVE_GATEWAY_SILENT_MARKERS)

View file

@ -32,6 +32,10 @@ from gateway.config import (
DEFAULT_STREAMING_BUFFER_THRESHOLD as _DEFAULT_STREAMING_BUFFER_THRESHOLD,
DEFAULT_STREAMING_CURSOR as _DEFAULT_STREAMING_CURSOR,
)
from gateway.response_filters import (
is_intentional_silence_response as _is_intentional_silence_response,
is_partial_silence_marker as _is_partial_silence_marker,
)
logger = logging.getLogger("gateway.stream_consumer")
@ -542,6 +546,22 @@ class GatewayStreamConsumer:
if got_done:
self._flush_think_buffer()
# Intentional-silence suppression. When the agent chose
# not to reply it emits a bare control marker (NO_REPLY /
# [SILENT] / …). The gateway's whole-response filter
# (gateway/run.py) suppresses this on the non-streaming
# path, but by the time it runs the stream consumer has
# already edited the raw marker onto the screen. Detect
# the exact-marker final buffer here and retract any
# preview instead of finalizing it, so the marker never
# reaches the chat. Substantive prose that merely mentions
# a marker is NOT suppressed (see is_intentional_silence_response).
if _is_intentional_silence_response(
self._clean_for_display(self._accumulated)
):
await self._suppress_silence_marker()
return
# Decide whether to flush an edit
now = time.monotonic()
elapsed = now - self._last_edit_time
@ -562,6 +582,24 @@ class GatewayStreamConsumer:
)
current_update_visible = False
# Hold back mid-stream edits while the buffer so far could
# still resolve to an intentional-silence marker. Without
# this, a partial marker (e.g. "NO_REPLY" streamed as
# "NO"→"NO_REPLY") would flash onto the screen on an interval
# tick before got_done can suppress it. Only defers display —
# got_done above always resolves the buffer (suppress if it's
# an exact marker, otherwise fall through and flush normally),
# so genuine prose that merely starts marker-like is never lost.
if (
should_edit
and not got_done
and not got_segment_break
and commentary_text is None
and _is_partial_silence_marker(
self._clean_for_display(self._accumulated)
)
):
should_edit = False
if should_edit and self._accumulated:
# Split overflow: if accumulated text exceeds the platform
# limit, split into properly sized chunks.
@ -1359,6 +1397,49 @@ class GatewayStreamConsumer:
self._final_response_sent = True
return True
async def _suppress_silence_marker(self) -> None:
"""Retract any streamed preview when the final reply is a silence marker.
The agent chose not to respond and emitted a bare control marker. Any
preview message the consumer already put on screen (a partial marker
flushed on an interval tick, or a preamble before a tool boundary) must
be removed so the raw marker is never left visible. Deletion reuses the
same best-effort ``delete_message`` path as :meth:`_try_fresh_final`.
Crucially, the delivery flags (``_final_response_sent`` /
``_final_content_delivered``) are left **False**: nothing was delivered.
The gateway then does not mistake the marker for a delivered reply, and
its own whole-response filter turns the marker into "" so no fallback
send happens either. ``_already_sent`` is likewise cleared so the
gateway's ``already_sent`` short-circuits do not fire.
"""
stale_ids = set(self._preview_message_ids)
if self._message_id and self._message_id != "__no_edit__":
stale_ids.add(self._message_id)
delete_fn = getattr(self.adapter, "delete_message", None)
if delete_fn is not None:
for stale_id in stale_ids:
if not stale_id or stale_id == "__no_edit__":
continue
try:
await delete_fn(self.chat_id, stale_id)
except Exception as e:
logger.debug(
"Silence-marker preview cleanup failed (%s): %s",
stale_id, e,
)
self._preview_message_ids = set()
self._message_id = None
self._accumulated = ""
self._last_sent_text = ""
self._already_sent = False
self._final_response_sent = False
self._final_content_delivered = False
logger.info(
"Suppressed streamed intentional-silence marker (chat=%s)",
self.chat_id,
)
async def _send_or_edit(
self, text: str, *, finalize: bool = False, is_turn_final: bool = True,
) -> bool:

View file

@ -0,0 +1,239 @@
"""Streaming intentional-silence suppression.
When the agent chooses not to reply it emits a bare control marker
(``NO_REPLY`` / ``[SILENT]`` / ). The gateway's whole-response filter
(``gateway/response_filters.is_intentional_silence_agent_result``) suppresses
this on the non-streaming delivery path, but the *streaming* path
(``GatewayStreamConsumer``) previously had no silence awareness: it edited the
raw marker onto the screen delta-by-delta and finalized it *before* the
whole-response filter could run. On any streaming-capable adapter (Slack,
Telegram, Discord, ) users saw a literal ``NO_REPLY`` bubble.
These tests pin the two halves of the fix:
* ``is_partial_silence_marker`` the mid-stream hold-back predicate.
* ``GatewayStreamConsumer`` an exact-marker final buffer is suppressed and
any already-shown preview is retracted, while substantive prose that merely
mentions a marker is delivered normally.
"""
from __future__ import annotations
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
import pytest
from gateway.response_filters import (
is_intentional_silence_response,
is_partial_silence_marker,
)
from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig
# --------------------------------------------------------------------------
# is_partial_silence_marker — mid-stream hold-back predicate
# --------------------------------------------------------------------------
# Buffers that could still resolve to a marker → held back while streaming.
PARTIAL_POSITIVE = [
"N",
"NO",
"NO_",
"NO_REP",
"NO_REPLY", # exact marker, not yet terminated by stream-end
"NO REPLY",
"no reply", # canonicalized (case/space-insensitive)
" no_reply ", # surrounding whitespace stripped
"[",
"[SIL",
"[SILENT]",
"SILENT",
"sil",
]
# Buffers that have already diverged from every marker → stream normally.
PARTIAL_NEGATIVE = [
"",
" ",
"No reply needed — here is the plan", # diverged past the marker
"NO_REPLYING", # superset, not a prefix
"Nope",
"Hello there",
"The NO_REPLY token means silence", # marker mentioned mid-prose
"x" * 65, # over the 64-char cap
"silence is golden", # 'SILENCE...' is not a marker prefix
]
@pytest.mark.parametrize("text", PARTIAL_POSITIVE)
def test_partial_silence_marker_positive(text):
assert is_partial_silence_marker(text) is True
@pytest.mark.parametrize("text", PARTIAL_NEGATIVE)
def test_partial_silence_marker_negative(text):
assert is_partial_silence_marker(text) is False
def test_partial_silence_marker_none_safe():
assert is_partial_silence_marker(None) is False
def test_partial_predicate_agrees_with_exact_on_full_markers():
"""Every exact silence marker is also a (trivial) partial of itself."""
from gateway.response_filters import LIVE_GATEWAY_SILENT_MARKERS
for marker in LIVE_GATEWAY_SILENT_MARKERS:
assert is_partial_silence_marker(marker) is True
assert is_intentional_silence_response(marker) is True
# --------------------------------------------------------------------------
# GatewayStreamConsumer — end-to-end suppression through run()
# --------------------------------------------------------------------------
def _make_adapter(*, supports_delete: bool = True) -> MagicMock:
"""Minimal MagicMock adapter wired for send/edit/delete."""
adapter = MagicMock()
adapter.REQUIRES_EDIT_FINALIZE = False
adapter.MAX_MESSAGE_LENGTH = 4096
adapter.send = AsyncMock(return_value=SimpleNamespace(
success=True, message_id="preview_1",
))
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(
success=True, message_id="preview_1",
))
if supports_delete:
adapter.delete_message = AsyncMock(return_value=True)
else:
del adapter.delete_message # type: ignore[attr-defined]
return adapter
def _sent_and_edited(adapter):
texts = []
for call in adapter.send.call_args_list:
texts.append(call.kwargs.get("content", ""))
if getattr(adapter, "edit_message", None) is not None:
for call in adapter.edit_message.call_args_list:
texts.append(call.kwargs.get("content", ""))
return texts
class TestStreamedSilenceSuppression:
@pytest.mark.asyncio
async def test_no_reply_only_stream_is_fully_suppressed(self):
"""A stream whose entire content is NO_REPLY sends nothing visible."""
adapter = _make_adapter()
consumer = GatewayStreamConsumer(
adapter, "chat_1",
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1),
)
consumer.on_delta("NO_REPLY")
consumer.finish()
await consumer.run()
# No marker text ever reached the platform.
for text in _sent_and_edited(adapter):
assert "NO_REPLY" not in text, f"marker leaked: {text!r}"
# Delivery flags stay False so the gateway does not treat the marker
# as a delivered reply (its whole-response filter then drops it too).
assert consumer.final_response_sent is False
assert consumer.final_content_delivered is False
assert consumer.already_sent is False
@pytest.mark.asyncio
async def test_partial_marker_preview_is_retracted(self):
"""A marker flushed mid-stream as a preview is deleted on completion."""
adapter = _make_adapter()
consumer = GatewayStreamConsumer(
adapter, "chat_1",
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1),
)
# Force a mid-stream preview: pretend "NO_REPLY" was already put on
# screen (the pre-fix behaviour) before got_done runs.
consumer._message_id = "preview_1"
consumer._preview_message_ids = {"preview_1"}
consumer._already_sent = True
consumer.on_delta("NO_REPLY")
consumer.finish()
await consumer.run()
# The stale preview was best-effort deleted.
adapter.delete_message.assert_awaited_once_with("chat_1", "preview_1")
assert consumer.final_content_delivered is False
assert consumer.already_sent is False
@pytest.mark.asyncio
async def test_suppression_without_delete_support_is_best_effort(self):
"""Adapter lacking delete_message still suppresses (leaves no new send)."""
adapter = _make_adapter(supports_delete=False)
consumer = GatewayStreamConsumer(
adapter, "chat_1",
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1),
)
consumer.on_delta("NO_REPLY")
consumer.finish()
await consumer.run()
for text in _sent_and_edited(adapter):
assert "NO_REPLY" not in text
assert consumer.final_content_delivered is False
@pytest.mark.asyncio
async def test_bracket_silent_marker_suppressed(self):
"""The [SILENT] marker is suppressed just like NO_REPLY."""
adapter = _make_adapter()
consumer = GatewayStreamConsumer(
adapter, "chat_1",
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1),
)
consumer.on_delta("[SILENT]")
consumer.finish()
await consumer.run()
for text in _sent_and_edited(adapter):
assert "[SILENT]" not in text
assert consumer.final_content_delivered is False
@pytest.mark.asyncio
async def test_prose_mentioning_marker_is_delivered(self):
"""Substantive prose that merely mentions NO_REPLY is NOT suppressed."""
adapter = _make_adapter()
consumer = GatewayStreamConsumer(
adapter, "chat_1",
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5),
)
body = "The NO_REPLY token tells the gateway to stay silent."
consumer.on_delta(body)
consumer.finish()
await consumer.run()
delivered = "".join(_sent_and_edited(adapter))
assert "NO_REPLY" in delivered
assert consumer.final_content_delivered is True
@pytest.mark.asyncio
async def test_marker_prefix_then_prose_is_delivered(self):
"""A reply that starts marker-like but continues is delivered whole.
"NO REPLY needed …" passes through the mid-stream hold-back while the
buffer is still a marker prefix, then flushes normally once it diverges.
The final text is NOT an exact marker, so got_done does not suppress it.
"""
adapter = _make_adapter()
consumer = GatewayStreamConsumer(
adapter, "chat_1",
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1),
)
consumer.on_delta("NO REPLY")
consumer.on_delta(" needed — the build is already green.")
consumer.finish()
await consumer.run()
delivered = "".join(_sent_and_edited(adapter))
assert "the build is already green" in delivered
assert consumer.final_content_delivered is True