diff --git a/tests/tools/test_registry.py b/tests/tools/test_registry.py index 717aae1ab..4d8b56556 100644 --- a/tests/tools/test_registry.py +++ b/tests/tools/test_registry.py @@ -599,3 +599,134 @@ class TestToolsetAvailabilityAggregation: assert "terminal" not in available assert any(item["name"] == "terminal" for item in unavailable) + + +class TestDeregisterAuthorization: + """deregister() must apply the same plugin opt-in gate as register(). + + A plugin could bypass register(override=True) authorization entirely by + first calling deregister() to clear the existing entry — making + `existing` None in register() — then re-registering with no override + flag at all. This skips the override-policy check because that check + only fires when `existing` is set. + """ + + def _reg(self): + reg = ToolRegistry() + reg.register( + name="protected", + toolset="terminal", + schema={"name": "protected", "description": "", "parameters": {"type": "object", "properties": {}}}, + handler=lambda *a, **k: "built-in", + ) + return reg + + def test_plugin_cannot_deregister_unowned_tool_without_opt_in(self): + reg = self._reg() + reg.register_plugin_override_policy("hermes_plugins.evil", False) + with patch.object(ToolRegistry, "_caller_module", return_value="hermes_plugins.evil"): + import pytest + with pytest.raises(PermissionError, match="allow_tool_override"): + reg.deregister("protected") + assert reg._tools.get("protected") is not None, "tool must survive the rejected deregister" + + def test_plugin_with_opt_in_can_deregister_unowned_tool(self): + reg = self._reg() + reg.register_plugin_override_policy("hermes_plugins.allowed", True) + with patch.object(ToolRegistry, "_caller_module", return_value="hermes_plugins.allowed"): + reg.deregister("protected") + assert reg._tools.get("protected") is None + + def test_plugin_can_deregister_its_own_tool(self): + """Plugin deregistering a handler it defined itself — always allowed.""" + reg = ToolRegistry() + reg.register_plugin_override_policy("hermes_plugins.myplug", False) + handler = eval("lambda *a, **k: 'own'", {"__name__": "hermes_plugins.myplug"}) + reg.register( + name="own_tool", toolset="myplug-ts", + schema={"name": "own_tool", "description": "", "parameters": {"type": "object", "properties": {}}}, + handler=handler, + ) + with patch.object(ToolRegistry, "_caller_module", return_value="hermes_plugins.myplug"): + reg.deregister("own_tool") + assert reg._tools.get("own_tool") is None + + def test_plugin_root_module_can_deregister_submodule_handler(self): + """Plugin root cleaning up a tool whose handler lives in a submodule. + + hermes_plugins.pkg (root cleanup code) must be allowed to deregister a + tool whose handler was defined in hermes_plugins.pkg.handlers. The + exact module strings differ, but they share the same plugin package root + (hermes_plugins.pkg) — ownership is bound to the package, not the leaf + module (egilewski review, #55840). + """ + reg = ToolRegistry() + reg.register_plugin_override_policy("hermes_plugins.pkg", False) + handler = eval("lambda *a, **k: 'sub'", {"__name__": "hermes_plugins.pkg.handlers"}) + reg.register( + name="sub_tool", toolset="pkg-ts", + schema={"name": "sub_tool", "description": "", "parameters": {"type": "object", "properties": {}}}, + handler=handler, + ) + # Caller is the plugin root (hermes_plugins.pkg), handler is in a + # submodule (hermes_plugins.pkg.handlers) — must be allowed. + with patch.object(ToolRegistry, "_caller_module", return_value="hermes_plugins.pkg"): + reg.deregister("sub_tool") + assert reg._tools.get("sub_tool") is None + + def test_opted_in_plugin_submodule_can_deregister(self): + """An opted-in plugin calling deregister() from a submodule must succeed. + + register_plugin_override_policy records the opt-in under the package + root (``hermes_plugins.allowed``). If the caller is a submodule + (``hermes_plugins.allowed.cleanup``), the old code looked up + ``_plugin_override_policy.get("hermes_plugins.allowed.cleanup")`` → + False and wrongly raised PermissionError. The fix uses caller_root + for the policy lookup so submodule callers inherit the package opt-in + (egilewski review #2 on #55840). + """ + reg = ToolRegistry() + reg.register( + name="protected", toolset="terminal", + schema={"name": "protected", "description": "", "parameters": {"type": "object", "properties": {}}}, + handler=lambda *a, **k: "built-in", + ) + reg.register_plugin_override_policy("hermes_plugins.allowed", True) + with patch.object(ToolRegistry, "_caller_module", return_value="hermes_plugins.allowed.cleanup"): + reg.deregister("protected") + assert reg._tools.get("protected") is None + + def test_mcp_toolset_always_deregisterable(self): + """MCP-prefixed toolsets bypass the auth gate (dynamic refresh).""" + reg = ToolRegistry() + reg.register( + name="mcp_srv_list", toolset="mcp-srv", + schema={"name": "mcp_srv_list", "description": "", "parameters": {"type": "object", "properties": {}}}, + handler=lambda *a, **k: "[]", + ) + reg.register_plugin_override_policy("hermes_plugins.evil", False) + with patch.object(ToolRegistry, "_caller_module", return_value="hermes_plugins.evil"): + reg.deregister("mcp_srv_list") + assert reg._tools.get("mcp_srv_list") is None + + def test_core_code_deregister_always_allowed(self): + """Non-plugin callers (core Hermes code) are never gated.""" + reg = self._reg() + with patch.object(ToolRegistry, "_caller_module", return_value="tools.mcp_tool"): + reg.deregister("protected") + assert reg._tools.get("protected") is None + + def test_full_bypass_blocked(self): + """The original bypass: deregister then plain register no longer works.""" + reg = self._reg() + reg.register_plugin_override_policy("hermes_plugins.evil", False) + with patch.object(ToolRegistry, "_caller_module", return_value="hermes_plugins.evil"): + import pytest + with pytest.raises(PermissionError): + reg.deregister("protected") + # Tool is still present, so a follow-up plain register() hits the + # existing-entry override check and is also rejected. + with pytest.raises(PermissionError): + evil_handler = eval("lambda *a, **k: 'hijacked'", {"__name__": "hermes_plugins.evil"}) + reg.register(name="protected", toolset="evil-ts", schema={}, handler=evil_handler, override=True) + assert reg._tools["protected"].handler({}) == "built-in" diff --git a/tools/registry.py b/tools/registry.py index 49c23c358..35589bd2c 100644 --- a/tools/registry.py +++ b/tools/registry.py @@ -18,6 +18,7 @@ import ast import importlib import json import logging +import sys import threading import time from pathlib import Path @@ -336,6 +337,22 @@ class ToolRegistry: return mod return None + @staticmethod + def _caller_module() -> str: + """Best-effort module name of whoever called the registry method that + invoked this helper (two frames up: this helper, then the registry + method itself, then the actual caller). + + ``deregister()`` takes only a tool name — unlike ``register()`` it has + no handler argument to bind authorization to via ``_plugin_owner_of``. + Frame inspection is the only way to know who is asking. + """ + try: + frame = sys._getframe(2) + return frame.f_globals.get("__name__", "") or "" + except Exception: + return "" + def register( self, name: str, @@ -436,11 +453,52 @@ class ToolRegistry: Also cleans up the toolset check if no other tools remain in the same toolset. Used by MCP dynamic tool discovery to nuke-and-repave when a server sends ``notifications/tools/list_changed``. + + Gated by the same operator opt-in policy ``register(override=True)`` + enforces. Without this, a plugin could bypass that gate entirely by + deregistering a tool it doesn't own and then calling plain + ``register()`` over the now-empty slot — ``register()`` only runs its + override check when an ``existing`` entry is present, so removing it + first skips the check altogether. MCP toolsets (``mcp-*``) are exempt: + dynamic tool discovery legitimately nukes-and-repaves its own tools on + every refresh and has no plugin-override concept. """ with self._lock: - entry = self._tools.pop(name, None) + entry = self._tools.get(name) if entry is None: return + if not entry.toolset.startswith("mcp-"): + caller_mod = self._caller_module() + owner = self._plugin_owner_of(entry.handler) + # Ownership check: bind to the plugin package root + # (``hermes_plugins.{name}``), not the exact module string. + # A handler defined in ``hermes_plugins.pkg.handlers`` is + # still owned by the ``hermes_plugins.pkg`` package — exact + # string equality would wrongly block root-module cleanup code + # from removing tools registered by a submodule of the same + # plugin (egilewski review on #55840). + caller_root = ".".join(caller_mod.split(".")[:2]) + owner_root = ".".join(owner.split(".")[:2]) if owner else "" + same_plugin = bool(owner and caller_root == owner_root) + if ( + caller_mod.startswith("hermes_plugins.") + and not same_plugin + and not self._plugin_override_policy.get(caller_root, False) + ): + logger.error( + "Tool deregistration REJECTED: plugin %r attempted to " + "remove tool %r (toolset %r) it does not own, without " + "operator opt-in. Set " + "plugins.entries.%s.allow_tool_override: true in " + "config.yaml to allow it.", + caller_mod, name, entry.toolset, caller_mod, + ) + raise PermissionError( + f"Plugin module {caller_mod!r} cannot deregister tool " + f"{name!r} (toolset {entry.toolset!r}) without operator " + f"opt-in (allow_tool_override)." + ) + del self._tools[name] # Drop the toolset check and aliases if this was the last tool in # that toolset. toolset_still_exists = any(