diff --git a/tests/tools/test_browser_eval_ssrf.py b/tests/tools/test_browser_eval_ssrf.py index 654e61b98..64b11aa8c 100644 --- a/tests/tools/test_browser_eval_ssrf.py +++ b/tests/tools/test_browser_eval_ssrf.py @@ -141,6 +141,85 @@ class TestExpressionPreScan: # --------------------------------------------------------------------------- +class TestCamofoxEvalGuard: + def _guard_on(self, monkeypatch): + monkeypatch.setattr(browser_tool, "_is_camofox_mode", lambda: True) + monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False) + monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: False) + monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: False) + + def test_camofox_blocks_private_fetch_literal_before_request(self, monkeypatch): + self._guard_on(monkeypatch) + monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False) + monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False) + + import tools.browser_camofox as camofox + + def fail_session(*_args, **_kwargs): + raise AssertionError("Camofox request should not run for a private URL literal") + + monkeypatch.setattr(camofox, "_ensure_tab", fail_session) + + result = _eval(f"fetch('{PRIVATE_URL}').then(r => r.text())") + + assert result["success"] is False + assert "private or internal address" in result["error"] + assert PRIVATE_URL in result["error"] + + def test_camofox_blocks_when_current_page_is_private(self, monkeypatch): + self._guard_on(monkeypatch) + monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False) + monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False) + + import tools.browser_camofox as camofox + + monkeypatch.setattr(camofox, "_ensure_tab", lambda task_id: {"tab_id": "tab-1", "user_id": "user-1"}) + + def fake_post(path, body=None, **_kwargs): + if body and body.get("expression") == "window.location.href": + return {"result": PRIVATE_URL} + return {"result": "secret DOM text"} + + monkeypatch.setattr(camofox, "_post", fake_post) + + result = _eval("document.body.innerText") + + assert result["success"] is False + assert "private or internal address" in result["error"] + assert PRIVATE_URL in result["error"] + assert "secret DOM text" not in json.dumps(result) + + def test_camofox_uses_raw_task_id_not_resolved_session_key(self, monkeypatch): + # Camofox keeps its own raw-task_id-keyed session map; eval must pass the + # raw task_id (like every sibling Camofox tool), NOT the agent-browser + # _last_session_key-resolved key, or it can hit a different/new tab and + # skip the pre-scan via a mismatched _is_local_sidecar_key check. + self._guard_on(monkeypatch) + monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True) + monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False) + monkeypatch.setattr( + browser_tool, "_last_session_key", lambda task_id: "resolved-agent-browser-key" + ) + + import tools.browser_camofox as camofox + + seen = {} + + def record_tab(task_id): + seen["task_id"] = task_id + return {"tab_id": "tab-1", "user_id": "user-1"} + + monkeypatch.setattr(camofox, "_ensure_tab", record_tab) + monkeypatch.setattr( + camofox, "_post", lambda path, body=None, **_kw: {"result": "https://example.com"} + ) + + result = _eval("document.title", task_id="test") + + assert result["success"] is True + assert seen["task_id"] == "test" + + class TestPostEvalPageRecheck: def _guard_on(self, monkeypatch): monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False) diff --git a/tools/browser_tool.py b/tools/browser_tool.py index 3604450c9..82c248a3f 100644 --- a/tools/browser_tool.py +++ b/tools/browser_tool.py @@ -3536,19 +3536,8 @@ def _enforce_browser_eval_policy(expression: str) -> Optional[str]: def _browser_eval(expression: str, task_id: Optional[str] = None) -> str: """Evaluate a JavaScript expression in the page context and return the result.""" - if _is_camofox_mode(): - return _camofox_eval(expression, task_id) - effective_task_id = _last_session_key(task_id or "default") - # ── Private-network guard (eval return-value path) ────────────────────── - # browser_snapshot / browser_vision re-check the page URL before returning - # content, but eval returns arbitrary JS results directly — an attacker can - # read a private page via `fetch('http://127.0.0.1/secret')` or by reading - # the DOM after `location.href = 'http://127.0.0.1/'`, never touching - # snapshot/vision. Close both sub-paths on the same gating condition: - # 1. Pre-scan the expression for private-host URL literals (direct fetch). - # 2. After eval, re-check the page URL (navigate-then-read). if _eval_ssrf_guard_active(effective_task_id): blocked_literal = _expression_targets_private_url(expression) if blocked_literal: @@ -3562,6 +3551,19 @@ def _browser_eval(expression: str, task_id: Optional[str] = None) -> str: ), }, ensure_ascii=False) + # Camofox keeps its own raw-``task_id``-keyed session map, so pass the raw + # id (matching every other Camofox tool) rather than the resolved + # agent-browser session key. The literal pre-scan above already ran. + if _is_camofox_mode(): + return _camofox_eval(expression, task_id) + + # ── Private-network guard (eval return-value path) ────────────────────── + # The literal pre-scan above closes the direct-fetch sub-path + # (`fetch('http://127.0.0.1/secret')`). The post-eval page-URL recheck + # below closes the navigate-then-read sub-path (`location.href = '...'` + # then read the DOM) — eval returns arbitrary JS results directly, never + # touching snapshot/vision, so both sub-paths gate on the same condition. + # --- Fast path: route through the supervisor's persistent CDP WS --------- # When a CDPSupervisor is alive for this task_id, ``Runtime.evaluate`` runs # on the already-connected WebSocket — zero subprocess startup cost vs @@ -3688,13 +3690,39 @@ def _browser_eval(expression: str, task_id: Optional[str] = None) -> str: return json.dumps(_copy_fallback_warning(response, result), ensure_ascii=False, default=str) +def _camofox_current_page_private_url(tab_id: str, user_id: str) -> Optional[str]: + """Return the Camofox page URL when it targets a private/internal address. + + Camofox analogue of ``_current_page_private_url`` (evaluate endpoint instead + of the agent-browser CLI). Returns ``None`` when the page is public, the URL + can't be determined, or the probe errors (fail-open on probe failure, + matching the snapshot/vision guards — do not change to fail-closed without + also changing the sibling). + """ + try: + from tools.browser_camofox import _post + + data = _post( + f"/tabs/{tab_id}/evaluate", + body={"expression": "window.location.href", "userId": user_id}, + ) + current_url = str(data.get("result") if isinstance(data, dict) else data or "") + current_url = current_url.strip().strip('"').strip("'") + if current_url and (_is_always_blocked_url(current_url) or not _is_safe_url(current_url)): + return current_url + except Exception as exc: + logger.debug("_camofox_current_page_private_url: probe failed (%s)", exc) + return None + + def _camofox_eval(expression: str, task_id: Optional[str] = None) -> str: - """Evaluate JS via Camofox's /tabs/{tab_id}/eval endpoint (if available).""" + """Evaluate JS via Camofox's /tabs/{tab_id}/evaluate endpoint (if available).""" from tools.browser_camofox import _ensure_tab, _post try: tab_info = _ensure_tab(task_id or "default") tab_id = tab_info.get("tab_id") or tab_info.get("id") - resp = _post(f"/tabs/{tab_id}/evaluate", body={"expression": expression, "userId": tab_info["user_id"]}) + user_id = tab_info["user_id"] + resp = _post(f"/tabs/{tab_id}/evaluate", body={"expression": expression, "userId": user_id}) # Camofox returns the result in a JSON envelope raw_result = resp.get("result") if isinstance(resp, dict) else resp @@ -3705,6 +3733,18 @@ def _camofox_eval(expression: str, task_id: Optional[str] = None) -> str: except (json.JSONDecodeError, ValueError): pass + if _eval_ssrf_guard_active(task_id or "default"): + _blocked_url = _camofox_current_page_private_url(tab_id, user_id) + if _blocked_url: + return json.dumps({ + "success": False, + "error": ( + "Blocked: page URL targets a private or internal address " + f"({_blocked_url}). This may have been caused by a " + "JavaScript navigation via browser_console." + ), + }, ensure_ascii=False) + return json.dumps({ "success": True, "result": _redact_browser_output(parsed),