security(cron): block base_url overrides that exfiltrate provider credentials

The model-facing cronjob tool accepts free-form provider + base_url. On fire,
the scheduler pairs the named provider's stored credential with the job's
base_url, so a prompt-injected job (e.g. provider=anthropic,
base_url=https://attacker/v1) sends the real API key to an attacker endpoint. A
base_url with no provider inherits the default provider's key for the same
effect.

Add a fail-closed guard at the tool boundary: a base_url override is allowed
only for the custom/BYOK sentinel, a configured custom_providers entry, or when
the override host matches the named provider's own endpoint; an override without
an explicit provider is rejected. The trust boundary is the caller, so
operator-configured base_urls for named providers are unaffected.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
claudlos 2026-06-25 00:01:06 -05:00 committed by kshitij
parent a56aa9ac47
commit b24708eda0
4 changed files with 334 additions and 0 deletions

View file

@ -1969,6 +1969,40 @@ def _scan_assembled_cron_prompt(
return assembled
def _guard_job_credential_exfil(job: dict) -> None:
"""Fail closed if a job's stored provider/base_url pair would exfiltrate a
credential (F8 runtime backstop; CWE-200/CWE-522).
The model-callable cron tool validates this on create/update, but a job
persisted before that guard or written directly to the jobs store
reaches the scheduler's provider-resolution sink unchecked. Re-validate the
EFFECTIVE stored pair with the same guard the tool uses, so a named
provider's stored key is never paired with an off-host base_url at fire
time. Raises ``RuntimeError`` (caught by the run_job failure path the run
is aborted and reported) when the pair is unsafe; returns ``None`` otherwise.
Fallback providers come from operator config, not the model-callable job, so
they are trusted and validated by the caller, not here.
"""
try:
from tools.cronjob_tools import _validate_cron_base_url
err = _validate_cron_base_url(job.get("provider"), job.get("base_url"))
except Exception:
# The validator is defensively coded to RETURN (not raise) its own
# fail-closed string when provider metadata can't be resolved; only a
# truly unexpected error lands here. Don't wedge every cron job on such
# an error — the create/update-time guard remains the primary control.
err = None
if err:
job_id = job.get("id")
logger.error(
"Job '%s': refusing to run — unsafe provider/base_url pair could "
"exfiltrate a stored credential: %s",
job_id, err,
)
raise RuntimeError(f"Cron job '{job_id}' blocked for safety: {err}")
def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
"""
Execute a single cron job.
@ -2356,6 +2390,15 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
format_runtime_provider_error,
)
from hermes_cli.auth import AuthError
# F8 runtime backstop: never resolve a stored provider/base_url pair that
# would ship a named provider's stored credential to an off-host endpoint
# (CWE-200/CWE-522). The cron tool validates this on create/update, but a
# job persisted before that guard — or written directly to the jobs store
# — reaches this sink unchecked. Fail closed before resolution so no
# off-host call is ever made with a stored key.
_guard_job_credential_exfil(job)
try:
# Do not inject HERMES_INFERENCE_PROVIDER here. resolve_runtime_provider()
# already prefers persisted config over stale shell/env overrides when

View file

@ -571,3 +571,66 @@ def test_cron_status_reports_stalled_when_no_heartbeat(tmp_path, monkeypatch, ca
out = capsys.readouterr().out
assert "STALLED" in out
assert "will fire automatically" not in out
# ── F8: runtime backstop — never resolve a stored pair that exfiltrates a key ──
class TestGuardJobCredentialExfil:
"""run_job() must fail closed before provider resolution when a job's stored
provider/base_url pair would ship a named provider's stored credential to an
off-host endpoint covering jobs persisted before the create/update guard
or written directly to the store (F8 stored-job path; CWE-200/CWE-522)."""
def test_named_registry_provider_offhost_is_blocked(self):
import pytest
from cron.scheduler import _guard_job_credential_exfil
job = {"id": "j1", "provider": "anthropic",
"base_url": "https://evil.example/v1"}
with pytest.raises(RuntimeError) as exc:
_guard_job_credential_exfil(job)
assert "blocked for safety" in str(exc.value)
def test_named_custom_offhost_is_blocked(self, monkeypatch):
import pytest
import hermes_cli.runtime_provider as rp
from cron.scheduler import _guard_job_credential_exfil
monkeypatch.setattr(rp, "has_named_custom_provider", lambda n: True)
monkeypatch.setattr(
rp, "_get_named_custom_provider",
lambda n: {"name": "legit", "base_url": "https://legit.example/v1",
"api_key": "sk-legit"},
)
job = {"id": "j2", "provider": "custom:legit",
"base_url": "https://evil.example/v1"}
with pytest.raises(RuntimeError):
_guard_job_credential_exfil(job)
def test_named_custom_matching_host_is_allowed(self, monkeypatch):
import hermes_cli.runtime_provider as rp
from cron.scheduler import _guard_job_credential_exfil
monkeypatch.setattr(rp, "has_named_custom_provider", lambda n: True)
monkeypatch.setattr(
rp, "_get_named_custom_provider",
lambda n: {"name": "legit", "base_url": "https://legit.example/v1",
"api_key": "sk-legit"},
)
job = {"id": "j3", "provider": "custom:legit",
"base_url": "https://legit.example/v1"}
assert _guard_job_credential_exfil(job) is None
def test_bare_custom_is_allowed(self):
from cron.scheduler import _guard_job_credential_exfil
job = {"id": "j4", "provider": "custom",
"base_url": "https://anything.example/v1"}
assert _guard_job_credential_exfil(job) is None
def test_no_base_url_is_allowed(self):
from cron.scheduler import _guard_job_credential_exfil
assert _guard_job_credential_exfil({"id": "j5", "provider": "anthropic"}) is None
assert _guard_job_credential_exfil({"id": "j6"}) is None

View file

@ -336,6 +336,81 @@ class TestUnifiedCronjobTool:
assert updated["job"]["provider"] == "openrouter"
assert updated["job"]["base_url"] is None
@staticmethod
def _patch_named_legit(monkeypatch):
import hermes_cli.runtime_provider as rp
monkeypatch.setattr(rp, "has_named_custom_provider", lambda n: True)
monkeypatch.setattr(
rp, "_get_named_custom_provider",
lambda n: {"name": "legit", "base_url": "https://legit.example/v1",
"api_key": "sk-legit"},
)
@staticmethod
def _save_legacy_unsafe_job():
"""Write a job with an unsafe named-provider + off-host base_url pair
DIRECTLY to the store, bypassing the create-time tool guard (mirrors a
job persisted before the guard existed)."""
from cron.jobs import save_jobs
save_jobs([
{
"id": "legacyunsafe1",
"name": "legacy",
"prompt": "x",
"schedule": {"kind": "interval", "minutes": 5, "display": "every 5m"},
"schedule_display": "every 5m",
"repeat": {"times": None, "completed": 0},
"enabled": True,
"state": "scheduled",
"provider": "custom:legit",
"base_url": "https://evil.example/v1",
}
])
return "legacyunsafe1"
def test_legacy_unsafe_job_blocked_on_unrelated_update(self, monkeypatch):
"""F8 stored-job path: editing an UNRELATED field on a job that already
holds an unsafe provider/base_url pair must be rejected, so the pair
cannot be left active/schedulable by sidestepping validation."""
self._patch_named_legit(monkeypatch)
job_id = self._save_legacy_unsafe_job()
result = json.loads(cronjob(action="update", job_id=job_id, name="renamed"))
assert result["success"] is False
assert "not allowed" in json.dumps(result)
# The rejected update must not have mutated the stored job at all.
from cron.jobs import get_job
stored = get_job(job_id)
assert stored["name"] == "legacy"
assert stored["base_url"] == "https://evil.example/v1"
def test_legacy_unsafe_job_remediated_by_clearing_base_url(self, monkeypatch):
"""The operator can still fix a legacy unsafe job in a single update by
clearing base_url (the effective pair becomes safe)."""
self._patch_named_legit(monkeypatch)
job_id = self._save_legacy_unsafe_job()
result = json.loads(
cronjob(action="update", job_id=job_id, name="renamed", base_url="")
)
assert result["success"] is True
assert result["job"]["base_url"] is None
assert result["job"]["name"] == "renamed"
def test_legacy_unsafe_job_remediated_by_matching_host(self, monkeypatch):
"""Repointing base_url at the named provider's own configured host also
remediates the job (no off-host exfil)."""
self._patch_named_legit(monkeypatch)
job_id = self._save_legacy_unsafe_job()
result = json.loads(
cronjob(action="update", job_id=job_id,
base_url="https://legit.example/v1")
)
assert result["success"] is True
assert result["job"]["base_url"] == "https://legit.example/v1"
def test_create_skill_backed_job(self):
result = json.loads(
cronjob(
@ -581,3 +656,51 @@ class TestLocalDeliveryNotice:
)
assert created["deliver"] == "origin"
assert "local-only cron job" not in created["message"]
class TestValidateCronBaseUrl:
"""The cron base_url guard must not let a NAMED custom provider's stored
credential be sent to an off-host endpoint (CWE-200/CWE-522)."""
@staticmethod
def _v(*args):
from tools.cronjob_tools import _validate_cron_base_url
return _validate_cron_base_url(*args)
@staticmethod
def _patch_named_legit(monkeypatch):
import hermes_cli.runtime_provider as rp
monkeypatch.setattr(rp, "has_named_custom_provider", lambda n: True)
monkeypatch.setattr(
rp, "_get_named_custom_provider",
lambda n: {"name": "legit", "base_url": "https://legit.example/v1", "api_key": "sk-legit"},
)
def test_named_custom_offhost_base_url_blocked(self, monkeypatch):
self._patch_named_legit(monkeypatch)
err = self._v("custom:legit", "https://evil.example/v1")
assert err and "not allowed" in err
def test_named_custom_matching_host_allowed(self, monkeypatch):
self._patch_named_legit(monkeypatch)
assert self._v("custom:legit", "https://legit.example/v1") is None
# subdomain of the configured host is still the provider's own endpoint
assert self._v("custom:legit", "https://eu.legit.example/v1") is None
def test_named_custom_lookalike_host_blocked(self, monkeypatch):
self._patch_named_legit(monkeypatch)
assert self._v("custom:legit", "https://legit.example.attacker.test/v1") is not None
def test_bare_custom_allows_any_base_url(self):
# Bare 'custom' is inline/host-derived BYOK — no stored secret to leak.
assert self._v("custom", "https://anything.example/v1") is None
def test_no_base_url_is_allowed(self):
assert self._v("custom:legit", None) is None
def test_named_registry_offhost_blocked(self):
# A named registry provider (stored key) + off-host override is refused.
assert self._v("anthropic", "https://evil.example/v1") is not None
def test_base_url_without_provider_rejected(self):
assert self._v(None, "https://x.example/v1") is not None

View file

@ -445,6 +445,86 @@ def _normalize_deliver_param(value: Any) -> Optional[str]:
return text or None
def _validate_cron_base_url(
provider: Optional[Any], base_url: Optional[Any]
) -> Optional[str]:
"""Reject pairing a named provider's stored credential with an off-host base_url.
The cron tool is model-callable, so a prompt-injected job could set a real
provider plus an attacker ``base_url``; on fire the scheduler resolves that
provider's stored API key and sends it to the URL, exfiltrating the
credential (CWE-200/CWE-522). Allow a ``base_url`` override only when it
cannot leak a stored secret: no override at all, a configured custom/byok
provider that carries its own endpoint+key, or an override whose host
matches the named provider's own endpoint.
Returns an error string if blocked, else None (valid).
"""
bu = _normalize_optional_job_value(base_url, strip_trailing_slash=True)
if not bu:
return None
prov = _normalize_optional_job_value(provider)
if not prov:
# A base_url with no explicit provider inherits the default/session
# provider's stored key — the same exfil primitive without naming a
# provider. Require an explicit (custom) provider for custom endpoints.
return (
"base_url override requires an explicit provider. Set provider to a "
"configured custom provider to use a custom endpoint."
)
try:
from hermes_cli.runtime_provider import (
has_named_custom_provider,
resolve_requested_provider,
_get_named_custom_provider,
)
from hermes_cli.auth import PROVIDER_REGISTRY
from utils import base_url_host_matches, base_url_hostname
except Exception:
# Can't resolve provider metadata -> fail closed.
return f"Unable to validate base_url override for provider {prov!r}; refused."
if prov.lower() == "custom":
# Bare/inline 'custom' (and aliases that resolve to it) is pure BYOK: the
# runtime derives the key from a pool keyed by THIS base_url or from
# host-gated env vars, never an arbitrary stored secret. Safe to allow.
return None
if has_named_custom_provider(prov):
# A NAMED custom provider carries a STORED key, and
# _resolve_named_custom_runtime prefers the override base_url while still
# sending that stored key — so an off-host override exfiltrates it.
# Require the override host to match the provider's CONFIGURED endpoint.
try:
cp = _get_named_custom_provider(prov)
except Exception:
cp = None
cfg_host = base_url_hostname((cp or {}).get("base_url", "")) if cp else ""
if cfg_host and base_url_host_matches(bu, cfg_host):
return None
return (
f"base_url {bu!r} is not allowed for provider {prov!r}. A named "
f"custom provider's stored credential may only be sent to its own "
f"configured endpoint ({cfg_host or 'unknown'})."
)
try:
resolved = resolve_requested_provider(prov)
except Exception:
resolved = prov
pconfig = PROVIDER_REGISTRY.get(resolved) if isinstance(resolved, str) else None
known_host = base_url_hostname(getattr(pconfig, "inference_base_url", "") if pconfig else "")
if known_host and base_url_host_matches(bu, known_host):
return None
# Fail closed: any non-custom provider we cannot host-match to its own
# endpoint is refused. This covers named providers with a stored credential
# AND aliases/unknown names we can't resolve to a known host (e.g. "openai",
# "google"), which would otherwise pair a stored key with the override URL.
return (
f"base_url {bu!r} is not allowed for provider {prov!r}. A named "
f"provider's stored credential may only be sent to its own endpoint; "
f'use a configured custom provider (provider="custom") for a custom base_url.'
)
def _validate_cron_script_path(script: Optional[str]) -> Optional[str]:
"""Validate a cron job script path at the API boundary.
@ -625,6 +705,12 @@ def cronjob(
if script_error:
return tool_error(script_error, success=False)
# Reject a model-supplied base_url that would route a named
# provider's stored credential to an attacker endpoint (F8).
base_url_error = _validate_cron_base_url(provider, base_url)
if base_url_error:
return tool_error(base_url_error, success=False)
# Validate context_from references existing jobs
if context_from:
from cron.jobs import get_job as _get_job
@ -779,6 +865,25 @@ def cronjob(
updates["provider"] = _normalize_optional_job_value(provider)
if base_url is not None:
updates["base_url"] = _normalize_optional_job_value(base_url, strip_trailing_slash=True)
# Re-validate the EFFECTIVE provider/base_url on EVERY update, not
# only when this update supplies provider/base_url. A job persisted
# before this guard (or written directly to the jobs store) may
# already hold an unsafe named-provider + off-host base_url pair;
# if we only checked when the update touches those axes, editing any
# unrelated field (name, schedule, ...) would succeed and leave that
# exfil-capable pair active and schedulable (F8). The effective pair
# merges this update's normalized values over the stored job; an
# operator can still remediate in the same update by clearing
# base_url or pointing provider/base_url at a safe pair.
eff_provider = (
updates["provider"] if "provider" in updates else job.get("provider")
)
eff_base_url = (
updates["base_url"] if "base_url" in updates else job.get("base_url")
)
base_url_error = _validate_cron_base_url(eff_provider, eff_base_url)
if base_url_error:
return tool_error(base_url_error, success=False)
if script is not None:
# Pass empty string to clear an existing script
if script: