From 7a2369718a126f624357430a3defce1a1aec668d Mon Sep 17 00:00:00 2001 From: kangsoo-bit <27672904+kangsoo-bit@users.noreply.github.com> Date: Wed, 1 Jul 2026 03:24:35 -0700 Subject: [PATCH] fix(telegram): keep polling alive during transient bootstrap outages A transient Bot API network error during gateway bootstrap (deleteWebhook or the initial start_polling) currently raises out of connect() and marks the Telegram adapter fatal, restart-looping the whole gateway even though the right behavior is to degrade the Telegram channel and let the existing reconnect ladder recover in the background. - _delete_webhook_best_effort(): swallow only transient network errors and continue to polling; non-network errors (e.g. auth failures) still raise. - _start_polling_resilient(): on a transient conflict/network error at bootstrap, schedule background recovery and return degraded instead of raising; non-transient errors still propagate. - Track the polling error-callback recovery tasks in _background_tasks so they can't be garbage-collected mid-flight. - Add a second Telegram Bot API seed fallback IP (149.154.166.110). Reconnect keeps its existing 10-retry -> supervisor-restart semantics; this change only fixes the bootstrap raise, it does not alter the retry ladder. --- plugins/platforms/telegram/adapter.py | 107 +++++++++++++++++- .../platforms/telegram/telegram_network.py | 2 +- .../test_telegram_network_reconnect.py | 94 +++++++++++++++ 3 files changed, 196 insertions(+), 7 deletions(-) diff --git a/plugins/platforms/telegram/adapter.py b/plugins/platforms/telegram/adapter.py index 1b69a1a39..91ec2a40b 100644 --- a/plugins/platforms/telegram/adapter.py +++ b/plugins/platforms/telegram/adapter.py @@ -1738,6 +1738,91 @@ class TelegramAdapter(BasePlatformAdapter): self.name, exc_info=True, ) + def _schedule_polling_recovery(self, error: Exception, *, reason: str) -> None: + """Schedule polling recovery without failing gateway startup. + + A Telegram bootstrap failure (deleteWebhook / initial start_polling) + caused by a transient network error should degrade only the Telegram + adapter: the gateway process stays alive and the existing reconnect + ladder (``_handle_polling_network_error``) recovers in the background. + """ + if self.has_fatal_error: + return + if self._polling_error_task and not self._polling_error_task.done(): + logger.debug( + "[%s] Telegram polling recovery already scheduled; ignoring %s: %s", + self.name, reason, error, + ) + return + self._send_path_degraded = True + logger.warning( + "[%s] Telegram polling degraded (%s); gateway stays alive and will retry. Error: %s", + self.name, reason, error, + ) + loop = asyncio.get_running_loop() + self._polling_error_task = loop.create_task(self._handle_polling_network_error(error)) + self._background_tasks.add(self._polling_error_task) + self._polling_error_task.add_done_callback(self._background_tasks.discard) + + async def _delete_webhook_best_effort(self) -> bool: + """Clear any stale webhook, but never fail polling on a network error. + + Returns True when the webhook was cleared (or there was nothing to do) + and False when a transient network error was swallowed so bootstrap can + continue to polling; the reconnect ladder recovers from there. + """ + if not self._bot: + return False + delete_webhook = getattr(self._bot, "delete_webhook", None) + if not callable(delete_webhook): + return True + try: + await delete_webhook(drop_pending_updates=False) + return True + except Exception as err: + if self._looks_like_network_error(err): + logger.warning( + "[%s] deleteWebhook failed with a recoverable network error; " + "continuing to polling so getUpdates/retry can recover: %s", + self.name, err, + ) + self._send_path_degraded = True + return False + raise + + async def _start_polling_resilient(self, *, drop_pending_updates: bool, error_callback) -> bool: + """Start PTB polling; on a transient bootstrap failure, recover in background. + + Returns True when polling started, False when a transient conflict or + network error was scheduled for background recovery instead of raising + (keeping the gateway process alive). + """ + if not (self._app and self._app.updater): + raise RuntimeError("Telegram application/updater not initialized") + try: + await self._app.updater.start_polling( + allowed_updates=Update.ALL_TYPES, + drop_pending_updates=drop_pending_updates, + error_callback=error_callback, + ) + return True + except Exception as err: + if self._looks_like_polling_conflict(err): + logger.warning( + "[%s] Telegram polling bootstrap conflict; gateway stays alive " + "while conflict retry runs: %s", + self.name, err, + ) + loop = asyncio.get_running_loop() + self._polling_error_task = loop.create_task(self._handle_polling_conflict(err)) + self._background_tasks.add(self._polling_error_task) + self._polling_error_task.add_done_callback(self._background_tasks.discard) + return False + if self._looks_like_network_error(err): + self._schedule_polling_recovery(err, reason="polling bootstrap") + return False + raise + async def _handle_polling_network_error(self, error: Exception) -> None: """Reconnect polling after a transient network interruption. @@ -2900,10 +2985,11 @@ class TelegramAdapter(BasePlatformAdapter): else: # ── Polling mode (default) ─────────────────────────── # Clear any stale webhook first so polling doesn't inherit a - # previous webhook registration and silently stop receiving updates. - delete_webhook = getattr(self._bot, "delete_webhook", None) - if callable(delete_webhook): - await delete_webhook(drop_pending_updates=False) + # previous webhook registration and silently stop receiving + # updates. Best-effort: a transient Bot API network error here + # must not fail gateway startup — degrade to background polling + # recovery instead. + await self._delete_webhook_best_effort() loop = asyncio.get_running_loop() @@ -2920,23 +3006,32 @@ class TelegramAdapter(BasePlatformAdapter): # exit on its next tick so recovery owns polling alone. self._disarm_ptb_retry_loop() self._polling_error_task = loop.create_task(self._handle_polling_conflict(error)) + self._background_tasks.add(self._polling_error_task) + self._polling_error_task.add_done_callback(self._background_tasks.discard) elif self._looks_like_network_error(error): logger.warning("[%s] Telegram network error, scheduling reconnect: %s", self.name, error) self._polling_error_task = loop.create_task(self._handle_polling_network_error(error)) + self._background_tasks.add(self._polling_error_task) + self._polling_error_task.add_done_callback(self._background_tasks.discard) else: logger.error("[%s] Telegram polling error: %s", self.name, error, exc_info=True) # Store reference for retry use in _handle_polling_conflict self._polling_error_callback_ref = _polling_error_callback - await self._app.updater.start_polling( - allowed_updates=Update.ALL_TYPES, + polling_started = await self._start_polling_resilient( # On a cold first boot drop the stale Bot API queue; on a # watcher reconnect after an outage preserve it so messages # sent while the bot was offline are delivered (#46621). drop_pending_updates=not is_reconnect, error_callback=_polling_error_callback, ) + if not polling_started: + logger.warning( + "[%s] Connected in degraded Telegram mode: gateway is alive, " + "polling will be retried in the background", + self.name, + ) self._mark_connected() mode = "webhook" if self._webhook_mode else "polling" diff --git a/plugins/platforms/telegram/telegram_network.py b/plugins/platforms/telegram/telegram_network.py index 49b5be912..319f4d2ac 100644 --- a/plugins/platforms/telegram/telegram_network.py +++ b/plugins/platforms/telegram/telegram_network.py @@ -40,7 +40,7 @@ _DOH_PROVIDERS: list[dict] = [ # Last-resort IPs when DoH is also blocked. These are stable Telegram Bot API # endpoints in the 149.154.160.0/20 block (same seed used by OpenClaw). -_SEED_FALLBACK_IPS: list[str] = ["149.154.167.220"] +_SEED_FALLBACK_IPS: list[str] = ["149.154.166.110", "149.154.167.220"] def _resolve_proxy_url(target_hosts=None) -> str | None: diff --git a/tests/gateway/test_telegram_network_reconnect.py b/tests/gateway/test_telegram_network_reconnect.py index c970adc8f..4a0634044 100644 --- a/tests/gateway/test_telegram_network_reconnect.py +++ b/tests/gateway/test_telegram_network_reconnect.py @@ -718,3 +718,97 @@ async def test_disconnect_cancels_heartbeat_task(): assert heartbeat_task.cancelled(), "Heartbeat task must be cancelled by disconnect()" assert adapter._polling_heartbeat_task is None + + +# ── Bootstrap degradation: keep polling alive during outages (#47508) ──── + + +@pytest.mark.asyncio +async def test_delete_webhook_network_error_is_recoverable(): + """deleteWebhook timeouts must not fail gateway startup. + + A transient Bot API outage during bootstrap should be treated as + recoverable and continue toward polling, so it never becomes a systemd + service failure. + """ + adapter = _make_adapter() + mock_bot = MagicMock() + mock_bot.delete_webhook = AsyncMock(side_effect=ConnectionError("api.telegram.org timeout")) + adapter._bot = mock_bot + + result = await adapter._delete_webhook_best_effort() + + assert result is False + assert adapter._send_path_degraded is True + mock_bot.delete_webhook.assert_awaited_once_with(drop_pending_updates=False) + assert not adapter.has_fatal_error + + +@pytest.mark.asyncio +async def test_polling_bootstrap_network_error_schedules_background_recovery(): + """Initial start_polling() network failure should degrade, not raise.""" + adapter = _make_adapter() + mock_updater = MagicMock() + mock_updater.start_polling = AsyncMock(side_effect=ConnectionError("bootstrap timeout")) + mock_app = MagicMock() + mock_app.updater = mock_updater + adapter._app = mock_app + adapter._schedule_polling_recovery = MagicMock() + + result = await adapter._start_polling_resilient( + drop_pending_updates=True, + error_callback=lambda error: None, + ) + + assert result is False + adapter._schedule_polling_recovery.assert_called_once() + err = adapter._schedule_polling_recovery.call_args.args[0] + assert isinstance(err, ConnectionError) + assert adapter._schedule_polling_recovery.call_args.kwargs["reason"] == "polling bootstrap" + assert not adapter.has_fatal_error + + +@pytest.mark.asyncio +async def test_polling_bootstrap_conflict_schedules_conflict_recovery_task(): + """Initial 409 polling conflict should also be recovered in background.""" + adapter = _make_adapter() + mock_updater = MagicMock() + mock_updater.start_polling = AsyncMock( + side_effect=Exception("Conflict: terminated by other getUpdates request") + ) + mock_app = MagicMock() + mock_app.updater = mock_updater + adapter._app = mock_app + adapter._handle_polling_conflict = AsyncMock() + + result = await adapter._start_polling_resilient( + drop_pending_updates=True, + error_callback=lambda error: None, + ) + + assert result is False + pending = [t for t in adapter._background_tasks if not t.done()] + assert pending, "expected background conflict recovery task" + for task in pending: + task.cancel() + try: + await task + except (asyncio.CancelledError, Exception): + pass + assert not adapter.has_fatal_error + + +@pytest.mark.asyncio +async def test_schedule_polling_recovery_tracks_background_task(): + """Background recovery task is registered so it isn't GC'd mid-flight.""" + adapter = _make_adapter() + adapter._handle_polling_network_error = AsyncMock() + + adapter._schedule_polling_recovery(ConnectionError("boom"), reason="unit test") + + assert adapter._send_path_degraded is True + assert adapter._polling_error_task is not None + assert adapter._polling_error_task in adapter._background_tasks + await adapter._polling_error_task + adapter._handle_polling_network_error.assert_awaited_once() +