From c1826e2690fe7e4813913f9b230b357b07626e19 Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Fri, 3 Jul 2026 18:34:14 +0530 Subject: [PATCH] fix(image-gen): route local-input credential guard through one shared chokepoint + cover xai (#57698) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to the per-provider guards. Three improvements from review: 1. Extract agent.file_safety.raise_if_read_blocked() as a single shared chokepoint and route the OpenAI, OpenRouter, and (newly) xAI image providers through it, replacing the 3x-duplicated inline try/except. Fixes the whole bug class: xai/_xai_image_field read a model-supplied local path via open() with no guard — the same vulnerability the PR fixed for OpenAI/OpenRouter, in a sibling provider it missed. 2. Strengthen the regression tests from pass-on-any-ValueError to true security invariants: spy open()/read_bytes() and assert the blocked credential is NEVER read; add negative controls (legit local image still loads; remote/data: URIs pass through unguarded) so a block-everything regression can't pass. 3. Guard is best-effort by design (defense-in-depth, not a security boundary) — documented on the shared helper. - agent/file_safety.py: raise_if_read_blocked() - plugins/image_gen/{openai,openrouter,xai}: route through helper - tests: no-read spies + negative controls across all three providers --- agent/file_safety.py | 24 +++++++++ plugins/image_gen/openai/__init__.py | 13 ++--- plugins/image_gen/openrouter/__init__.py | 12 ++--- plugins/image_gen/xai/__init__.py | 5 ++ .../plugins/image_gen/test_openai_provider.py | 50 +++++++++++++++++++ .../test_openrouter_compat_provider.py | 25 ++++++++++ tests/plugins/image_gen/test_xai_provider.py | 47 +++++++++++++++++ 7 files changed, 157 insertions(+), 19 deletions(-) diff --git a/agent/file_safety.py b/agent/file_safety.py index 02e1eba2a..d7e20ee5f 100644 --- a/agent/file_safety.py +++ b/agent/file_safety.py @@ -304,6 +304,30 @@ def get_read_block_error(path: str) -> Optional[str]: return None +def raise_if_read_blocked(path: str) -> None: + """Raise ``ValueError`` if ``path`` is a denied Hermes read (see + :func:`get_read_block_error`), else return. + + Shared chokepoint for provider input-loading sites that read a local + file the model/tool supplied (e.g. image-gen ``image_url`` / + ``reference_image_urls`` paths). Centralizes the guard so every provider + enforces the same read boundary with identical semantics instead of each + open-coding the try/except block (#57698). + + Best-effort by design: if ``agent.file_safety`` machinery is somehow + unavailable at the call site the guard no-ops rather than breaking local + image loading — consistent with the defense-in-depth (not security + boundary) framing of the denylist itself. The blocking ``ValueError`` from + a real hit still propagates; only unexpected internal errors are swallowed. + """ + try: + blocked = get_read_block_error(path) + except Exception: # noqa: BLE001 - guard must never break local-file loading + return + if blocked: + raise ValueError(blocked) + + # --------------------------------------------------------------------------- # Cross-profile write guard (#TBD) # diff --git a/plugins/image_gen/openai/__init__.py b/plugins/image_gen/openai/__init__.py index cecdac206..cfa9e42c9 100644 --- a/plugins/image_gen/openai/__init__.py +++ b/plugins/image_gen/openai/__init__.py @@ -146,17 +146,10 @@ def _load_image_bytes(ref: str) -> Tuple[bytes, str]: if "image/" in header: ext = header.split("image/", 1)[1].split(";", 1)[0] or "png" return base64.b64decode(b64), f"image.{ext}" - # Local file path. - try: - from agent.file_safety import get_read_block_error + # Local file path — enforce the shared credential-read guard before reading. + from agent.file_safety import raise_if_read_blocked - blocked = get_read_block_error(ref) - if blocked: - raise ValueError(blocked) - except ValueError: - raise - except Exception as exc: # noqa: BLE001 - preserve existing local-file behavior - logger.debug("OpenAI image input read guard unavailable: %s", exc) + raise_if_read_blocked(ref) with open(ref, "rb") as fh: data = fh.read() name = os.path.basename(ref) or "image.png" diff --git a/plugins/image_gen/openrouter/__init__.py b/plugins/image_gen/openrouter/__init__.py index a08bae614..a5c6c164d 100644 --- a/plugins/image_gen/openrouter/__init__.py +++ b/plugins/image_gen/openrouter/__init__.py @@ -97,16 +97,10 @@ def _to_image_url_part(ref: str) -> Optional[str]: if ref.startswith(("http://", "https://", "data:")): return ref path = Path(ref) - try: - from agent.file_safety import get_read_block_error + # Enforce the shared credential-read guard before inlining local bytes. + from agent.file_safety import raise_if_read_blocked - blocked = get_read_block_error(ref) - if blocked: - raise ValueError(blocked) - except ValueError: - raise - except Exception as exc: # noqa: BLE001 - keep local image refs best-effort - logger.debug("OpenRouter image input read guard unavailable: %s", exc) + raise_if_read_blocked(ref) try: raw = path.read_bytes() except OSError as exc: diff --git a/plugins/image_gen/xai/__init__.py b/plugins/image_gen/xai/__init__.py index 31a0b719b..5ce9f26cb 100644 --- a/plugins/image_gen/xai/__init__.py +++ b/plugins/image_gen/xai/__init__.py @@ -137,6 +137,11 @@ def _xai_image_field(source: str) -> Dict[str, str]: import base64 import os as _os + # Enforce the shared credential-read guard before reading local bytes + # (same boundary the OpenAI / OpenRouter / Codex image providers apply). + from agent.file_safety import raise_if_read_blocked + + raise_if_read_blocked(source) with open(_os.path.expanduser(source), "rb") as fh: # windows-footgun: ok raw = fh.read() ext = (_os.path.splitext(source)[1].lstrip(".") or "png").lower() diff --git a/tests/plugins/image_gen/test_openai_provider.py b/tests/plugins/image_gen/test_openai_provider.py index abd867823..2ac61c54e 100644 --- a/tests/plugins/image_gen/test_openai_provider.py +++ b/tests/plugins/image_gen/test_openai_provider.py @@ -135,6 +135,56 @@ class TestSourceImageLoading: with pytest.raises(ValueError, match="credential store"): openai_plugin._load_image_bytes(str(auth_json)) + def test_load_image_bytes_never_opens_blocked_credential(self, tmp_path, monkeypatch): + """The guard must fire BEFORE the file is opened — a credential store + must never be read into memory (#57698). Spy builtins.open and assert + it is never called for the blocked path.""" + hermes_home = tmp_path / ".hermes" + hermes_home.mkdir() + auth_json = hermes_home / "auth.json" + auth_json.write_text('{"api_key":"sk-secret"}', encoding="utf-8") + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + + import builtins + + real_open = builtins.open + opened: list = [] + + def _spy_open(file, *a, **k): + opened.append(str(file)) + return real_open(file, *a, **k) + + monkeypatch.setattr(builtins, "open", _spy_open) + with pytest.raises(ValueError, match="credential store"): + openai_plugin._load_image_bytes(str(auth_json)) + assert str(auth_json) not in opened, "blocked credential must never be opened" + + def test_load_image_bytes_allows_legit_local_image(self, tmp_path, monkeypatch): + """Negative control: a legitimate local image path is NOT blocked and + loads normally — proves the guard doesn't over-fire on everything.""" + hermes_home = tmp_path / ".hermes" + hermes_home.mkdir() + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + img = tmp_path / "pic.png" + img.write_bytes(b"\x89PNG\r\n\x1a\nfake-image-bytes") + + data, name = openai_plugin._load_image_bytes(str(img)) + assert data == b"\x89PNG\r\n\x1a\nfake-image-bytes" + assert name == "pic.png" + + def test_load_image_bytes_passthrough_data_uri_not_blocked(self, tmp_path, monkeypatch): + """Negative control: data: URIs are decoded, never routed through the + local-path guard (the guard only applies to local file reads).""" + import base64 + + hermes_home = tmp_path / ".hermes" + hermes_home.mkdir() + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + b64 = base64.b64encode(b"xyz").decode("ascii") + data, name = openai_plugin._load_image_bytes(f"data:image/png;base64,{b64}") + assert data == b"xyz" + assert name.endswith(".png") + class TestGenerate: def test_empty_prompt_rejected(self, provider): diff --git a/tests/plugins/image_gen/test_openrouter_compat_provider.py b/tests/plugins/image_gen/test_openrouter_compat_provider.py index 149fbc891..95052a9f8 100644 --- a/tests/plugins/image_gen/test_openrouter_compat_provider.py +++ b/tests/plugins/image_gen/test_openrouter_compat_provider.py @@ -181,6 +181,31 @@ class TestHelpers: with pytest.raises(ValueError, match="credential store"): _to_image_url_part(str(auth_json)) + def test_to_image_url_part_never_reads_blocked_credential(self, tmp_path, monkeypatch): + """The guard must fire BEFORE path.read_bytes() — the credential store + must never be inlined into a provider request (#57698).""" + from pathlib import Path as _P + + from plugins.image_gen.openrouter import _to_image_url_part + + hermes_home = tmp_path / ".hermes" + hermes_home.mkdir() + auth_json = hermes_home / "auth.json" + auth_json.write_text('{"api_key":"sk-secret"}', encoding="utf-8") + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + + real_read_bytes = _P.read_bytes + read: list = [] + + def _spy_read_bytes(self, *a, **k): + read.append(str(self)) + return real_read_bytes(self, *a, **k) + + monkeypatch.setattr(_P, "read_bytes", _spy_read_bytes) + with pytest.raises(ValueError, match="credential store"): + _to_image_url_part(str(auth_json)) + assert str(auth_json) not in read, "blocked credential must never be read" + def test_extract_images(self): from plugins.image_gen.openrouter import _extract_images diff --git a/tests/plugins/image_gen/test_xai_provider.py b/tests/plugins/image_gen/test_xai_provider.py index cf9708dae..a233240cf 100644 --- a/tests/plugins/image_gen/test_xai_provider.py +++ b/tests/plugins/image_gen/test_xai_provider.py @@ -492,3 +492,50 @@ def test_xai_image_field_expands_user_home(tmp_path, monkeypatch): field = _xai_image_field("~/pic.png") assert field["type"] == "image_url" assert field["url"].startswith("data:image/png;base64,") + + +class TestXAIImageFieldReadGuard: + """#57698: local image inputs must not read Hermes credential stores.""" + + def test_xai_image_field_blocks_credential_store(self, tmp_path, monkeypatch): + from plugins.image_gen.xai import _xai_image_field + + hermes_home = tmp_path / ".hermes" + hermes_home.mkdir() + auth_json = hermes_home / "auth.json" + auth_json.write_text('{"api_key":"sk-secret"}', encoding="utf-8") + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + + with pytest.raises(ValueError, match="credential store"): + _xai_image_field(str(auth_json)) + + def test_xai_image_field_never_opens_blocked_credential(self, tmp_path, monkeypatch): + """Guard fires before open() — credential store never read into memory.""" + import builtins + + from plugins.image_gen.xai import _xai_image_field + + hermes_home = tmp_path / ".hermes" + hermes_home.mkdir() + auth_json = hermes_home / "auth.json" + auth_json.write_text('{"api_key":"sk-secret"}', encoding="utf-8") + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + + real_open = builtins.open + opened: list = [] + + def _spy_open(file, *a, **k): + opened.append(str(file)) + return real_open(file, *a, **k) + + monkeypatch.setattr(builtins, "open", _spy_open) + with pytest.raises(ValueError, match="credential store"): + _xai_image_field(str(auth_json)) + assert str(auth_json) not in opened, "blocked credential must never be opened" + + def test_xai_image_field_passthrough_url_not_blocked(self, monkeypatch): + """Negative control: remote URLs and data: URIs pass through unguarded.""" + from plugins.image_gen.xai import _xai_image_field + + assert _xai_image_field("https://example.com/pic.png")["url"] == "https://example.com/pic.png" + assert _xai_image_field("data:image/png;base64,eHl6")["url"].startswith("data:image/png")