fix(browser): guard Camofox eval private pages
Extends the browser private-network eval guard to the Camofox backend. On main, _browser_eval() returned early in Camofox mode before running the shared private-URL literal pre-scan and before re-checking the page URL after eval, leaving Camofox as a sibling backend that could execute browser_console(expression=...) against private/internal targets. - move the eval private-URL literal pre-scan before the Camofox early return - add a Camofox current-page private-URL probe via the evaluate endpoint - withhold Camofox eval results when the page is now private/internal Follow-up to browser private-network hardening in #56173, #56526, #56664. Salvage of #56764 by @rayjun (rayoo), cherry-picked to preserve authorship.
This commit is contained in:
parent
f2b8a5d541
commit
6a58badfdc
2 changed files with 132 additions and 13 deletions
|
|
@ -141,6 +141,85 @@ class TestExpressionPreScan:
|
|||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCamofoxEvalGuard:
|
||||
def _guard_on(self, monkeypatch):
|
||||
monkeypatch.setattr(browser_tool, "_is_camofox_mode", lambda: True)
|
||||
monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False)
|
||||
monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: False)
|
||||
monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: False)
|
||||
|
||||
def test_camofox_blocks_private_fetch_literal_before_request(self, monkeypatch):
|
||||
self._guard_on(monkeypatch)
|
||||
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False)
|
||||
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
|
||||
|
||||
import tools.browser_camofox as camofox
|
||||
|
||||
def fail_session(*_args, **_kwargs):
|
||||
raise AssertionError("Camofox request should not run for a private URL literal")
|
||||
|
||||
monkeypatch.setattr(camofox, "_ensure_tab", fail_session)
|
||||
|
||||
result = _eval(f"fetch('{PRIVATE_URL}').then(r => r.text())")
|
||||
|
||||
assert result["success"] is False
|
||||
assert "private or internal address" in result["error"]
|
||||
assert PRIVATE_URL in result["error"]
|
||||
|
||||
def test_camofox_blocks_when_current_page_is_private(self, monkeypatch):
|
||||
self._guard_on(monkeypatch)
|
||||
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False)
|
||||
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
|
||||
|
||||
import tools.browser_camofox as camofox
|
||||
|
||||
monkeypatch.setattr(camofox, "_ensure_tab", lambda task_id: {"tab_id": "tab-1", "user_id": "user-1"})
|
||||
|
||||
def fake_post(path, body=None, **_kwargs):
|
||||
if body and body.get("expression") == "window.location.href":
|
||||
return {"result": PRIVATE_URL}
|
||||
return {"result": "secret DOM text"}
|
||||
|
||||
monkeypatch.setattr(camofox, "_post", fake_post)
|
||||
|
||||
result = _eval("document.body.innerText")
|
||||
|
||||
assert result["success"] is False
|
||||
assert "private or internal address" in result["error"]
|
||||
assert PRIVATE_URL in result["error"]
|
||||
assert "secret DOM text" not in json.dumps(result)
|
||||
|
||||
def test_camofox_uses_raw_task_id_not_resolved_session_key(self, monkeypatch):
|
||||
# Camofox keeps its own raw-task_id-keyed session map; eval must pass the
|
||||
# raw task_id (like every sibling Camofox tool), NOT the agent-browser
|
||||
# _last_session_key-resolved key, or it can hit a different/new tab and
|
||||
# skip the pre-scan via a mismatched _is_local_sidecar_key check.
|
||||
self._guard_on(monkeypatch)
|
||||
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True)
|
||||
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
|
||||
monkeypatch.setattr(
|
||||
browser_tool, "_last_session_key", lambda task_id: "resolved-agent-browser-key"
|
||||
)
|
||||
|
||||
import tools.browser_camofox as camofox
|
||||
|
||||
seen = {}
|
||||
|
||||
def record_tab(task_id):
|
||||
seen["task_id"] = task_id
|
||||
return {"tab_id": "tab-1", "user_id": "user-1"}
|
||||
|
||||
monkeypatch.setattr(camofox, "_ensure_tab", record_tab)
|
||||
monkeypatch.setattr(
|
||||
camofox, "_post", lambda path, body=None, **_kw: {"result": "https://example.com"}
|
||||
)
|
||||
|
||||
result = _eval("document.title", task_id="test")
|
||||
|
||||
assert result["success"] is True
|
||||
assert seen["task_id"] == "test"
|
||||
|
||||
|
||||
class TestPostEvalPageRecheck:
|
||||
def _guard_on(self, monkeypatch):
|
||||
monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False)
|
||||
|
|
|
|||
|
|
@ -3536,19 +3536,8 @@ def _enforce_browser_eval_policy(expression: str) -> Optional[str]:
|
|||
|
||||
def _browser_eval(expression: str, task_id: Optional[str] = None) -> str:
|
||||
"""Evaluate a JavaScript expression in the page context and return the result."""
|
||||
if _is_camofox_mode():
|
||||
return _camofox_eval(expression, task_id)
|
||||
|
||||
effective_task_id = _last_session_key(task_id or "default")
|
||||
|
||||
# ── Private-network guard (eval return-value path) ──────────────────────
|
||||
# browser_snapshot / browser_vision re-check the page URL before returning
|
||||
# content, but eval returns arbitrary JS results directly — an attacker can
|
||||
# read a private page via `fetch('http://127.0.0.1/secret')` or by reading
|
||||
# the DOM after `location.href = 'http://127.0.0.1/'`, never touching
|
||||
# snapshot/vision. Close both sub-paths on the same gating condition:
|
||||
# 1. Pre-scan the expression for private-host URL literals (direct fetch).
|
||||
# 2. After eval, re-check the page URL (navigate-then-read).
|
||||
if _eval_ssrf_guard_active(effective_task_id):
|
||||
blocked_literal = _expression_targets_private_url(expression)
|
||||
if blocked_literal:
|
||||
|
|
@ -3562,6 +3551,19 @@ def _browser_eval(expression: str, task_id: Optional[str] = None) -> str:
|
|||
),
|
||||
}, ensure_ascii=False)
|
||||
|
||||
# Camofox keeps its own raw-``task_id``-keyed session map, so pass the raw
|
||||
# id (matching every other Camofox tool) rather than the resolved
|
||||
# agent-browser session key. The literal pre-scan above already ran.
|
||||
if _is_camofox_mode():
|
||||
return _camofox_eval(expression, task_id)
|
||||
|
||||
# ── Private-network guard (eval return-value path) ──────────────────────
|
||||
# The literal pre-scan above closes the direct-fetch sub-path
|
||||
# (`fetch('http://127.0.0.1/secret')`). The post-eval page-URL recheck
|
||||
# below closes the navigate-then-read sub-path (`location.href = '...'`
|
||||
# then read the DOM) — eval returns arbitrary JS results directly, never
|
||||
# touching snapshot/vision, so both sub-paths gate on the same condition.
|
||||
|
||||
# --- Fast path: route through the supervisor's persistent CDP WS ---------
|
||||
# When a CDPSupervisor is alive for this task_id, ``Runtime.evaluate`` runs
|
||||
# on the already-connected WebSocket — zero subprocess startup cost vs
|
||||
|
|
@ -3688,13 +3690,39 @@ def _browser_eval(expression: str, task_id: Optional[str] = None) -> str:
|
|||
return json.dumps(_copy_fallback_warning(response, result), ensure_ascii=False, default=str)
|
||||
|
||||
|
||||
def _camofox_current_page_private_url(tab_id: str, user_id: str) -> Optional[str]:
|
||||
"""Return the Camofox page URL when it targets a private/internal address.
|
||||
|
||||
Camofox analogue of ``_current_page_private_url`` (evaluate endpoint instead
|
||||
of the agent-browser CLI). Returns ``None`` when the page is public, the URL
|
||||
can't be determined, or the probe errors (fail-open on probe failure,
|
||||
matching the snapshot/vision guards — do not change to fail-closed without
|
||||
also changing the sibling).
|
||||
"""
|
||||
try:
|
||||
from tools.browser_camofox import _post
|
||||
|
||||
data = _post(
|
||||
f"/tabs/{tab_id}/evaluate",
|
||||
body={"expression": "window.location.href", "userId": user_id},
|
||||
)
|
||||
current_url = str(data.get("result") if isinstance(data, dict) else data or "")
|
||||
current_url = current_url.strip().strip('"').strip("'")
|
||||
if current_url and (_is_always_blocked_url(current_url) or not _is_safe_url(current_url)):
|
||||
return current_url
|
||||
except Exception as exc:
|
||||
logger.debug("_camofox_current_page_private_url: probe failed (%s)", exc)
|
||||
return None
|
||||
|
||||
|
||||
def _camofox_eval(expression: str, task_id: Optional[str] = None) -> str:
|
||||
"""Evaluate JS via Camofox's /tabs/{tab_id}/eval endpoint (if available)."""
|
||||
"""Evaluate JS via Camofox's /tabs/{tab_id}/evaluate endpoint (if available)."""
|
||||
from tools.browser_camofox import _ensure_tab, _post
|
||||
try:
|
||||
tab_info = _ensure_tab(task_id or "default")
|
||||
tab_id = tab_info.get("tab_id") or tab_info.get("id")
|
||||
resp = _post(f"/tabs/{tab_id}/evaluate", body={"expression": expression, "userId": tab_info["user_id"]})
|
||||
user_id = tab_info["user_id"]
|
||||
resp = _post(f"/tabs/{tab_id}/evaluate", body={"expression": expression, "userId": user_id})
|
||||
|
||||
# Camofox returns the result in a JSON envelope
|
||||
raw_result = resp.get("result") if isinstance(resp, dict) else resp
|
||||
|
|
@ -3705,6 +3733,18 @@ def _camofox_eval(expression: str, task_id: Optional[str] = None) -> str:
|
|||
except (json.JSONDecodeError, ValueError):
|
||||
pass
|
||||
|
||||
if _eval_ssrf_guard_active(task_id or "default"):
|
||||
_blocked_url = _camofox_current_page_private_url(tab_id, user_id)
|
||||
if _blocked_url:
|
||||
return json.dumps({
|
||||
"success": False,
|
||||
"error": (
|
||||
"Blocked: page URL targets a private or internal address "
|
||||
f"({_blocked_url}). This may have been caused by a "
|
||||
"JavaScript navigation via browser_console."
|
||||
),
|
||||
}, ensure_ascii=False)
|
||||
|
||||
return json.dumps({
|
||||
"success": True,
|
||||
"result": _redact_browser_output(parsed),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue