fix(telegram): keep polling alive during transient bootstrap outages
A transient Bot API network error during gateway bootstrap (deleteWebhook or the initial start_polling) currently raises out of connect() and marks the Telegram adapter fatal, restart-looping the whole gateway even though the right behavior is to degrade the Telegram channel and let the existing reconnect ladder recover in the background. - _delete_webhook_best_effort(): swallow only transient network errors and continue to polling; non-network errors (e.g. auth failures) still raise. - _start_polling_resilient(): on a transient conflict/network error at bootstrap, schedule background recovery and return degraded instead of raising; non-transient errors still propagate. - Track the polling error-callback recovery tasks in _background_tasks so they can't be garbage-collected mid-flight. - Add a second Telegram Bot API seed fallback IP (149.154.166.110). Reconnect keeps its existing 10-retry -> supervisor-restart semantics; this change only fixes the bootstrap raise, it does not alter the retry ladder.
This commit is contained in:
parent
9dd6451c80
commit
7a2369718a
3 changed files with 196 additions and 7 deletions
|
|
@ -1738,6 +1738,91 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
self.name, exc_info=True,
|
||||
)
|
||||
|
||||
def _schedule_polling_recovery(self, error: Exception, *, reason: str) -> None:
|
||||
"""Schedule polling recovery without failing gateway startup.
|
||||
|
||||
A Telegram bootstrap failure (deleteWebhook / initial start_polling)
|
||||
caused by a transient network error should degrade only the Telegram
|
||||
adapter: the gateway process stays alive and the existing reconnect
|
||||
ladder (``_handle_polling_network_error``) recovers in the background.
|
||||
"""
|
||||
if self.has_fatal_error:
|
||||
return
|
||||
if self._polling_error_task and not self._polling_error_task.done():
|
||||
logger.debug(
|
||||
"[%s] Telegram polling recovery already scheduled; ignoring %s: %s",
|
||||
self.name, reason, error,
|
||||
)
|
||||
return
|
||||
self._send_path_degraded = True
|
||||
logger.warning(
|
||||
"[%s] Telegram polling degraded (%s); gateway stays alive and will retry. Error: %s",
|
||||
self.name, reason, error,
|
||||
)
|
||||
loop = asyncio.get_running_loop()
|
||||
self._polling_error_task = loop.create_task(self._handle_polling_network_error(error))
|
||||
self._background_tasks.add(self._polling_error_task)
|
||||
self._polling_error_task.add_done_callback(self._background_tasks.discard)
|
||||
|
||||
async def _delete_webhook_best_effort(self) -> bool:
|
||||
"""Clear any stale webhook, but never fail polling on a network error.
|
||||
|
||||
Returns True when the webhook was cleared (or there was nothing to do)
|
||||
and False when a transient network error was swallowed so bootstrap can
|
||||
continue to polling; the reconnect ladder recovers from there.
|
||||
"""
|
||||
if not self._bot:
|
||||
return False
|
||||
delete_webhook = getattr(self._bot, "delete_webhook", None)
|
||||
if not callable(delete_webhook):
|
||||
return True
|
||||
try:
|
||||
await delete_webhook(drop_pending_updates=False)
|
||||
return True
|
||||
except Exception as err:
|
||||
if self._looks_like_network_error(err):
|
||||
logger.warning(
|
||||
"[%s] deleteWebhook failed with a recoverable network error; "
|
||||
"continuing to polling so getUpdates/retry can recover: %s",
|
||||
self.name, err,
|
||||
)
|
||||
self._send_path_degraded = True
|
||||
return False
|
||||
raise
|
||||
|
||||
async def _start_polling_resilient(self, *, drop_pending_updates: bool, error_callback) -> bool:
|
||||
"""Start PTB polling; on a transient bootstrap failure, recover in background.
|
||||
|
||||
Returns True when polling started, False when a transient conflict or
|
||||
network error was scheduled for background recovery instead of raising
|
||||
(keeping the gateway process alive).
|
||||
"""
|
||||
if not (self._app and self._app.updater):
|
||||
raise RuntimeError("Telegram application/updater not initialized")
|
||||
try:
|
||||
await self._app.updater.start_polling(
|
||||
allowed_updates=Update.ALL_TYPES,
|
||||
drop_pending_updates=drop_pending_updates,
|
||||
error_callback=error_callback,
|
||||
)
|
||||
return True
|
||||
except Exception as err:
|
||||
if self._looks_like_polling_conflict(err):
|
||||
logger.warning(
|
||||
"[%s] Telegram polling bootstrap conflict; gateway stays alive "
|
||||
"while conflict retry runs: %s",
|
||||
self.name, err,
|
||||
)
|
||||
loop = asyncio.get_running_loop()
|
||||
self._polling_error_task = loop.create_task(self._handle_polling_conflict(err))
|
||||
self._background_tasks.add(self._polling_error_task)
|
||||
self._polling_error_task.add_done_callback(self._background_tasks.discard)
|
||||
return False
|
||||
if self._looks_like_network_error(err):
|
||||
self._schedule_polling_recovery(err, reason="polling bootstrap")
|
||||
return False
|
||||
raise
|
||||
|
||||
async def _handle_polling_network_error(self, error: Exception) -> None:
|
||||
"""Reconnect polling after a transient network interruption.
|
||||
|
||||
|
|
@ -2900,10 +2985,11 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
else:
|
||||
# ── Polling mode (default) ───────────────────────────
|
||||
# Clear any stale webhook first so polling doesn't inherit a
|
||||
# previous webhook registration and silently stop receiving updates.
|
||||
delete_webhook = getattr(self._bot, "delete_webhook", None)
|
||||
if callable(delete_webhook):
|
||||
await delete_webhook(drop_pending_updates=False)
|
||||
# previous webhook registration and silently stop receiving
|
||||
# updates. Best-effort: a transient Bot API network error here
|
||||
# must not fail gateway startup — degrade to background polling
|
||||
# recovery instead.
|
||||
await self._delete_webhook_best_effort()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
|
|
@ -2920,23 +3006,32 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
# exit on its next tick so recovery owns polling alone.
|
||||
self._disarm_ptb_retry_loop()
|
||||
self._polling_error_task = loop.create_task(self._handle_polling_conflict(error))
|
||||
self._background_tasks.add(self._polling_error_task)
|
||||
self._polling_error_task.add_done_callback(self._background_tasks.discard)
|
||||
elif self._looks_like_network_error(error):
|
||||
logger.warning("[%s] Telegram network error, scheduling reconnect: %s", self.name, error)
|
||||
self._polling_error_task = loop.create_task(self._handle_polling_network_error(error))
|
||||
self._background_tasks.add(self._polling_error_task)
|
||||
self._polling_error_task.add_done_callback(self._background_tasks.discard)
|
||||
else:
|
||||
logger.error("[%s] Telegram polling error: %s", self.name, error, exc_info=True)
|
||||
|
||||
# Store reference for retry use in _handle_polling_conflict
|
||||
self._polling_error_callback_ref = _polling_error_callback
|
||||
|
||||
await self._app.updater.start_polling(
|
||||
allowed_updates=Update.ALL_TYPES,
|
||||
polling_started = await self._start_polling_resilient(
|
||||
# On a cold first boot drop the stale Bot API queue; on a
|
||||
# watcher reconnect after an outage preserve it so messages
|
||||
# sent while the bot was offline are delivered (#46621).
|
||||
drop_pending_updates=not is_reconnect,
|
||||
error_callback=_polling_error_callback,
|
||||
)
|
||||
if not polling_started:
|
||||
logger.warning(
|
||||
"[%s] Connected in degraded Telegram mode: gateway is alive, "
|
||||
"polling will be retried in the background",
|
||||
self.name,
|
||||
)
|
||||
|
||||
self._mark_connected()
|
||||
mode = "webhook" if self._webhook_mode else "polling"
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ _DOH_PROVIDERS: list[dict] = [
|
|||
|
||||
# Last-resort IPs when DoH is also blocked. These are stable Telegram Bot API
|
||||
# endpoints in the 149.154.160.0/20 block (same seed used by OpenClaw).
|
||||
_SEED_FALLBACK_IPS: list[str] = ["149.154.167.220"]
|
||||
_SEED_FALLBACK_IPS: list[str] = ["149.154.166.110", "149.154.167.220"]
|
||||
|
||||
|
||||
def _resolve_proxy_url(target_hosts=None) -> str | None:
|
||||
|
|
|
|||
|
|
@ -718,3 +718,97 @@ async def test_disconnect_cancels_heartbeat_task():
|
|||
|
||||
assert heartbeat_task.cancelled(), "Heartbeat task must be cancelled by disconnect()"
|
||||
assert adapter._polling_heartbeat_task is None
|
||||
|
||||
|
||||
# ── Bootstrap degradation: keep polling alive during outages (#47508) ────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_webhook_network_error_is_recoverable():
|
||||
"""deleteWebhook timeouts must not fail gateway startup.
|
||||
|
||||
A transient Bot API outage during bootstrap should be treated as
|
||||
recoverable and continue toward polling, so it never becomes a systemd
|
||||
service failure.
|
||||
"""
|
||||
adapter = _make_adapter()
|
||||
mock_bot = MagicMock()
|
||||
mock_bot.delete_webhook = AsyncMock(side_effect=ConnectionError("api.telegram.org timeout"))
|
||||
adapter._bot = mock_bot
|
||||
|
||||
result = await adapter._delete_webhook_best_effort()
|
||||
|
||||
assert result is False
|
||||
assert adapter._send_path_degraded is True
|
||||
mock_bot.delete_webhook.assert_awaited_once_with(drop_pending_updates=False)
|
||||
assert not adapter.has_fatal_error
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_polling_bootstrap_network_error_schedules_background_recovery():
|
||||
"""Initial start_polling() network failure should degrade, not raise."""
|
||||
adapter = _make_adapter()
|
||||
mock_updater = MagicMock()
|
||||
mock_updater.start_polling = AsyncMock(side_effect=ConnectionError("bootstrap timeout"))
|
||||
mock_app = MagicMock()
|
||||
mock_app.updater = mock_updater
|
||||
adapter._app = mock_app
|
||||
adapter._schedule_polling_recovery = MagicMock()
|
||||
|
||||
result = await adapter._start_polling_resilient(
|
||||
drop_pending_updates=True,
|
||||
error_callback=lambda error: None,
|
||||
)
|
||||
|
||||
assert result is False
|
||||
adapter._schedule_polling_recovery.assert_called_once()
|
||||
err = adapter._schedule_polling_recovery.call_args.args[0]
|
||||
assert isinstance(err, ConnectionError)
|
||||
assert adapter._schedule_polling_recovery.call_args.kwargs["reason"] == "polling bootstrap"
|
||||
assert not adapter.has_fatal_error
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_polling_bootstrap_conflict_schedules_conflict_recovery_task():
|
||||
"""Initial 409 polling conflict should also be recovered in background."""
|
||||
adapter = _make_adapter()
|
||||
mock_updater = MagicMock()
|
||||
mock_updater.start_polling = AsyncMock(
|
||||
side_effect=Exception("Conflict: terminated by other getUpdates request")
|
||||
)
|
||||
mock_app = MagicMock()
|
||||
mock_app.updater = mock_updater
|
||||
adapter._app = mock_app
|
||||
adapter._handle_polling_conflict = AsyncMock()
|
||||
|
||||
result = await adapter._start_polling_resilient(
|
||||
drop_pending_updates=True,
|
||||
error_callback=lambda error: None,
|
||||
)
|
||||
|
||||
assert result is False
|
||||
pending = [t for t in adapter._background_tasks if not t.done()]
|
||||
assert pending, "expected background conflict recovery task"
|
||||
for task in pending:
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except (asyncio.CancelledError, Exception):
|
||||
pass
|
||||
assert not adapter.has_fatal_error
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_schedule_polling_recovery_tracks_background_task():
|
||||
"""Background recovery task is registered so it isn't GC'd mid-flight."""
|
||||
adapter = _make_adapter()
|
||||
adapter._handle_polling_network_error = AsyncMock()
|
||||
|
||||
adapter._schedule_polling_recovery(ConnectionError("boom"), reason="unit test")
|
||||
|
||||
assert adapter._send_path_degraded is True
|
||||
assert adapter._polling_error_task is not None
|
||||
assert adapter._polling_error_task in adapter._background_tasks
|
||||
await adapter._polling_error_task
|
||||
adapter._handle_polling_network_error.assert_awaited_once()
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue