fix(auxiliary): retry transient blips harder + isolate client cache per model (#56889)
Two related hardening fixes for auxiliary calls (which include MoA reference advisors — a pinned-model path where provider fallback is not a meaningful recovery): 1. Transient-transport retries: the same-provider retry on a connection reset / timeout / 5xx / 408 was a single attempt, then fallback. For a pinned aux call a second blip silently loses the call (root of the run2 double-advisor 'Connection error' collapse — a genuine upstream blip). Now retries N times with exponential backoff, N = auxiliary.transient_retries (default 2 -> 3 total attempts, clamped [0,6]). Compression-on-timeout fast-fail carve-out preserved. 2. Per-model client-cache isolation: _client_cache_key excluded the model, so two concurrent auxiliary calls to the same provider/base_url/key but different models (e.g. an opus + gpt-5.5 MoA fan-out) shared one cache entry and could race each other's client lifecycle. Model now participates in the key -> distinct clients, no cross-call races. Same-model reuse unchanged. - agent/auxiliary_client.py: _transient_retry_count() + backoff loop; model in _client_cache_key and both call sites. - hermes_cli/config.py: auxiliary.transient_retries default (2). - tests: new retry/isolation tests; updated 2 stale-expectation tests to the corrected behavior (per-model resolve; N-retry escalation). Backoff base is overridable (_TRANSIENT_RETRY_BACKOFF_BASE) so tests don't sleep.
This commit is contained in:
parent
71c0622122
commit
fb403a3a73
4 changed files with 178 additions and 20 deletions
|
|
@ -2743,7 +2743,7 @@ def _is_connection_error(exc: Exception) -> bool:
|
|||
|
||||
|
||||
def _is_transient_transport_error(exc: Exception) -> bool:
|
||||
"""Return True for a one-off transport blip worth retrying ONCE on the
|
||||
"""Return True for a one-off transport blip worth retrying ON the
|
||||
same provider before any provider/model fallback.
|
||||
|
||||
Covers connection/streaming-close errors (via the canonical
|
||||
|
|
@ -2761,6 +2761,34 @@ def _is_transient_transport_error(exc: Exception) -> bool:
|
|||
return isinstance(status, int) and (status == 408 or 500 <= status < 600)
|
||||
|
||||
|
||||
_DEFAULT_TRANSIENT_RETRIES = 2
|
||||
# Base for exponential backoff between transient retries (seconds). Overridable
|
||||
# so tests can zero it out and not sleep real wall-clock time.
|
||||
_TRANSIENT_RETRY_BACKOFF_BASE = 1.0
|
||||
|
||||
|
||||
def _transient_retry_count() -> int:
|
||||
"""Number of same-provider retries for a transient transport blip.
|
||||
|
||||
Read from ``auxiliary.transient_retries`` in config.yaml (default 2 →
|
||||
3 total attempts). Clamped to [0, 6] to bound worst-case wall time. A
|
||||
connection blip to a pinned auxiliary target (e.g. a MoA reference
|
||||
advisor) has no meaningful provider fallback, so a couple of retries with
|
||||
backoff is the difference between recovering and silently losing the call.
|
||||
Best-effort: any config-read failure falls back to the default.
|
||||
"""
|
||||
try:
|
||||
from hermes_cli.config import cfg_get, load_config
|
||||
|
||||
val = cfg_get(load_config(), "auxiliary", "transient_retries")
|
||||
if val is None:
|
||||
return _DEFAULT_TRANSIENT_RETRIES
|
||||
n = int(val)
|
||||
return max(0, min(n, 6))
|
||||
except Exception:
|
||||
return _DEFAULT_TRANSIENT_RETRIES
|
||||
|
||||
|
||||
def _is_auth_error(exc: Exception) -> bool:
|
||||
"""Detect auth failures that should trigger provider-specific refresh."""
|
||||
status = getattr(exc, "status_code", None)
|
||||
|
|
@ -5036,6 +5064,7 @@ def _client_cache_key(
|
|||
main_runtime: Optional[Dict[str, Any]] = None,
|
||||
is_vision: bool = False,
|
||||
task: Optional[str] = None,
|
||||
model: Optional[str] = None,
|
||||
) -> tuple:
|
||||
runtime = _normalize_main_runtime(main_runtime)
|
||||
runtime_key = tuple(runtime.get(field, "") for field in _MAIN_RUNTIME_FIELDS) if provider == "auto" else ()
|
||||
|
|
@ -5044,7 +5073,17 @@ def _client_cache_key(
|
|||
# old cache shape because the explicit provider/model tuple is sufficient.
|
||||
task_key = (task or "") if provider == "auto" else ""
|
||||
pool_hint = _pool_cache_hint(provider, main_runtime=main_runtime)
|
||||
return (provider, async_mode, base_url or "", api_key or "", api_mode or "", runtime_key, is_vision, task_key, pool_hint)
|
||||
# The model MUST participate in the key. Two concurrent auxiliary calls to
|
||||
# the SAME provider/base_url/key but DIFFERENT models (e.g. a MoA reference
|
||||
# fan-out running opus + gpt-5.5 in parallel threads) would otherwise share
|
||||
# one cache entry. On a cache MISS both build a client for the same key; the
|
||||
# second's _store_cached_client sees the first as the "old" entry and CLOSES
|
||||
# it — while the first call is still mid-request on it — yielding a spurious
|
||||
# APIConnectionError that fails the sibling advisor (root cause of the run2
|
||||
# double-advisor "Connection error" collapse). Keying on model gives each
|
||||
# model its own client, so concurrent fan-out calls never cross-close.
|
||||
model_key = model or ""
|
||||
return (provider, async_mode, base_url or "", api_key or "", api_mode or "", runtime_key, is_vision, task_key, pool_hint, model_key)
|
||||
|
||||
|
||||
def _store_cached_client(cache_key: tuple, client: Any, default_model: Optional[str], *, bound_loop: Any = None) -> None:
|
||||
|
|
@ -5100,6 +5139,7 @@ def _refresh_nous_auxiliary_client(
|
|||
api_mode=api_mode,
|
||||
main_runtime=main_runtime,
|
||||
is_vision=is_vision,
|
||||
model=final_model,
|
||||
)
|
||||
_store_cached_client(cache_key, client, final_model, bound_loop=current_loop)
|
||||
return client, final_model
|
||||
|
|
@ -5276,6 +5316,7 @@ def _get_cached_client(
|
|||
main_runtime=main_runtime,
|
||||
is_vision=is_vision,
|
||||
task=task,
|
||||
model=model,
|
||||
)
|
||||
with _client_cache_lock:
|
||||
if cache_key in _client_cache:
|
||||
|
|
@ -6005,15 +6046,22 @@ def call_llm(
|
|||
# Handle unsupported temperature, max_tokens vs max_completion_tokens retry,
|
||||
# then payment fallback.
|
||||
try:
|
||||
# Retry ONCE on the same provider for a one-off transient transport
|
||||
# blip (streaming-close / incomplete chunked read / 5xx / 408) before
|
||||
# the except-chain below escalates to provider/model fallback. A
|
||||
# single dropped connection shouldn't abandon an otherwise-healthy
|
||||
# provider. A second failure (or any non-transient error) falls
|
||||
# through to ``first_err`` and the existing fallback handling
|
||||
# unchanged. This is the unified home for the transient retry that
|
||||
# every auxiliary task (compression, memory flush, title-gen,
|
||||
# session-search, vision) shares. (PR #16587)
|
||||
# Retry on the same provider for a transient transport blip
|
||||
# (connection reset / streaming-close / incomplete chunked read / 5xx /
|
||||
# 408) before the except-chain below escalates to provider/model
|
||||
# fallback. A dropped connection shouldn't abandon an otherwise-healthy
|
||||
# provider — this especially matters for pinned auxiliary calls like MoA
|
||||
# reference advisors, where "fallback to another provider" is not a
|
||||
# meaningful recovery (the advisor is a specific model), so a transient
|
||||
# blip that isn't retried simply loses that advisor for the turn (root
|
||||
# of the run2 double-advisor "Connection error" collapse — a genuine
|
||||
# upstream blip hitting both parallel advisors at once).
|
||||
#
|
||||
# Attempts are bounded and use exponential backoff. Count is configurable
|
||||
# via auxiliary.transient_retries (default 2 retries → 3 total attempts);
|
||||
# a second/third failure or any non-transient error falls through to
|
||||
# ``first_err`` and the existing fallback handling unchanged. Unified home
|
||||
# for the transient retry every auxiliary task shares. (PR #16587)
|
||||
try:
|
||||
return _validate_llm_response(
|
||||
client.chat.completions.create(**kwargs), task)
|
||||
|
|
@ -6035,13 +6083,26 @@ def call_llm(
|
|||
transient_err,
|
||||
)
|
||||
raise
|
||||
logger.info(
|
||||
"Auxiliary %s: transient transport error; retrying once on "
|
||||
"the same provider before fallback: %s",
|
||||
task or "call", transient_err,
|
||||
)
|
||||
return _validate_llm_response(
|
||||
client.chat.completions.create(**kwargs), task)
|
||||
_max_transient_retries = _transient_retry_count()
|
||||
_last_transient = transient_err
|
||||
for _attempt in range(1, _max_transient_retries + 1):
|
||||
_backoff = min(_TRANSIENT_RETRY_BACKOFF_BASE * (2.0 ** (_attempt - 1)), 8.0)
|
||||
logger.info(
|
||||
"Auxiliary %s: transient transport error (attempt %d/%d); "
|
||||
"retrying same provider after %.1fs before fallback: %s",
|
||||
task or "call", _attempt, _max_transient_retries, _backoff,
|
||||
_last_transient,
|
||||
)
|
||||
time.sleep(_backoff)
|
||||
try:
|
||||
return _validate_llm_response(
|
||||
client.chat.completions.create(**kwargs), task)
|
||||
except Exception as retry_transient:
|
||||
if not _is_transient_transport_error(retry_transient):
|
||||
raise
|
||||
_last_transient = retry_transient
|
||||
# Retries exhausted — fall through to first_err fallback handling.
|
||||
raise _last_transient
|
||||
except Exception as first_err:
|
||||
if "temperature" in kwargs and _is_unsupported_temperature_error(first_err):
|
||||
retry_kwargs = dict(kwargs)
|
||||
|
|
|
|||
|
|
@ -1465,6 +1465,13 @@ DEFAULT_CONFIG = {
|
|||
# Each aux task is independent — main-agent provider_routing and
|
||||
# openrouter.min_coding_score do NOT propagate to aux calls by design.
|
||||
"auxiliary": {
|
||||
# Same-provider retries for a transient transport blip (connection
|
||||
# reset / timeout / 5xx / 408) on ANY auxiliary call before falling
|
||||
# back. Default 2 (→ 3 total attempts), clamped [0,6]. Matters most for
|
||||
# pinned calls like MoA reference advisors, where provider fallback is
|
||||
# not a meaningful recovery, so an unretried blip silently loses the
|
||||
# call.
|
||||
"transient_retries": 2,
|
||||
"vision": {
|
||||
"provider": "auto", # auto | openrouter | nous | codex | custom
|
||||
"model": "", # e.g. "google/gemini-2.5-flash", "gpt-4o"
|
||||
|
|
|
|||
|
|
@ -1523,7 +1523,13 @@ class TestAuxiliaryPoolAwareness:
|
|||
|
||||
assert client is fake_client
|
||||
assert model == "openai/gpt-5.4-mini"
|
||||
assert mock_resolve.call_count == 1
|
||||
# A DIFFERENT model resolves its own client (model participates in the
|
||||
# cache key). This isolation is what stops two concurrent advisors on
|
||||
# the same provider/base_url/key (e.g. a MoA fan-out) from sharing — and
|
||||
# racing the lifecycle of — one cached client. Same-model reuse is still
|
||||
# a single resolve (verified elsewhere); distinct models => distinct
|
||||
# resolves.
|
||||
assert mock_resolve.call_count == 2
|
||||
|
||||
|
||||
# ── Payment / credit exhaustion fallback ─────────────────────────────────
|
||||
|
|
@ -2581,6 +2587,8 @@ class TestTransientTransportRetry:
|
|||
p1, p2, p3 = self._patches(primary)
|
||||
with (
|
||||
p1, p2, p3,
|
||||
patch("agent.auxiliary_client._transient_retry_count", return_value=1),
|
||||
patch("agent.auxiliary_client._TRANSIENT_RETRY_BACKOFF_BASE", 0.0),
|
||||
patch(
|
||||
"agent.auxiliary_client._try_configured_fallback_chain",
|
||||
return_value=(None, None, ""),
|
||||
|
|
@ -2592,7 +2600,7 @@ class TestTransientTransportRetry:
|
|||
):
|
||||
result = call_llm(task="compression", messages=[{"role": "user", "content": "hi"}])
|
||||
assert result == {"fallback": True}
|
||||
# Primary tried twice (initial + same-target retry), then fallback.
|
||||
# Primary tried twice (initial + one same-target retry), then fallback.
|
||||
assert primary.chat.completions.create.call_count == 2
|
||||
assert fb_client.chat.completions.create.call_count == 1
|
||||
|
||||
|
|
|
|||
82
tests/agent/test_auxiliary_transient_retry.py
Normal file
82
tests/agent/test_auxiliary_transient_retry.py
Normal file
|
|
@ -0,0 +1,82 @@
|
|||
"""Transient-transport retry count + per-model client-cache isolation.
|
||||
|
||||
Two related hardening behaviors for auxiliary calls (which include MoA
|
||||
reference advisors, a pinned-model path where provider fallback is not a
|
||||
meaningful recovery):
|
||||
|
||||
1. A transient transport blip (connection reset / timeout / 5xx) is retried
|
||||
on the SAME provider several times with backoff before giving up — a single
|
||||
upstream blip should not silently lose a pinned auxiliary call (root of the
|
||||
run2 double-advisor "Connection error" collapse).
|
||||
2. Two auxiliary calls to the same provider/base_url/key but DIFFERENT models
|
||||
get DISTINCT client-cache keys, so a concurrent fan-out (e.g. opus + gpt-5.5
|
||||
advisors) never shares one client entry.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import types
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class _ConnErr(Exception):
|
||||
"""Stand-in that the transient detector recognizes as a connection blip."""
|
||||
|
||||
|
||||
def test_transient_retry_count_default(monkeypatch):
|
||||
from agent import auxiliary_client as ac
|
||||
|
||||
# No config value -> default.
|
||||
monkeypatch.setattr(ac, "load_config", lambda: {}, raising=False)
|
||||
with patch("hermes_cli.config.load_config", return_value={}), \
|
||||
patch("hermes_cli.config.cfg_get", return_value=None):
|
||||
assert ac._transient_retry_count() == ac._DEFAULT_TRANSIENT_RETRIES
|
||||
|
||||
|
||||
def test_transient_retry_count_configurable_and_clamped():
|
||||
from agent import auxiliary_client as ac
|
||||
|
||||
with patch("hermes_cli.config.cfg_get", return_value=4):
|
||||
assert ac._transient_retry_count() == 4
|
||||
with patch("hermes_cli.config.cfg_get", return_value=100):
|
||||
assert ac._transient_retry_count() == 6 # clamped high
|
||||
with patch("hermes_cli.config.cfg_get", return_value=-3):
|
||||
assert ac._transient_retry_count() == 0 # clamped low
|
||||
with patch("hermes_cli.config.cfg_get", side_effect=RuntimeError):
|
||||
assert ac._transient_retry_count() == ac._DEFAULT_TRANSIENT_RETRIES
|
||||
|
||||
|
||||
def test_model_participates_in_client_cache_key():
|
||||
"""Same provider/base_url/key, different model -> different cache key.
|
||||
|
||||
This is what stops two concurrent advisors from sharing (and racing on)
|
||||
one cached client entry."""
|
||||
from agent.auxiliary_client import _client_cache_key
|
||||
|
||||
k_opus = _client_cache_key(
|
||||
"openrouter", async_mode=False, base_url="https://openrouter.ai/api/v1",
|
||||
api_key="K", model="anthropic/claude-opus-4.8",
|
||||
)
|
||||
k_gpt = _client_cache_key(
|
||||
"openrouter", async_mode=False, base_url="https://openrouter.ai/api/v1",
|
||||
api_key="K", model="openai/gpt-5.5",
|
||||
)
|
||||
assert k_opus != k_gpt
|
||||
# Same model still collides (cache still works for reuse).
|
||||
k_opus2 = _client_cache_key(
|
||||
"openrouter", async_mode=False, base_url="https://openrouter.ai/api/v1",
|
||||
api_key="K", model="anthropic/claude-opus-4.8",
|
||||
)
|
||||
assert k_opus == k_opus2
|
||||
|
||||
|
||||
def test_missing_model_key_is_stable():
|
||||
"""Omitting model (legacy callers) is still a valid, stable key."""
|
||||
from agent.auxiliary_client import _client_cache_key
|
||||
|
||||
a = _client_cache_key("openrouter", async_mode=False, base_url="u", api_key="k")
|
||||
b = _client_cache_key("openrouter", async_mode=False, base_url="u", api_key="k")
|
||||
assert a == b
|
||||
Loading…
Add table
Add a link
Reference in a new issue