fix(vision): narrow the fan-out cap to the CPU encode burst only
The original cap held a process-global slot across the WHOLE vision analysis (image load + encode + LLM call) with a default of min(CPUs, 4). That serialized legitimate multi-image workflows — "compare these 6 screenshots", "read this 10-page scan", "analyze every frame" — behind a 4-wide gate, and on the native fast path it even throttled calls that make no LLM request at all. Excess calls queued (blocking acquire, nothing dropped), but the latency hit on real fan-out was the wrong tradeoff. The incident was CPU exhaustion, not call count: concurrent base64/resize bursts saturated every core and left none to service the shared event loop serving /api/status. So cap ONLY that: - A dedicated, bounded ThreadPoolExecutor (_vision_cpu_executor) runs the encode/resize/dimension-check off the caller's loop, sized to the host's usable core count with NO fixed ceiling — the cap tracks the actual exhausted resource (cores), not a magic number. Excess encodes queue on the executor; cores stay free for the loop. - The LLM call is deliberately OUTSIDE the executor, so multi-image workflows keep full request concurrency. - Override via auxiliary.vision.max_concurrency / HERMES_VISION_MAX_CONCURRENCY (honored verbatim, including above core count); sub-1 ignored. - _vision_concurrency_slot() is now a no-op shim for back-compat. Tests assert: resolver defaults to host cores with no ceiling; env/config override (incl. above cores); sub-1 rejection; the executor is dedicated and core-sized; encode runs on a vision-encode thread; and crucially that encode bursts are bounded to the cap while the analyses themselves stay fully concurrent (calls_peak > cap).
This commit is contained in:
parent
eddfecd2ce
commit
75317d82d0
4 changed files with 216 additions and 190 deletions
|
|
@ -1085,28 +1085,29 @@ class TestDownloadRetryClassification:
|
|||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fan-out concurrency cap — a single turn (or several concurrent sessions in
|
||||
# one process) can launch dozens of vision_analyze calls at once. The
|
||||
# process-global semaphore must bound how many run simultaneously so a video-
|
||||
# frame storm can't pin a worker thread and starve the dashboard event loop.
|
||||
# CPU-burst concurrency cap — a single turn (or several concurrent sessions in
|
||||
# one process) can launch dozens of vision_analyze calls at once. Only the
|
||||
# CPU-bound encode/resize is bounded (to host cores), so a video-frame storm
|
||||
# can't saturate every core and starve the dashboard event loop — while the
|
||||
# network-bound LLM calls stay fully concurrent for legitimate multi-image work.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestVisionFanoutConcurrencyCap:
|
||||
"""The process-global semaphore bounds concurrent vision analyses."""
|
||||
class TestVisionCpuBurstCap:
|
||||
"""The bounded CPU executor caps concurrent encode/resize, not LLM calls."""
|
||||
|
||||
def test_resolver_defaults_to_min_cpus_and_ceiling(self):
|
||||
def test_resolver_defaults_to_host_cpus_no_ceiling(self):
|
||||
from tools import vision_tools as vt
|
||||
|
||||
with (
|
||||
patch.dict(os.environ, {}, clear=False),
|
||||
patch("tools.vision_tools._detect_host_cpus", return_value=64),
|
||||
patch("hermes_cli.config.load_config", side_effect=Exception),
|
||||
):
|
||||
os.environ.pop("HERMES_VISION_MAX_CONCURRENCY", None)
|
||||
# No config override available in the test env → falls to default,
|
||||
# which is clamped to the ceiling even on a 64-core host.
|
||||
with patch("hermes_cli.config.load_config", side_effect=Exception):
|
||||
assert vt._resolve_vision_max_concurrency() == vt._VISION_DEFAULT_CONCURRENCY_CEILING
|
||||
# No fixed ceiling: a 64-core host gets 64 encode workers. The cap
|
||||
# tracks the actual resource (cores), not a magic number.
|
||||
assert vt._resolve_vision_cpu_workers() == 64
|
||||
|
||||
def test_resolver_respects_low_host_cpu_count(self):
|
||||
from tools import vision_tools as vt
|
||||
|
|
@ -1117,14 +1118,15 @@ class TestVisionFanoutConcurrencyCap:
|
|||
patch("hermes_cli.config.load_config", side_effect=Exception),
|
||||
):
|
||||
os.environ.pop("HERMES_VISION_MAX_CONCURRENCY", None)
|
||||
# 2-core host → cap is 2 (host limit, below the ceiling of 4).
|
||||
assert vt._resolve_vision_max_concurrency() == 2
|
||||
assert vt._resolve_vision_cpu_workers() == 2
|
||||
|
||||
def test_resolver_env_override(self):
|
||||
from tools import vision_tools as vt
|
||||
|
||||
with patch.dict(os.environ, {"HERMES_VISION_MAX_CONCURRENCY": "1"}):
|
||||
assert vt._resolve_vision_max_concurrency() == 1
|
||||
with patch.dict(os.environ, {"HERMES_VISION_MAX_CONCURRENCY": "16"}):
|
||||
# Explicit override is honored verbatim — including ABOVE core count,
|
||||
# so operators can raise it for heavy multi-image workloads.
|
||||
assert vt._resolve_vision_cpu_workers() == 16
|
||||
|
||||
def test_resolver_rejects_sub_one_override(self):
|
||||
from tools import vision_tools as vt
|
||||
|
|
@ -1134,55 +1136,95 @@ class TestVisionFanoutConcurrencyCap:
|
|||
patch("tools.vision_tools._detect_host_cpus", return_value=2),
|
||||
patch("hermes_cli.config.load_config", side_effect=Exception),
|
||||
):
|
||||
# 0 is ignored (cap can never be disabled) → falls back to default.
|
||||
assert vt._resolve_vision_max_concurrency() == 2
|
||||
# 0 is ignored (cap can never be disabled) → falls back to host cores.
|
||||
assert vt._resolve_vision_cpu_workers() == 2
|
||||
|
||||
def test_cpu_executor_is_dedicated_and_sized_to_workers(self):
|
||||
"""The encode executor must be dedicated, not the shared default pool."""
|
||||
import importlib
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
vt = importlib.import_module("tools.vision_tools")
|
||||
assert isinstance(vt._vision_cpu_executor, ThreadPoolExecutor)
|
||||
assert vt._vision_cpu_executor._max_workers == vt._VISION_CPU_WORKERS
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fanout_is_bounded_by_semaphore(self):
|
||||
"""Firing many concurrent vision calls must never exceed the cap in flight.
|
||||
async def test_encode_runs_on_dedicated_cpu_executor(self):
|
||||
"""Encode/resize must execute on a ``vision-encode`` thread, off the loop.
|
||||
|
||||
This is the regression guard for the prod incident: an unbounded
|
||||
fan-out pinned the event loop. With the cap, peak concurrency is
|
||||
clamped to the semaphore value regardless of how many calls launch.
|
||||
Regression guard: the CPU burst is what saturated cores and starved the
|
||||
loop. It must run on the bounded vision executor, not the caller's loop
|
||||
thread nor the shared default pool.
|
||||
"""
|
||||
import importlib
|
||||
import threading
|
||||
|
||||
vt = importlib.import_module("tools.vision_tools")
|
||||
|
||||
seen_threads = []
|
||||
|
||||
def fake_encode(path, mime_type=None):
|
||||
seen_threads.append(threading.current_thread().name)
|
||||
return "data:image/jpeg;base64,AAAA"
|
||||
|
||||
result = await vt._run_encode_on_cpu_executor(fake_encode, "p", mime_type="image/jpeg")
|
||||
assert result == "data:image/jpeg;base64,AAAA"
|
||||
assert len(seen_threads) == 1
|
||||
assert seen_threads[0].startswith("vision-encode"), seen_threads
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_encode_bursts_bounded_but_llm_stays_concurrent(self):
|
||||
"""Encode concurrency is clamped to the cap; the LLM call is not.
|
||||
|
||||
Drives many native-path calls whose encode step is the only thing on
|
||||
the CPU executor. With the executor sized to CAP, no more than CAP
|
||||
encodes ever run at once — even though all N calls are in flight
|
||||
simultaneously (proving the analyses themselves are NOT serialized).
|
||||
"""
|
||||
import asyncio
|
||||
import importlib
|
||||
import threading
|
||||
# Resolve the module fresh and drive BOTH the handler and the patch
|
||||
# targets through that SAME module object. Sibling suites
|
||||
# (test_vision_routing_31179) delete tools.vision_tools from
|
||||
# sys.modules, so the top-level ``_handle_vision_analyze`` import can
|
||||
# be bound to a stale module while ``patch`` hits the current one —
|
||||
# patching the wrong object lets the real function run (peak stays 0).
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
vt = importlib.import_module("tools.vision_tools")
|
||||
|
||||
CAP = 3
|
||||
in_flight = 0
|
||||
peak = 0
|
||||
lock = asyncio.Lock()
|
||||
N = 12
|
||||
enc_inflight = 0
|
||||
enc_peak = 0
|
||||
calls_inflight = 0
|
||||
calls_peak = 0
|
||||
import threading as _t
|
||||
enc_lock = _t.Lock()
|
||||
|
||||
def slow_encode(path, mime_type=None):
|
||||
nonlocal enc_inflight, enc_peak
|
||||
with enc_lock:
|
||||
enc_inflight += 1
|
||||
enc_peak = max(enc_peak, enc_inflight)
|
||||
try:
|
||||
_t.Event().wait(0.04) # simulate CPU burst
|
||||
finally:
|
||||
with enc_lock:
|
||||
enc_inflight -= 1
|
||||
return "data:image/jpeg;base64,AAAA"
|
||||
|
||||
async def fake_native(image_url, question):
|
||||
nonlocal in_flight, peak
|
||||
async with lock:
|
||||
in_flight += 1
|
||||
peak = max(peak, in_flight)
|
||||
nonlocal calls_inflight, calls_peak
|
||||
calls_inflight += 1
|
||||
calls_peak = max(calls_peak, calls_inflight)
|
||||
try:
|
||||
# Hold the slot long enough that, without a cap, all callers
|
||||
# would overlap and drive peak up to N.
|
||||
await asyncio.sleep(0.05)
|
||||
# The encode is the capped CPU step.
|
||||
await vt._run_encode_on_cpu_executor(slow_encode, "p", mime_type="image/jpeg")
|
||||
# The "LLM call" is NOT capped — overlaps freely.
|
||||
await asyncio.sleep(0.02)
|
||||
finally:
|
||||
async with lock:
|
||||
in_flight -= 1
|
||||
calls_inflight -= 1
|
||||
return json.dumps({"ok": True})
|
||||
|
||||
N = 12
|
||||
# Install a fresh semaphore at the test cap so the assertion is
|
||||
# deterministic regardless of the host's core count.
|
||||
with (
|
||||
patch.object(vt, "_vision_concurrency_semaphore",
|
||||
threading.BoundedSemaphore(CAP)),
|
||||
patch.object(vt, "_should_use_native_vision_fast_path",
|
||||
return_value=True),
|
||||
patch.object(vt, "_vision_cpu_executor",
|
||||
ThreadPoolExecutor(max_workers=CAP, thread_name_prefix="vision-encode")),
|
||||
patch.object(vt, "_should_use_native_vision_fast_path", return_value=True),
|
||||
patch.object(vt, "_vision_analyze_native", side_effect=fake_native),
|
||||
):
|
||||
await asyncio.gather(*[
|
||||
|
|
@ -1193,58 +1235,12 @@ class TestVisionFanoutConcurrencyCap:
|
|||
for i in range(N)
|
||||
])
|
||||
|
||||
assert peak <= CAP, f"peak concurrency {peak} exceeded cap {CAP}"
|
||||
# Sanity: with N > CAP and a real wait, we should have actually
|
||||
# saturated the cap (otherwise the test proves nothing).
|
||||
assert peak == CAP, f"expected to saturate cap {CAP}, only reached {peak}"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unbounded_fanout_would_exceed_cap_without_semaphore(self):
|
||||
"""Control: with a no-op (effectively unbounded) semaphore, peak blows past CAP.
|
||||
|
||||
Proves the guard above would fail if the semaphore weren't enforcing
|
||||
the limit — i.e. the test is actually exercising the cap.
|
||||
"""
|
||||
import asyncio
|
||||
import importlib
|
||||
import threading
|
||||
vt = importlib.import_module("tools.vision_tools")
|
||||
|
||||
CAP = 3
|
||||
in_flight = 0
|
||||
peak = 0
|
||||
lock = asyncio.Lock()
|
||||
|
||||
async def fake_native(image_url, question):
|
||||
nonlocal in_flight, peak
|
||||
async with lock:
|
||||
in_flight += 1
|
||||
peak = max(peak, in_flight)
|
||||
try:
|
||||
await asyncio.sleep(0.05)
|
||||
finally:
|
||||
async with lock:
|
||||
in_flight -= 1
|
||||
return json.dumps({"ok": True})
|
||||
|
||||
N = 12
|
||||
# A semaphore sized to N imposes no real limit for this workload.
|
||||
with (
|
||||
patch.object(vt, "_vision_concurrency_semaphore",
|
||||
threading.BoundedSemaphore(N)),
|
||||
patch.object(vt, "_should_use_native_vision_fast_path",
|
||||
return_value=True),
|
||||
patch.object(vt, "_vision_analyze_native", side_effect=fake_native),
|
||||
):
|
||||
await asyncio.gather(*[
|
||||
vt._handle_vision_analyze(
|
||||
{"image_url": f"https://example.com/frame_{i}.png",
|
||||
"question": "what is this"}
|
||||
)
|
||||
for i in range(N)
|
||||
])
|
||||
|
||||
assert peak > CAP, (
|
||||
"control failed: peak did not exceed CAP even without a real cap "
|
||||
f"(peak={peak})"
|
||||
assert enc_peak <= CAP, f"encode peak {enc_peak} exceeded cap {CAP}"
|
||||
assert enc_peak == CAP, f"expected to saturate encode cap {CAP}, got {enc_peak}"
|
||||
# The analyses themselves were NOT serialized to the cap — all N ran
|
||||
# concurrently, which is the whole point (multi-image workflows keep
|
||||
# their concurrency; only the CPU burst is bounded).
|
||||
assert calls_peak > CAP, (
|
||||
f"analyses were serialized to the cap (peak={calls_peak}); only the "
|
||||
"encode burst should be bounded, not the whole call"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ import base64
|
|||
import contextlib
|
||||
import asyncio
|
||||
import json
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import logging
|
||||
import os
|
||||
import uuid
|
||||
|
|
@ -77,36 +78,35 @@ _VISION_MAX_DOWNLOAD_BYTES = 50 * 1024 * 1024
|
|||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fan-out concurrency cap
|
||||
# CPU-burst concurrency cap (vision encode/resize)
|
||||
# ---------------------------------------------------------------------------
|
||||
# A single agent turn can fan out N vision_analyze calls at once (the classic
|
||||
# trigger is "analyze every frame of this video" — ffmpeg explodes a clip into
|
||||
# dozens of frames, the model then calls vision_analyze on each). Every call
|
||||
# does a CPU-heavy base64-encode/resize burst AND holds a long-lived LLM stream
|
||||
# open. The tool executor runs concurrent tool calls on a ThreadPoolExecutor
|
||||
# (agent.tool_executor._MAX_TOOL_WORKERS = 8) PER SESSION, and several agent
|
||||
# sessions share one process (the dashboard runs the agent in-process). With no
|
||||
# global ceiling, a video-frame fan-out across one or more sessions pins a
|
||||
# worker thread at ~100% CPU and starves the shared asyncio event loop that also
|
||||
# serves the dashboard's /api/status liveness probe — so the instance flaps to
|
||||
# UNHEALTHY even though nothing has actually crashed (observed in prod, June
|
||||
# 2026).
|
||||
# dozens of frames, the model then calls vision_analyze on each). Each call does
|
||||
# a CPU-heavy base64-encode + (sometimes) Pillow resize. The tool executor runs
|
||||
# concurrent tool calls on a ThreadPoolExecutor (agent.tool_executor =
|
||||
# 8 workers) PER SESSION, and several agent sessions share one process (the
|
||||
# dashboard runs the agent in-process). Unbounded, a video-frame fan-out across
|
||||
# one or more sessions runs *every* encode at once, saturates all cores, and
|
||||
# leaves no CPU to service the shared asyncio event loop that serves the
|
||||
# dashboard's /api/status liveness probe — so the instance flaps to UNHEALTHY
|
||||
# even though nothing has crashed (observed in prod, June 2026).
|
||||
#
|
||||
# This semaphore bounds the number of vision analyses running concurrently
|
||||
# across the WHOLE process, regardless of how many sessions or worker threads
|
||||
# issue them. It is a threading.Semaphore (NOT asyncio.Semaphore): each vision
|
||||
# call is dispatched through model_tools._run_async on a PER-THREAD event loop,
|
||||
# so an asyncio primitive bound to one loop cannot coordinate across them. A
|
||||
# threading semaphore is loop- and thread-agnostic, which is exactly what we
|
||||
# need here.
|
||||
# The fix is NOT to cap how many vision analyses run — multi-image workflows
|
||||
# ("compare these 6 screenshots", "read this 10-page scan") legitimately want
|
||||
# high concurrency, and the slow part (the LLM stream) is network-bound and
|
||||
# harmless to the loop. We cap ONLY the CPU burst: the encode/resize is offloaded
|
||||
# to a dedicated, bounded executor sized to the host's usable core count. That
|
||||
# is the resource the incident actually exhausted (cores), so bounding it to
|
||||
# cores is *correct*, not an arbitrary number — excess encodes queue on the
|
||||
# executor instead of all running at once, the LLM calls stay fully concurrent,
|
||||
# and the loop always keeps a core. No fixed ceiling: the limit tracks the host.
|
||||
#
|
||||
# Default: min(host CPU count, 4), floored at 1 — "respect the host's
|
||||
# concurrency, or lower". 4 is a conservative ceiling: vision work is a mix of
|
||||
# CPU (encode/resize) and network (LLM stream), and we would rather under-
|
||||
# subscribe than let a frame storm wedge the loop. Override with
|
||||
# HERMES_VISION_MAX_CONCURRENCY (env) or auxiliary.vision.max_concurrency
|
||||
# (config.yaml). 0 / negative / unparseable falls back to the default.
|
||||
import threading
|
||||
# A threading primitive (NOT asyncio) is required: each vision call is dispatched
|
||||
# through model_tools._run_async on a PER-THREAD event loop, so an asyncio
|
||||
# executor/semaphore bound to one loop cannot coordinate across them. A
|
||||
# ThreadPoolExecutor is loop- and thread-agnostic.
|
||||
import threading # noqa: F401 (kept for downstream importers / patch targets)
|
||||
|
||||
|
||||
def _detect_host_cpus() -> int:
|
||||
|
|
@ -122,19 +122,19 @@ def _detect_host_cpus() -> int:
|
|||
return max(1, os.cpu_count() or 1)
|
||||
|
||||
|
||||
# Absolute ceiling for the default (not for explicit overrides): even on a
|
||||
# many-core host, more than this many simultaneous in-process vision analyses
|
||||
# is rarely worth the event-loop pressure.
|
||||
_VISION_DEFAULT_CONCURRENCY_CEILING = 4
|
||||
def _resolve_vision_cpu_workers() -> int:
|
||||
"""Resolve how many vision encode/resize bursts may run concurrently.
|
||||
|
||||
Defaults to the host's usable core count (``_detect_host_cpus``) — no fixed
|
||||
ceiling, because the cap tracks the actual exhausted resource (CPU cores),
|
||||
not a magic number. The LLM call is NOT covered by this limit, so legitimate
|
||||
multi-image fan-out keeps full request concurrency; only the simultaneous
|
||||
CPU bursts are bounded so the event loop always keeps a core.
|
||||
|
||||
def _resolve_vision_max_concurrency() -> int:
|
||||
"""Resolve the max concurrent vision analyses for this process.
|
||||
|
||||
Resolution order: HERMES_VISION_MAX_CONCURRENCY env → config.yaml
|
||||
auxiliary.vision.max_concurrency → default ``min(host_cpus, 4)``. Any
|
||||
value that parses to < 1 is ignored in favor of the next source so the
|
||||
cap can never be disabled into an unbounded fan-out.
|
||||
Resolution order: HERMES_VISION_MAX_CONCURRENCY env →
|
||||
config.yaml auxiliary.vision.max_concurrency → host core count. Any value
|
||||
that parses to < 1 is ignored in favor of the next source so the cap can
|
||||
never be disabled into an unbounded encode storm.
|
||||
"""
|
||||
env_val = os.getenv("HERMES_VISION_MAX_CONCURRENCY", "").strip()
|
||||
if env_val:
|
||||
|
|
@ -154,11 +154,39 @@ def _resolve_vision_max_concurrency() -> int:
|
|||
return parsed
|
||||
except Exception:
|
||||
pass
|
||||
return max(1, min(_detect_host_cpus(), _VISION_DEFAULT_CONCURRENCY_CEILING))
|
||||
return _detect_host_cpus()
|
||||
|
||||
|
||||
_VISION_MAX_CONCURRENCY = _resolve_vision_max_concurrency()
|
||||
_vision_concurrency_semaphore = threading.BoundedSemaphore(_VISION_MAX_CONCURRENCY)
|
||||
_VISION_CPU_WORKERS = _resolve_vision_cpu_workers()
|
||||
|
||||
# Dedicated, bounded executor for the CPU-bound encode/resize burst ONLY. We do
|
||||
# NOT use the default executor (run_in_executor(None, ...)) — that pool is shared
|
||||
# with the gateway and web server, so a fan-out would park encode work there and
|
||||
# starve those callers. Sizing it to the usable core count means at most
|
||||
# _VISION_CPU_WORKERS encodes run at once; further encodes queue on this
|
||||
# executor's work queue, leaving cores free for the event loop. The LLM call is
|
||||
# deliberately left OUTSIDE this executor so multi-image workflows keep full
|
||||
# request concurrency.
|
||||
_vision_cpu_executor = ThreadPoolExecutor(
|
||||
max_workers=_VISION_CPU_WORKERS,
|
||||
thread_name_prefix="vision-encode",
|
||||
)
|
||||
|
||||
|
||||
async def _run_encode_on_cpu_executor(fn, *args, **kwargs):
|
||||
"""Run a sync encode/resize callable on the bounded vision CPU executor.
|
||||
|
||||
Offloads CPU-bound image work to :data:`_vision_cpu_executor` so it (a)
|
||||
never runs on the caller's event-loop thread and (b) is bounded to the
|
||||
host's usable core count process-wide. Excess encodes queue on the
|
||||
executor instead of all running at once, leaving cores free for the loop.
|
||||
The LLM call must NOT be routed through here — only the encode/resize.
|
||||
"""
|
||||
import functools
|
||||
loop = asyncio.get_running_loop()
|
||||
return await loop.run_in_executor(
|
||||
_vision_cpu_executor, functools.partial(fn, *args, **kwargs)
|
||||
)
|
||||
|
||||
|
||||
def _image_url_shape_ok(url: str) -> bool:
|
||||
|
|
@ -774,22 +802,17 @@ def _build_native_vision_tool_result(
|
|||
|
||||
@contextlib.asynccontextmanager
|
||||
async def _vision_concurrency_slot():
|
||||
"""Hold one process-global vision-concurrency slot for the duration.
|
||||
"""Deprecated no-op shim kept for backward compatibility.
|
||||
|
||||
Acquires :data:`_vision_concurrency_semaphore` before yielding and always
|
||||
releases it on exit. The blocking acquire is offloaded to a worker thread
|
||||
via ``run_in_executor`` so that waiting for a slot never blocks the calling
|
||||
event loop (callers run on per-thread loops; blocking the acquire on the
|
||||
loop thread would freeze that loop's other tasks while we wait). The
|
||||
semaphore is a ``BoundedSemaphore`` so a double-release would raise rather
|
||||
than silently inflate the limit.
|
||||
The fan-out cap was narrowed to the CPU-bound encode/resize burst only
|
||||
(see :data:`_vision_cpu_executor` / :func:`_run_encode_on_cpu_executor`).
|
||||
Holding a slot across the whole analysis serialized legitimate multi-image
|
||||
workflows behind the slow LLM call, which is exactly what we don't want.
|
||||
This context manager no longer gates anything; encode/resize is bounded
|
||||
where it actually runs. Retained only so any external caller importing it
|
||||
keeps working.
|
||||
"""
|
||||
loop = asyncio.get_event_loop()
|
||||
await loop.run_in_executor(None, _vision_concurrency_semaphore.acquire)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
_vision_concurrency_semaphore.release()
|
||||
yield
|
||||
|
||||
|
||||
async def _vision_analyze_native(
|
||||
|
|
@ -851,7 +874,8 @@ async def _vision_analyze_native(
|
|||
success=False,
|
||||
)
|
||||
|
||||
image_data_url = _image_to_base64_data_url(
|
||||
image_data_url = await _run_encode_on_cpu_executor(
|
||||
_image_to_base64_data_url,
|
||||
temp_image_path, mime_type=detected_mime_type,
|
||||
)
|
||||
|
||||
|
|
@ -864,9 +888,12 @@ async def _vision_analyze_native(
|
|||
# target (4 MB / 7900px, headroom under both ceilings) whenever the
|
||||
# payload exceeds either limit, not just at the 20 MB hard ceiling.
|
||||
_over_bytes = len(image_data_url) > _EMBED_TARGET_BYTES
|
||||
_over_dims = _image_exceeds_dimension(temp_image_path, _EMBED_MAX_DIMENSION)
|
||||
_over_dims = await _run_encode_on_cpu_executor(
|
||||
_image_exceeds_dimension, temp_image_path, _EMBED_MAX_DIMENSION,
|
||||
)
|
||||
if _over_bytes or _over_dims:
|
||||
image_data_url = _resize_image_for_vision(
|
||||
image_data_url = await _run_encode_on_cpu_executor(
|
||||
_resize_image_for_vision,
|
||||
temp_image_path, mime_type=detected_mime_type,
|
||||
max_base64_bytes=_EMBED_TARGET_BYTES,
|
||||
max_dimension=_EMBED_MAX_DIMENSION,
|
||||
|
|
@ -1008,15 +1035,19 @@ async def vision_analyze_tool(
|
|||
|
||||
# Convert image to base64 — send at full resolution first.
|
||||
# If the provider rejects it as too large, we auto-resize and retry.
|
||||
# Offloaded to the bounded vision CPU executor so a fan-out of encodes
|
||||
# can't saturate every core and starve the event loop.
|
||||
logger.info("Converting image to base64...")
|
||||
image_data_url = _image_to_base64_data_url(temp_image_path, mime_type=detected_mime_type)
|
||||
image_data_url = await _run_encode_on_cpu_executor(
|
||||
_image_to_base64_data_url, temp_image_path, mime_type=detected_mime_type)
|
||||
data_size_kb = len(image_data_url) / 1024
|
||||
logger.info("Image converted to base64 (%.1f KB)", data_size_kb)
|
||||
|
||||
# Hard limit (20 MB) — no provider accepts payloads this large.
|
||||
if len(image_data_url) > _MAX_BASE64_BYTES:
|
||||
# Try to resize down to 5 MB before giving up.
|
||||
image_data_url = _resize_image_for_vision(
|
||||
image_data_url = await _run_encode_on_cpu_executor(
|
||||
_resize_image_for_vision,
|
||||
temp_image_path, mime_type=detected_mime_type)
|
||||
if len(image_data_url) > _MAX_BASE64_BYTES:
|
||||
raise ValueError(
|
||||
|
|
@ -1092,7 +1123,8 @@ async def vision_analyze_tool(
|
|||
len(image_data_url) / (1024 * 1024),
|
||||
_RESIZE_TARGET_BYTES / (1024 * 1024),
|
||||
)
|
||||
image_data_url = _resize_image_for_vision(
|
||||
image_data_url = await _run_encode_on_cpu_executor(
|
||||
_resize_image_for_vision,
|
||||
temp_image_path, mime_type=detected_mime_type)
|
||||
messages[0]["content"][1]["image_url"]["url"] = image_data_url
|
||||
response = await async_call_llm(**call_kwargs)
|
||||
|
|
@ -1305,32 +1337,28 @@ async def _handle_vision_analyze(args: Dict[str, Any], **kw: Any) -> str:
|
|||
image_url = args.get("image_url", "")
|
||||
question = args.get("question", "")
|
||||
|
||||
# Bound process-wide vision fan-out: a single turn (or several concurrent
|
||||
# sessions sharing this process) can launch dozens of vision_analyze calls
|
||||
# at once — e.g. "analyze every frame of this video". Each one is a
|
||||
# CPU-heavy encode/resize plus a long LLM stream; unbounded, they pin a
|
||||
# worker thread and starve the shared event loop that serves /api/status,
|
||||
# flapping the instance to UNHEALTHY. The slot is held across the WHOLE
|
||||
# analysis (image load + encode + LLM call), and acquiring it waits off the
|
||||
# event loop, so excess calls queue instead of piling on simultaneously.
|
||||
async with _vision_concurrency_slot():
|
||||
# Fast path: when native image routing is in effect for the active main
|
||||
# model (provider accepts images in tool results, or the user set the
|
||||
# model.supports_vision override), short-circuit the auxiliary LLM and
|
||||
# return the image bytes as a multimodal tool-result envelope. The main
|
||||
# model sees the pixels directly on its next turn — no aux call, no
|
||||
# information loss, no extra latency.
|
||||
if _should_use_native_vision_fast_path():
|
||||
logger.info("vision_analyze: native fast path")
|
||||
return await _vision_analyze_native(image_url, question)
|
||||
# The fan-out cap lives inside the encode/resize step (offloaded to the
|
||||
# bounded _vision_cpu_executor), NOT around the whole analysis — so a
|
||||
# legitimate multi-image workflow keeps full request concurrency while the
|
||||
# CPU bursts that actually starve the loop are bounded to host cores.
|
||||
#
|
||||
# Fast path: when native image routing is in effect for the active main
|
||||
# model (provider accepts images in tool results, or the user set the
|
||||
# model.supports_vision override), short-circuit the auxiliary LLM and
|
||||
# return the image bytes as a multimodal tool-result envelope. The main
|
||||
# model sees the pixels directly on its next turn — no aux call, no
|
||||
# information loss, no extra latency.
|
||||
if _should_use_native_vision_fast_path():
|
||||
logger.info("vision_analyze: native fast path")
|
||||
return await _vision_analyze_native(image_url, question)
|
||||
|
||||
# Legacy path: aux LLM describes the image and we return its text.
|
||||
full_prompt = (
|
||||
"Fully describe and explain everything about this image, then answer the "
|
||||
f"following question:\n\n{question}"
|
||||
)
|
||||
model = os.getenv("AUXILIARY_VISION_MODEL", "").strip() or None
|
||||
return await vision_analyze_tool(image_url, full_prompt, model)
|
||||
# Legacy path: aux LLM describes the image and we return its text.
|
||||
full_prompt = (
|
||||
"Fully describe and explain everything about this image, then answer the "
|
||||
f"following question:\n\n{question}"
|
||||
)
|
||||
model = os.getenv("AUXILIARY_VISION_MODEL", "").strip() or None
|
||||
return await vision_analyze_tool(image_url, full_prompt, model)
|
||||
|
||||
|
||||
registry.register(
|
||||
|
|
|
|||
|
|
@ -684,7 +684,7 @@ Advanced per-platform knobs for throttling the outbound message batcher. Most us
|
|||
| `HERMES_FEISHU_DEDUP_CACHE_SIZE` | Size of the Feishu webhook dedup cache (default: `1024`). |
|
||||
| `HERMES_WECOM_TEXT_BATCH_DELAY_SECONDS` / `_SPLIT_DELAY_SECONDS` | WeCom batcher tuning. |
|
||||
| `HERMES_VISION_DOWNLOAD_TIMEOUT` | Timeout in seconds for downloading an image before handing it to vision models (default: `30`). |
|
||||
| `HERMES_VISION_MAX_CONCURRENCY` | Max vision analyses running concurrently across the whole process (override for `auxiliary.vision.max_concurrency`; default `min(host CPUs, 4)`). Bounds video-frame fan-out so it can't saturate the event loop. Values `< 1` are ignored. |
|
||||
| `HERMES_VISION_MAX_CONCURRENCY` | Max concurrent image **encode/resize** bursts across the whole process (override for `auxiliary.vision.max_concurrency`; default: host CPU core count, no ceiling). Bounds only the CPU-bound encode step so a video-frame fan-out can't saturate every core and starve the event loop — the LLM calls stay fully concurrent. Values `< 1` are ignored. |
|
||||
| `HERMES_RESTART_DRAIN_TIMEOUT` | Gateway: seconds to wait for active runs to drain on `/restart` before forcing the restart (default: `900`). |
|
||||
| `HERMES_GATEWAY_PLATFORM_CONNECT_TIMEOUT` | Per-platform connect timeout during gateway startup (seconds). |
|
||||
| `HERMES_GATEWAY_BUSY_INPUT_MODE` | Default gateway busy-input behavior: `queue`, `steer`, or `interrupt`. Can be overridden per chat with `/busy`. |
|
||||
|
|
|
|||
|
|
@ -1005,9 +1005,11 @@ auxiliary:
|
|||
api_key: "" # API key for base_url (falls back to OPENAI_API_KEY)
|
||||
timeout: 120 # seconds — LLM API call timeout; vision payloads need generous timeout
|
||||
download_timeout: 30 # seconds — image HTTP download; increase for slow connections
|
||||
max_concurrency: 4 # max vision analyses running at once across the whole process
|
||||
# (default: min(host CPUs, 4)) — bounds video-frame fan-out so it
|
||||
# can't saturate the event loop. Minimum 1; values < 1 are ignored.
|
||||
max_concurrency: 8 # max concurrent image encode/resize bursts across the process
|
||||
# (default: host CPU core count, no ceiling) — bounds only the
|
||||
# CPU-bound encode step so a video-frame fan-out can't saturate
|
||||
# every core and starve the event loop; LLM calls stay fully
|
||||
# concurrent. Minimum 1; values < 1 are ignored.
|
||||
|
||||
# Web page summarization + browser page text extraction
|
||||
web_extract:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue