diff --git a/gateway/authz_mixin.py b/gateway/authz_mixin.py index 64fb05d3b..c46efd900 100644 --- a/gateway/authz_mixin.py +++ b/gateway/authz_mixin.py @@ -31,7 +31,41 @@ from gateway.whatsapp_identity import ( class GatewayAuthorizationMixin: """User/chat authorization methods for ``GatewayRunner``.""" - def _adapter_authorization_is_upstream(self, platform: Optional[Platform]) -> bool: + def _authorization_adapter( + self, + platform: Optional[Platform], + profile: Optional[str] = None, + ): + """Resolve the live adapter whose intake policy should gate authorization. + + In multiplex mode, secondary-profile adapters live in + ``_profile_adapters[profile]`` while the default/active profile uses + ``self.adapters``. ``SessionSource.profile`` selects which map to consult. + When a stamped profile has its own adapter registry entry, the default + profile's same-platform adapter must not be consulted as a fallback. + """ + if not platform: + return None + profile_name = (profile or "").strip() or None + if profile_name: + profile_adapters = getattr(self, "_profile_adapters", None) or {} + if profile_name in profile_adapters: + return profile_adapters[profile_name].get(platform) + adapters = getattr(self, "adapters", None) or {} + return adapters.get(platform) + + def _adapter_for_source(self, source: Optional[SessionSource]): + """Resolve the live adapter for an inbound ``SessionSource``.""" + if source is None: + return None + return self._authorization_adapter(source.platform, source.profile) + + def _adapter_authorization_is_upstream( + self, + platform: Optional[Platform], + *, + profile: Optional[str] = None, + ) -> bool: """Whether the adapter for *platform* delegates authz to a trusted upstream. Mirrors ``BasePlatformAdapter.authorization_is_upstream``. The relay @@ -45,15 +79,17 @@ class GatewayAuthorizationMixin: """ if not platform: return False - adapters = getattr(self, "adapters", None) - if not adapters: - return False - adapter = adapters.get(platform) + adapter = self._authorization_adapter(platform, profile) if adapter is None: return False return bool(getattr(adapter, "authorization_is_upstream", False)) - def _adapter_enforces_own_access_policy(self, platform: Optional[Platform]) -> bool: + def _adapter_enforces_own_access_policy( + self, + platform: Optional[Platform], + *, + profile: Optional[str] = None, + ) -> bool: """Whether the adapter for *platform* gates access at intake itself. Mirrors ``BasePlatformAdapter.enforces_own_access_policy``. Adapters @@ -71,15 +107,17 @@ class GatewayAuthorizationMixin: # Some test helpers build a bare GatewayRunner via object.__new__ and # never set ``adapters``; treat a missing/empty map as "no adapter" # rather than raising (see pitfalls.md #17). - adapters = getattr(self, "adapters", None) - if not adapters: - return False - adapter = adapters.get(platform) + adapter = self._authorization_adapter(platform, profile) if adapter is None: return False return bool(getattr(adapter, "enforces_own_access_policy", False)) - def _adapter_dm_policy(self, platform: Optional[Platform]) -> str: + def _adapter_dm_policy( + self, + platform: Optional[Platform], + *, + profile: Optional[str] = None, + ) -> str: """Best-effort read of an own-policy adapter's effective DM policy. Returns the lowercased ``dm_policy`` (``"open"`` / ``"allowlist"`` / @@ -97,8 +135,7 @@ class GatewayAuthorizationMixin: """ if not platform: return "" - adapters = getattr(self, "adapters", None) or {} - adapter = adapters.get(platform) + adapter = self._authorization_adapter(platform, profile) policy = getattr(adapter, "_dm_policy", None) if adapter is not None else None if policy is None: config = getattr(self, "config", None) @@ -112,7 +149,12 @@ class GatewayAuthorizationMixin: policy = extra.get("dm_policy") return str(policy or "").strip().lower() - def _adapter_group_policy(self, platform: Optional[Platform]) -> str: + def _adapter_group_policy( + self, + platform: Optional[Platform], + *, + profile: Optional[str] = None, + ) -> str: """Best-effort read of an own-policy adapter's effective group policy. Mirror of ``_adapter_dm_policy`` for group / forum / channel traffic: @@ -128,8 +170,7 @@ class GatewayAuthorizationMixin: """ if not platform: return "" - adapters = getattr(self, "adapters", None) or {} - adapter = adapters.get(platform) + adapter = self._authorization_adapter(platform, profile) policy = getattr(adapter, "_group_policy", None) if adapter is not None else None if policy is None: config = getattr(self, "config", None) @@ -147,6 +188,8 @@ class GatewayAuthorizationMixin: self, platform: Optional[Platform], chat_id: Optional[str], + *, + profile: Optional[str] = None, ) -> bool: """Whether a per-group sender allowlist gated this group message. @@ -159,8 +202,7 @@ class GatewayAuthorizationMixin: """ if not platform or not chat_id: return False - adapters = getattr(self, "adapters", None) or {} - adapter = adapters.get(platform) + adapter = self._authorization_adapter(platform, profile) groups = getattr(adapter, "_groups", None) if adapter is not None else None if groups is None: config = getattr(self, "config", None) @@ -243,7 +285,8 @@ class GatewayAuthorizationMixin: # non-bool stand-in (e.g. a MagicMock attribute auto-vivifies truthy in # tests) — defensive against accidental fail-open. if source.delivered_via_upstream_relay is True or self._adapter_authorization_is_upstream( - source.platform + source.platform, + profile=source.profile, ): return True @@ -401,16 +444,26 @@ class GatewayAuthorizationMixin: # flag (checked above), and the pairing flow remain the explicit # opt-ins to broader access. (#34515 follow-up: trusting "open" was a # fail-open.) - if self._adapter_enforces_own_access_policy(source.platform): + if self._adapter_enforces_own_access_policy( + source.platform, + profile=source.profile, + ): if source.chat_type in {"group", "forum", "channel"}: - effective_policy = self._adapter_group_policy(source.platform) + effective_policy = self._adapter_group_policy( + source.platform, + profile=source.profile, + ) if self._adapter_group_has_sender_allowlist( source.platform, source.chat_id, + profile=source.profile, ): return True else: - effective_policy = self._adapter_dm_policy(source.platform) + effective_policy = self._adapter_dm_policy( + source.platform, + profile=source.profile, + ) if effective_policy == "allowlist": return True # No allowlists configured -- check global allow-all flag @@ -507,7 +560,12 @@ class GatewayAuthorizationMixin: return bool(check_ids & allowed_ids) - def _get_unauthorized_dm_behavior(self, platform: Optional[Platform]) -> str: + def _get_unauthorized_dm_behavior( + self, + platform: Optional[Platform], + *, + profile: Optional[str] = None, + ) -> str: """Return how unauthorized DMs should be handled for a platform. Resolution order: @@ -552,15 +610,19 @@ class GatewayAuthorizationMixin: # allowlist or disabled DM policy means the operator restricted access, # so unauthorized DMs should be dropped silently rather than answered # with a pairing code. An explicit pairing policy opts back into codes. - if platform and config and hasattr(config, "platforms"): - platform_cfg = config.platforms.get(platform) - extra = getattr(platform_cfg, "extra", None) if platform_cfg else None - if isinstance(extra, dict): - dm_policy = str(extra.get("dm_policy") or "").strip().lower() - if dm_policy == "pairing": - return "pair" - if dm_policy in {"allowlist", "disabled"}: - return "ignore" + # Prefer the profile-scoped live adapter's resolved policy in multiplex + # mode; fall back to the default profile's config.extra. + if platform: + dm_policy = self._adapter_dm_policy(platform, profile=profile) + if not dm_policy and config and hasattr(config, "platforms"): + platform_cfg = config.platforms.get(platform) + extra = getattr(platform_cfg, "extra", None) if platform_cfg else None + if isinstance(extra, dict): + dm_policy = str(extra.get("dm_policy") or "").strip().lower() + if dm_policy == "pairing": + return "pair" + if dm_policy in {"allowlist", "disabled"}: + return "ignore" # No explicit override. Fall back to allowlist-aware default: # if any allowlist is configured for this platform, silently drop diff --git a/gateway/platforms/qqbot/adapter.py b/gateway/platforms/qqbot/adapter.py index 953266213..300a93556 100644 --- a/gateway/platforms/qqbot/adapter.py +++ b/gateway/platforms/qqbot/adapter.py @@ -12,9 +12,9 @@ Configuration in config.yaml: app_id: "your-app-id" # or QQ_APP_ID env var client_secret: "your-secret" # or QQ_CLIENT_SECRET env var markdown_support: true # enable QQ markdown (msg_type 2) - dm_policy: "open" # open | allowlist | disabled + dm_policy: "pairing" # open | allowlist | disabled | pairing allow_from: ["openid_1"] - group_policy: "open" # open | allowlist | disabled + group_policy: "pairing" # open | allowlist | disabled | pairing group_allow_from: ["group_openid_1"] stt: # Voice-to-text config (optional) provider: "zai" # zai (GLM-ASR), openai (Whisper), etc. @@ -208,11 +208,11 @@ class QQAdapter(BasePlatformAdapter): self._markdown_support = bool(extra.get("markdown_support", True)) # Auth/ACL policies - self._dm_policy = str(extra.get("dm_policy", "open")).strip().lower() + self._dm_policy = str(extra.get("dm_policy", "pairing")).strip().lower() self._allow_from = _coerce_list( extra.get("allow_from") or extra.get("allowFrom") ) - self._group_policy = str(extra.get("group_policy", "open")).strip().lower() + self._group_policy = str(extra.get("group_policy", "pairing")).strip().lower() self._group_allow_from = _coerce_list( extra.get("group_allow_from") or extra.get("groupAllowFrom") ) @@ -1214,7 +1214,7 @@ class QQAdapter(BasePlatformAdapter): user_openid = str(author.get("user_openid", "")) if not user_openid: return - if not self._is_dm_allowed(user_openid): + if not self._is_dm_intake_allowed(user_openid): return text = content @@ -1454,7 +1454,7 @@ class QQAdapter(BasePlatformAdapter): # Without this check any member of any guild the bot is in could # bypass the configured allowlist via direct messages. author_id = str(author.get("id", "")) - if not self._is_dm_allowed(author_id): + if not self._is_dm_intake_allowed(author_id): logger.debug( "[%s] Guild DM blocked by ACL: guild=%s user=%s", self._log_tag, guild_id, author_id, @@ -3142,19 +3142,44 @@ class QQAdapter(BasePlatformAdapter): stripped = re.sub(r"^@\S+\s*", "", content.strip()) return stripped + def _open_dm_opted_in(self) -> bool: + if os.getenv("GATEWAY_ALLOW_ALL_USERS", "").lower() in {"true", "1", "yes"}: + return True + return os.getenv("QQ_ALLOW_ALL_USERS", "").lower() in {"true", "1", "yes"} + def _is_dm_allowed(self, user_id: str) -> bool: if self._dm_policy == "disabled": return False if self._dm_policy == "allowlist": return self._entry_matches(self._allow_from, user_id) - return True + if self._dm_policy == "open": + return self._open_dm_opted_in() + return False + + def _is_dm_intake_allowed(self, user_id: str) -> bool: + principal = str(user_id or "").strip() + if not principal: + return False + if self._dm_policy == "disabled": + return False + if self._dm_policy == "allowlist": + return self._entry_matches(self._allow_from, principal) + if self._dm_policy == "pairing": + return True + if self._dm_policy == "open": + return self._open_dm_opted_in() + return False def _is_group_allowed(self, group_id: str, user_id: str) -> bool: if self._group_policy == "disabled": return False if self._group_policy == "allowlist": return self._entry_matches(self._group_allow_from, group_id) - return True + if self._group_policy == "pairing": + return False + if self._group_policy == "open": + return True + return False @staticmethod def _entry_matches(entries: List[str], target: str) -> bool: diff --git a/gateway/platforms/weixin.py b/gateway/platforms/weixin.py index 4e86dd0bf..d78eb4aad 100644 --- a/gateway/platforms/weixin.py +++ b/gateway/platforms/weixin.py @@ -1193,7 +1193,7 @@ class WeixinAdapter(BasePlatformAdapter): ) self._rate_limit_circuit_until = 0.0 self._rate_limit_events: List[float] = [] - self._dm_policy = str(extra.get("dm_policy") or os.getenv("WEIXIN_DM_POLICY", "open")).strip().lower() + self._dm_policy = str(extra.get("dm_policy") or os.getenv("WEIXIN_DM_POLICY", "pairing")).strip().lower() self._group_policy = str(extra.get("group_policy") or os.getenv("WEIXIN_GROUP_POLICY", "disabled")).strip().lower() allow_from = extra.get("allow_from") if allow_from is None: @@ -1427,7 +1427,9 @@ class WeixinAdapter(BasePlatformAdapter): return if self._group_policy == "allowlist" and effective_chat_id not in self._group_allow_from: return - elif not self._is_dm_allowed(sender_id): + if self._group_policy == "pairing": + return + elif not self._is_dm_intake_allowed(sender_id): return context_token = str(message.get("context_token") or "").strip() @@ -1470,12 +1472,30 @@ class WeixinAdapter(BasePlatformAdapter): else: await self.handle_message(event) + def _open_dm_opted_in(self) -> bool: + if os.getenv("GATEWAY_ALLOW_ALL_USERS", "").lower() in {"true", "1", "yes"}: + return True + return os.getenv("WEIXIN_ALLOW_ALL_USERS", "").lower() in {"true", "1", "yes"} + def _is_dm_allowed(self, sender_id: str) -> bool: if self._dm_policy == "disabled": return False if self._dm_policy == "allowlist": return sender_id in self._allow_from - return True + if self._dm_policy == "open": + return self._open_dm_opted_in() + return False + + def _is_dm_intake_allowed(self, sender_id: str) -> bool: + if self._dm_policy == "disabled": + return False + if self._dm_policy == "allowlist": + return sender_id in self._allow_from + if self._dm_policy == "pairing": + return True + if self._dm_policy == "open": + return self._open_dm_opted_in() + return False @property def enforces_own_access_policy(self) -> bool: diff --git a/gateway/platforms/whatsapp_common.py b/gateway/platforms/whatsapp_common.py index 54a919091..c5bf50556 100644 --- a/gateway/platforms/whatsapp_common.py +++ b/gateway/platforms/whatsapp_common.py @@ -147,6 +147,11 @@ class WhatsAppBehaviorMixin: return False # ------------------------------------------------------------------ gating + def _open_dm_opted_in(self) -> bool: + if os.getenv("GATEWAY_ALLOW_ALL_USERS", "").lower() in {"true", "1", "yes"}: + return True + return os.getenv("WHATSAPP_ALLOW_ALL_USERS", "").lower() in {"true", "1", "yes"} + @staticmethod def _matches_whatsapp_allowlist(candidate: str, allow_from) -> bool: """Match a WhatsApp identifier against an allowlist across phone/LID forms. @@ -187,13 +192,29 @@ class WhatsAppBehaviorMixin: return False def _is_dm_allowed(self, sender_id: str) -> bool: - """Check whether a DM from the given sender should be processed.""" + """Strict DM authorization — pairing does not imply access.""" if self._dm_policy == "disabled": return False if self._dm_policy == "allowlist": return self._matches_whatsapp_allowlist(sender_id, self._allow_from) - # "open" — all DMs allowed - return True + if self._dm_policy == "open": + return self._open_dm_opted_in() + return False + + def _is_dm_intake_allowed(self, sender_id: str) -> bool: + """Whether a DM may reach the gateway intake (pairing handshake path).""" + principal = str(sender_id or "").strip() + if not principal: + return False + if self._dm_policy == "disabled": + return False + if self._dm_policy == "allowlist": + return self._matches_whatsapp_allowlist(principal, self._allow_from) + if self._dm_policy == "pairing": + return True + if self._dm_policy == "open": + return self._open_dm_opted_in() + return False def _is_group_allowed(self, chat_id: str) -> bool: """Check whether a group chat should be processed.""" @@ -201,8 +222,11 @@ class WhatsAppBehaviorMixin: return False if self._group_policy == "allowlist": return self._matches_whatsapp_allowlist(chat_id, self._group_allow_from) - # "open" — all groups allowed - return True + if self._group_policy == "pairing": + return False + if self._group_policy == "open": + return True + return False def _compile_mention_patterns(self): patterns = self.config.extra.get("mention_patterns") @@ -318,7 +342,7 @@ class WhatsAppBehaviorMixin: return False else: sender_id = str(data.get("senderId") or data.get("from") or "") - if not self._is_dm_allowed(sender_id): + if not self._is_dm_intake_allowed(sender_id): return False # DMs that pass the policy gate are always processed return True diff --git a/gateway/platforms/yuanbao.py b/gateway/platforms/yuanbao.py index 7f3b1f34e..a26f3a43b 100644 --- a/gateway/platforms/yuanbao.py +++ b/gateway/platforms/yuanbao.py @@ -1545,13 +1545,35 @@ class AccessPolicy: self._group_policy = group_policy self._group_allow_from = group_allow_from + def _open_dm_opted_in(self) -> bool: + if os.getenv("GATEWAY_ALLOW_ALL_USERS", "").lower() in {"true", "1", "yes"}: + return True + return os.getenv("YUANBAO_ALLOW_ALL_USERS", "").lower() in {"true", "1", "yes"} + def is_dm_allowed(self, sender_id: str) -> bool: - """Platform-level DM inbound filter (open / allowlist / disabled).""" + """Strict DM authorization — pairing does not imply access.""" if self._dm_policy == "disabled": return False if self._dm_policy == "allowlist": return sender_id.strip() in self._dm_allow_from - return True + if self._dm_policy == "open": + return self._open_dm_opted_in() + return False + + def is_dm_intake_allowed(self, sender_id: str) -> bool: + """Whether a DM may reach gateway intake (pairing handshake path).""" + principal = str(sender_id or "").strip() + if not principal: + return False + if self._dm_policy == "disabled": + return False + if self._dm_policy == "allowlist": + return principal in self._dm_allow_from + if self._dm_policy == "pairing": + return True + if self._dm_policy == "open": + return self._open_dm_opted_in() + return False def is_group_allowed(self, group_code: str) -> bool: """Platform-level group chat inbound filter (open / allowlist / disabled).""" @@ -1559,7 +1581,11 @@ class AccessPolicy: return False if self._group_policy == "allowlist": return group_code.strip() in self._group_allow_from - return True + if self._group_policy == "pairing": + return False + if self._group_policy == "open": + return self._open_dm_opted_in() + return False @property def dm_policy(self) -> str: @@ -1579,7 +1605,7 @@ class AccessGuardMiddleware(InboundMiddleware): adapter = ctx.adapter policy: AccessPolicy = adapter._access_policy if ctx.chat_type == "dm": - if not policy.is_dm_allowed(ctx.from_account): + if not policy.is_dm_intake_allowed(ctx.from_account): logger.debug( "[%s] DM from %s blocked by dm_policy=%s", adapter.name, ctx.from_account, policy.dm_policy, @@ -1601,13 +1627,19 @@ class AutoSetHomeMiddleware(InboundMiddleware): Triggers when no home channel is configured, or when an existing group-chat home is superseded by the first DM (direct > group upgrade). Silent: writes config.yaml and env, no user-facing message. + + Runs after :class:`BuildSourceMiddleware` and :class:`GroupAtGuardMiddleware` + so unaddressed group traffic is dropped before home-channel persistence. + Only senders that pass strict authorization (allowlist / explicit open + opt-in / pairing-store approval) may claim ``YUANBAO_HOME_CHANNEL``. + Intake-only pairing forwards must not claim ``YUANBAO_HOME_CHANNEL``. """ name = "auto-sethome" async def handle(self, ctx: InboundContext, next_fn) -> None: adapter = ctx.adapter - if not adapter._auto_sethome_done: + if not adapter._auto_sethome_done and adapter._sender_may_designate_home(ctx): _cur_home = os.getenv("YUANBAO_HOME_CHANNEL", "") _should_set = ( not _cur_home @@ -3180,12 +3212,12 @@ class InboundPipelineBuilder: SkipSelfMiddleware, ChatRoutingMiddleware, AccessGuardMiddleware, - AutoSetHomeMiddleware, ExtractContentMiddleware, PlaceholderFilterMiddleware, OwnerCommandMiddleware, BuildSourceMiddleware, GroupAtGuardMiddleware, + AutoSetHomeMiddleware, GroupAttributionMiddleware, ClassifyMessageTypeMiddleware, QuoteContextMiddleware, @@ -5050,7 +5082,7 @@ class YuanbaoAdapter(BasePlatformAdapter): # ------------------------------------------------------------------ dm_policy: str = ( _extra.get("dm_policy") - or os.getenv("YUANBAO_DM_POLICY", "open") + or os.getenv("YUANBAO_DM_POLICY", "pairing") ).strip().lower() _dm_allow_from_raw: str = ( @@ -5061,7 +5093,7 @@ class YuanbaoAdapter(BasePlatformAdapter): group_policy: str = ( _extra.get("group_policy") - or os.getenv("YUANBAO_GROUP_POLICY", "open") + or os.getenv("YUANBAO_GROUP_POLICY", "pairing") ).strip().lower() _group_allow_from_raw: str = ( @@ -5114,6 +5146,36 @@ class YuanbaoAdapter(BasePlatformAdapter): """Yuanbao gates DM/group access at intake via dm_policy/group_policy.""" return True + def _sender_may_designate_home(self, ctx: InboundContext) -> bool: + """True when the sender may persist ``YUANBAO_HOME_CHANNEL``. + + Intake-only pairing forwards are excluded until the sender is on the + strict allowlist, has explicit open-world opt-in, or is approved in the + pairing store. + """ + policy: AccessPolicy = self._access_policy + sender = str(ctx.from_account or "").strip() + if not sender: + return False + if ctx.chat_type == "dm": + if policy.is_dm_allowed(sender): + return True + if policy.dm_policy == "pairing": + from gateway.pairing import PairingStore + + return PairingStore().is_approved(Platform.YUANBAO.value, sender) + return False + if ctx.chat_type == "group": + group_code = str(ctx.group_code or "").strip() + if not group_code: + return False + if policy.group_policy == "allowlist": + return policy.is_group_allowed(group_code) + if policy.group_policy == "open": + return policy._open_dm_opted_in() + return False + return False + async def connect(self, *, is_reconnect: bool = False) -> bool: """Connect to Yuanbao WS gateway and authenticate. diff --git a/gateway/run.py b/gateway/run.py index d9d31acf8..48a6ba21f 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1706,6 +1706,48 @@ from gateway.whatsapp_identity import ( logger = logging.getLogger(__name__) +_OWN_POLICY_OPEN_ENV = { + Platform.WECOM: ("WECOM_DM_POLICY", "WECOM_GROUP_POLICY", "WECOM_ALLOW_ALL_USERS"), + Platform.WEIXIN: ("WEIXIN_DM_POLICY", "WEIXIN_GROUP_POLICY", "WEIXIN_ALLOW_ALL_USERS"), + Platform.YUANBAO: ("YUANBAO_DM_POLICY", "YUANBAO_GROUP_POLICY", "YUANBAO_ALLOW_ALL_USERS"), + Platform.QQBOT: (None, None, "QQ_ALLOW_ALL_USERS"), + Platform.WHATSAPP: ("WHATSAPP_DM_POLICY", "WHATSAPP_GROUP_POLICY", "WHATSAPP_ALLOW_ALL_USERS"), +} + + +def _own_policy_open_startup_violation(config) -> Optional[str]: + """Return a startup-abort reason when open policy lacks allow-all opt-in.""" + for platform, platform_config in getattr(config, "platforms", {}).items(): + if not getattr(platform_config, "enabled", False): + continue + open_env = _OWN_POLICY_OPEN_ENV.get(platform) + if not open_env: + continue + dm_env, group_env, allow_all_env = open_env + extra = getattr(platform_config, "extra", None) or {} + dm_policy = str( + extra.get("dm_policy") + or (os.getenv(dm_env, "pairing") if dm_env else "pairing") + ).strip().lower() + group_policy = str( + extra.get("group_policy") + or (os.getenv(group_env, "pairing") if group_env else "pairing") + ).strip().lower() + if dm_policy != "open" and group_policy != "open": + continue + gateway_allow_all = os.getenv( + "GATEWAY_ALLOW_ALL_USERS", "" + ).lower() in {"true", "1", "yes"} + platform_opted_in = gateway_allow_all or ( + allow_all_env + and os.getenv(allow_all_env, "").lower() in {"true", "1", "yes"} + ) + if platform_opted_in: + continue + return f"{platform.value}: open policy without allow-all opt-in" + return None + + # Sentinel placed into _running_agents immediately when a session starts # processing, *before* any await. Prevents a second message for the same # session from bypassing the "already running" guard during the async gap @@ -4713,7 +4755,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew _BUSY_QUEUE_MAX_PENDING = 32 def _queue_or_replace_pending_event(self, session_key: str, event: MessageEvent) -> None: - adapter = self.adapters.get(event.source.platform) + adapter = self._adapter_for_source(event.source) if not adapter: return # #28503 — Previously this called ``merge_pending_message_event`` @@ -4770,7 +4812,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew # --- Draining case (gateway restarting/stopping) --- if self._draining: - adapter = self.adapters.get(event.source.platform) + adapter = self._adapter_for_source(event.source) if not adapter: return True @@ -4872,7 +4914,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew ) # Normal busy case (agent actively running a task) - adapter = self.adapters.get(event.source.platform) + adapter = self._adapter_for_source(event.source) if not adapter: return False # let default path handle it @@ -6265,10 +6307,34 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew ) if not _any_allowlist and not _allow_all: logger.warning( - "No user allowlists configured. All unauthorized users will be denied. " - "Set GATEWAY_ALLOW_ALL_USERS=true in ~/.hermes/.env to allow open access, " - "or configure platform allowlists (e.g., TELEGRAM_ALLOWED_USERS=your_id)." + "No env user allowlists configured. Messaging platforms default to " + "pairing/allowlist policies and will deny unknown senders unless you " + "configure platform allowlists (e.g., TELEGRAM_ALLOWED_USERS=your_id) " + "or explicitly opt in with GATEWAY_ALLOW_ALL_USERS=true plus " + "dm_policy/group_policy: open on the platform." ) + + reason = _own_policy_open_startup_violation(self.config) + if reason: + platform_value = reason.split(":", 1)[0] + allow_all_env = None + for platform, open_env in _OWN_POLICY_OPEN_ENV.items(): + if platform.value == platform_value: + allow_all_env = open_env[2] + break + logger.error( + "Refusing to start: %s has dm_policy/group_policy set to 'open' " + "but neither GATEWAY_ALLOW_ALL_USERS nor %s is enabled.", + platform_value, + allow_all_env or "a platform allow-all flag", + ) + try: + from gateway.status import write_runtime_status + write_runtime_status(gateway_state="startup_failed", exit_reason=reason) + except Exception: + pass + self._request_clean_exit(reason) + return True # Discover Python plugins before shell hooks so plugin block # decisions take precedence in tie cases. The CLI startup path @@ -7859,6 +7925,14 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew with _profile_runtime_scope(profile_home): profile_cfg = load_gateway_config() + violation = _own_policy_open_startup_violation(profile_cfg) + if violation: + raise MultiplexConfigError( + f"Profile '{profile_name}' enables {violation}. " + "Enable GATEWAY_ALLOW_ALL_USERS or the platform allow-all flag " + "for that profile, or change dm_policy/group_policy away from " + "'open'." + ) profile_map = self._profile_adapters.setdefault(profile_name, {}) connected = 0 @@ -8255,7 +8329,14 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew elif not self._is_user_authorized(source): logger.warning("Unauthorized user: %s (%s) on %s", source.user_id, source.user_name, source.platform.value) # In DMs: offer pairing code. In groups: silently ignore. - if source.chat_type == "dm" and self._get_unauthorized_dm_behavior(source.platform) == "pair": + if ( + source.chat_type == "dm" + and self._get_unauthorized_dm_behavior( + source.platform, + profile=source.profile, + ) + == "pair" + ): platform_name = source.platform.value if source.platform else "unknown" # Rate-limit ALL pairing responses (code or rejection) to # prevent spamming the user with repeated messages when @@ -8266,7 +8347,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew platform_name, source.user_id, source.user_name or "" ) if code: - adapter = self.adapters.get(source.platform) + adapter = self._adapter_for_source(source) if adapter: await adapter.send( source.chat_id, @@ -8276,7 +8357,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew f"`hermes pairing approve {platform_name} {code}`" ) else: - adapter = self.adapters.get(source.platform) + adapter = self._adapter_for_source(source) if adapter: await adapter.send( source.chat_id, diff --git a/plugins/platforms/discord/adapter.py b/plugins/platforms/discord/adapter.py index e07d00a71..afbc1e95f 100644 --- a/plugins/platforms/discord/adapter.py +++ b/plugins/platforms/discord/adapter.py @@ -1075,11 +1075,18 @@ class DiscordAdapter(BasePlatformAdapter): # _is_allowed_user docstring). _msg_guild = getattr(message, "guild", None) _is_dm = isinstance(message.channel, discord.DMChannel) or _msg_guild is None + _msg_channel_ids = None + if not _is_dm: + _msg_channel_ids = {str(message.channel.id)} + _parent_id = adapter_self._get_parent_channel_id(message.channel) + if _parent_id: + _msg_channel_ids.add(_parent_id) if not self._is_allowed_user( str(message.author.id), message.author, guild=_msg_guild, is_dm=_is_dm, + channel_ids=_msg_channel_ids, ): return _role_authorized = bool(getattr(self, "_allowed_role_ids", set())) @@ -3085,6 +3092,18 @@ class DiscordAdapter(BasePlatformAdapter): except OSError: pass + def _discord_channel_ids_allowed(self, channel_ids: set[str]) -> bool: + """True when *channel_ids* intersect ``DISCORD_ALLOWED_CHANNELS``.""" + if not channel_ids: + return False + allowed_raw = os.getenv("DISCORD_ALLOWED_CHANNELS", "").strip() + if not allowed_raw: + return False + allowed = {c.strip() for c in allowed_raw.split(",") if c.strip()} + if "*" in allowed: + return True + return bool(channel_ids & allowed) + def _is_allowed_user( self, user_id: str, @@ -3092,11 +3111,15 @@ class DiscordAdapter(BasePlatformAdapter): *, guild=None, is_dm: bool = False, + channel_ids: Optional[set[str]] = None, ) -> bool: """Check if user is allowed via DISCORD_ALLOWED_USERS or DISCORD_ALLOWED_ROLES. Uses OR semantics: if the user matches EITHER allowlist, they're allowed. - If both allowlists are empty, everyone is allowed (backwards compatible). + With no user/role allowlists configured, guild traffic may still pass when + ``channel_ids`` matches ``DISCORD_ALLOWED_CHANNELS`` — but only when the + caller supplies the validated channel context (on_message, slash). Calls + without channel context (e.g. voice utterances) do not get this bypass. Role checks are **scoped to the guild the message originated from**. For DMs (no guild context), role-based auth is disabled by default and @@ -3111,6 +3134,8 @@ class DiscordAdapter(BasePlatformAdapter): author: Optional Member/User object for in-guild role lookup. guild: The guild the message arrived in (None for DMs). is_dm: True if the message came from a DM channel. + channel_ids: Resolved text-channel ids for guild traffic when an + upstream gate has already scoped the message to a channel. """ # ``getattr`` fallbacks here guard against test fixtures that build # an adapter via ``object.__new__(DiscordAdapter)`` and skip __init__ @@ -3120,7 +3145,20 @@ class DiscordAdapter(BasePlatformAdapter): has_users = bool(allowed_users) has_roles = bool(allowed_roles) if not has_users and not has_roles: - return True + if os.getenv("DISCORD_ALLOW_ALL_USERS", "").strip().lower() in {"true", "1", "yes"}: + return True + if os.getenv("GATEWAY_ALLOW_ALL_USERS", "").strip().lower() in {"true", "1", "yes"}: + return True + # Channel-scoped guild access requires validated channel context. + # Do not treat DISCORD_ALLOWED_CHANNELS alone as a user-wide bypass + # (voice loops and other guild-scoped callers may lack channel ids). + if ( + not is_dm + and channel_ids is not None + and self._discord_channel_ids_allowed(channel_ids) + ): + return True + return False # Check user ID allowlist (works for both DMs and guild messages). # ``"*"`` is honored as an open-mode wildcard, mirroring # ``SIGNAL_ALLOWED_USERS`` and the existing ``DISCORD_ALLOWED_CHANNELS`` / @@ -3184,11 +3222,11 @@ class DiscordAdapter(BasePlatformAdapter): # operator. ``_check_slash_authorization`` mirrors the on_message gates # one-for-one so the slash surface honors the same trust boundary. # - # By design, this is a no-op for deployments with no allowlist env vars - # set — ``_is_allowed_user`` returns True and the channel checks early-out - # — preserving the existing "single-tenant, all guild members trusted" - # default. Deployments that DO set any DISCORD_ALLOWED_* var get slash - # parity with on_message. + # Deployments with no allowlist env vars fail closed unless an explicit + # allow-all opt-in is set. When only ``DISCORD_ALLOWED_CHANNELS`` is + # configured, guild traffic is authorized per validated channel context + # (not as a user-wide bypass). Slash and on_message both pass the + # resolved channel ids into ``_is_allowed_user`` after the channel gate. def _evaluate_slash_authorization( self, interaction: "discord.Interaction", @@ -3215,6 +3253,8 @@ class DiscordAdapter(BasePlatformAdapter): chan_obj = getattr(interaction, "channel", None) in_dm = isinstance(chan_obj, discord.DMChannel) if chan_obj is not None else False + channel_ids: set = set() + channel_keys: set = set() # ── Channel scope (mirrors on_message lines 3374-3388) ── # DMs aren't channel-gated — DMs follow on_message's DM lockdown # path which has its own user-allowlist enforcement. @@ -3222,7 +3262,6 @@ class DiscordAdapter(BasePlatformAdapter): chan_id_raw = getattr(interaction, "channel_id", None) or getattr( chan_obj, "id", None, ) - channel_ids: set = set() if chan_id_raw is not None: channel_ids.add(str(chan_id_raw)) # Mirror on_message: also test the parent channel for threads @@ -3270,13 +3309,12 @@ class DiscordAdapter(BasePlatformAdapter): allowed_users = getattr(self, "_allowed_user_ids", set()) or set() allowed_roles = getattr(self, "_allowed_role_ids", set()) or set() if user is None or getattr(user, "id", None) is None: - # No identifiable user. With any user/role allowlist - # configured, fail closed rather than raise AttributeError - # on ``interaction.user.id`` below. With no allowlist this - # is the existing "no allowlist = everyone" backwards-compat. + # No identifiable user — fail closed even with allow-all opt-in. + # Downstream slash handlers (_build_slash_event, etc.) require + # interaction.user.id and do not synthesize a safe identity. if allowed_users or allowed_roles: return (False, "missing interaction.user with allowlist configured") - return (True, None) + return (False, "missing interaction.user") user_id = str(user.id) # Pass guild + is_dm so role check is scoped to the originating @@ -3288,6 +3326,7 @@ class DiscordAdapter(BasePlatformAdapter): author=user, guild=interaction_guild, is_dm=in_dm, + channel_ids=channel_keys if not in_dm else None, ): return ( False, @@ -6255,6 +6294,10 @@ def _component_check_auth( - user is approved in the pairing store -> allow - otherwise -> reject """ + user = getattr(interaction, "user", None) + if user is None or getattr(user, "id", None) is None: + return False + if os.getenv("DISCORD_ALLOW_ALL_USERS", "").strip().lower() in {"true", "1", "yes"}: return True if os.getenv("GATEWAY_ALLOW_ALL_USERS", "").strip().lower() in {"true", "1", "yes"}: @@ -6270,9 +6313,6 @@ def _component_check_auth( role_set = set(allowed_role_ids or set()) has_users = bool(user_set) has_roles = bool(role_set) - user = getattr(interaction, "user", None) - if user is None: - return False # Resolve user ID once for both allowlist and pairing checks. try: diff --git a/plugins/platforms/email/adapter.py b/plugins/platforms/email/adapter.py index 9521d586c..c9d1cb499 100644 --- a/plugins/platforms/email/adapter.py +++ b/plugins/platforms/email/adapter.py @@ -776,7 +776,17 @@ class EmailAdapter(BasePlatformAdapter): # a race between dispatch and authorization can result in the adapter # sending a reply even though the handler returned None. allowed_raw = os.getenv("EMAIL_ALLOWED_USERS", "").strip() - if allowed_raw: + if not allowed_raw: + if os.getenv("EMAIL_ALLOW_ALL_USERS", "").strip().lower() not in {"true", "1", "yes"} and ( + os.getenv("GATEWAY_ALLOW_ALL_USERS", "").strip().lower() not in {"true", "1", "yes"} + ): + logger.debug( + "[Email] Dropping sender at dispatch — EMAIL_ALLOWED_USERS is unset " + "and open access is not opted in: %s", + sender_addr, + ) + return + else: allowed = {addr.strip().lower() for addr in allowed_raw.split(",") if addr.strip()} if sender_addr.lower() not in allowed: logger.debug("[Email] Dropping non-allowlisted sender at dispatch: %s", sender_addr) diff --git a/plugins/platforms/feishu/adapter.py b/plugins/platforms/feishu/adapter.py index 12b6e3e43..ba18f2229 100644 --- a/plugins/platforms/feishu/adapter.py +++ b/plugins/platforms/feishu/adapter.py @@ -4200,6 +4200,17 @@ class FeishuAdapter(BasePlatformAdapter): return "bot_not_mentioned" if not is_group: + if os.getenv("FEISHU_ALLOW_ALL_USERS", "").strip().lower() in {"true", "1", "yes"}: + return None + if os.getenv("GATEWAY_ALLOW_ALL_USERS", "").strip().lower() in {"true", "1", "yes"}: + return None + # Empty FEISHU_ALLOWED_USERS is the pairing-mode default from setup: + # forward DMs to gateway intake so the pairing handshake can run. + # Gateway auth fail-closes agent access until approval. + if not self._allowed_group_users: + return None + if not (sender_ids and (sender_ids & self._allowed_group_users)): + return "dm_policy_rejected" return None if not self._allow_group_message( diff --git a/plugins/platforms/wecom/adapter.py b/plugins/platforms/wecom/adapter.py index 7d809c19a..1a1421b9d 100644 --- a/plugins/platforms/wecom/adapter.py +++ b/plugins/platforms/wecom/adapter.py @@ -18,9 +18,9 @@ Configuration in config.yaml: bot_id: "your-bot-id" # or WECOM_BOT_ID env var secret: "your-secret" # or WECOM_SECRET env var websocket_url: "wss://openws.work.weixin.qq.com" - dm_policy: "open" # open | allowlist | disabled | pairing + dm_policy: "pairing" # open | allowlist | disabled | pairing allow_from: ["user_id_1"] - group_policy: "open" # open | allowlist | disabled + group_policy: "pairing" # open | allowlist | disabled | pairing group_allow_from: ["group_id_1"] groups: group_id_1: @@ -161,7 +161,7 @@ class WeComAdapter(BasePlatformAdapter): or os.getenv("WECOM_WEBSOCKET_URL", DEFAULT_WS_URL) ).strip() or DEFAULT_WS_URL - self._dm_policy = str(extra.get("dm_policy") or os.getenv("WECOM_DM_POLICY", "open")).strip().lower() + self._dm_policy = str(extra.get("dm_policy") or os.getenv("WECOM_DM_POLICY", "pairing")).strip().lower() # dm_policy already honors WECOM_DM_POLICY, so the allowlist must honor # WECOM_ALLOWED_USERS too. Without the env fallback an env-only setup # (dm_policy=allowlist via env, no config extra) runs with an empty @@ -172,7 +172,7 @@ class WeComAdapter(BasePlatformAdapter): or os.getenv("WECOM_ALLOWED_USERS", "") ) - self._group_policy = str(extra.get("group_policy") or os.getenv("WECOM_GROUP_POLICY", "open")).strip().lower() + self._group_policy = str(extra.get("group_policy") or os.getenv("WECOM_GROUP_POLICY", "pairing")).strip().lower() self._group_allow_from = _coerce_list(extra.get("group_allow_from") or extra.get("groupAllowFrom")) self._groups = extra.get("groups") if isinstance(extra.get("groups"), dict) else {} @@ -514,7 +514,7 @@ class WeComAdapter(BasePlatformAdapter): if not self._is_group_allowed(chat_id, sender_id): logger.debug("[%s] Group %s / sender %s blocked by policy", self.name, chat_id, sender_id) return - elif not self._is_dm_allowed(sender_id): + elif not self._is_dm_intake_allowed(sender_id): logger.debug("[%s] DM sender %s blocked by policy", self.name, sender_id) return @@ -861,16 +861,39 @@ class WeComAdapter(BasePlatformAdapter): """WeCom gates DM/group access at intake via dm_policy/group_policy.""" return True + def _open_dm_opted_in(self) -> bool: + if os.getenv("GATEWAY_ALLOW_ALL_USERS", "").lower() in {"true", "1", "yes"}: + return True + return os.getenv("WECOM_ALLOW_ALL_USERS", "").lower() in {"true", "1", "yes"} + def _is_dm_allowed(self, sender_id: str) -> bool: if self._dm_policy == "disabled": return False if self._dm_policy == "allowlist": return _entry_matches(self._allow_from, sender_id) - return True + if self._dm_policy == "open": + return self._open_dm_opted_in() + return False + + def _is_dm_intake_allowed(self, sender_id: str) -> bool: + principal = str(sender_id or "").strip() + if not principal: + return False + if self._dm_policy == "disabled": + return False + if self._dm_policy == "allowlist": + return _entry_matches(self._allow_from, principal) + if self._dm_policy == "pairing": + return True + if self._dm_policy == "open": + return self._open_dm_opted_in() + return False def _is_group_allowed(self, chat_id: str, sender_id: str) -> bool: if self._group_policy == "disabled": return False + if self._group_policy == "pairing": + return False if self._group_policy == "allowlist" and not _entry_matches(self._group_allow_from, chat_id): return False diff --git a/plugins/platforms/whatsapp/adapter.py b/plugins/platforms/whatsapp/adapter.py index c3c996b73..8a51030e2 100644 --- a/plugins/platforms/whatsapp/adapter.py +++ b/plugins/platforms/whatsapp/adapter.py @@ -374,9 +374,9 @@ class WhatsAppAdapter(WhatsAppBehaviorMixin, BasePlatformAdapter): - bridge_script: Path to the Node.js bridge script - bridge_port: Port for HTTP communication (default: 3000) - session_path: Path to store WhatsApp session data - - dm_policy: "open" | "allowlist" | "disabled" — how DMs are handled (default: "open") + - dm_policy: "open" | "allowlist" | "disabled" | "pairing" — how DMs are handled (default: "pairing") - allow_from: List of sender IDs allowed in DMs (when dm_policy="allowlist") - - group_policy: "open" | "allowlist" | "disabled" — which groups are processed (default: "open") + - group_policy: "open" | "allowlist" | "disabled" | "pairing" — which groups are processed (default: "pairing") - group_allow_from: List of group JIDs allowed (when group_policy="allowlist") Behavior (gating, mention parsing, markdown conversion, chunking) is @@ -405,9 +405,9 @@ class WhatsAppAdapter(WhatsAppBehaviorMixin, BasePlatformAdapter): get_hermes_dir("platforms/whatsapp/session", "whatsapp/session") )) self._reply_prefix: Optional[str] = config.extra.get("reply_prefix") - self._dm_policy = str(config.extra.get("dm_policy") or os.getenv("WHATSAPP_DM_POLICY", "open")).strip().lower() + self._dm_policy = str(config.extra.get("dm_policy") or os.getenv("WHATSAPP_DM_POLICY", "pairing")).strip().lower() self._allow_from = self._coerce_allow_list(config.extra.get("allow_from") or config.extra.get("allowFrom")) - self._group_policy = str(config.extra.get("group_policy") or os.getenv("WHATSAPP_GROUP_POLICY", "open")).strip().lower() + self._group_policy = str(config.extra.get("group_policy") or os.getenv("WHATSAPP_GROUP_POLICY", "pairing")).strip().lower() self._group_allow_from = self._coerce_allow_list(config.extra.get("group_allow_from") or config.extra.get("groupAllowFrom")) self._mention_patterns = self._compile_mention_patterns() self._message_queue: asyncio.Queue = asyncio.Queue() diff --git a/tests/gateway/test_config_driven_access_policy.py b/tests/gateway/test_config_driven_access_policy.py index 4bfbdf59c..4f97d941f 100644 --- a/tests/gateway/test_config_driven_access_policy.py +++ b/tests/gateway/test_config_driven_access_policy.py @@ -149,6 +149,19 @@ def test_own_policy_allowlist_authorized_without_env_allowlist(monkeypatch, plat assert runner._is_user_authorized(_source(platform)) is True +@pytest.mark.parametrize("platform", _OWN_POLICY_PLATFORMS) +def test_own_policy_open_dm_authorized_with_gateway_allow_all(monkeypatch, platform): + """Explicit ``GATEWAY_ALLOW_ALL_USERS`` unlocks ``dm_policy: open``.""" + _clear_auth_env(monkeypatch) + monkeypatch.setenv("GATEWAY_ALLOW_ALL_USERS", "true") + config = GatewayConfig( + platforms={platform: PlatformConfig(enabled=True, extra={"dm_policy": "open"})} + ) + runner, _adapter = _make_runner(platform, config, enforces=True) + + assert runner._is_user_authorized(_source(platform)) is True + + @pytest.mark.parametrize("platform", _OWN_POLICY_PLATFORMS) def test_own_policy_open_dm_not_authorized_without_allowlist(monkeypatch, platform): """``dm_policy: open`` forwards everyone → NOT authorization (SECURITY.md §2.6). @@ -207,6 +220,82 @@ def test_own_policy_open_group_not_authorized_without_allowlist(monkeypatch, pla assert runner._is_user_authorized(_source(platform, chat_type="group")) is False +@pytest.mark.parametrize( + "module_path, class_name, dm_helper", + [ + ("plugins.platforms.whatsapp.adapter", "WhatsAppAdapter", "_is_dm_allowed"), + ("plugins.platforms.wecom.adapter", "WeComAdapter", "_is_dm_allowed"), + ("gateway.platforms.weixin", "WeixinAdapter", "_is_dm_allowed"), + ("gateway.platforms.qqbot.adapter", "QQAdapter", "_is_dm_allowed"), + ], +) +def test_pairing_dm_policy_strict_intake_auth_denies_unknown( + monkeypatch, module_path, class_name, dm_helper, +): + """Default ``dm_policy: pairing`` must not admit senders via strict auth.""" + _clear_auth_env(monkeypatch) + import importlib + + from gateway.config import PlatformConfig + + module = importlib.import_module(module_path) + adapter_cls = getattr(module, class_name) + adapter = adapter_cls(PlatformConfig(enabled=True, extra={"dm_policy": "pairing"})) + assert getattr(adapter, dm_helper)("unknown-user") is False + + +@pytest.mark.parametrize( + "module_path, class_name, intake_helper", + [ + ("gateway.platforms.qqbot.adapter", "QQAdapter", "_is_dm_intake_allowed"), + ("plugins.platforms.wecom.adapter", "WeComAdapter", "_is_dm_intake_allowed"), + ("plugins.platforms.whatsapp.adapter", "WhatsAppAdapter", "_is_dm_intake_allowed"), + ], +) +@pytest.mark.parametrize("blank_sender", ["", " ", None]) +def test_pairing_dm_intake_denies_blank_principal( + monkeypatch, module_path, class_name, intake_helper, blank_sender, +): + """Pairing intake must not forward senderless DM callbacks to the gateway.""" + _clear_auth_env(monkeypatch) + import importlib + + from gateway.config import PlatformConfig + + module = importlib.import_module(module_path) + adapter_cls = getattr(module, class_name) + adapter = adapter_cls(PlatformConfig(enabled=True, extra={"dm_policy": "pairing"})) + assert getattr(adapter, intake_helper)(blank_sender) is False + + +@pytest.mark.parametrize("blank_sender", ["", " ", None]) +def test_yuanbao_pairing_dm_intake_denies_blank_principal(monkeypatch, blank_sender): + """Yuanbao pairing intake must not forward senderless C2C callbacks.""" + _clear_auth_env(monkeypatch) + from gateway.platforms.yuanbao import AccessPolicy + + policy = AccessPolicy( + dm_policy="pairing", + dm_allow_from=[], + group_policy="pairing", + group_allow_from=[], + ) + assert policy.is_dm_intake_allowed(blank_sender) is False + assert policy.is_dm_intake_allowed("user-1") is True + + +@pytest.mark.parametrize("platform", _OWN_POLICY_PLATFORMS) +def test_pairing_group_policy_not_blanket_authorized(monkeypatch, platform): + """Default ``group_policy: pairing`` must not authorize unknown group senders.""" + _clear_auth_env(monkeypatch) + config = GatewayConfig( + platforms={platform: PlatformConfig(enabled=True, extra={"group_policy": "pairing"})} + ) + runner, _adapter = _make_runner(platform, config, enforces=True) + + assert runner._is_user_authorized(_source(platform, chat_type="group")) is False + + def test_wecom_open_group_with_per_group_sender_allowlist_is_authorized(monkeypatch): """WeCom ``groups..allow_from`` is an adapter-enforced restriction. diff --git a/tests/gateway/test_discord_component_auth.py b/tests/gateway/test_discord_component_auth.py index b82378dcc..5c77c8e13 100644 --- a/tests/gateway/test_discord_component_auth.py +++ b/tests/gateway/test_discord_component_auth.py @@ -95,6 +95,17 @@ def test_component_check_explicit_allow_all_passes(monkeypatch, env_name, env_va assert _component_check_auth(interaction, set(), set()) is True +@pytest.mark.parametrize( + "env_name", + ["DISCORD_ALLOW_ALL_USERS", "GATEWAY_ALLOW_ALL_USERS"], +) +def test_component_check_missing_user_rejected_even_with_allow_all(monkeypatch, env_name): + """Component clicks without interaction.user stay fail-closed with allow-all.""" + monkeypatch.setenv(env_name, "true") + interaction = _interaction(11111, drop_user=True) + assert _component_check_auth(interaction, set(), set()) is False + + # ── user allowlist ───────────────────────────────────────────────────────── diff --git a/tests/gateway/test_discord_roles_dm_scope.py b/tests/gateway/test_discord_roles_dm_scope.py index 19d65a599..b2fb09d0c 100644 --- a/tests/gateway/test_discord_roles_dm_scope.py +++ b/tests/gateway/test_discord_roles_dm_scope.py @@ -256,14 +256,62 @@ def test_user_id_allowlist_works_in_guild(): ) -def test_empty_allowlists_allow_everyone(): +def test_empty_allowlists_deny_without_opt_in(): adapter = _make_adapter() assert ( adapter._is_allowed_user("42", author=None, guild=None, is_dm=True) + is False + ) + + +def test_channel_allowlist_requires_channel_context(monkeypatch): + """DISCORD_ALLOWED_CHANNELS must not authorize guild traffic without + validated channel ids — e.g. voice utterances call _is_allowed_user + with guild/is_dm only.""" + monkeypatch.setenv("DISCORD_ALLOWED_CHANNELS", "999") + guild = SimpleNamespace(id=111111, get_member=lambda uid: None) + adapter = _make_adapter(guilds=[guild]) + + assert ( + adapter._is_allowed_user("42", author=None, guild=guild, is_dm=False) + is False + ) + + +def test_channel_allowlist_authorizes_with_matching_channel_context(monkeypatch): + monkeypatch.setenv("DISCORD_ALLOWED_CHANNELS", "999") + guild = SimpleNamespace(id=111111, get_member=lambda uid: None) + adapter = _make_adapter(guilds=[guild]) + + assert ( + adapter._is_allowed_user( + "42", + author=None, + guild=guild, + is_dm=False, + channel_ids={"999"}, + ) is True ) +def test_channel_allowlist_rejects_non_matching_channel_context(monkeypatch): + monkeypatch.setenv("DISCORD_ALLOWED_CHANNELS", "999") + guild = SimpleNamespace(id=111111, get_member=lambda uid: None) + adapter = _make_adapter(guilds=[guild]) + + assert ( + adapter._is_allowed_user( + "42", + author=None, + guild=guild, + is_dm=False, + channel_ids={"1111"}, + ) + is False + ) + + # --------------------------------------------------------------------------- # Slash-surface sibling site: _evaluate_slash_authorization must pass # guild/is_dm through so the cross-guild bypass can't land via slash either. diff --git a/tests/gateway/test_discord_slash_auth.py b/tests/gateway/test_discord_slash_auth.py index f353dbd13..f9bb7f40c 100644 --- a/tests/gateway/test_discord_slash_auth.py +++ b/tests/gateway/test_discord_slash_auth.py @@ -97,6 +97,8 @@ def _isolate_discord_env(monkeypatch): "DISCORD_IGNORED_CHANNELS", "DISCORD_HIDE_SLASH_COMMANDS", "DISCORD_ALLOW_BOTS", + "DISCORD_ALLOW_ALL_USERS", + "GATEWAY_ALLOW_ALL_USERS", ): monkeypatch.delenv(var, raising=False) @@ -182,21 +184,28 @@ def _make_interaction( @pytest.mark.asyncio -async def test_no_allowlist_allows_everyone(adapter): - """SECURITY-CRITICAL backwards-compat: deployments without any allowlist - env vars set must see ZERO behavior change. on_message lets everyone - through in this case (returns True at line 1890); slash must do the same. - """ +async def test_no_allowlist_denies_without_opt_in(adapter): + """Without allowlists or allow-all flags, Discord traffic is denied.""" interaction = _make_interaction("999999999") - assert await adapter._check_slash_authorization(interaction, "/help") is True - interaction.response.send_message.assert_not_awaited() + assert await adapter._check_slash_authorization(interaction, "/help") is False + interaction.response.send_message.assert_awaited() @pytest.mark.asyncio -async def test_no_allowlist_dm_also_allowed(adapter): - """Same for DMs — no allowlist means no restriction, matching on_message.""" +async def test_no_allowlist_dm_denied_without_opt_in(adapter): + """DM slash commands follow the same fail-closed default.""" interaction = _make_interaction("999999999", in_dm=True) + assert await adapter._check_slash_authorization(interaction, "/help") is False + interaction.response.send_message.assert_awaited() + + +@pytest.mark.asyncio +async def test_no_allowlist_allows_with_gateway_allow_all(adapter, monkeypatch): + """Explicit ``GATEWAY_ALLOW_ALL_USERS`` restores open Discord access.""" + monkeypatch.setenv("GATEWAY_ALLOW_ALL_USERS", "true") + interaction = _make_interaction("999999999") assert await adapter._check_slash_authorization(interaction, "/help") is True + interaction.response.send_message.assert_not_awaited() # --------------------------------------------------------------------------- @@ -303,10 +312,10 @@ async def test_channel_allowlist_matches_by_hash_name(adapter, monkeypatch): @pytest.mark.asyncio async def test_channel_allowlist_does_not_apply_to_dms(adapter, monkeypatch): - """DMs aren't channel-gated — they go through on_message's DM lockdown.""" + """DMs ignore channel allowlists and still require user allowlist or opt-in.""" monkeypatch.setenv("DISCORD_ALLOWED_CHANNELS", "1111") interaction = _make_interaction("100200300", in_dm=True) - assert await adapter._check_slash_authorization(interaction, "/help") is True + assert await adapter._check_slash_authorization(interaction, "/help") is False # --------------------------------------------------------------------------- @@ -466,11 +475,10 @@ async def test_missing_channel_id_rejected_when_channel_policy_configured( @pytest.mark.asyncio -async def test_missing_channel_id_allowed_when_no_channel_policy(adapter): - """No DISCORD_ALLOWED_CHANNELS configured + missing channel id: still - pass through the channel block (matches no-allowlist default).""" +async def test_missing_channel_id_denied_without_allowlists(adapter): + """No channel or user policy configured: fail closed by default.""" interaction = _make_interaction("100200300", channel_id=None) - assert await adapter._check_slash_authorization(interaction, "/help") is True + assert await adapter._check_slash_authorization(interaction, "/help") is False @pytest.mark.asyncio @@ -485,12 +493,44 @@ async def test_missing_user_rejected_when_allowlist_configured(adapter): @pytest.mark.asyncio -async def test_missing_user_allowed_when_no_allowlist_configured(adapter): - """interaction.user is None but no allowlist configured: allow - (preserves no-allowlist back-compat -- anyone is allowed when no - policy is in effect).""" +async def test_missing_user_denied_when_no_allowlist_configured(adapter): + """interaction.user is None without allow-all opt-in: fail closed.""" interaction = _make_interaction("100200300", user=None) - assert await adapter._check_slash_authorization(interaction, "/help") is True + assert await adapter._check_slash_authorization(interaction, "/help") is False + + +@pytest.mark.parametrize( + "env_name", + ["GATEWAY_ALLOW_ALL_USERS", "DISCORD_ALLOW_ALL_USERS"], +) +@pytest.mark.asyncio +async def test_missing_user_denied_even_with_allow_all(adapter, monkeypatch, env_name): + """Malformed slash payloads missing user stay fail-closed with allow-all.""" + monkeypatch.setenv(env_name, "true") + interaction = _make_interaction("100200300", user=None) + allowed, reason = adapter._evaluate_slash_authorization(interaction) + assert allowed is False + assert reason == "missing interaction.user" + assert await adapter._check_slash_authorization(interaction, "/help") is False + interaction.response.send_message.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_run_simple_slash_missing_user_does_not_crash(adapter, monkeypatch): + """_run_simple_slash must reject missing-user payloads before _build_slash_event.""" + monkeypatch.setenv("GATEWAY_ALLOW_ALL_USERS", "true") + interaction = _make_interaction("100200300", user=None) + interaction.response.defer = AsyncMock() + interaction.edit_original_response = AsyncMock() + interaction.delete_original_response = AsyncMock() + adapter.handle_message = AsyncMock() + adapter._build_slash_event = MagicMock() + + await adapter._run_simple_slash(interaction, "/help") + + adapter._build_slash_event.assert_not_called() + adapter.handle_message.assert_not_awaited() + interaction.response.defer.assert_not_awaited() # --------------------------------------------------------------------------- diff --git a/tests/gateway/test_feishu_bot_admission.py b/tests/gateway/test_feishu_bot_admission.py index 61628f933..04705bf19 100644 --- a/tests/gateway/test_feishu_bot_admission.py +++ b/tests/gateway/test_feishu_bot_admission.py @@ -388,6 +388,37 @@ def test_admit_pipeline(case): # --- Mention call-count semantics ------------------------------------------ +def test_dm_pairing_mode_forwards_unknown_sender_to_gateway_intake(monkeypatch): + """Empty FEISHU_ALLOWED_USERS must not block pairing handshake intake.""" + monkeypatch.delenv("FEISHU_ALLOW_ALL_USERS", raising=False) + monkeypatch.delenv("GATEWAY_ALLOW_ALL_USERS", raising=False) + adapter = make_adapter_skeleton() + adapter._allowed_group_users = frozenset() + sender = make_sender(open_id="ou_unknown") + message = make_message(chat_type="p2p") + assert adapter._admit(sender, message) is None + + +def test_dm_allowlist_rejects_unknown_sender(monkeypatch): + monkeypatch.delenv("FEISHU_ALLOW_ALL_USERS", raising=False) + monkeypatch.delenv("GATEWAY_ALLOW_ALL_USERS", raising=False) + adapter = make_adapter_skeleton() + adapter._allowed_group_users = frozenset({"ou_owner"}) + sender = make_sender(open_id="ou_unknown") + message = make_message(chat_type="p2p") + assert adapter._admit(sender, message) == "dm_policy_rejected" + + +def test_dm_allowlist_admits_configured_sender(monkeypatch): + monkeypatch.delenv("FEISHU_ALLOW_ALL_USERS", raising=False) + monkeypatch.delenv("GATEWAY_ALLOW_ALL_USERS", raising=False) + adapter = make_adapter_skeleton() + adapter._allowed_group_users = frozenset({"ou_owner"}) + sender = make_sender(open_id="ou_owner") + message = make_message(chat_type="p2p") + assert adapter._admit(sender, message) is None + + def test_admit_skips_mention_check_under_all_mode(): # Tripwire: under allow_bots=all the mention path must not be probed. adapter = make_adapter_skeleton(bot_open_id="ou_self", allow_bots="all") diff --git a/tests/gateway/test_multiplex_profile_authz.py b/tests/gateway/test_multiplex_profile_authz.py new file mode 100644 index 000000000..514bda7cc --- /dev/null +++ b/tests/gateway/test_multiplex_profile_authz.py @@ -0,0 +1,159 @@ +"""Regression tests for multiplex profile-aware own-policy authorization.""" + +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from gateway.config import GatewayConfig, Platform, PlatformConfig +from gateway.session import SessionSource + + +def _clear_auth_env(monkeypatch) -> None: + for key in ( + "WECOM_ALLOWED_USERS", + "GATEWAY_ALLOWED_USERS", + "GATEWAY_ALLOW_ALL_USERS", + "WECOM_ALLOW_ALL_USERS", + ): + monkeypatch.delenv(key, raising=False) + + +def _make_multiplex_runner(monkeypatch): + """Runner with default allowlist WeCom and secondary open-policy WeCom.""" + from gateway.run import GatewayRunner + + _clear_auth_env(monkeypatch) + + runner = object.__new__(GatewayRunner) + runner.config = GatewayConfig(multiplex_profiles=True) + + default_adapter = SimpleNamespace( + send=AsyncMock(), + enforces_own_access_policy=True, + _dm_policy="allowlist", + _group_policy="pairing", + ) + secondary_adapter = SimpleNamespace( + send=AsyncMock(), + enforces_own_access_policy=True, + _dm_policy="open", + _group_policy="open", + ) + + runner.adapters = {Platform.WECOM: default_adapter} + runner._profile_adapters = { + "coder": {Platform.WECOM: secondary_adapter}, + } + runner.pairing_store = MagicMock() + runner.pairing_store.is_approved.return_value = False + return runner, default_adapter, secondary_adapter + + +def test_secondary_open_policy_not_authorized_by_default_allowlist(monkeypatch): + """Secondary-profile open intake must not inherit default allowlist trust.""" + runner, _default_adapter, _secondary_adapter = _make_multiplex_runner(monkeypatch) + + source = SessionSource( + platform=Platform.WECOM, + user_id="attacker", + chat_id="dm-chat", + user_name="attacker", + chat_type="dm", + profile="coder", + ) + + assert runner._adapter_dm_policy(Platform.WECOM, profile="coder") == "open" + assert runner._adapter_dm_policy(Platform.WECOM) == "allowlist" + assert runner._is_user_authorized(source) is False + + +def test_default_profile_still_trusts_own_allowlist(monkeypatch): + """Default-profile allowlist trust is unchanged when profile is unstamped.""" + runner, _default_adapter, _secondary_adapter = _make_multiplex_runner(monkeypatch) + + source = SessionSource( + platform=Platform.WECOM, + user_id="allowed-user", + chat_id="dm-chat", + user_name="allowed-user", + chat_type="dm", + profile=None, + ) + + assert runner._is_user_authorized(source) is True + + +def test_secondary_allowlist_still_authorized(monkeypatch): + """Secondary profile with allowlist policy is trusted on its own adapter.""" + runner, _default_adapter, secondary_adapter = _make_multiplex_runner(monkeypatch) + secondary_adapter._dm_policy = "allowlist" + + source = SessionSource( + platform=Platform.WECOM, + user_id="allowed-user", + chat_id="dm-chat", + user_name="allowed-user", + chat_type="dm", + profile="coder", + ) + + assert runner._is_user_authorized(source) is True + + +def test_adapter_for_source_resolves_secondary_profile_adapter(monkeypatch): + """Ingress adapter lookup must use the stamped profile's adapter map.""" + runner, default_adapter, secondary_adapter = _make_multiplex_runner(monkeypatch) + + source = SessionSource( + platform=Platform.WECOM, + user_id="attacker", + chat_id="dm-chat", + user_name="attacker", + chat_type="dm", + profile="coder", + ) + + assert runner._adapter_for_source(source) is secondary_adapter + assert runner._adapter_for_source( + SessionSource( + platform=Platform.WECOM, + user_id="allowed-user", + chat_id="dm-chat", + user_name="allowed-user", + chat_type="dm", + profile=None, + ) + ) is default_adapter + + +def test_secondary_allowlist_dm_behavior_ignores_unauthorized(monkeypatch): + """Unauthorized-DM behavior must read the secondary adapter's dm_policy.""" + runner, _default_adapter, secondary_adapter = _make_multiplex_runner(monkeypatch) + secondary_adapter._dm_policy = "allowlist" + + assert runner._get_unauthorized_dm_behavior( + Platform.WECOM, + profile="coder", + ) == "ignore" + assert runner._get_unauthorized_dm_behavior(Platform.WECOM) == "ignore" + + +def test_secondary_open_policy_fails_startup_guard(monkeypatch): + """Secondary profiles must pass the same open-policy startup guard.""" + from gateway.run import _own_policy_open_startup_violation + + _clear_auth_env(monkeypatch) + + secondary_cfg = GatewayConfig(multiplex_profiles=True) + secondary_cfg.platforms = { + Platform.WECOM: PlatformConfig( + enabled=True, + extra={"dm_policy": "open"}, + ), + } + + violation = _own_policy_open_startup_violation(secondary_cfg) + assert violation is not None + assert "wecom" in violation + assert "open policy" in violation \ No newline at end of file diff --git a/tests/gateway/test_own_policy_startup_gate.py b/tests/gateway/test_own_policy_startup_gate.py new file mode 100644 index 000000000..37bb04043 --- /dev/null +++ b/tests/gateway/test_own_policy_startup_gate.py @@ -0,0 +1,60 @@ +"""Regression tests for own-policy open startup gate in gateway/run.py.""" + +import pytest + +from gateway.config import GatewayConfig, Platform, PlatformConfig +from gateway.run import GatewayRunner + + +@pytest.mark.asyncio +async def test_unrelated_allow_all_does_not_bypass_yuanbao_open_gate( + monkeypatch, tmp_path, +): + """TELEGRAM_ALLOW_ALL_USERS must not satisfy Yuanbao's open-policy opt-in.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + monkeypatch.delenv("GATEWAY_ALLOW_ALL_USERS", raising=False) + monkeypatch.delenv("YUANBAO_ALLOW_ALL_USERS", raising=False) + monkeypatch.setenv("TELEGRAM_ALLOW_ALL_USERS", "true") + + config = GatewayConfig( + platforms={ + Platform.YUANBAO: PlatformConfig( + enabled=True, + extra={"dm_policy": "open"}, + ), + }, + sessions_dir=tmp_path / "sessions", + ) + runner = GatewayRunner(config) + + ok = await runner.start() + + assert ok is True + assert runner.should_exit_cleanly is True + assert "yuanbao" in (runner.exit_reason or "").lower() + + +@pytest.mark.asyncio +async def test_gateway_allow_all_satisfies_yuanbao_open_gate(monkeypatch, tmp_path): + """GATEWAY_ALLOW_ALL_USERS is the intended global open-policy opt-in.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + monkeypatch.setenv("GATEWAY_ALLOW_ALL_USERS", "true") + monkeypatch.delenv("YUANBAO_ALLOW_ALL_USERS", raising=False) + monkeypatch.delenv("TELEGRAM_ALLOW_ALL_USERS", raising=False) + + config = GatewayConfig( + platforms={ + Platform.YUANBAO: PlatformConfig( + enabled=True, + extra={"dm_policy": "open"}, + ), + }, + sessions_dir=tmp_path / "sessions", + ) + runner = GatewayRunner(config) + monkeypatch.setattr(runner, "_create_adapter", lambda platform, cfg: None) + + ok = await runner.start() + + assert ok is True + assert runner.should_exit_cleanly is False \ No newline at end of file diff --git a/tests/gateway/test_qqbot.py b/tests/gateway/test_qqbot.py index 816bb5f16..d250a119e 100644 --- a/tests/gateway/test_qqbot.py +++ b/tests/gateway/test_qqbot.py @@ -57,7 +57,7 @@ class TestQQAdapterInit: def test_dm_policy_default(self): adapter = self._make(app_id="a", client_secret="b") - assert adapter._dm_policy == "open" + assert adapter._dm_policy == "pairing" def test_dm_policy_explicit(self): adapter = self._make(app_id="a", client_secret="b", dm_policy="allowlist") @@ -65,7 +65,7 @@ class TestQQAdapterInit: def test_group_policy_default(self): adapter = self._make(app_id="a", client_secret="b") - assert adapter._group_policy == "open" + assert adapter._group_policy == "pairing" def test_allow_from_parsing_string(self): adapter = self._make(app_id="a", client_secret="b", allow_from="x, y , z") @@ -267,9 +267,15 @@ class TestDmAllowed: from gateway.platforms.qqbot import QQAdapter return QQAdapter(_make_config(**extra)) - def test_open_policy(self): + def test_open_policy_requires_opt_in(self): + adapter = self._make_adapter(app_id="a", client_secret="b", dm_policy="open") + assert adapter._is_dm_allowed("any_user") is False + + def test_open_policy_with_opt_in(self, monkeypatch): + monkeypatch.setenv("GATEWAY_ALLOW_ALL_USERS", "true") adapter = self._make_adapter(app_id="a", client_secret="b", dm_policy="open") assert adapter._is_dm_allowed("any_user") is True + assert adapter._is_dm_intake_allowed("any_user") is True def test_disabled_policy(self): adapter = self._make_adapter(app_id="a", client_secret="b", dm_policy="disabled") @@ -309,6 +315,19 @@ class TestGroupAllowed: adapter = self._make_adapter(app_id="a", client_secret="b", group_policy="allowlist", group_allow_from="grp1") assert adapter._is_group_allowed("grp2", "user1") is False + def test_pairing_default_blocks_groups(self): + adapter = self._make_adapter(app_id="a", client_secret="b") + assert adapter._group_policy == "pairing" + assert adapter._is_group_allowed("grp1", "user1") is False + + def test_pairing_default_strict_dm_auth_denies_unknown(self): + adapter = self._make_adapter(app_id="a", client_secret="b") + assert adapter._dm_policy == "pairing" + assert adapter._is_dm_allowed("any_user") is False + + def test_pairing_default_forwards_dm_to_gateway_intake(self): + adapter = self._make_adapter(app_id="a", client_secret="b") + assert adapter._is_dm_intake_allowed("any_user") is True # --------------------------------------------------------------------------- # _resolve_stt_config diff --git a/tests/gateway/test_voice_command.py b/tests/gateway/test_voice_command.py index 6c408cb05..539648886 100644 --- a/tests/gateway/test_voice_command.py +++ b/tests/gateway/test_voice_command.py @@ -1211,7 +1211,7 @@ class TestDiscordVoiceChannelMethods: def test_is_allowed_user_empty_list(self): adapter = self._make_adapter() - assert adapter._is_allowed_user("42") is True + assert adapter._is_allowed_user("42") is False def test_is_allowed_user_in_list(self): adapter = self._make_adapter() diff --git a/tests/gateway/test_wecom.py b/tests/gateway/test_wecom.py index 1202ec3f0..949851e4d 100644 --- a/tests/gateway/test_wecom.py +++ b/tests/gateway/test_wecom.py @@ -336,6 +336,21 @@ class TestPolicyHelpers: assert adapter._is_group_allowed("group-1", "user-2") is False assert adapter._is_group_allowed("group-2", "user-1") is False + def test_pairing_group_policy_blocks_without_explicit_group_allow_from(self): + from plugins.platforms.wecom.adapter import WeComAdapter + + adapter = WeComAdapter( + PlatformConfig(enabled=True, extra={"group_policy": "pairing"}) + ) + + assert adapter._is_group_allowed("group-1", "user-1") is False + + def test_pairing_dm_policy_strict_auth_denies_unknown(self): + from plugins.platforms.wecom.adapter import WeComAdapter + + adapter = WeComAdapter(PlatformConfig(enabled=True, extra={"dm_policy": "pairing"})) + assert adapter._is_dm_allowed("user-1") is False + assert adapter._is_dm_intake_allowed("user-1") is True class TestMediaHelpers: def test_detect_wecom_media_type(self): @@ -589,7 +604,12 @@ class TestInboundMessages: async def test_on_message_builds_event(self): from plugins.platforms.wecom.adapter import WeComAdapter - adapter = WeComAdapter(PlatformConfig(enabled=True)) + adapter = WeComAdapter( + PlatformConfig( + enabled=True, + extra={"group_policy": "allowlist", "group_allow_from": ["group-1"]}, + ) + ) adapter._text_batch_delay_seconds = 0 # disable batching for tests adapter.handle_message = AsyncMock() adapter._extract_media = AsyncMock(return_value=(["/tmp/test.png"], ["image/png"])) @@ -621,7 +641,12 @@ class TestInboundMessages: async def test_on_message_preserves_quote_context(self): from plugins.platforms.wecom.adapter import WeComAdapter - adapter = WeComAdapter(PlatformConfig(enabled=True)) + adapter = WeComAdapter( + PlatformConfig( + enabled=True, + extra={"group_policy": "allowlist", "group_allow_from": ["group-1"]}, + ) + ) adapter._text_batch_delay_seconds = 0 # disable batching for tests adapter.handle_message = AsyncMock() adapter._extract_media = AsyncMock(return_value=([], [])) @@ -749,7 +774,12 @@ class TestWeComZombieSessionFix: async def test_on_message_caches_last_req_id_per_chat(self): from plugins.platforms.wecom.adapter import WeComAdapter - adapter = WeComAdapter(PlatformConfig(enabled=True)) + adapter = WeComAdapter( + PlatformConfig( + enabled=True, + extra={"group_policy": "allowlist", "group_allow_from": ["group-1"]}, + ) + ) adapter._text_batch_delay_seconds = 0 adapter.handle_message = AsyncMock() adapter._extract_media = AsyncMock(return_value=([], [])) diff --git a/tests/gateway/test_whatsapp_allowlist_lid_resolution.py b/tests/gateway/test_whatsapp_allowlist_lid_resolution.py index 52c1f9d3e..e0cf8a359 100644 --- a/tests/gateway/test_whatsapp_allowlist_lid_resolution.py +++ b/tests/gateway/test_whatsapp_allowlist_lid_resolution.py @@ -119,12 +119,19 @@ def test_dm_disabled_policy_blocks_even_allowlisted(): assert adapter._is_dm_allowed(f"{LID}@lid") is False -def test_dm_open_policy_allows_anyone(): +def test_dm_open_policy_allows_anyone_with_opt_in(monkeypatch): + monkeypatch.setenv("GATEWAY_ALLOW_ALL_USERS", "true") adapter = _make_adapter(dm_policy="open") assert adapter._is_dm_allowed("anyone@lid") is True +def test_dm_open_policy_blocked_without_opt_in(): + adapter = _make_adapter(dm_policy="open") + + assert adapter._is_dm_allowed("anyone@lid") is False + + # ------------------------------------------------------------------ group gate def test_group_jid_exact_match_still_works(): diff --git a/tests/gateway/test_whatsapp_group_gating.py b/tests/gateway/test_whatsapp_group_gating.py index cee3894d6..eba4a6848 100644 --- a/tests/gateway/test_whatsapp_group_gating.py +++ b/tests/gateway/test_whatsapp_group_gating.py @@ -28,9 +28,9 @@ def _make_adapter(require_mention=None, mention_patterns=None, free_response_cha adapter.platform = Platform.WHATSAPP adapter.config = PlatformConfig(enabled=True, extra=extra) adapter._message_handler = AsyncMock() - adapter._dm_policy = str(extra.get("dm_policy", "open")).strip().lower() + adapter._dm_policy = str(extra.get("dm_policy", "pairing")).strip().lower() adapter._allow_from = WhatsAppAdapter._coerce_allow_list(extra.get("allow_from")) - adapter._group_policy = str(extra.get("group_policy", "open")).strip().lower() + adapter._group_policy = str(extra.get("group_policy", "pairing")).strip().lower() adapter._group_allow_from = WhatsAppAdapter._coerce_allow_list(extra.get("group_allow_from")) adapter._mention_patterns = adapter._compile_mention_patterns() adapter._free_response_chats = adapter._whatsapp_free_response_chats() @@ -66,13 +66,13 @@ def _dm_message(body="hello", **overrides): # --- Existing tests (unchanged logic, updated helper) --- def test_group_messages_can_be_opened_via_config(): - adapter = _make_adapter(require_mention=False) + adapter = _make_adapter(require_mention=False, group_policy="open") assert adapter._should_process_message(_group_message("hello everyone")) is True def test_group_messages_can_require_direct_trigger_via_config(): - adapter = _make_adapter(require_mention=True) + adapter = _make_adapter(require_mention=True, group_policy="open") assert adapter._should_process_message(_group_message("hello everyone")) is False assert adapter._should_process_message( @@ -91,7 +91,11 @@ def test_group_messages_can_require_direct_trigger_via_config(): def test_regex_mention_patterns_allow_custom_wake_words(): - adapter = _make_adapter(require_mention=True, mention_patterns=[r"^\s*chompy\b"]) + adapter = _make_adapter( + require_mention=True, + mention_patterns=[r"^\s*chompy\b"], + group_policy="open", + ) assert adapter._should_process_message(_group_message("chompy status")) is True assert adapter._should_process_message(_group_message(" chompy help")) is True @@ -99,7 +103,11 @@ def test_regex_mention_patterns_allow_custom_wake_words(): def test_invalid_regex_patterns_are_ignored(): - adapter = _make_adapter(require_mention=True, mention_patterns=[r"(", r"^\s*chompy\b"]) + adapter = _make_adapter( + require_mention=True, + mention_patterns=[r"(", r"^\s*chompy\b"], + group_policy="open", + ) assert adapter._should_process_message(_group_message("chompy status")) is True assert adapter._should_process_message(_group_message("hello everyone")) is False @@ -133,6 +141,7 @@ def test_free_response_chats_bypass_mention_gating(): adapter = _make_adapter( require_mention=True, free_response_chats=["120363001234567890@g.us"], + group_policy="open", ) assert adapter._should_process_message(_group_message("hello everyone")) is True @@ -142,12 +151,13 @@ def test_free_response_chats_does_not_bypass_other_groups(): adapter = _make_adapter( require_mention=True, free_response_chats=["999999999999@g.us"], + group_policy="open", ) assert adapter._should_process_message(_group_message("hello everyone")) is False -def test_dm_passes_with_default_open_policy(): +def test_dm_passes_with_default_pairing_policy(): adapter = _make_adapter(require_mention=True) dm = _dm_message("hello") @@ -180,7 +190,11 @@ def test_dm_policy_disabled_blocks_all_dms(): def test_dm_policy_disabled_still_allows_groups(): - adapter = _make_adapter(dm_policy="disabled", require_mention=False) + adapter = _make_adapter( + dm_policy="disabled", + require_mention=False, + group_policy="open", + ) assert adapter._should_process_message(_group_message("hello")) is True @@ -197,12 +211,34 @@ def test_dm_policy_allowlist_allows_listed_sender(): assert adapter._should_process_message(_dm_message("hello")) is True -def test_dm_policy_open_allows_all_dms(): +def test_dm_policy_open_allows_all_dms_with_opt_in(monkeypatch): + monkeypatch.setenv("GATEWAY_ALLOW_ALL_USERS", "true") adapter = _make_adapter(dm_policy="open") assert adapter._should_process_message(_dm_message("hello")) is True +def test_dm_policy_open_blocked_without_opt_in(): + adapter = _make_adapter(dm_policy="open") + + assert adapter._is_dm_allowed("6281234567890@s.whatsapp.net") is False + assert adapter._should_process_message(_dm_message("hello")) is False + + +def test_dm_policy_pairing_strict_auth_denies_unknown(): + adapter = _make_adapter() + + assert adapter._dm_policy == "pairing" + assert adapter._is_dm_allowed("6281234567890@s.whatsapp.net") is False + + +def test_dm_policy_pairing_still_forwards_to_gateway_intake(): + adapter = _make_adapter() + + assert adapter._is_dm_intake_allowed("6281234567890@s.whatsapp.net") is True + assert adapter._should_process_message(_dm_message("hello")) is True + + # --- New group_policy tests --- def test_group_policy_disabled_blocks_all_groups(): @@ -244,6 +280,14 @@ def test_group_policy_open_allows_all_groups(): assert adapter._should_process_message(_group_message("/status")) is True +def test_group_policy_pairing_default_blocks_groups(): + adapter = _make_adapter() + + assert adapter._group_policy == "pairing" + assert adapter._is_group_allowed("120363001234567890@g.us") is False + assert adapter._should_process_message(_group_message("hello")) is False + + # --- Config bridging tests --- def test_config_bridges_whatsapp_dm_and_group_policy(monkeypatch, tmp_path): @@ -347,7 +391,7 @@ def test_broadcast_filter_runs_before_allowlist(): def test_real_dm_still_processed_after_broadcast_filter(): """Sanity check: the broadcast filter doesn't accidentally drop real DMs.""" - adapter = _make_adapter(dm_policy="open") + adapter = _make_adapter(dm_policy="pairing") msg = _dm_message( body="hello", diff --git a/tests/test_yuanbao_pipeline.py b/tests/test_yuanbao_pipeline.py index ac35f4964..53613abd1 100644 --- a/tests/test_yuanbao_pipeline.py +++ b/tests/test_yuanbao_pipeline.py @@ -33,6 +33,7 @@ from gateway.platforms.yuanbao import ( ChatRoutingMiddleware, AccessPolicy, AccessGuardMiddleware, + AutoSetHomeMiddleware, ExtractContentMiddleware, PlaceholderFilterMiddleware, OwnerCommandMiddleware, @@ -483,8 +484,9 @@ class TestChatRoutingMiddleware: class TestAccessGuardMiddleware: @pytest.mark.asyncio - async def test_open_policy_passes(self): - """AccessGuardMiddleware passes with open policy.""" + async def test_open_policy_passes_with_opt_in(self, monkeypatch): + """AccessGuardMiddleware passes open policy only with explicit opt-in.""" + monkeypatch.setenv("GATEWAY_ALLOW_ALL_USERS", "true") adapter = make_adapter() adapter._access_policy = AccessPolicy(dm_policy="open", dm_allow_from=[], group_policy="open", group_allow_from=[]) ctx = make_ctx(adapter=adapter, chat_type="dm", from_account="alice") @@ -493,6 +495,19 @@ class TestAccessGuardMiddleware: await AccessGuardMiddleware()(ctx, next_fn) next_fn.assert_awaited_once() + @pytest.mark.asyncio + async def test_open_policy_blocked_without_opt_in(self, monkeypatch): + """AccessGuardMiddleware blocks open policy without explicit opt-in.""" + monkeypatch.delenv("GATEWAY_ALLOW_ALL_USERS", raising=False) + monkeypatch.delenv("YUANBAO_ALLOW_ALL_USERS", raising=False) + adapter = make_adapter() + adapter._access_policy = AccessPolicy(dm_policy="open", dm_allow_from=[], group_policy="open", group_allow_from=[]) + ctx = make_ctx(adapter=adapter, chat_type="dm", from_account="alice") + next_fn = AsyncMock() + + await AccessGuardMiddleware()(ctx, next_fn) + next_fn.assert_not_awaited() + @pytest.mark.asyncio async def test_disabled_dm_stops(self): """AccessGuardMiddleware stops DM when dm_policy=disabled.""" @@ -548,6 +563,279 @@ class TestAccessGuardMiddleware: await AccessGuardMiddleware()(ctx, next_fn) next_fn.assert_awaited_once() + @pytest.mark.asyncio + async def test_open_group_blocked_without_opt_in(self, monkeypatch): + """AccessGuardMiddleware blocks open group policy without explicit opt-in.""" + monkeypatch.delenv("GATEWAY_ALLOW_ALL_USERS", raising=False) + monkeypatch.delenv("YUANBAO_ALLOW_ALL_USERS", raising=False) + adapter = make_adapter() + adapter._access_policy = AccessPolicy( + dm_policy="pairing", dm_allow_from=[], + group_policy="open", group_allow_from=[], + ) + ctx = make_ctx(adapter=adapter, chat_type="group", group_code="grp-1") + next_fn = AsyncMock() + + await AccessGuardMiddleware()(ctx, next_fn) + next_fn.assert_not_awaited() + + @pytest.mark.asyncio + async def test_open_group_passes_with_opt_in(self, monkeypatch): + """AccessGuardMiddleware passes open group policy with explicit opt-in.""" + monkeypatch.setenv("GATEWAY_ALLOW_ALL_USERS", "true") + adapter = make_adapter() + adapter._access_policy = AccessPolicy( + dm_policy="pairing", dm_allow_from=[], + group_policy="open", group_allow_from=[], + ) + ctx = make_ctx(adapter=adapter, chat_type="group", group_code="grp-1") + next_fn = AsyncMock() + + await AccessGuardMiddleware()(ctx, next_fn) + next_fn.assert_awaited_once() + + @pytest.mark.asyncio + async def test_unknown_group_policy_blocked(self, monkeypatch): + """AccessGuardMiddleware blocks unrecognized group_policy values.""" + monkeypatch.delenv("GATEWAY_ALLOW_ALL_USERS", raising=False) + monkeypatch.delenv("YUANBAO_ALLOW_ALL_USERS", raising=False) + adapter = make_adapter() + adapter._access_policy = AccessPolicy( + dm_policy="pairing", dm_allow_from=[], + group_policy="typo", group_allow_from=[], + ) + ctx = make_ctx(adapter=adapter, chat_type="group", group_code="grp-1") + next_fn = AsyncMock() + + await AccessGuardMiddleware()(ctx, next_fn) + next_fn.assert_not_awaited() + + @pytest.mark.asyncio + @pytest.mark.parametrize("blank_sender", ["", " ", None]) + async def test_pairing_blank_dm_blocked(self, monkeypatch, blank_sender): + """AccessGuardMiddleware blocks pairing DMs with blank sender principals.""" + monkeypatch.delenv("GATEWAY_ALLOW_ALL_USERS", raising=False) + monkeypatch.delenv("YUANBAO_ALLOW_ALL_USERS", raising=False) + adapter = make_adapter() + adapter._access_policy = AccessPolicy( + dm_policy="pairing", dm_allow_from=[], + group_policy="pairing", group_allow_from=[], + ) + ctx = make_ctx(adapter=adapter, chat_type="dm", from_account=blank_sender) + next_fn = AsyncMock() + + await AccessGuardMiddleware()(ctx, next_fn) + next_fn.assert_not_awaited() + + +class TestAccessPolicy: + def test_open_group_requires_opt_in(self, monkeypatch): + monkeypatch.delenv("GATEWAY_ALLOW_ALL_USERS", raising=False) + monkeypatch.delenv("YUANBAO_ALLOW_ALL_USERS", raising=False) + policy = AccessPolicy( + dm_policy="pairing", dm_allow_from=[], + group_policy="open", group_allow_from=[], + ) + assert policy.is_group_allowed("unknown-group") is False + + def test_open_group_with_gateway_opt_in(self, monkeypatch): + monkeypatch.setenv("GATEWAY_ALLOW_ALL_USERS", "true") + policy = AccessPolicy( + dm_policy="pairing", dm_allow_from=[], + group_policy="open", group_allow_from=[], + ) + assert policy.is_group_allowed("unknown-group") is True + + def test_open_group_with_platform_opt_in(self, monkeypatch): + monkeypatch.delenv("GATEWAY_ALLOW_ALL_USERS", raising=False) + monkeypatch.setenv("YUANBAO_ALLOW_ALL_USERS", "true") + policy = AccessPolicy( + dm_policy="pairing", dm_allow_from=[], + group_policy="open", group_allow_from=[], + ) + assert policy.is_group_allowed("unknown-group") is True + + def test_unknown_group_policy_denies(self, monkeypatch): + monkeypatch.delenv("GATEWAY_ALLOW_ALL_USERS", raising=False) + monkeypatch.delenv("YUANBAO_ALLOW_ALL_USERS", raising=False) + policy = AccessPolicy( + dm_policy="pairing", dm_allow_from=[], + group_policy="typo", group_allow_from=[], + ) + assert policy.is_group_allowed("unknown-group") is False + + @pytest.mark.parametrize("blank_sender", ["", " ", None]) + def test_pairing_dm_intake_denies_blank_principal(self, monkeypatch, blank_sender): + monkeypatch.delenv("GATEWAY_ALLOW_ALL_USERS", raising=False) + monkeypatch.delenv("YUANBAO_ALLOW_ALL_USERS", raising=False) + policy = AccessPolicy( + dm_policy="pairing", dm_allow_from=[], + group_policy="pairing", group_allow_from=[], + ) + assert policy.is_dm_intake_allowed(blank_sender) is False + + def test_pairing_dm_intake_allows_non_blank_principal(self, monkeypatch): + monkeypatch.delenv("GATEWAY_ALLOW_ALL_USERS", raising=False) + monkeypatch.delenv("YUANBAO_ALLOW_ALL_USERS", raising=False) + policy = AccessPolicy( + dm_policy="pairing", dm_allow_from=[], + group_policy="pairing", group_allow_from=[], + ) + assert policy.is_dm_intake_allowed("user-1") is True + + +class TestAutoSetHomeMiddleware: + @pytest.mark.asyncio + async def test_pairing_unapproved_dm_does_not_set_home(self, monkeypatch, tmp_path): + """Intake-only pairing DMs must not claim YUANBAO_HOME_CHANNEL.""" + monkeypatch.delenv("YUANBAO_HOME_CHANNEL", raising=False) + monkeypatch.delenv("YUANBAO_ALLOW_ALL_USERS", raising=False) + monkeypatch.delenv("GATEWAY_ALLOW_ALL_USERS", raising=False) + + adapter = make_adapter() + adapter._auto_sethome_done = False + adapter._access_policy = AccessPolicy( + dm_policy="pairing", + dm_allow_from=[], + group_policy="pairing", + group_allow_from=[], + ) + ctx = make_ctx( + adapter=adapter, + chat_type="dm", + chat_id="direct:unapproved-sender", + from_account="unapproved-sender", + ) + next_fn = AsyncMock() + + with patch("gateway.pairing.PairingStore") as mock_store_cls: + mock_store_cls.return_value.is_approved.return_value = False + await AutoSetHomeMiddleware()(ctx, next_fn) + + assert "YUANBAO_HOME_CHANNEL" not in os.environ + assert not (tmp_path / "config.yaml").exists() + next_fn.assert_awaited_once() + + @pytest.mark.asyncio + async def test_pairing_approved_dm_sets_home(self, monkeypatch, tmp_path): + """Pairing-approved senders may auto-designate the home channel.""" + monkeypatch.delenv("YUANBAO_HOME_CHANNEL", raising=False) + monkeypatch.setattr( + "hermes_constants.get_hermes_home", + lambda: tmp_path, + ) + + adapter = make_adapter() + adapter._auto_sethome_done = False + adapter._access_policy = AccessPolicy( + dm_policy="pairing", + dm_allow_from=[], + group_policy="pairing", + group_allow_from=[], + ) + ctx = make_ctx( + adapter=adapter, + chat_type="dm", + chat_id="direct:approved-sender", + from_account="approved-sender", + chat_name="Approved", + ) + next_fn = AsyncMock() + + with patch("gateway.pairing.PairingStore") as mock_store_cls: + mock_store_cls.return_value.is_approved.return_value = True + await AutoSetHomeMiddleware()(ctx, next_fn) + + assert os.environ.get("YUANBAO_HOME_CHANNEL") == "direct:approved-sender" + next_fn.assert_awaited_once() + + @pytest.mark.asyncio + async def test_allowlist_dm_sets_home(self, monkeypatch, tmp_path): + """Allowlisted senders may auto-designate the home channel.""" + monkeypatch.delenv("YUANBAO_HOME_CHANNEL", raising=False) + monkeypatch.setattr( + "hermes_constants.get_hermes_home", + lambda: tmp_path, + ) + + adapter = make_adapter() + adapter._auto_sethome_done = False + adapter._access_policy = AccessPolicy( + dm_policy="allowlist", + dm_allow_from=["alice"], + group_policy="pairing", + group_allow_from=[], + ) + ctx = make_ctx( + adapter=adapter, + chat_type="dm", + chat_id="direct:alice", + from_account="alice", + chat_name="Alice", + ) + next_fn = AsyncMock() + + await AutoSetHomeMiddleware()(ctx, next_fn) + + assert os.environ.get("YUANBAO_HOME_CHANNEL") == "direct:alice" + next_fn.assert_awaited_once() + + +class TestSenderMayDesignateHome: + def test_pairing_unapproved_sender_denied(self, monkeypatch): + monkeypatch.delenv("YUANBAO_ALLOW_ALL_USERS", raising=False) + monkeypatch.delenv("GATEWAY_ALLOW_ALL_USERS", raising=False) + + adapter = make_adapter() + adapter._access_policy = AccessPolicy( + dm_policy="pairing", + dm_allow_from=[], + group_policy="pairing", + group_allow_from=[], + ) + ctx = make_ctx( + adapter=adapter, + chat_type="dm", + from_account="unapproved-sender", + ) + + with patch("gateway.pairing.PairingStore") as mock_store_cls: + mock_store_cls.return_value.is_approved.return_value = False + assert adapter._sender_may_designate_home(ctx) is False + + def test_pairing_approved_sender_allowed(self): + adapter = make_adapter() + adapter._access_policy = AccessPolicy( + dm_policy="pairing", + dm_allow_from=[], + group_policy="pairing", + group_allow_from=[], + ) + ctx = make_ctx( + adapter=adapter, + chat_type="dm", + from_account="approved-sender", + ) + + with patch("gateway.pairing.PairingStore") as mock_store_cls: + mock_store_cls.return_value.is_approved.return_value = True + assert adapter._sender_may_designate_home(ctx) is True + + def test_allowlist_sender_allowed(self): + adapter = make_adapter() + adapter._access_policy = AccessPolicy( + dm_policy="allowlist", + dm_allow_from=["alice"], + group_policy="pairing", + group_allow_from=[], + ) + ctx = make_ctx( + adapter=adapter, + chat_type="dm", + from_account="alice", + ) + assert adapter._sender_may_designate_home(ctx) is True + class TestExtractContentMiddleware: @pytest.mark.asyncio @@ -679,6 +967,41 @@ class TestGroupAtGuardMiddleware: next_fn.assert_awaited_once() +class TestAutoSetHomeAfterGroupAtGuard: + @pytest.mark.asyncio + async def test_unaddressed_group_does_not_set_home(self, monkeypatch, tmp_path): + """Group traffic dropped by GroupAtGuard must not persist YUANBAO_HOME_CHANNEL.""" + monkeypatch.delenv("YUANBAO_HOME_CHANNEL", raising=False) + monkeypatch.setenv("GATEWAY_ALLOW_ALL_USERS", "true") + monkeypatch.setattr( + "hermes_constants.get_hermes_home", + lambda: tmp_path, + ) + + adapter = make_adapter() + adapter._auto_sethome_done = False + adapter._access_policy = AccessPolicy( + dm_policy="pairing", + dm_allow_from=[], + group_policy="open", + group_allow_from=[], + ) + adapter._session_store = None + + push_data = make_json_push( + from_account="alice", + group_code="grp-1", + text="hello group", + msg_id="msg-group-001", + ) + ctx = InboundContext(adapter=adapter, raw_frames=[push_data]) + pipeline = InboundPipelineBuilder.build() + await pipeline.execute(ctx) + + assert "YUANBAO_HOME_CHANNEL" not in os.environ + assert not (tmp_path / "config.yaml").exists() + + # ============================================================ # 4. Factory Tests # ============================================================ @@ -695,12 +1018,12 @@ class TestCreateInboundPipeline: "skip-self", "chat-routing", "access-guard", - "auto-sethome", "extract-content", "placeholder-filter", "owner-command", "build-source", "group-at-guard", + "auto-sethome", "group-attribution", "classify-msg-type", "quote-context", @@ -718,8 +1041,9 @@ class TestCreateInboundPipeline: class TestPipelineIntegration: @pytest.mark.asyncio - async def test_full_dm_message_flow(self): + async def test_full_dm_message_flow(self, monkeypatch): """Full pipeline processes a DM message end-to-end.""" + monkeypatch.setenv("GATEWAY_ALLOW_ALL_USERS", "true") adapter = make_adapter() adapter._bot_id = "bot_123" adapter._access_policy = AccessPolicy(dm_policy="open", dm_allow_from=[], group_policy="open", group_allow_from=[]) @@ -745,6 +1069,36 @@ class TestPipelineIntegration: assert "Hello bot!" in ctx.raw_text assert ctx.source is not None + @pytest.mark.asyncio + async def test_pairing_blank_sender_stops_at_access_guard(self, monkeypatch): + """Whitespace-only C2C senders must not pass pairing intake into dispatch.""" + monkeypatch.delenv("GATEWAY_ALLOW_ALL_USERS", raising=False) + monkeypatch.delenv("YUANBAO_ALLOW_ALL_USERS", raising=False) + adapter = make_adapter() + adapter._bot_id = "bot_123" + adapter._access_policy = AccessPolicy( + dm_policy="pairing", dm_allow_from=[], + group_policy="pairing", group_allow_from=[], + ) + adapter.handle_message = AsyncMock() + + push_data = make_json_push( + from_account=" ", + to_account="bot_123", + text="Hello bot!", + msg_id="msg-blank-001", + ) + + ctx = InboundContext(adapter=adapter, raw_frames=[push_data]) + pipeline = InboundPipelineBuilder.build() + await pipeline.execute(ctx) + + assert ctx.from_account == " " + assert ctx.chat_type == "dm" + assert ctx.chat_id == "direct: " + assert ctx.source is None + adapter.handle_message.assert_not_awaited() + @pytest.mark.asyncio async def test_self_message_filtered(self): """Pipeline stops when message is from bot itself."""