From e55e9fad2c2e2de6181e12c65d3a12baf718d759 Mon Sep 17 00:00:00 2001 From: PRATHAMESH75 Date: Tue, 30 Jun 2026 23:02:35 +0530 Subject: [PATCH] fix(telegram): recover when polling updater stops while process stays alive MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The polling heartbeat's pending-update probe treated a stopped updater (running=False) as "someone else's job" and silently reset its counter, so a long-poll task that disappears with no reconnect in flight was never recovered. get_me() on the general request path stays healthy, so neither PTB's error_callback nor the connectivity probe ever fires — the gateway keeps running but stops receiving messages indefinitely (#55769). Detect the stopped-updater case directly in _probe_pending_updates and feed it into the existing _handle_polling_network_error ladder, debounced over two consecutive probes so a just-starting updater or the brief stop()->start_polling() window of an in-flight reconnect never trips it. --- plugins/platforms/telegram/adapter.py | 60 +++++++++++++++++-- .../test_telegram_pending_update_probe.py | 58 +++++++++++++++++- 2 files changed, 109 insertions(+), 9 deletions(-) diff --git a/plugins/platforms/telegram/adapter.py b/plugins/platforms/telegram/adapter.py index ae7116040..e4d0aba2d 100644 --- a/plugins/platforms/telegram/adapter.py +++ b/plugins/platforms/telegram/adapter.py @@ -423,6 +423,13 @@ class TelegramAdapter(BasePlatformAdapter): # also probes get_webhook_info().pending_update_count and escalates to # recovery after two consecutive stuck probes (#42909). self._polling_pending_stuck_count: int = 0 + # Consecutive heartbeat probes that found the updater stopped entirely + # (running=False) while we are in polling mode with no reconnect in + # flight. Distinct from the wedged-but-running case above: the long-poll + # task is simply gone, so neither the connectivity probe nor PTB's + # error_callback ever fires and the gateway silently stops receiving + # messages with the process still alive (#55769). + self._polling_not_running_count: int = 0 # After sustained reconnect storms the PTB httpx pool can return # SendResult(success=True) for sends that never actually transmit. # _handle_polling_network_error sets this; _verify_polling_after_reconnect @@ -1879,21 +1886,62 @@ class TelegramAdapter(BasePlatformAdapter): not trip a needless recovery. Recovery reuses ``_handle_polling_network_error`` — the same ladder PTB's own ``error_callback`` feeds — so no new restart machinery is introduced. + + This also covers the harsher case where the updater has stopped + entirely (``running=False``) with no reconnect in flight: the long-poll + task is gone rather than wedged, so even ``get_webhook_info`` can't + report a queue against a live consumer. We detect the stopped updater + directly and feed the same ladder (#55769). """ - # Only meaningful in polling mode with a running updater; in webhook - # mode Telegram pushes updates and holds no server-side queue. + # Only meaningful in polling mode; in webhook mode Telegram pushes + # updates and holds no server-side queue. if self._webhook_mode: return + # A reconnect already in flight owns recovery — don't double-trigger, + # and don't misread its brief stop()->start_polling() window (where + # updater.running is transiently False) as a dead updater below. + if self._polling_error_task and not self._polling_error_task.done(): + self._polling_not_running_count = 0 + return updater = getattr(self._app, "updater", None) if self._app else None - if updater is None or not getattr(updater, "running", False): + if updater is None: self._polling_pending_stuck_count = 0 return + if not getattr(updater, "running", False): + # We are in polling mode with no reconnect in flight, yet PTB's + # updater has stopped entirely. This is distinct from the + # wedged-but-running consumer handled below: the long-poll task is + # gone, get_me()/get_webhook_info() on the general request path + # still succeed, so no error_callback or connectivity probe ever + # fires and the gateway silently stops receiving messages while the + # process stays alive (#55769). Escalate through the same reconnect + # ladder as a wedged consumer, debounced over two consecutive probes + # so a just-starting updater never trips it. + self._polling_pending_stuck_count = 0 + self._polling_not_running_count += 1 + logger.warning( + "[%s] Telegram polling heartbeat: updater stopped while in " + "polling mode (stuck probe %d/2)", + self.name, self._polling_not_running_count, + ) + if self._polling_not_running_count >= 2: + self._polling_not_running_count = 0 + logger.warning( + "[%s] Telegram updater is not running (long-poll task " + "gone); triggering polling restart", + self.name, + ) + loop = asyncio.get_running_loop() + self._polling_error_task = loop.create_task( + self._handle_polling_network_error( + RuntimeError("Telegram updater stopped while in polling mode") + ) + ) + return + self._polling_not_running_count = 0 get_webhook_info = getattr(bot, "get_webhook_info", None) if not callable(get_webhook_info): return - # A reconnect already in flight owns recovery — don't double-trigger. - if self._polling_error_task and not self._polling_error_task.done(): - return try: info = await asyncio.wait_for(get_webhook_info(), probe_timeout) # type: ignore[arg-type] except (asyncio.TimeoutError, OSError): diff --git a/tests/gateway/test_telegram_pending_update_probe.py b/tests/gateway/test_telegram_pending_update_probe.py index f02778b1d..6ba3d1961 100644 --- a/tests/gateway/test_telegram_pending_update_probe.py +++ b/tests/gateway/test_telegram_pending_update_probe.py @@ -6,6 +6,11 @@ handlers (#42909). ``get_me()`` stays healthy (general request path), so the CLOSE-WAIT heartbeat is blind to it. ``_probe_pending_updates`` watches ``get_webhook_info().pending_update_count`` and escalates to the existing network-error recovery ladder after two consecutive stuck probes. + +The same probe also covers the harsher case where the updater has stopped +entirely (``running=False``) with no reconnect in flight — the long-poll task +is gone, so the gateway silently stops receiving messages while the process +stays alive (#55769) — and feeds it into the same recovery ladder. """ import sys from unittest.mock import AsyncMock, MagicMock, patch @@ -96,14 +101,44 @@ async def test_webhook_mode_is_noop(): @pytest.mark.asyncio -async def test_no_probe_when_updater_not_running(): - """If the updater isn't running, recovery is already someone else's job.""" +async def test_single_stopped_updater_probe_does_not_escalate(): + """One probe finding a stopped updater only increments the counter (#55769).""" adapter = _make_adapter(pending=9) adapter._app.updater.running = False adapter._polling_pending_stuck_count = 1 - await adapter._probe_pending_updates(adapter._app.bot, 5) + with patch.object(adapter, "_handle_polling_network_error", new=AsyncMock()) as rec: + await adapter._probe_pending_updates(adapter._app.bot, 5) + # Stopped updater means no live consumer to query for a queue. adapter._app.bot.get_webhook_info.assert_not_called() assert adapter._polling_pending_stuck_count == 0 + assert adapter._polling_not_running_count == 1 + rec.assert_not_called() + + +@pytest.mark.asyncio +async def test_two_stopped_updater_probes_trigger_recovery(): + """A stopped updater that stays stopped routes into recovery (#55769).""" + adapter = _make_adapter(pending=9) + adapter._app.updater.running = False + recovery = AsyncMock() + with patch.object(adapter, "_handle_polling_network_error", new=recovery): + await adapter._probe_pending_updates(adapter._app.bot, 5) + assert adapter._polling_not_running_count == 1 + await adapter._probe_pending_updates(adapter._app.bot, 5) + task = adapter._polling_error_task + assert task is not None + await task + recovery.assert_awaited_once() + assert adapter._polling_not_running_count == 0 + + +@pytest.mark.asyncio +async def test_running_updater_resets_stopped_counter(): + """A recovered (running) updater clears any prior stopped-probe count.""" + adapter = _make_adapter(pending=0) + adapter._polling_not_running_count = 1 + await adapter._probe_pending_updates(adapter._app.bot, 5) + assert adapter._polling_not_running_count == 0 @pytest.mark.asyncio @@ -115,3 +150,20 @@ async def test_reconnect_in_flight_skips_probe(): adapter._polling_error_task = inflight await adapter._probe_pending_updates(adapter._app.bot, 5) adapter._app.bot.get_webhook_info.assert_not_called() + + +@pytest.mark.asyncio +async def test_reconnect_in_flight_skips_stopped_updater_escalation(): + """A stopped updater during an in-flight reconnect must not re-escalate.""" + adapter = _make_adapter(pending=9) + adapter._app.updater.running = False + adapter._polling_not_running_count = 1 + inflight = MagicMock() + inflight.done.return_value = False + adapter._polling_error_task = inflight + with patch.object(adapter, "_handle_polling_network_error", new=AsyncMock()) as rec: + await adapter._probe_pending_updates(adapter._app.bot, 5) + # The in-flight reconnect owns recovery; the stopped-updater counter resets + # so the transient stop()->start_polling() window never trips a re-trigger. + assert adapter._polling_not_running_count == 0 + rec.assert_not_called()