fix(dashboard): enforce plugin disabled gate at request time and for bundled assets

Address two residual bypasses identified in review:

1. Add _plugin_api_runtime_gate middleware that checks plugins.enabled/
   plugins.disabled on every request to /api/plugins/{name}/... routes.
   Previously, disabling a plugin at runtime had no effect on its already-
   mounted API routes until a restart.

2. Extend serve_plugin_asset to check plugins.disabled for bundled plugins.
   Previously, only user plugins were gated — a bundled plugin in
   plugins.disabled would still serve assets from the unauthenticated
   /dashboard-plugins/{name}/... endpoint.

Both fixes ensure the enabled/disabled policy is evaluated live at request
time, not just at startup.

Adds regression tests covering:
- Middleware blocks disabled user plugin API routes (404)
- Middleware blocks user plugin removed from enabled set (404)
- Middleware passes enabled user plugin API routes
- Middleware blocks disabled bundled plugin API routes (404)
- Bundled plugin assets return 404 when disabled
- Bundled plugin assets served normally when not disabled
- User plugin asset gating still works correctly
This commit is contained in:
manusjs 2026-06-23 11:50:38 +00:00 committed by Teknium
parent 7cff95644d
commit b2e0086f1b
2 changed files with 457 additions and 8 deletions

View file

