fix(image-gen): route local-input credential guard through one shared chokepoint + cover xai (#57698)

Follow-up to the per-provider guards. Three improvements from review:

1. Extract agent.file_safety.raise_if_read_blocked() as a single shared
   chokepoint and route the OpenAI, OpenRouter, and (newly) xAI image
   providers through it, replacing the 3x-duplicated inline try/except.
   Fixes the whole bug class: xai/_xai_image_field read a model-supplied
   local path via open() with no guard — the same vulnerability the PR
   fixed for OpenAI/OpenRouter, in a sibling provider it missed.
2. Strengthen the regression tests from pass-on-any-ValueError to true
   security invariants: spy open()/read_bytes() and assert the blocked
   credential is NEVER read; add negative controls (legit local image
   still loads; remote/data: URIs pass through unguarded) so a
   block-everything regression can't pass.
3. Guard is best-effort by design (defense-in-depth, not a security
   boundary) — documented on the shared helper.

- agent/file_safety.py: raise_if_read_blocked()
- plugins/image_gen/{openai,openrouter,xai}: route through helper
- tests: no-read spies + negative controls across all three providers
This commit is contained in:
kshitijk4poor 2026-07-03 18:34:14 +05:30 committed by kshitij
parent 587be5b5b4
commit c1826e2690
7 changed files with 157 additions and 19 deletions

View file

@ -304,6 +304,30 @@ def get_read_block_error(path: str) -> Optional[str]:
return None
def raise_if_read_blocked(path: str) -> None:
"""Raise ``ValueError`` if ``path`` is a denied Hermes read (see
:func:`get_read_block_error`), else return.
Shared chokepoint for provider input-loading sites that read a local
file the model/tool supplied (e.g. image-gen ``image_url`` /
``reference_image_urls`` paths). Centralizes the guard so every provider
enforces the same read boundary with identical semantics instead of each
open-coding the try/except block (#57698).
Best-effort by design: if ``agent.file_safety`` machinery is somehow
unavailable at the call site the guard no-ops rather than breaking local
image loading consistent with the defense-in-depth (not security
boundary) framing of the denylist itself. The blocking ``ValueError`` from
a real hit still propagates; only unexpected internal errors are swallowed.
"""
try:
blocked = get_read_block_error(path)
except Exception: # noqa: BLE001 - guard must never break local-file loading
return
if blocked:
raise ValueError(blocked)
# ---------------------------------------------------------------------------
# Cross-profile write guard (#TBD)
#

View file

@ -146,17 +146,10 @@ def _load_image_bytes(ref: str) -> Tuple[bytes, str]:
if "image/" in header:
ext = header.split("image/", 1)[1].split(";", 1)[0] or "png"
return base64.b64decode(b64), f"image.{ext}"
# Local file path.
try:
from agent.file_safety import get_read_block_error
# Local file path — enforce the shared credential-read guard before reading.
from agent.file_safety import raise_if_read_blocked
blocked = get_read_block_error(ref)
if blocked:
raise ValueError(blocked)
except ValueError:
raise
except Exception as exc: # noqa: BLE001 - preserve existing local-file behavior
logger.debug("OpenAI image input read guard unavailable: %s", exc)
raise_if_read_blocked(ref)
with open(ref, "rb") as fh:
data = fh.read()
name = os.path.basename(ref) or "image.png"

View file

@ -97,16 +97,10 @@ def _to_image_url_part(ref: str) -> Optional[str]:
if ref.startswith(("http://", "https://", "data:")):
return ref
path = Path(ref)
try:
from agent.file_safety import get_read_block_error
# Enforce the shared credential-read guard before inlining local bytes.
from agent.file_safety import raise_if_read_blocked
blocked = get_read_block_error(ref)
if blocked:
raise ValueError(blocked)
except ValueError:
raise
except Exception as exc: # noqa: BLE001 - keep local image refs best-effort
logger.debug("OpenRouter image input read guard unavailable: %s", exc)
raise_if_read_blocked(ref)
try:
raw = path.read_bytes()
except OSError as exc:

View file

@ -137,6 +137,11 @@ def _xai_image_field(source: str) -> Dict[str, str]:
import base64
import os as _os
# Enforce the shared credential-read guard before reading local bytes
# (same boundary the OpenAI / OpenRouter / Codex image providers apply).
from agent.file_safety import raise_if_read_blocked
raise_if_read_blocked(source)
with open(_os.path.expanduser(source), "rb") as fh: # windows-footgun: ok
raw = fh.read()
ext = (_os.path.splitext(source)[1].lstrip(".") or "png").lower()

View file

@ -135,6 +135,56 @@ class TestSourceImageLoading:
with pytest.raises(ValueError, match="credential store"):
openai_plugin._load_image_bytes(str(auth_json))
def test_load_image_bytes_never_opens_blocked_credential(self, tmp_path, monkeypatch):
"""The guard must fire BEFORE the file is opened — a credential store
must never be read into memory (#57698). Spy builtins.open and assert
it is never called for the blocked path."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
auth_json = hermes_home / "auth.json"
auth_json.write_text('{"api_key":"sk-secret"}', encoding="utf-8")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
import builtins
real_open = builtins.open
opened: list = []
def _spy_open(file, *a, **k):
opened.append(str(file))
return real_open(file, *a, **k)
monkeypatch.setattr(builtins, "open", _spy_open)
with pytest.raises(ValueError, match="credential store"):
openai_plugin._load_image_bytes(str(auth_json))
assert str(auth_json) not in opened, "blocked credential must never be opened"
def test_load_image_bytes_allows_legit_local_image(self, tmp_path, monkeypatch):
"""Negative control: a legitimate local image path is NOT blocked and
loads normally proves the guard doesn't over-fire on everything."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
img = tmp_path / "pic.png"
img.write_bytes(b"\x89PNG\r\n\x1a\nfake-image-bytes")
data, name = openai_plugin._load_image_bytes(str(img))
assert data == b"\x89PNG\r\n\x1a\nfake-image-bytes"
assert name == "pic.png"
def test_load_image_bytes_passthrough_data_uri_not_blocked(self, tmp_path, monkeypatch):
"""Negative control: data: URIs are decoded, never routed through the
local-path guard (the guard only applies to local file reads)."""
import base64
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
b64 = base64.b64encode(b"xyz").decode("ascii")
data, name = openai_plugin._load_image_bytes(f"data:image/png;base64,{b64}")
assert data == b"xyz"
assert name.endswith(".png")
class TestGenerate:
def test_empty_prompt_rejected(self, provider):

View file

@ -181,6 +181,31 @@ class TestHelpers:
with pytest.raises(ValueError, match="credential store"):
_to_image_url_part(str(auth_json))
def test_to_image_url_part_never_reads_blocked_credential(self, tmp_path, monkeypatch):
"""The guard must fire BEFORE path.read_bytes() — the credential store
must never be inlined into a provider request (#57698)."""
from pathlib import Path as _P
from plugins.image_gen.openrouter import _to_image_url_part
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
auth_json = hermes_home / "auth.json"
auth_json.write_text('{"api_key":"sk-secret"}', encoding="utf-8")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
real_read_bytes = _P.read_bytes
read: list = []
def _spy_read_bytes(self, *a, **k):
read.append(str(self))
return real_read_bytes(self, *a, **k)
monkeypatch.setattr(_P, "read_bytes", _spy_read_bytes)
with pytest.raises(ValueError, match="credential store"):
_to_image_url_part(str(auth_json))
assert str(auth_json) not in read, "blocked credential must never be read"
def test_extract_images(self):
from plugins.image_gen.openrouter import _extract_images

View file

@ -492,3 +492,50 @@ def test_xai_image_field_expands_user_home(tmp_path, monkeypatch):
field = _xai_image_field("~/pic.png")
assert field["type"] == "image_url"
assert field["url"].startswith("data:image/png;base64,")
class TestXAIImageFieldReadGuard:
"""#57698: local image inputs must not read Hermes credential stores."""
def test_xai_image_field_blocks_credential_store(self, tmp_path, monkeypatch):
from plugins.image_gen.xai import _xai_image_field
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
auth_json = hermes_home / "auth.json"
auth_json.write_text('{"api_key":"sk-secret"}', encoding="utf-8")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
with pytest.raises(ValueError, match="credential store"):
_xai_image_field(str(auth_json))
def test_xai_image_field_never_opens_blocked_credential(self, tmp_path, monkeypatch):
"""Guard fires before open() — credential store never read into memory."""
import builtins
from plugins.image_gen.xai import _xai_image_field
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
auth_json = hermes_home / "auth.json"
auth_json.write_text('{"api_key":"sk-secret"}', encoding="utf-8")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
real_open = builtins.open
opened: list = []
def _spy_open(file, *a, **k):
opened.append(str(file))
return real_open(file, *a, **k)
monkeypatch.setattr(builtins, "open", _spy_open)
with pytest.raises(ValueError, match="credential store"):
_xai_image_field(str(auth_json))
assert str(auth_json) not in opened, "blocked credential must never be opened"
def test_xai_image_field_passthrough_url_not_blocked(self, monkeypatch):
"""Negative control: remote URLs and data: URIs pass through unguarded."""
from plugins.image_gen.xai import _xai_image_field
assert _xai_image_field("https://example.com/pic.png")["url"] == "https://example.com/pic.png"
assert _xai_image_field("data:image/png;base64,eHl6")["url"].startswith("data:image/png")