fix(telegram): defer post-connect housekeeping off the connect path
Command-menu registration (set_my_commands), the status-indicator, and DM-topic setup make Bot API calls that can stall for certain bot tokens. They ran inside connect() before/after _mark_connected() but still within the coroutine the gateway wraps in a connect timeout, so one slow call blew the whole connect and the adapter never came up — even though polling/webhook was already live (getMe works via curl). Fixes #46298. - mark connected as soon as polling/webhook startup succeeds - move command-menu, status-indicator, and DM-topic setup into a cancellable background housekeeping task (_run_post_connect_housekeeping) - cancel that task during disconnect so it can't fire into a torn-down client - harden scope-name lookup with getattr fallback Salvaged onto the relocated plugin adapter (plugins/platforms/telegram/ adapter.py) since the original PR #46404 targeted the pre-migration gateway/platforms/telegram.py path. Co-authored-by: Hermes Agent <teknium@nousresearch.com>
This commit is contained in:
parent
122e5bc037
commit
3362bdb4e5
2 changed files with 183 additions and 63 deletions
|
|
@ -499,6 +499,11 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
# Tracks status bubbles owned by this adapter so subsequent calls with the
|
||||
# same key edit the same message instead of appending new ones (#30045).
|
||||
self._status_message_ids: Dict[tuple, str] = {}
|
||||
# Background task that runs post-connect housekeeping (command-menu
|
||||
# registration + DM-topic setup) off the connect path so a slow Bot
|
||||
# API call (e.g. a set_my_commands stall for certain tokens) cannot
|
||||
# blow the gateway's connect timeout (#46298).
|
||||
self._post_connect_task: Optional[asyncio.Task] = None
|
||||
|
||||
def _mark_connected(self) -> None:
|
||||
self._drop_delayed_deliveries = False
|
||||
|
|
@ -2540,6 +2545,95 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
self.name, topic_name, seed_err,
|
||||
)
|
||||
|
||||
def _start_post_connect_housekeeping(self) -> None:
|
||||
"""Kick off deferred post-connect housekeeping in the background.
|
||||
|
||||
Idempotent: if a previous housekeeping task is still running (e.g. a
|
||||
rapid reconnect), it is left in place rather than double-scheduled.
|
||||
"""
|
||||
task = self._post_connect_task
|
||||
if task and not task.done():
|
||||
return
|
||||
self._post_connect_task = asyncio.ensure_future(
|
||||
self._run_post_connect_housekeeping()
|
||||
)
|
||||
|
||||
async def _run_post_connect_housekeeping(self) -> None:
|
||||
"""Register the command menu, surface the status indicator, and set up
|
||||
DM topics — all off the connect path so a slow Bot API call cannot blow
|
||||
the gateway connect timeout (#46298). Every step is non-fatal."""
|
||||
try:
|
||||
# Register bot commands so Telegram shows a hint menu when users type /
|
||||
# List is derived from the central COMMAND_REGISTRY — adding a new
|
||||
# gateway command there automatically adds it to the Telegram menu.
|
||||
try:
|
||||
from telegram import (
|
||||
BotCommand,
|
||||
BotCommandScopeAllPrivateChats,
|
||||
BotCommandScopeAllGroupChats,
|
||||
BotCommandScopeDefault,
|
||||
)
|
||||
from hermes_cli.commands import telegram_menu_commands, telegram_menu_max_commands
|
||||
if not self._bot:
|
||||
return
|
||||
# Telegram allows up to 100 commands but has an undocumented
|
||||
# payload size limit (~4KB total). Hermes defaults to 60 to
|
||||
# keep built-ins plus common skill commands visible while
|
||||
# staying under the threshold; users can tune the cap via
|
||||
# platforms.telegram.extra.command_menu.
|
||||
max_commands = telegram_menu_max_commands()
|
||||
menu_commands, hidden_count = telegram_menu_commands(max_commands=max_commands)
|
||||
bot_commands = [BotCommand(name, desc) for name, desc in menu_commands]
|
||||
# Register for all scopes independently — Telegram picks the
|
||||
# narrowest matching scope per chat type (forum topics fall
|
||||
# through to AllGroupChats or Default).
|
||||
for scope_cls in (BotCommandScopeDefault, BotCommandScopeAllPrivateChats, BotCommandScopeAllGroupChats):
|
||||
scope_name = getattr(scope_cls, "__name__", str(scope_cls))
|
||||
try:
|
||||
await self._bot.set_my_commands(bot_commands, scope=scope_cls())
|
||||
logger.info("[%s] set_my_commands OK for scope %s (%d cmds)", self.name, scope_name, len(bot_commands))
|
||||
except Exception as scope_err:
|
||||
logger.warning("[%s] set_my_commands FAILED for scope %s: %s", self.name, scope_name, scope_err)
|
||||
# Forum topics don't inherit AllGroupChats — Telegram resolves
|
||||
# commands via BotCommandScopeChat(chat_id) for forum groups.
|
||||
# Lazy registration happens in _ensure_forum_commands on first
|
||||
# message from a forum topic (see _handle_text_message).
|
||||
if hidden_count:
|
||||
logger.info(
|
||||
"[%s] Telegram menu: %d commands registered, %d hidden (over %d limit). Use /commands for full list.",
|
||||
self.name, len(menu_commands), hidden_count, max_commands,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"[%s] Could not register Telegram command menu: %s",
|
||||
self.name,
|
||||
e,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
# Surface the gateway as "Online" in the bot's short description
|
||||
# (opt-in via extra.status_indicator). Non-fatal.
|
||||
try:
|
||||
await self._set_status_indicator(online=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Set up DM topics (Bot API 9.4 — Private Chat Topics)
|
||||
# Runs after connection is established so the bot can call createForumTopic.
|
||||
# Failures here are non-fatal — the bot works fine without topics.
|
||||
try:
|
||||
await self._setup_dm_topics()
|
||||
except Exception as topics_err:
|
||||
logger.warning(
|
||||
"[%s] DM topics setup failed (non-fatal): %s",
|
||||
self.name, topics_err, exc_info=True,
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
finally:
|
||||
if self._post_connect_task is asyncio.current_task():
|
||||
self._post_connect_task = None
|
||||
|
||||
async def connect(self, *, is_reconnect: bool = False) -> bool:
|
||||
"""Connect to Telegram via polling or webhook.
|
||||
|
||||
|
|
@ -2844,52 +2938,6 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
error_callback=_polling_error_callback,
|
||||
)
|
||||
|
||||
# Register bot commands so Telegram shows a hint menu when users type /
|
||||
# List is derived from the central COMMAND_REGISTRY — adding a new
|
||||
# gateway command there automatically adds it to the Telegram menu.
|
||||
try:
|
||||
from telegram import (
|
||||
BotCommand,
|
||||
BotCommandScopeAllPrivateChats,
|
||||
BotCommandScopeAllGroupChats,
|
||||
BotCommandScopeDefault,
|
||||
)
|
||||
from hermes_cli.commands import telegram_menu_commands, telegram_menu_max_commands
|
||||
# Telegram allows up to 100 commands but has an undocumented
|
||||
# payload size limit (~4KB total). Hermes defaults to 60 to
|
||||
# keep built-ins plus common skill commands visible while
|
||||
# staying under the threshold; users can tune the cap via
|
||||
# platforms.telegram.extra.command_menu.
|
||||
max_commands = telegram_menu_max_commands()
|
||||
menu_commands, hidden_count = telegram_menu_commands(max_commands=max_commands)
|
||||
bot_commands = [BotCommand(name, desc) for name, desc in menu_commands]
|
||||
# Register for all scopes independently — Telegram picks the
|
||||
# narrowest matching scope per chat type (forum topics fall
|
||||
# through to AllGroupChats or Default).
|
||||
for scope_cls in (BotCommandScopeDefault, BotCommandScopeAllPrivateChats, BotCommandScopeAllGroupChats):
|
||||
scope_name = scope_cls.__name__
|
||||
try:
|
||||
await self._bot.set_my_commands(bot_commands, scope=scope_cls())
|
||||
logger.info("[%s] set_my_commands OK for scope %s (%d cmds)", self.name, scope_name, len(bot_commands))
|
||||
except Exception as scope_err:
|
||||
logger.warning("[%s] set_my_commands FAILED for scope %s: %s", self.name, scope_name, scope_err)
|
||||
# Forum topics don't inherit AllGroupChats — Telegram resolves
|
||||
# commands via BotCommandScopeChat(chat_id) for forum groups.
|
||||
# Lazy registration happens in _ensure_forum_commands on first
|
||||
# message from a forum topic (see _handle_text_message).
|
||||
if hidden_count:
|
||||
logger.info(
|
||||
"[%s] Telegram menu: %d commands registered, %d hidden (over %d limit). Use /commands for full list.",
|
||||
self.name, len(menu_commands), hidden_count, max_commands,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"[%s] Could not register Telegram command menu: %s",
|
||||
self.name,
|
||||
e,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
self._mark_connected()
|
||||
mode = "webhook" if self._webhook_mode else "polling"
|
||||
logger.info("[%s] Connected to Telegram (%s mode)", self.name, mode)
|
||||
|
|
@ -2904,23 +2952,15 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
self._polling_heartbeat_loop()
|
||||
)
|
||||
|
||||
# Surface the gateway as "Online" in the bot's short description
|
||||
# (opt-in via extra.status_indicator). Non-fatal.
|
||||
try:
|
||||
await self._set_status_indicator(online=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Set up DM topics (Bot API 9.4 — Private Chat Topics)
|
||||
# Runs after connection is established so the bot can call createForumTopic.
|
||||
# Failures here are non-fatal — the bot works fine without topics.
|
||||
try:
|
||||
await self._setup_dm_topics()
|
||||
except Exception as topics_err:
|
||||
logger.warning(
|
||||
"[%s] DM topics setup failed (non-fatal): %s",
|
||||
self.name, topics_err, exc_info=True,
|
||||
)
|
||||
# Command-menu registration, DM-topic setup, and the status
|
||||
# indicator each make Bot API calls that can stall for certain
|
||||
# tokens. Running them here — inside the connect() coroutine that
|
||||
# the gateway wraps in a connect timeout — means one slow call
|
||||
# blows the whole connect and the adapter never comes up, even
|
||||
# though polling/webhook is already live (#46298). Defer them to a
|
||||
# cancellable background task so connect() returns as soon as the
|
||||
# transport is up.
|
||||
self._start_post_connect_housekeeping()
|
||||
|
||||
return True
|
||||
|
||||
|
|
@ -3014,6 +3054,15 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
# from being scheduled by late update handlers.
|
||||
self._mark_disconnected()
|
||||
|
||||
# Cancel deferred post-connect housekeeping (command-menu / DM-topic /
|
||||
# status-indicator Bot API calls) so it cannot fire into a half-torn-down
|
||||
# bot client (#46298).
|
||||
post_connect_task = self._post_connect_task
|
||||
if post_connect_task and not post_connect_task.done():
|
||||
post_connect_task.cancel()
|
||||
await asyncio.gather(post_connect_task, return_exceptions=True)
|
||||
self._post_connect_task = None
|
||||
|
||||
# Cancel the heartbeat before tearing down the app so the probe task
|
||||
# cannot fire get_me() into a half-shutdown bot client.
|
||||
if self._polling_heartbeat_task and not self._polling_heartbeat_task.done():
|
||||
|
|
|
|||
|
|
@ -313,6 +313,77 @@ async def test_connect_clears_webhook_before_polling(monkeypatch):
|
|||
await _cancel_heartbeat(adapter)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_connect_does_not_block_on_post_connect_housekeeping(monkeypatch):
|
||||
"""Regression for #46298.
|
||||
|
||||
Command-menu registration and DM-topic setup make Bot API calls that can
|
||||
stall for certain tokens. If they run inside connect() (which the gateway
|
||||
wraps in a connect timeout), one slow call blows the whole connect and the
|
||||
adapter never comes up. connect() must return as soon as polling/webhook is
|
||||
live and defer that housekeeping to a cancellable background task.
|
||||
"""
|
||||
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***"))
|
||||
|
||||
monkeypatch.setattr(
|
||||
"gateway.status.acquire_scoped_lock",
|
||||
lambda scope, identity, metadata=None: (True, None),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"gateway.status.release_scoped_lock",
|
||||
lambda scope, identity: None,
|
||||
)
|
||||
|
||||
async def _hang_forever(*args, **kwargs):
|
||||
await asyncio.Future()
|
||||
|
||||
# Make the entire housekeeping coroutine hang. connect() must still return
|
||||
# promptly and expose the still-running task; disconnect() must cancel it.
|
||||
monkeypatch.setattr(adapter, "_run_post_connect_housekeeping", _hang_forever)
|
||||
|
||||
updater = SimpleNamespace(
|
||||
start_polling=AsyncMock(),
|
||||
stop=AsyncMock(),
|
||||
running=True,
|
||||
)
|
||||
bot = SimpleNamespace(
|
||||
delete_webhook=AsyncMock(),
|
||||
set_my_commands=AsyncMock(),
|
||||
)
|
||||
app = SimpleNamespace(
|
||||
bot=bot,
|
||||
updater=updater,
|
||||
add_handler=MagicMock(),
|
||||
initialize=AsyncMock(),
|
||||
start=AsyncMock(),
|
||||
running=True,
|
||||
stop=AsyncMock(),
|
||||
shutdown=AsyncMock(),
|
||||
)
|
||||
builder = MagicMock()
|
||||
builder.token.return_value = builder
|
||||
builder.request.return_value = builder
|
||||
builder.get_updates_request.return_value = builder
|
||||
builder.build.return_value = app
|
||||
monkeypatch.setattr(
|
||||
"plugins.platforms.telegram.adapter.Application",
|
||||
SimpleNamespace(builder=MagicMock(return_value=builder)),
|
||||
)
|
||||
|
||||
# A tight timeout: if connect() awaited the hanging set_my_commands this
|
||||
# would raise TimeoutError instead of returning.
|
||||
ok = await asyncio.wait_for(adapter.connect(), timeout=0.5)
|
||||
|
||||
assert ok is True
|
||||
assert adapter._post_connect_task is not None
|
||||
assert not adapter._post_connect_task.done()
|
||||
|
||||
# disconnect() must cancel the still-hanging housekeeping task cleanly.
|
||||
await adapter.disconnect()
|
||||
assert adapter._post_connect_task is None
|
||||
await _cancel_heartbeat(adapter)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_disconnect_skips_inactive_updater_and_app(monkeypatch):
|
||||
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***"))
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue