fix(security): use caller package root for deregister opt-in policy lookup
_plugin_override_policy is keyed by the plugin package root
(e.g. hermes_plugins.allowed), but the lookup used caller_mod
(the exact leaf module string). A call from hermes_plugins.allowed.cleanup
would evaluate _plugin_override_policy.get("hermes_plugins.allowed.cleanup")
→ False and raise PermissionError even when the plugin registered opt-in
under its package root.
Switch the policy lookup to caller_root (.join of the first two segments)
so submodule callers inherit the package-level allow_tool_override grant.
Adds a focused regression test for the opted-in submodule case.
This commit is contained in:
parent
e07768a53f
commit
db0fd8f290
2 changed files with 190 additions and 1 deletions
|
|
@ -599,3 +599,134 @@ class TestToolsetAvailabilityAggregation:
|
|||
|
||||
assert "terminal" not in available
|
||||
assert any(item["name"] == "terminal" for item in unavailable)
|
||||
|
||||
|
||||
class TestDeregisterAuthorization:
|
||||
"""deregister() must apply the same plugin opt-in gate as register().
|
||||
|
||||
A plugin could bypass register(override=True) authorization entirely by
|
||||
first calling deregister() to clear the existing entry — making
|
||||
`existing` None in register() — then re-registering with no override
|
||||
flag at all. This skips the override-policy check because that check
|
||||
only fires when `existing` is set.
|
||||
"""
|
||||
|
||||
def _reg(self):
|
||||
reg = ToolRegistry()
|
||||
reg.register(
|
||||
name="protected",
|
||||
toolset="terminal",
|
||||
schema={"name": "protected", "description": "", "parameters": {"type": "object", "properties": {}}},
|
||||
handler=lambda *a, **k: "built-in",
|
||||
)
|
||||
return reg
|
||||
|
||||
def test_plugin_cannot_deregister_unowned_tool_without_opt_in(self):
|
||||
reg = self._reg()
|
||||
reg.register_plugin_override_policy("hermes_plugins.evil", False)
|
||||
with patch.object(ToolRegistry, "_caller_module", return_value="hermes_plugins.evil"):
|
||||
import pytest
|
||||
with pytest.raises(PermissionError, match="allow_tool_override"):
|
||||
reg.deregister("protected")
|
||||
assert reg._tools.get("protected") is not None, "tool must survive the rejected deregister"
|
||||
|
||||
def test_plugin_with_opt_in_can_deregister_unowned_tool(self):
|
||||
reg = self._reg()
|
||||
reg.register_plugin_override_policy("hermes_plugins.allowed", True)
|
||||
with patch.object(ToolRegistry, "_caller_module", return_value="hermes_plugins.allowed"):
|
||||
reg.deregister("protected")
|
||||
assert reg._tools.get("protected") is None
|
||||
|
||||
def test_plugin_can_deregister_its_own_tool(self):
|
||||
"""Plugin deregistering a handler it defined itself — always allowed."""
|
||||
reg = ToolRegistry()
|
||||
reg.register_plugin_override_policy("hermes_plugins.myplug", False)
|
||||
handler = eval("lambda *a, **k: 'own'", {"__name__": "hermes_plugins.myplug"})
|
||||
reg.register(
|
||||
name="own_tool", toolset="myplug-ts",
|
||||
schema={"name": "own_tool", "description": "", "parameters": {"type": "object", "properties": {}}},
|
||||
handler=handler,
|
||||
)
|
||||
with patch.object(ToolRegistry, "_caller_module", return_value="hermes_plugins.myplug"):
|
||||
reg.deregister("own_tool")
|
||||
assert reg._tools.get("own_tool") is None
|
||||
|
||||
def test_plugin_root_module_can_deregister_submodule_handler(self):
|
||||
"""Plugin root cleaning up a tool whose handler lives in a submodule.
|
||||
|
||||
hermes_plugins.pkg (root cleanup code) must be allowed to deregister a
|
||||
tool whose handler was defined in hermes_plugins.pkg.handlers. The
|
||||
exact module strings differ, but they share the same plugin package root
|
||||
(hermes_plugins.pkg) — ownership is bound to the package, not the leaf
|
||||
module (egilewski review, #55840).
|
||||
"""
|
||||
reg = ToolRegistry()
|
||||
reg.register_plugin_override_policy("hermes_plugins.pkg", False)
|
||||
handler = eval("lambda *a, **k: 'sub'", {"__name__": "hermes_plugins.pkg.handlers"})
|
||||
reg.register(
|
||||
name="sub_tool", toolset="pkg-ts",
|
||||
schema={"name": "sub_tool", "description": "", "parameters": {"type": "object", "properties": {}}},
|
||||
handler=handler,
|
||||
)
|
||||
# Caller is the plugin root (hermes_plugins.pkg), handler is in a
|
||||
# submodule (hermes_plugins.pkg.handlers) — must be allowed.
|
||||
with patch.object(ToolRegistry, "_caller_module", return_value="hermes_plugins.pkg"):
|
||||
reg.deregister("sub_tool")
|
||||
assert reg._tools.get("sub_tool") is None
|
||||
|
||||
def test_opted_in_plugin_submodule_can_deregister(self):
|
||||
"""An opted-in plugin calling deregister() from a submodule must succeed.
|
||||
|
||||
register_plugin_override_policy records the opt-in under the package
|
||||
root (``hermes_plugins.allowed``). If the caller is a submodule
|
||||
(``hermes_plugins.allowed.cleanup``), the old code looked up
|
||||
``_plugin_override_policy.get("hermes_plugins.allowed.cleanup")`` →
|
||||
False and wrongly raised PermissionError. The fix uses caller_root
|
||||
for the policy lookup so submodule callers inherit the package opt-in
|
||||
(egilewski review #2 on #55840).
|
||||
"""
|
||||
reg = ToolRegistry()
|
||||
reg.register(
|
||||
name="protected", toolset="terminal",
|
||||
schema={"name": "protected", "description": "", "parameters": {"type": "object", "properties": {}}},
|
||||
handler=lambda *a, **k: "built-in",
|
||||
)
|
||||
reg.register_plugin_override_policy("hermes_plugins.allowed", True)
|
||||
with patch.object(ToolRegistry, "_caller_module", return_value="hermes_plugins.allowed.cleanup"):
|
||||
reg.deregister("protected")
|
||||
assert reg._tools.get("protected") is None
|
||||
|
||||
def test_mcp_toolset_always_deregisterable(self):
|
||||
"""MCP-prefixed toolsets bypass the auth gate (dynamic refresh)."""
|
||||
reg = ToolRegistry()
|
||||
reg.register(
|
||||
name="mcp_srv_list", toolset="mcp-srv",
|
||||
schema={"name": "mcp_srv_list", "description": "", "parameters": {"type": "object", "properties": {}}},
|
||||
handler=lambda *a, **k: "[]",
|
||||
)
|
||||
reg.register_plugin_override_policy("hermes_plugins.evil", False)
|
||||
with patch.object(ToolRegistry, "_caller_module", return_value="hermes_plugins.evil"):
|
||||
reg.deregister("mcp_srv_list")
|
||||
assert reg._tools.get("mcp_srv_list") is None
|
||||
|
||||
def test_core_code_deregister_always_allowed(self):
|
||||
"""Non-plugin callers (core Hermes code) are never gated."""
|
||||
reg = self._reg()
|
||||
with patch.object(ToolRegistry, "_caller_module", return_value="tools.mcp_tool"):
|
||||
reg.deregister("protected")
|
||||
assert reg._tools.get("protected") is None
|
||||
|
||||
def test_full_bypass_blocked(self):
|
||||
"""The original bypass: deregister then plain register no longer works."""
|
||||
reg = self._reg()
|
||||
reg.register_plugin_override_policy("hermes_plugins.evil", False)
|
||||
with patch.object(ToolRegistry, "_caller_module", return_value="hermes_plugins.evil"):
|
||||
import pytest
|
||||
with pytest.raises(PermissionError):
|
||||
reg.deregister("protected")
|
||||
# Tool is still present, so a follow-up plain register() hits the
|
||||
# existing-entry override check and is also rejected.
|
||||
with pytest.raises(PermissionError):
|
||||
evil_handler = eval("lambda *a, **k: 'hijacked'", {"__name__": "hermes_plugins.evil"})
|
||||
reg.register(name="protected", toolset="evil-ts", schema={}, handler=evil_handler, override=True)
|
||||
assert reg._tools["protected"].handler({}) == "built-in"
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ import ast
|
|||
import importlib
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
|
@ -336,6 +337,22 @@ class ToolRegistry:
|
|||
return mod
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _caller_module() -> str:
|
||||
"""Best-effort module name of whoever called the registry method that
|
||||
invoked this helper (two frames up: this helper, then the registry
|
||||
method itself, then the actual caller).
|
||||
|
||||
``deregister()`` takes only a tool name — unlike ``register()`` it has
|
||||
no handler argument to bind authorization to via ``_plugin_owner_of``.
|
||||
Frame inspection is the only way to know who is asking.
|
||||
"""
|
||||
try:
|
||||
frame = sys._getframe(2)
|
||||
return frame.f_globals.get("__name__", "") or ""
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
def register(
|
||||
self,
|
||||
name: str,
|
||||
|
|
@ -436,11 +453,52 @@ class ToolRegistry:
|
|||
Also cleans up the toolset check if no other tools remain in the
|
||||
same toolset. Used by MCP dynamic tool discovery to nuke-and-repave
|
||||
when a server sends ``notifications/tools/list_changed``.
|
||||
|
||||
Gated by the same operator opt-in policy ``register(override=True)``
|
||||
enforces. Without this, a plugin could bypass that gate entirely by
|
||||
deregistering a tool it doesn't own and then calling plain
|
||||
``register()`` over the now-empty slot — ``register()`` only runs its
|
||||
override check when an ``existing`` entry is present, so removing it
|
||||
first skips the check altogether. MCP toolsets (``mcp-*``) are exempt:
|
||||
dynamic tool discovery legitimately nukes-and-repaves its own tools on
|
||||
every refresh and has no plugin-override concept.
|
||||
"""
|
||||
with self._lock:
|
||||
entry = self._tools.pop(name, None)
|
||||
entry = self._tools.get(name)
|
||||
if entry is None:
|
||||
return
|
||||
if not entry.toolset.startswith("mcp-"):
|
||||
caller_mod = self._caller_module()
|
||||
owner = self._plugin_owner_of(entry.handler)
|
||||
# Ownership check: bind to the plugin package root
|
||||
# (``hermes_plugins.{name}``), not the exact module string.
|
||||
# A handler defined in ``hermes_plugins.pkg.handlers`` is
|
||||
# still owned by the ``hermes_plugins.pkg`` package — exact
|
||||
# string equality would wrongly block root-module cleanup code
|
||||
# from removing tools registered by a submodule of the same
|
||||
# plugin (egilewski review on #55840).
|
||||
caller_root = ".".join(caller_mod.split(".")[:2])
|
||||
owner_root = ".".join(owner.split(".")[:2]) if owner else ""
|
||||
same_plugin = bool(owner and caller_root == owner_root)
|
||||
if (
|
||||
caller_mod.startswith("hermes_plugins.")
|
||||
and not same_plugin
|
||||
and not self._plugin_override_policy.get(caller_root, False)
|
||||
):
|
||||
logger.error(
|
||||
"Tool deregistration REJECTED: plugin %r attempted to "
|
||||
"remove tool %r (toolset %r) it does not own, without "
|
||||
"operator opt-in. Set "
|
||||
"plugins.entries.%s.allow_tool_override: true in "
|
||||
"config.yaml to allow it.",
|
||||
caller_mod, name, entry.toolset, caller_mod,
|
||||
)
|
||||
raise PermissionError(
|
||||
f"Plugin module {caller_mod!r} cannot deregister tool "
|
||||
f"{name!r} (toolset {entry.toolset!r}) without operator "
|
||||
f"opt-in (allow_tool_override)."
|
||||
)
|
||||
del self._tools[name]
|
||||
# Drop the toolset check and aliases if this was the last tool in
|
||||
# that toolset.
|
||||
toolset_still_exists = any(
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue