feat(agent): add resolve_httpx_verify for custom CA bundle TLS

Introduce a shared helper that maps HERMES_CA_BUNDLE, SSL_CERT_FILE, and
per-provider ssl_ca_cert settings to httpx verify contexts.
This commit is contained in:
HexLab98 2026-07-01 19:51:59 +07:00 committed by kshitij
parent b3bc302370
commit 7e957cbd0b
2 changed files with 92 additions and 0 deletions

52
agent/ssl_verify.py Normal file
View file

@ -0,0 +1,52 @@
"""TLS verify resolution for httpx/OpenAI provider clients."""
from __future__ import annotations
import logging
import os
import ssl
from pathlib import Path
from typing import Any, Optional
logger = logging.getLogger(__name__)
def _coerce_insecure(ssl_verify: Any) -> bool:
if ssl_verify is False:
return True
if isinstance(ssl_verify, str) and ssl_verify.strip().lower() in {"false", "0", "no", "off"}:
return True
return False
def resolve_httpx_verify(
*,
ca_bundle: Optional[str] = None,
ssl_verify: Any = None,
) -> bool | ssl.SSLContext:
"""Resolve httpx ``verify`` for provider HTTP clients.
Priority:
1. ``ssl_verify: false`` disable verification (local dev only)
2. explicit ``ca_bundle`` (per-provider ``ssl_ca_cert`` config field)
3. ``HERMES_CA_BUNDLE``, ``SSL_CERT_FILE``, ``REQUESTS_CA_BUNDLE`` env vars
4. ``True`` (httpx/certifi default)
"""
if _coerce_insecure(ssl_verify):
return False
effective_ca = (
(ca_bundle or "").strip()
or os.getenv("HERMES_CA_BUNDLE", "").strip()
or os.getenv("SSL_CERT_FILE", "").strip()
or os.getenv("REQUESTS_CA_BUNDLE", "").strip()
)
if effective_ca:
ca_path = str(Path(effective_ca).expanduser())
if os.path.isfile(ca_path):
return ssl.create_default_context(cafile=ca_path)
logger.warning(
"CA bundle path does not exist: %s — falling back to default certificates",
effective_ca,
)
return True

View file

@ -0,0 +1,40 @@
"""Tests for agent.ssl_verify.resolve_httpx_verify."""
import ssl
import certifi
import pytest
from agent.ssl_verify import resolve_httpx_verify
_CA_ENV_VARS = ("HERMES_CA_BUNDLE", "SSL_CERT_FILE", "REQUESTS_CA_BUNDLE")
@pytest.fixture
def clean_ca_env(monkeypatch):
for var in _CA_ENV_VARS:
monkeypatch.delenv(var, raising=False)
def test_ssl_verify_false_disables_verification(clean_ca_env):
assert resolve_httpx_verify(ssl_verify=False) is False
def test_hermes_ca_bundle_returns_ssl_context(clean_ca_env, monkeypatch):
monkeypatch.setenv("HERMES_CA_BUNDLE", certifi.where())
result = resolve_httpx_verify()
assert isinstance(result, ssl.SSLContext)
def test_explicit_ca_bundle_param(clean_ca_env):
result = resolve_httpx_verify(ca_bundle=certifi.where())
assert isinstance(result, ssl.SSLContext)
def test_missing_ca_bundle_falls_back_to_true(clean_ca_env, monkeypatch):
monkeypatch.setenv("HERMES_CA_BUNDLE", "/nonexistent/root-ca.pem")
assert resolve_httpx_verify() is True
def test_default_without_env_is_true(clean_ca_env):
assert resolve_httpx_verify() is True