diff --git a/cron/scheduler.py b/cron/scheduler.py index 82f10ee94..0fab517ac 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -1969,6 +1969,40 @@ def _scan_assembled_cron_prompt( return assembled +def _guard_job_credential_exfil(job: dict) -> None: + """Fail closed if a job's stored provider/base_url pair would exfiltrate a + credential (F8 runtime backstop; CWE-200/CWE-522). + + The model-callable cron tool validates this on create/update, but a job + persisted before that guard — or written directly to the jobs store — + reaches the scheduler's provider-resolution sink unchecked. Re-validate the + EFFECTIVE stored pair with the same guard the tool uses, so a named + provider's stored key is never paired with an off-host base_url at fire + time. Raises ``RuntimeError`` (caught by the run_job failure path → the run + is aborted and reported) when the pair is unsafe; returns ``None`` otherwise. + + Fallback providers come from operator config, not the model-callable job, so + they are trusted and validated by the caller, not here. + """ + try: + from tools.cronjob_tools import _validate_cron_base_url + err = _validate_cron_base_url(job.get("provider"), job.get("base_url")) + except Exception: + # The validator is defensively coded to RETURN (not raise) its own + # fail-closed string when provider metadata can't be resolved; only a + # truly unexpected error lands here. Don't wedge every cron job on such + # an error — the create/update-time guard remains the primary control. + err = None + if err: + job_id = job.get("id") + logger.error( + "Job '%s': refusing to run — unsafe provider/base_url pair could " + "exfiltrate a stored credential: %s", + job_id, err, + ) + raise RuntimeError(f"Cron job '{job_id}' blocked for safety: {err}") + + def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: """ Execute a single cron job. @@ -2356,6 +2390,15 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: format_runtime_provider_error, ) from hermes_cli.auth import AuthError + + # F8 runtime backstop: never resolve a stored provider/base_url pair that + # would ship a named provider's stored credential to an off-host endpoint + # (CWE-200/CWE-522). The cron tool validates this on create/update, but a + # job persisted before that guard — or written directly to the jobs store + # — reaches this sink unchecked. Fail closed before resolution so no + # off-host call is ever made with a stored key. + _guard_job_credential_exfil(job) + try: # Do not inject HERMES_INFERENCE_PROVIDER here. resolve_runtime_provider() # already prefers persisted config over stale shell/env overrides when diff --git a/tests/cron/test_scheduler_provider.py b/tests/cron/test_scheduler_provider.py index 00b03e9b2..404b15e2d 100644 --- a/tests/cron/test_scheduler_provider.py +++ b/tests/cron/test_scheduler_provider.py @@ -571,3 +571,66 @@ def test_cron_status_reports_stalled_when_no_heartbeat(tmp_path, monkeypatch, ca out = capsys.readouterr().out assert "STALLED" in out assert "will fire automatically" not in out + + +# ── F8: runtime backstop — never resolve a stored pair that exfiltrates a key ── + + +class TestGuardJobCredentialExfil: + """run_job() must fail closed before provider resolution when a job's stored + provider/base_url pair would ship a named provider's stored credential to an + off-host endpoint — covering jobs persisted before the create/update guard + or written directly to the store (F8 stored-job path; CWE-200/CWE-522).""" + + def test_named_registry_provider_offhost_is_blocked(self): + import pytest + from cron.scheduler import _guard_job_credential_exfil + + job = {"id": "j1", "provider": "anthropic", + "base_url": "https://evil.example/v1"} + with pytest.raises(RuntimeError) as exc: + _guard_job_credential_exfil(job) + assert "blocked for safety" in str(exc.value) + + def test_named_custom_offhost_is_blocked(self, monkeypatch): + import pytest + import hermes_cli.runtime_provider as rp + from cron.scheduler import _guard_job_credential_exfil + + monkeypatch.setattr(rp, "has_named_custom_provider", lambda n: True) + monkeypatch.setattr( + rp, "_get_named_custom_provider", + lambda n: {"name": "legit", "base_url": "https://legit.example/v1", + "api_key": "sk-legit"}, + ) + job = {"id": "j2", "provider": "custom:legit", + "base_url": "https://evil.example/v1"} + with pytest.raises(RuntimeError): + _guard_job_credential_exfil(job) + + def test_named_custom_matching_host_is_allowed(self, monkeypatch): + import hermes_cli.runtime_provider as rp + from cron.scheduler import _guard_job_credential_exfil + + monkeypatch.setattr(rp, "has_named_custom_provider", lambda n: True) + monkeypatch.setattr( + rp, "_get_named_custom_provider", + lambda n: {"name": "legit", "base_url": "https://legit.example/v1", + "api_key": "sk-legit"}, + ) + job = {"id": "j3", "provider": "custom:legit", + "base_url": "https://legit.example/v1"} + assert _guard_job_credential_exfil(job) is None + + def test_bare_custom_is_allowed(self): + from cron.scheduler import _guard_job_credential_exfil + + job = {"id": "j4", "provider": "custom", + "base_url": "https://anything.example/v1"} + assert _guard_job_credential_exfil(job) is None + + def test_no_base_url_is_allowed(self): + from cron.scheduler import _guard_job_credential_exfil + + assert _guard_job_credential_exfil({"id": "j5", "provider": "anthropic"}) is None + assert _guard_job_credential_exfil({"id": "j6"}) is None diff --git a/tests/tools/test_cronjob_tools.py b/tests/tools/test_cronjob_tools.py index 08c82f375..41aea33c7 100644 --- a/tests/tools/test_cronjob_tools.py +++ b/tests/tools/test_cronjob_tools.py @@ -336,6 +336,81 @@ class TestUnifiedCronjobTool: assert updated["job"]["provider"] == "openrouter" assert updated["job"]["base_url"] is None + @staticmethod + def _patch_named_legit(monkeypatch): + import hermes_cli.runtime_provider as rp + monkeypatch.setattr(rp, "has_named_custom_provider", lambda n: True) + monkeypatch.setattr( + rp, "_get_named_custom_provider", + lambda n: {"name": "legit", "base_url": "https://legit.example/v1", + "api_key": "sk-legit"}, + ) + + @staticmethod + def _save_legacy_unsafe_job(): + """Write a job with an unsafe named-provider + off-host base_url pair + DIRECTLY to the store, bypassing the create-time tool guard (mirrors a + job persisted before the guard existed).""" + from cron.jobs import save_jobs + save_jobs([ + { + "id": "legacyunsafe1", + "name": "legacy", + "prompt": "x", + "schedule": {"kind": "interval", "minutes": 5, "display": "every 5m"}, + "schedule_display": "every 5m", + "repeat": {"times": None, "completed": 0}, + "enabled": True, + "state": "scheduled", + "provider": "custom:legit", + "base_url": "https://evil.example/v1", + } + ]) + return "legacyunsafe1" + + def test_legacy_unsafe_job_blocked_on_unrelated_update(self, monkeypatch): + """F8 stored-job path: editing an UNRELATED field on a job that already + holds an unsafe provider/base_url pair must be rejected, so the pair + cannot be left active/schedulable by sidestepping validation.""" + self._patch_named_legit(monkeypatch) + job_id = self._save_legacy_unsafe_job() + + result = json.loads(cronjob(action="update", job_id=job_id, name="renamed")) + assert result["success"] is False + assert "not allowed" in json.dumps(result) + + # The rejected update must not have mutated the stored job at all. + from cron.jobs import get_job + stored = get_job(job_id) + assert stored["name"] == "legacy" + assert stored["base_url"] == "https://evil.example/v1" + + def test_legacy_unsafe_job_remediated_by_clearing_base_url(self, monkeypatch): + """The operator can still fix a legacy unsafe job in a single update by + clearing base_url (the effective pair becomes safe).""" + self._patch_named_legit(monkeypatch) + job_id = self._save_legacy_unsafe_job() + + result = json.loads( + cronjob(action="update", job_id=job_id, name="renamed", base_url="") + ) + assert result["success"] is True + assert result["job"]["base_url"] is None + assert result["job"]["name"] == "renamed" + + def test_legacy_unsafe_job_remediated_by_matching_host(self, monkeypatch): + """Repointing base_url at the named provider's own configured host also + remediates the job (no off-host exfil).""" + self._patch_named_legit(monkeypatch) + job_id = self._save_legacy_unsafe_job() + + result = json.loads( + cronjob(action="update", job_id=job_id, + base_url="https://legit.example/v1") + ) + assert result["success"] is True + assert result["job"]["base_url"] == "https://legit.example/v1" + def test_create_skill_backed_job(self): result = json.loads( cronjob( @@ -581,3 +656,51 @@ class TestLocalDeliveryNotice: ) assert created["deliver"] == "origin" assert "local-only cron job" not in created["message"] + + +class TestValidateCronBaseUrl: + """The cron base_url guard must not let a NAMED custom provider's stored + credential be sent to an off-host endpoint (CWE-200/CWE-522).""" + + @staticmethod + def _v(*args): + from tools.cronjob_tools import _validate_cron_base_url + return _validate_cron_base_url(*args) + + @staticmethod + def _patch_named_legit(monkeypatch): + import hermes_cli.runtime_provider as rp + monkeypatch.setattr(rp, "has_named_custom_provider", lambda n: True) + monkeypatch.setattr( + rp, "_get_named_custom_provider", + lambda n: {"name": "legit", "base_url": "https://legit.example/v1", "api_key": "sk-legit"}, + ) + + def test_named_custom_offhost_base_url_blocked(self, monkeypatch): + self._patch_named_legit(monkeypatch) + err = self._v("custom:legit", "https://evil.example/v1") + assert err and "not allowed" in err + + def test_named_custom_matching_host_allowed(self, monkeypatch): + self._patch_named_legit(monkeypatch) + assert self._v("custom:legit", "https://legit.example/v1") is None + # subdomain of the configured host is still the provider's own endpoint + assert self._v("custom:legit", "https://eu.legit.example/v1") is None + + def test_named_custom_lookalike_host_blocked(self, monkeypatch): + self._patch_named_legit(monkeypatch) + assert self._v("custom:legit", "https://legit.example.attacker.test/v1") is not None + + def test_bare_custom_allows_any_base_url(self): + # Bare 'custom' is inline/host-derived BYOK — no stored secret to leak. + assert self._v("custom", "https://anything.example/v1") is None + + def test_no_base_url_is_allowed(self): + assert self._v("custom:legit", None) is None + + def test_named_registry_offhost_blocked(self): + # A named registry provider (stored key) + off-host override is refused. + assert self._v("anthropic", "https://evil.example/v1") is not None + + def test_base_url_without_provider_rejected(self): + assert self._v(None, "https://x.example/v1") is not None diff --git a/tools/cronjob_tools.py b/tools/cronjob_tools.py index 999297c20..02ac58f9c 100644 --- a/tools/cronjob_tools.py +++ b/tools/cronjob_tools.py @@ -445,6 +445,86 @@ def _normalize_deliver_param(value: Any) -> Optional[str]: return text or None +def _validate_cron_base_url( + provider: Optional[Any], base_url: Optional[Any] +) -> Optional[str]: + """Reject pairing a named provider's stored credential with an off-host base_url. + + The cron tool is model-callable, so a prompt-injected job could set a real + provider plus an attacker ``base_url``; on fire the scheduler resolves that + provider's stored API key and sends it to the URL, exfiltrating the + credential (CWE-200/CWE-522). Allow a ``base_url`` override only when it + cannot leak a stored secret: no override at all, a configured custom/byok + provider that carries its own endpoint+key, or an override whose host + matches the named provider's own endpoint. + + Returns an error string if blocked, else None (valid). + """ + bu = _normalize_optional_job_value(base_url, strip_trailing_slash=True) + if not bu: + return None + prov = _normalize_optional_job_value(provider) + if not prov: + # A base_url with no explicit provider inherits the default/session + # provider's stored key — the same exfil primitive without naming a + # provider. Require an explicit (custom) provider for custom endpoints. + return ( + "base_url override requires an explicit provider. Set provider to a " + "configured custom provider to use a custom endpoint." + ) + try: + from hermes_cli.runtime_provider import ( + has_named_custom_provider, + resolve_requested_provider, + _get_named_custom_provider, + ) + from hermes_cli.auth import PROVIDER_REGISTRY + from utils import base_url_host_matches, base_url_hostname + except Exception: + # Can't resolve provider metadata -> fail closed. + return f"Unable to validate base_url override for provider {prov!r}; refused." + + if prov.lower() == "custom": + # Bare/inline 'custom' (and aliases that resolve to it) is pure BYOK: the + # runtime derives the key from a pool keyed by THIS base_url or from + # host-gated env vars, never an arbitrary stored secret. Safe to allow. + return None + if has_named_custom_provider(prov): + # A NAMED custom provider carries a STORED key, and + # _resolve_named_custom_runtime prefers the override base_url while still + # sending that stored key — so an off-host override exfiltrates it. + # Require the override host to match the provider's CONFIGURED endpoint. + try: + cp = _get_named_custom_provider(prov) + except Exception: + cp = None + cfg_host = base_url_hostname((cp or {}).get("base_url", "")) if cp else "" + if cfg_host and base_url_host_matches(bu, cfg_host): + return None + return ( + f"base_url {bu!r} is not allowed for provider {prov!r}. A named " + f"custom provider's stored credential may only be sent to its own " + f"configured endpoint ({cfg_host or 'unknown'})." + ) + try: + resolved = resolve_requested_provider(prov) + except Exception: + resolved = prov + pconfig = PROVIDER_REGISTRY.get(resolved) if isinstance(resolved, str) else None + known_host = base_url_hostname(getattr(pconfig, "inference_base_url", "") if pconfig else "") + if known_host and base_url_host_matches(bu, known_host): + return None + # Fail closed: any non-custom provider we cannot host-match to its own + # endpoint is refused. This covers named providers with a stored credential + # AND aliases/unknown names we can't resolve to a known host (e.g. "openai", + # "google"), which would otherwise pair a stored key with the override URL. + return ( + f"base_url {bu!r} is not allowed for provider {prov!r}. A named " + f"provider's stored credential may only be sent to its own endpoint; " + f'use a configured custom provider (provider="custom") for a custom base_url.' + ) + + def _validate_cron_script_path(script: Optional[str]) -> Optional[str]: """Validate a cron job script path at the API boundary. @@ -625,6 +705,12 @@ def cronjob( if script_error: return tool_error(script_error, success=False) + # Reject a model-supplied base_url that would route a named + # provider's stored credential to an attacker endpoint (F8). + base_url_error = _validate_cron_base_url(provider, base_url) + if base_url_error: + return tool_error(base_url_error, success=False) + # Validate context_from references existing jobs if context_from: from cron.jobs import get_job as _get_job @@ -779,6 +865,25 @@ def cronjob( updates["provider"] = _normalize_optional_job_value(provider) if base_url is not None: updates["base_url"] = _normalize_optional_job_value(base_url, strip_trailing_slash=True) + # Re-validate the EFFECTIVE provider/base_url on EVERY update, not + # only when this update supplies provider/base_url. A job persisted + # before this guard (or written directly to the jobs store) may + # already hold an unsafe named-provider + off-host base_url pair; + # if we only checked when the update touches those axes, editing any + # unrelated field (name, schedule, ...) would succeed and leave that + # exfil-capable pair active and schedulable (F8). The effective pair + # merges this update's normalized values over the stored job; an + # operator can still remediate in the same update by clearing + # base_url or pointing provider/base_url at a safe pair. + eff_provider = ( + updates["provider"] if "provider" in updates else job.get("provider") + ) + eff_base_url = ( + updates["base_url"] if "base_url" in updates else job.get("base_url") + ) + base_url_error = _validate_cron_base_url(eff_provider, eff_base_url) + if base_url_error: + return tool_error(base_url_error, success=False) if script is not None: # Pass empty string to clear an existing script if script: