fix(telegram): recover when polling updater stops while process stays alive

The polling heartbeat's pending-update probe treated a stopped updater
(running=False) as "someone else's job" and silently reset its counter,
so a long-poll task that disappears with no reconnect in flight was never
recovered. get_me() on the general request path stays healthy, so neither
PTB's error_callback nor the connectivity probe ever fires — the gateway
keeps running but stops receiving messages indefinitely (#55769).

Detect the stopped-updater case directly in _probe_pending_updates and feed
it into the existing _handle_polling_network_error ladder, debounced over two
consecutive probes so a just-starting updater or the brief stop()->start_polling()
window of an in-flight reconnect never trips it.
This commit is contained in:
PRATHAMESH75 2026-06-30 23:02:35 +05:30 committed by Teknium
parent 437dcacbbf
commit e55e9fad2c
2 changed files with 109 additions and 9 deletions

View file

@ -423,6 +423,13 @@ class TelegramAdapter(BasePlatformAdapter):
# also probes get_webhook_info().pending_update_count and escalates to
# recovery after two consecutive stuck probes (#42909).
self._polling_pending_stuck_count: int = 0
# Consecutive heartbeat probes that found the updater stopped entirely
# (running=False) while we are in polling mode with no reconnect in
# flight. Distinct from the wedged-but-running case above: the long-poll
# task is simply gone, so neither the connectivity probe nor PTB's
# error_callback ever fires and the gateway silently stops receiving
# messages with the process still alive (#55769).
self._polling_not_running_count: int = 0
# After sustained reconnect storms the PTB httpx pool can return
# SendResult(success=True) for sends that never actually transmit.
# _handle_polling_network_error sets this; _verify_polling_after_reconnect
@ -1879,21 +1886,62 @@ class TelegramAdapter(BasePlatformAdapter):
not trip a needless recovery. Recovery reuses
``_handle_polling_network_error`` the same ladder PTB's own
``error_callback`` feeds so no new restart machinery is introduced.
This also covers the harsher case where the updater has stopped
entirely (``running=False``) with no reconnect in flight: the long-poll
task is gone rather than wedged, so even ``get_webhook_info`` can't
report a queue against a live consumer. We detect the stopped updater
directly and feed the same ladder (#55769).
"""
# Only meaningful in polling mode with a running updater; in webhook
# mode Telegram pushes updates and holds no server-side queue.
# Only meaningful in polling mode; in webhook mode Telegram pushes
# updates and holds no server-side queue.
if self._webhook_mode:
return
# A reconnect already in flight owns recovery — don't double-trigger,
# and don't misread its brief stop()->start_polling() window (where
# updater.running is transiently False) as a dead updater below.
if self._polling_error_task and not self._polling_error_task.done():
self._polling_not_running_count = 0
return
updater = getattr(self._app, "updater", None) if self._app else None
if updater is None or not getattr(updater, "running", False):
if updater is None:
self._polling_pending_stuck_count = 0
return
if not getattr(updater, "running", False):
# We are in polling mode with no reconnect in flight, yet PTB's
# updater has stopped entirely. This is distinct from the
# wedged-but-running consumer handled below: the long-poll task is
# gone, get_me()/get_webhook_info() on the general request path
# still succeed, so no error_callback or connectivity probe ever
# fires and the gateway silently stops receiving messages while the
# process stays alive (#55769). Escalate through the same reconnect
# ladder as a wedged consumer, debounced over two consecutive probes
# so a just-starting updater never trips it.
self._polling_pending_stuck_count = 0
self._polling_not_running_count += 1
logger.warning(
"[%s] Telegram polling heartbeat: updater stopped while in "
"polling mode (stuck probe %d/2)",
self.name, self._polling_not_running_count,
)
if self._polling_not_running_count >= 2:
self._polling_not_running_count = 0
logger.warning(
"[%s] Telegram updater is not running (long-poll task "
"gone); triggering polling restart",
self.name,
)
loop = asyncio.get_running_loop()
self._polling_error_task = loop.create_task(
self._handle_polling_network_error(
RuntimeError("Telegram updater stopped while in polling mode")
)
)
return
self._polling_not_running_count = 0
get_webhook_info = getattr(bot, "get_webhook_info", None)
if not callable(get_webhook_info):
return
# A reconnect already in flight owns recovery — don't double-trigger.
if self._polling_error_task and not self._polling_error_task.done():
return
try:
info = await asyncio.wait_for(get_webhook_info(), probe_timeout) # type: ignore[arg-type]
except (asyncio.TimeoutError, OSError):

View file

@ -6,6 +6,11 @@ handlers (#42909). ``get_me()`` stays healthy (general request path), so the
CLOSE-WAIT heartbeat is blind to it. ``_probe_pending_updates`` watches
``get_webhook_info().pending_update_count`` and escalates to the existing
network-error recovery ladder after two consecutive stuck probes.
The same probe also covers the harsher case where the updater has stopped
entirely (``running=False``) with no reconnect in flight the long-poll task
is gone, so the gateway silently stops receiving messages while the process
stays alive (#55769) — and feeds it into the same recovery ladder.
"""
import sys
from unittest.mock import AsyncMock, MagicMock, patch
@ -96,14 +101,44 @@ async def test_webhook_mode_is_noop():
@pytest.mark.asyncio
async def test_no_probe_when_updater_not_running():
"""If the updater isn't running, recovery is already someone else's job."""
async def test_single_stopped_updater_probe_does_not_escalate():
"""One probe finding a stopped updater only increments the counter (#55769)."""
adapter = _make_adapter(pending=9)
adapter._app.updater.running = False
adapter._polling_pending_stuck_count = 1
await adapter._probe_pending_updates(adapter._app.bot, 5)
with patch.object(adapter, "_handle_polling_network_error", new=AsyncMock()) as rec:
await adapter._probe_pending_updates(adapter._app.bot, 5)
# Stopped updater means no live consumer to query for a queue.
adapter._app.bot.get_webhook_info.assert_not_called()
assert adapter._polling_pending_stuck_count == 0
assert adapter._polling_not_running_count == 1
rec.assert_not_called()
@pytest.mark.asyncio
async def test_two_stopped_updater_probes_trigger_recovery():
"""A stopped updater that stays stopped routes into recovery (#55769)."""
adapter = _make_adapter(pending=9)
adapter._app.updater.running = False
recovery = AsyncMock()
with patch.object(adapter, "_handle_polling_network_error", new=recovery):
await adapter._probe_pending_updates(adapter._app.bot, 5)
assert adapter._polling_not_running_count == 1
await adapter._probe_pending_updates(adapter._app.bot, 5)
task = adapter._polling_error_task
assert task is not None
await task
recovery.assert_awaited_once()
assert adapter._polling_not_running_count == 0
@pytest.mark.asyncio
async def test_running_updater_resets_stopped_counter():
"""A recovered (running) updater clears any prior stopped-probe count."""
adapter = _make_adapter(pending=0)
adapter._polling_not_running_count = 1
await adapter._probe_pending_updates(adapter._app.bot, 5)
assert adapter._polling_not_running_count == 0
@pytest.mark.asyncio
@ -115,3 +150,20 @@ async def test_reconnect_in_flight_skips_probe():
adapter._polling_error_task = inflight
await adapter._probe_pending_updates(adapter._app.bot, 5)
adapter._app.bot.get_webhook_info.assert_not_called()
@pytest.mark.asyncio
async def test_reconnect_in_flight_skips_stopped_updater_escalation():
"""A stopped updater during an in-flight reconnect must not re-escalate."""
adapter = _make_adapter(pending=9)
adapter._app.updater.running = False
adapter._polling_not_running_count = 1
inflight = MagicMock()
inflight.done.return_value = False
adapter._polling_error_task = inflight
with patch.object(adapter, "_handle_polling_network_error", new=AsyncMock()) as rec:
await adapter._probe_pending_updates(adapter._app.bot, 5)
# The in-flight reconnect owns recovery; the stopped-updater counter resets
# so the transient stop()->start_polling() window never trips a re-trigger.
assert adapter._polling_not_running_count == 0
rec.assert_not_called()