From e26393ffc21cdd315b59355687400e561753bcd0 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Wed, 8 Apr 2026 17:39:45 -0700 Subject: [PATCH] fix: Signal duplicate replies with streaming + per-platform tool_progress (#6348) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #4647 — Signal replies duplicated when gateway streaming is enabled. Root cause: stream_consumer.py did not handle the case where send() returns success=True but no message_id (Signal behavior). Every stream delta produced a separate send() call (7+ messages instead of 2), plus the gateway sent another full duplicate since already_sent was never set. Changes: - stream_consumer.py: Add elif branch for success-without-message_id — enters fallback mode (sets already_sent, disables editing, sends only continuation) - signal.py send(): Extract timestamp from signal-cli RPC result as message_id so stream consumer follows normal edit→fallback path - signal.py: Add public stop_typing() delegating to _stop_typing_indicator() so base adapter's _keep_typing finally block can clean up typing tasks - gateway/run.py: Per-platform tool_progress_overrides (#6164) — lets users set e.g. signal: off while keeping telegram: all - hermes_cli/config.py: Add tool_progress_overrides to DEFAULT_CONFIG Refs: #4647, #6164 --- gateway/platforms/signal.py | 11 ++++- gateway/run.py | 10 ++++- gateway/stream_consumer.py | 11 +++++ hermes_cli/config.py | 1 + tests/gateway/test_signal.py | 63 +++++++++++++++++++++++++++ tests/gateway/test_stream_consumer.py | 54 +++++++++++++++++++++++ 6 files changed, 148 insertions(+), 2 deletions(-) diff --git a/gateway/platforms/signal.py b/gateway/platforms/signal.py index 66d455cca..08b62f2a6 100644 --- a/gateway/platforms/signal.py +++ b/gateway/platforms/signal.py @@ -647,7 +647,11 @@ class SignalAdapter(BasePlatformAdapter): if result is not None: self._track_sent_timestamp(result) - return SendResult(success=True) + # Use the timestamp from the RPC result as a pseudo message_id. + # Signal doesn't have real message IDs, but the stream consumer + # needs a truthy value to follow its edit→fallback path correctly. + _msg_id = str(result.get("timestamp", "")) if isinstance(result, dict) else None + return SendResult(success=True, message_id=_msg_id or None) return SendResult(success=False, error="RPC send failed") def _track_sent_timestamp(self, rpc_result) -> None: @@ -837,6 +841,11 @@ class SignalAdapter(BasePlatformAdapter): except asyncio.CancelledError: pass + async def stop_typing(self, chat_id: str) -> None: + """Public interface for stopping typing — called by base adapter's + _keep_typing finally block to clean up platform-level typing tasks.""" + await self._stop_typing_indicator(chat_id) + # ------------------------------------------------------------------ # Chat Info # ------------------------------------------------------------------ diff --git a/gateway/run.py b/gateway/run.py index 7a551be16..e705597ef 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -6308,7 +6308,15 @@ class GatewayRunner: # Falls back to env vars for backward compatibility. # YAML 1.1 parses bare `off` as boolean False — normalise before # the `or` chain so it doesn't silently fall through to "all". - _raw_tp = user_config.get("display", {}).get("tool_progress") + # + # Per-platform overrides (display.tool_progress_overrides) take + # priority over the global setting — e.g. Signal users can set + # tool_progress to "off" while keeping Telegram on "all". + _display_cfg = user_config.get("display", {}) + _overrides = _display_cfg.get("tool_progress_overrides", {}) + _raw_tp = _overrides.get(platform_key) + if _raw_tp is None: + _raw_tp = _display_cfg.get("tool_progress") if _raw_tp is False: _raw_tp = "off" progress_mode = ( diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index 5522c631d..cc3d64d13 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -353,6 +353,17 @@ class GatewayStreamConsumer: self._message_id = result.message_id self._already_sent = True self._last_sent_text = text + elif result.success: + # Platform accepted the message but returned no message_id + # (e.g. Signal). Can't edit without an ID — switch to + # fallback mode: suppress intermediate deltas, send only + # the missing tail once the final response is ready. + self._already_sent = True + self._edit_supported = False + self._fallback_prefix = self._clean_for_display(text) + self._fallback_final_send = True + # Sentinel prevents re-entering this branch on every delta + self._message_id = "__no_edit__" else: # Initial send failed — disable streaming for this session self._edit_supported = False diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 7c860f159..0c39902ae 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -392,6 +392,7 @@ DEFAULT_CONFIG = { "show_cost": False, # Show $ cost in the status bar (off by default) "skin": "default", "tool_progress_command": False, # Enable /verbose command in messaging gateway + "tool_progress_overrides": {}, # Per-platform overrides: {"signal": "off", "telegram": "all"} "tool_preview_length": 0, # Max chars for tool call previews (0 = no limit, show full paths/commands) }, diff --git a/tests/gateway/test_signal.py b/tests/gateway/test_signal.py index b2830e1fc..ae985300d 100644 --- a/tests/gateway/test_signal.py +++ b/tests/gateway/test_signal.py @@ -707,3 +707,66 @@ class TestSignalSendDocumentViaHelper: assert result.success is False assert "/nonexistent.pdf" in result.error + + +# --------------------------------------------------------------------------- +# send() returns message_id from timestamp (#4647) +# --------------------------------------------------------------------------- + +class TestSignalSendReturnsMessageId: + """Signal send() must return a timestamp-based message_id so the stream + consumer can follow its edit→fallback path correctly.""" + + @pytest.mark.asyncio + async def test_send_returns_timestamp_as_message_id(self, monkeypatch): + adapter = _make_signal_adapter(monkeypatch) + mock_rpc, _ = _stub_rpc({"timestamp": 1712345678000}) + adapter._rpc = mock_rpc + adapter._stop_typing_indicator = AsyncMock() + + result = await adapter.send(chat_id="+155****4567", content="hello") + + assert result.success is True + assert result.message_id == "1712345678000" + + @pytest.mark.asyncio + async def test_send_returns_none_message_id_when_no_timestamp(self, monkeypatch): + adapter = _make_signal_adapter(monkeypatch) + mock_rpc, _ = _stub_rpc({}) # No timestamp key + adapter._rpc = mock_rpc + adapter._stop_typing_indicator = AsyncMock() + + result = await adapter.send(chat_id="+155****4567", content="hello") + + assert result.success is True + assert result.message_id is None + + @pytest.mark.asyncio + async def test_send_returns_none_message_id_for_non_dict(self, monkeypatch): + adapter = _make_signal_adapter(monkeypatch) + mock_rpc, _ = _stub_rpc("ok") # Non-dict result + adapter._rpc = mock_rpc + adapter._stop_typing_indicator = AsyncMock() + + result = await adapter.send(chat_id="+155****4567", content="hello") + + assert result.success is True + assert result.message_id is None + + +# --------------------------------------------------------------------------- +# stop_typing() delegates to _stop_typing_indicator (#4647) +# --------------------------------------------------------------------------- + +class TestSignalStopTyping: + """Signal must expose a public stop_typing() so base adapter's + _keep_typing finally block can clean up platform-level typing tasks.""" + + @pytest.mark.asyncio + async def test_stop_typing_calls_private_method(self, monkeypatch): + adapter = _make_signal_adapter(monkeypatch) + adapter._stop_typing_indicator = AsyncMock() + + await adapter.stop_typing("+155****4567") + + adapter._stop_typing_indicator.assert_awaited_once_with("+155****4567") diff --git a/tests/gateway/test_stream_consumer.py b/tests/gateway/test_stream_consumer.py index ddc88fc2f..d5a20331b 100644 --- a/tests/gateway/test_stream_consumer.py +++ b/tests/gateway/test_stream_consumer.py @@ -383,6 +383,60 @@ class TestSegmentBreakOnToolBoundary: sent_texts = [call[1]["content"] for call in adapter.send.call_args_list] assert sent_texts == ["Hello ▉", "Next segment"] + @pytest.mark.asyncio + async def test_no_message_id_enters_fallback_mode(self): + """Platform returns success but no message_id (Signal) — must not + re-send on every delta. Should enter fallback mode and send only + the continuation at finish.""" + adapter = MagicMock() + # First send succeeds but returns no message_id (Signal behavior) + send_result_no_id = SimpleNamespace(success=True, message_id=None) + # Fallback final send succeeds + send_result_final = SimpleNamespace(success=True, message_id="msg_final") + adapter.send = AsyncMock(side_effect=[send_result_no_id, send_result_final]) + adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) + adapter.MAX_MESSAGE_LENGTH = 4096 + + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) + consumer = GatewayStreamConsumer(adapter, "chat_123", config) + + consumer.on_delta("Hello") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.08) + consumer.on_delta(" world, this is a longer response.") + await asyncio.sleep(0.08) + consumer.finish() + await task + + # Should send exactly 2 messages: initial chunk + fallback continuation + # NOT one message per delta + assert adapter.send.call_count == 2 + assert consumer.already_sent + # edit_message should NOT have been called (no valid message_id to edit) + adapter.edit_message.assert_not_called() + + @pytest.mark.asyncio + async def test_no_message_id_single_delta_marks_already_sent(self): + """When the entire response fits in one delta and platform returns no + message_id, already_sent must still be True to prevent the gateway + from re-sending the full response.""" + adapter = MagicMock() + send_result = SimpleNamespace(success=True, message_id=None) + adapter.send = AsyncMock(return_value=send_result) + adapter.MAX_MESSAGE_LENGTH = 4096 + + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) + consumer = GatewayStreamConsumer(adapter, "chat_123", config) + + consumer.on_delta("Short response.") + consumer.finish() + + await consumer.run() + + assert consumer.already_sent + # Only one send call (the initial message) + assert adapter.send.call_count == 1 + @pytest.mark.asyncio async def test_fallback_final_splits_long_continuation_without_dropping_text(self): """Long continuation tails should be chunked when fallback final-send runs."""