@ -515,6 +515,57 @@ async def auth_middleware(request: Request, call_next):
return await call_next(request)
@app.middleware("http")
async def _plugin_api_runtime_gate(request: Request, call_next):
"""Block requests to disabled plugin API routes at request time.
:func:`_mount_plugin_api_routes` gates at import time, but if a plugin
is disabled *after* the dashboard is already running, its FastAPI router
remains mounted until restart. This middleware enforces the enabled/
disabled policy on every request to ``/api/plugins/{name}/...`` so that
runtime config changes take effect immediately.
"""
path = request.url.path
if path.startswith("/api/plugins/"):
# Extract plugin name from /api/plugins/<name>/...
parts = path.split("/")
# parts: ['', 'api', 'plugins', '<name>', ...]
if len(parts) >= 4:
plugin_name = parts[3]
if plugin_name:
try:
from hermes_cli.plugins_cmd import (
_get_enabled_set,
_get_disabled_set,
)
enabled_set = _get_enabled_set()
disabled_set = _get_disabled_set()
except Exception:
enabled_set = set()
disabled_set = set()
# Determine plugin source. Check the cached plugin list;
# if not found, assume user plugin (safe default — blocks).
plugins = _get_dashboard_plugins()
plugin = next(
(p for p in plugins if p.get("name") == plugin_name),
None,
)
source = plugin.get("source") if plugin else "user"
if source == "user":
if plugin_name in disabled_set or plugin_name not in enabled_set:
return JSONResponse(
status_code=404,
content={"detail": "Plugin not found"},
)
elif source == "bundled":
if plugin_name in disabled_set:
return JSONResponse(
status_code=404,
content={"detail": "Plugin not found"},
)
return await call_next(request)
@app.middleware("http")
async def _token_auth_seam(request: Request, call_next):
"""Outermost auth seam: non-interactive bearer-token auth for opted-in routes.
@ -13793,17 +13844,21 @@ async def serve_plugin_asset(plugin_name: str, file_path: str):
if not plugin:
raise HTTPException(status_code=404, detail="Plugin not found")
# Gate: user plugins must be enabled to serve assets.
# Gate: user plugins must be enabled to serve assets;
# bundled plugins must not be explicitly disabled.
try:
from hermes_cli.plugins_cmd import _get_enabled_set, _get_disabled_set
enabled_set = _get_enabled_set()
disabled_set = _get_disabled_set()
except Exception:
enabled_set = set()
disabled_set = set()
if plugin.get("source") == "user":
try:
from hermes_cli.plugins_cmd import _get_enabled_set, _get_disabled_set
enabled_set = _get_enabled_set()
disabled_set = _get_disabled_set()
except Exception:
enabled_set = set()
disabled_set = set()
if plugin_name in disabled_set or plugin_name not in enabled_set:
raise HTTPException(status_code=404, detail="Plugin not found")
elif plugin.get("source") == "bundled":
if plugin_name in disabled_set:
raise HTTPException(status_code=404, detail="Plugin not found")
base = Path(plugin["_dir"])
target = (base / file_path).resolve()

View file

@ -0,0 +1,394 @@
"""Regression tests for runtime plugin disable gating.
Covers two residual bypasses addressed in the PR:
1. Plugin API routes mounted at startup remain callable even after the
plugin is added to ``plugins.disabled`` at runtime. The new
``_plugin_api_runtime_gate`` middleware blocks these requests.
2. Bundled plugin assets were served from the unauthenticated
``/dashboard-plugins/{name}/{path}`` route even when the bundled
plugin was in ``plugins.disabled``. The updated ``serve_plugin_asset``
now applies the disabled check to bundled plugins too.
"""
from __future__ import annotations
import json
from pathlib import Path
from unittest.mock import patch, AsyncMock
import pytest
from hermes_cli import web_server
@pytest.fixture(autouse=True)
def _reset_plugin_cache():
"""Bust the plugin cache before and after each test."""
web_server._dashboard_plugins_cache = None
yield
web_server._dashboard_plugins_cache = None
@pytest.fixture
def test_client(monkeypatch, tmp_path):
"""Set up a Starlette TestClient with auth bypassed."""
try:
from starlette.testclient import TestClient
except ImportError:
pytest.skip("fastapi/starlette not installed")
from hermes_cli.web_server import app, _SESSION_HEADER_NAME, _SESSION_TOKEN
# Isolate HERMES_HOME so config reads go to our tmp.
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home"))
(tmp_path / "home").mkdir(parents=True)
client = TestClient(app)
client.headers[_SESSION_HEADER_NAME] = _SESSION_TOKEN
return client
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_user_plugin(tmp_path, name="hot"):
"""Create a minimal user plugin with a JS asset."""
dashboard_dir = tmp_path / "plugins" / name / "dashboard"
dashboard_dir.mkdir(parents=True)
dist_dir = dashboard_dir / "dist"
dist_dir.mkdir()
(dist_dir / "index.js").write_text("console.log('hello');")
(dashboard_dir / "manifest.json").write_text(json.dumps({
"name": name,
"label": name.title(),
"entry": "dist/index.js",
}))
return dashboard_dir
def _make_bundled_plugin(tmp_path, name="bundledx"):
"""Create a minimal bundled plugin with a JS asset."""
dashboard_dir = tmp_path / "bundled" / name / "dashboard"
dashboard_dir.mkdir(parents=True)
dist_dir = dashboard_dir / "dist"
dist_dir.mkdir()
(dist_dir / "index.js").write_text("console.log('bundled');")
(dashboard_dir / "manifest.json").write_text(json.dumps({
"name": name,
"label": name.title(),
"entry": "dist/index.js",
}))
return dashboard_dir
# ---------------------------------------------------------------------------
# Test 1: Runtime-disabled user plugin API routes return 404
# ---------------------------------------------------------------------------
class TestPluginApiRuntimeGate:
"""After a user plugin is disabled at runtime, its mounted API routes
must return 404 not 200 even though the router was already
included at startup. The _plugin_api_runtime_gate middleware enforces
this at request time."""
@pytest.mark.asyncio
async def test_middleware_blocks_disabled_user_plugin(self):
"""Middleware returns 404 for a user plugin added to disabled set."""
from starlette.requests import Request
from starlette.responses import JSONResponse
fake_plugin = {
"name": "hot",
"source": "user",
}
# Simulate a request to /api/plugins/hot/probe
scope = {
"type": "http",
"method": "GET",
"path": "/api/plugins/hot/probe",
"query_string": b"",
"headers": [],
}
request = Request(scope)
call_next = AsyncMock(return_value=JSONResponse({"ok": True}))
with patch.object(web_server, "_get_dashboard_plugins", return_value=[fake_plugin]), \
patch("hermes_cli.plugins_cmd._get_enabled_set", return_value={"hot"}), \
patch("hermes_cli.plugins_cmd._get_disabled_set", return_value={"hot"}):
response = await web_server._plugin_api_runtime_gate(request, call_next)
assert response.status_code == 404
call_next.assert_not_called()
@pytest.mark.asyncio
async def test_middleware_blocks_unenabled_user_plugin(self):
"""Middleware returns 404 when user plugin not in enabled set."""
from starlette.requests import Request
from starlette.responses import JSONResponse
fake_plugin = {
"name": "hot",
"source": "user",
}
scope = {
"type": "http",
"method": "GET",
"path": "/api/plugins/hot/probe",
"query_string": b"",
"headers": [],
}
request = Request(scope)
call_next = AsyncMock(return_value=JSONResponse({"ok": True}))
with patch.object(web_server, "_get_dashboard_plugins", return_value=[fake_plugin]), \
patch("hermes_cli.plugins_cmd._get_enabled_set", return_value=set()), \
patch("hermes_cli.plugins_cmd._get_disabled_set", return_value=set()):
response = await web_server._plugin_api_runtime_gate(request, call_next)
assert response.status_code == 404
call_next.assert_not_called()
@pytest.mark.asyncio
async def test_middleware_passes_enabled_user_plugin(self):
"""Middleware passes through for an enabled user plugin."""
from starlette.requests import Request
from starlette.responses import JSONResponse
fake_plugin = {
"name": "hot",
"source": "user",
}
scope = {
"type": "http",
"method": "GET",
"path": "/api/plugins/hot/probe",
"query_string": b"",
"headers": [],
}
request = Request(scope)
expected_resp = JSONResponse({"ok": True})
call_next = AsyncMock(return_value=expected_resp)
with patch.object(web_server, "_get_dashboard_plugins", return_value=[fake_plugin]), \
patch("hermes_cli.plugins_cmd._get_enabled_set", return_value={"hot"}), \
patch("hermes_cli.plugins_cmd._get_disabled_set", return_value=set()):
response = await web_server._plugin_api_runtime_gate(request, call_next)
assert response is expected_resp
call_next.assert_called_once()
@pytest.mark.asyncio
async def test_middleware_blocks_disabled_bundled_plugin(self):
"""Middleware returns 404 for a bundled plugin in disabled set."""
from starlette.requests import Request
from starlette.responses import JSONResponse
fake_plugin = {
"name": "bundledx",
"source": "bundled",
}
scope = {
"type": "http",
"method": "GET",
"path": "/api/plugins/bundledx/probe",
"query_string": b"",
"headers": [],
}
request = Request(scope)
call_next = AsyncMock(return_value=JSONResponse({"ok": True}))
with patch.object(web_server, "_get_dashboard_plugins", return_value=[fake_plugin]), \
patch("hermes_cli.plugins_cmd._get_enabled_set", return_value=set()), \
patch("hermes_cli.plugins_cmd._get_disabled_set", return_value={"bundledx"}):
response = await web_server._plugin_api_runtime_gate(request, call_next)
assert response.status_code == 404
call_next.assert_not_called()
@pytest.mark.asyncio
async def test_middleware_passes_enabled_bundled_plugin(self):
"""Middleware passes through for a bundled plugin not in disabled set."""
from starlette.requests import Request
from starlette.responses import JSONResponse
fake_plugin = {
"name": "bundledx",
"source": "bundled",
}
scope = {
"type": "http",
"method": "GET",
"path": "/api/plugins/bundledx/probe",
"query_string": b"",
"headers": [],
}
request = Request(scope)
expected_resp = JSONResponse({"ok": True})
call_next = AsyncMock(return_value=expected_resp)
with patch.object(web_server, "_get_dashboard_plugins", return_value=[fake_plugin]), \
patch("hermes_cli.plugins_cmd._get_enabled_set", return_value=set()), \
patch("hermes_cli.plugins_cmd._get_disabled_set", return_value=set()):
response = await web_server._plugin_api_runtime_gate(request, call_next)
assert response is expected_resp
call_next.assert_called_once()
@pytest.mark.asyncio
async def test_middleware_passes_non_plugin_api_routes(self):
"""Middleware ignores non-plugin API routes."""
from starlette.requests import Request
from starlette.responses import JSONResponse
scope = {
"type": "http",
"method": "GET",
"path": "/api/status",
"query_string": b"",
"headers": [],
}
request = Request(scope)
expected_resp = JSONResponse({"ok": True})
call_next = AsyncMock(return_value=expected_resp)
response = await web_server._plugin_api_runtime_gate(request, call_next)
assert response is expected_resp
call_next.assert_called_once()
@pytest.mark.asyncio
async def test_middleware_unknown_plugin_defaults_to_user_blocks(self):
"""Unknown plugin name (not in discovery cache) is treated as user
plugin and blocked when not enabled."""
from starlette.requests import Request
from starlette.responses import JSONResponse
scope = {
"type": "http",
"method": "GET",
"path": "/api/plugins/unknown/action",
"query_string": b"",
"headers": [],
}
request = Request(scope)
call_next = AsyncMock(return_value=JSONResponse({"ok": True}))
with patch.object(web_server, "_get_dashboard_plugins", return_value=[]), \
patch("hermes_cli.plugins_cmd._get_enabled_set", return_value=set()), \
patch("hermes_cli.plugins_cmd._get_disabled_set", return_value=set()):
response = await web_server._plugin_api_runtime_gate(request, call_next)
assert response.status_code == 404
call_next.assert_not_called()
# ---------------------------------------------------------------------------
# Test 2: Disabled bundled plugin assets return 404
# ---------------------------------------------------------------------------
class TestBundledPluginAssetGate:
"""Bundled plugins in ``plugins.disabled`` must have their static
assets blocked not just hidden from the listing endpoint."""
def test_bundled_asset_returns_404_when_disabled(self, test_client, tmp_path, monkeypatch):
"""A disabled bundled plugin's JS asset must return 404."""
plugin_dir = _make_bundled_plugin(tmp_path, "bundledx")
fake_plugin = {
"name": "bundledx",
"label": "Bundledx",
"source": "bundled",
"entry": "dist/index.js",
"_dir": str(plugin_dir),
}
with patch.object(web_server, "_get_dashboard_plugins", return_value=[fake_plugin]):
# Sanity: asset is served when not disabled.
with patch(
"hermes_cli.plugins_cmd._get_enabled_set", return_value=set()
), patch(
"hermes_cli.plugins_cmd._get_disabled_set", return_value=set()
):
resp = test_client.get("/dashboard-plugins/bundledx/dist/index.js")
assert resp.status_code == 200, (
"Sanity: bundled plugin asset should be served when not disabled"
)
# Disable it.
with patch(
"hermes_cli.plugins_cmd._get_enabled_set", return_value=set()
), patch(
"hermes_cli.plugins_cmd._get_disabled_set", return_value={"bundledx"}
):
resp = test_client.get("/dashboard-plugins/bundledx/dist/index.js")
assert resp.status_code == 404, (
"Disabled bundled plugin asset must return 404"
)
def test_bundled_asset_served_when_not_disabled(self, test_client, tmp_path, monkeypatch):
"""Bundled plugin assets are served normally when not in disabled set."""
plugin_dir = _make_bundled_plugin(tmp_path, "goodbundled")
fake_plugin = {
"name": "goodbundled",
"label": "Good Bundled",
"source": "bundled",
"entry": "dist/index.js",
"_dir": str(plugin_dir),
}
with patch.object(web_server, "_get_dashboard_plugins", return_value=[fake_plugin]):
with patch(
"hermes_cli.plugins_cmd._get_enabled_set", return_value=set()
), patch(
"hermes_cli.plugins_cmd._get_disabled_set", return_value=set()
):
resp = test_client.get("/dashboard-plugins/goodbundled/dist/index.js")
assert resp.status_code == 200
def test_user_plugin_asset_still_gated(self, test_client, tmp_path, monkeypatch):
"""User plugins still require enabled set membership for assets."""
plugin_dir = _make_user_plugin(tmp_path, "userplugin")
fake_plugin = {
"name": "userplugin",
"label": "User Plugin",
"source": "user",
"entry": "dist/index.js",
"_dir": str(plugin_dir),
}
with patch.object(web_server, "_get_dashboard_plugins", return_value=[fake_plugin]):
# Not in enabled set → 404.
with patch(
"hermes_cli.plugins_cmd._get_enabled_set", return_value=set()
), patch(
"hermes_cli.plugins_cmd._get_disabled_set", return_value=set()
):
resp = test_client.get("/dashboard-plugins/userplugin/dist/index.js")
assert resp.status_code == 404
# In enabled set → 200.
with patch(
"hermes_cli.plugins_cmd._get_enabled_set", return_value={"userplugin"}
), patch(
"hermes_cli.plugins_cmd._get_disabled_set", return_value=set()
):
resp = test_client.get("/dashboard-plugins/userplugin/dist/index.js")
assert resp.status_code == 200