fix(cli): reliable interrupts, bounded exit, and exit feedback (#57000)

Three CLI reliability fixes:

1. Interrupt reliability: chat() only re-queued the user's interrupt
   message when the turn result carried interrupted=True. When the agent
   thread raced past its last interrupt check (or finished) before the
   interrupt landed, the message was silently dropped — and the stale
   _interrupt_requested flag left on the agent instantly aborted the
   NEXT turn. Un-acknowledged interrupt messages are now re-queued as
   the next turn and the stale flag is cleared (only when the agent
   thread actually exited). The clarify-race path also parks the message
   in _pending_input instead of dropping it.

2. Slow exit (5+ min): stdlib ThreadPoolExecutor workers are non-daemon
   and joined unconditionally by concurrent.futures' atexit hook — even
   after shutdown(wait=False). One wedged tool worker (abandoned after
   interrupt/timeout) held the process open forever. Promoted
   async_delegation's daemon executor to a shared tools/daemon_pool
   module and adopted it in tool_executor (concurrent tool batches),
   memory_manager (background sync), delegate_tool (child timeout wrapper
   + batch fan-out), and skills_hub (source fan-out). Added a 30s exit
   watchdog (HERMES_EXIT_WATCHDOG_S) armed at _run_cleanup start as a
   backstop for wedged cleanup steps.

3. Exit jank: after prompt_toolkit tears down the input/status bars the
   terminal sat silent for the whole cleanup window, looking hung. Print
   'Shutting down… (finalizing session)' immediately at exit start.

E2E: live PTY interrupt of a foreground 'sleep 120' terminal tool now
aborts in ~1s and the typed message runs as the next turn; wedged-worker
+ wedged-cleanup subprocess exits in 5.8s (watchdog) instead of hanging.
This commit is contained in:
Teknium 2026-07-02 04:20:43 -07:00 committed by GitHub
parent 2068754d6f
commit 3f2a56d1a4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 495 additions and 43 deletions

View file

@ -651,7 +651,12 @@ class MemoryManager:
with self._sync_executor_lock:
if self._sync_executor is None:
try:
self._sync_executor = ThreadPoolExecutor(
# Daemon workers (see tools.daemon_pool): a provider wedged
# on a network call must never block interpreter exit —
# stdlib ThreadPoolExecutor's atexit hook would join it
# unconditionally even after shutdown(wait=False).
from tools.daemon_pool import DaemonThreadPoolExecutor
self._sync_executor = DaemonThreadPoolExecutor(
max_workers=1,
thread_name_prefix="mem-sync",
)

View file

@ -638,7 +638,13 @@ def execute_tool_calls_concurrent(agent, assistant_message, messages: list, effe
deadline = time.monotonic() + timeout_s if timeout_s is not None else None
if runnable_calls:
max_workers = min(len(runnable_calls), _MAX_TOOL_WORKERS)
executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
# Daemon workers: an interrupted/timed-out batch is abandoned with
# shutdown(wait=False), but stdlib ThreadPoolExecutor workers are
# non-daemon and registered in concurrent.futures' atexit hook,
# which joins them unconditionally — so one wedged tool thread
# would block interpreter exit forever (multi-minute CLI exits).
from tools.daemon_pool import DaemonThreadPoolExecutor
executor = DaemonThreadPoolExecutor(max_workers=max_workers)
abandon_executor = False
try:
for submit_index, (i, tc, name, args) in enumerate(runnable_calls):

115
cli.py
View file

@ -982,6 +982,71 @@ def _prepare_deferred_agent_startup() -> None:
exc_info=True,
)
def _arm_exit_watchdog(timeout_s: float | None = None) -> None:
"""Guarantee the process actually exits once shutdown has begun.
Two hang classes have kept "dead" CLI processes alive for minutes:
1. A cleanup step wedged on network I/O (memory provider
``on_session_end``, MCP teardown, remote terminal cleanup).
2. Interpreter teardown blocked joining non-daemon threads
stdlib ``ThreadPoolExecutor`` workers are joined unconditionally
by ``concurrent.futures``' atexit hook even after
``shutdown(wait=False)``, so one tool thread wedged on a socket
held the process open forever (#27563 class).
The shared daemon pool (``tools.daemon_pool``) removes the main cause
of (2); this watchdog is the backstop for both. It arms a daemon
timer when ``_run_cleanup`` starts; if the process is still alive
after ``timeout_s`` it flushes logging/stdio and calls ``os._exit(0)``.
Daemon threads keep running through ``Py_FinalizeEx``'s thread joins,
so the timer fires even when the main thread is stuck in teardown.
Tune with ``HERMES_EXIT_WATCHDOG_S`` (seconds); ``0`` disables.
"""
if timeout_s is None:
try:
timeout_s = float(os.getenv("HERMES_EXIT_WATCHDOG_S", "30"))
except (TypeError, ValueError):
timeout_s = 30.0
if timeout_s <= 0:
return
# Never arm under pytest: tests invoke _run_cleanup() directly and a
# 30s-delayed os._exit(0) would silently kill the test worker.
if os.environ.get("PYTEST_CURRENT_TEST"):
return
def _watchdog():
time.sleep(timeout_s)
# Still alive — cleanup or interpreter teardown is wedged.
try:
logger.warning(
"Exit watchdog fired after %.0fs — forcing process exit "
"(a cleanup step or non-daemon thread is wedged).",
timeout_s,
)
except Exception:
pass
try:
import logging as _lg
_lg.shutdown()
except Exception:
pass
for _stream in (sys.stdout, sys.stderr):
try:
_stream.flush()
except Exception:
pass
os._exit(0)
try:
threading.Thread(
target=_watchdog, daemon=True, name="exit-watchdog"
).start()
except Exception:
pass # best-effort — never block shutdown on watchdog setup
def _run_cleanup(*, notify_session_finalize: bool = True):
"""Run resource cleanup exactly once."""
global _cleanup_done
@ -989,6 +1054,11 @@ def _run_cleanup(*, notify_session_finalize: bool = True):
return
_cleanup_done = True
# Bound total shutdown time: if cleanup (or the interpreter's
# thread-join teardown after it) wedges, force-exit instead of
# leaving a zombie CLI holding the terminal for minutes.
_arm_exit_watchdog()
# Reset terminal input modes first, before the slower resource teardown
# below (MCP / browser / memory shutdown can take seconds). On Ctrl+C the
# user's terminal becomes usable immediately, and a later step raising
@ -12161,8 +12231,15 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
if interrupt_msg:
# If clarify is active, the Enter handler routes
# input directly; this queue shouldn't have anything.
# But if it does (race condition), don't interrupt.
# But if it does (race condition), don't interrupt —
# and don't drop the message either: park it in
# _pending_input so it runs as the next turn.
if self._clarify_state or self._clarify_freetext:
try:
self._pending_input.put(interrupt_msg)
except Exception:
pass
interrupt_msg = None
continue
print("\n⚡ New message detected, interrupting...")
# Signal TTS to stop on interrupt
@ -12334,6 +12411,33 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
# Add indicator that we were interrupted
if response and pending_message:
response = response + "\n\n---\n_[Interrupted - processing new message]_"
elif interrupt_msg:
# We fired agent.interrupt(interrupt_msg) but the turn result
# doesn't acknowledge it. Two ways this happens, both racy:
# 1. The agent thread had already passed its last interrupt
# check (or finished) when the interrupt landed — the turn
# completed normally and finalize_turn() never saw the flag.
# 2. The 10s post-interrupt wait above expired and we
# abandoned the daemon thread; `result` is still None.
# In both cases the user's message must NOT be dropped —
# re-queue it as the next turn (#interrupt-vacuumed-into-void).
pending_message = interrupt_msg
# If the interrupt landed after finalize_turn()'s
# clear_interrupt(), the stale flag would instantly abort the
# NEXT turn at its first loop check. Clear it now that we've
# claimed the message — but ONLY if the agent thread actually
# exited. If it's still alive (abandoned after the 10s wait),
# the flag is what makes the wedged tool eventually unwind;
# clearing it would un-signal that thread.
try:
if (
not agent_thread.is_alive()
and self.agent
and getattr(self.agent, "_interrupt_requested", False)
):
self.agent.clear_interrupt()
except Exception:
pass
response_previewed = result.get("response_previewed", False) if result else False
@ -15240,6 +15344,15 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
finally:
self._should_exit = True
self._pet_stop_anim()
# Immediate feedback: prompt_toolkit has just torn down the input
# box + status bar, so without a line here the terminal sits
# silent for the whole cleanup window (session flush, memory
# shutdown, MCP/browser/terminal teardown) and the exit looks
# hung. Print before any potentially-slow step.
try:
print(f"{_DIM}Shutting down… (finalizing session){_RST}", flush=True)
except Exception:
pass
# Interrupt the agent immediately so its daemon thread stops making
# API calls and exits promptly (agent_thread is daemon, so the
# process will exit once the main thread finishes, but interrupting

View file

@ -0,0 +1,194 @@
"""Regression tests for the CLI interrupt-acknowledgement race.
Symptom (user report, July 2026): interrupting an active turn is
unreliable the interrupt message is sometimes "vacuumed into the void".
Root cause: ``HermesCLI.chat()`` fires ``agent.interrupt(msg)`` from its
monitor loop, but only re-queued the message when the turn RESULT carried
``interrupted=True``. Two races defeat that:
1. The agent thread passes its last ``_interrupt_requested`` check (or
finishes entirely) just before the interrupt lands the turn
completes "normally", ``finalize_turn()`` never acknowledges the
interrupt, and the user's message was silently dropped.
2. Worse, when the interrupt lands *after* ``finalize_turn()``'s
``clear_interrupt()``, the stale ``_interrupt_requested`` flag
survives on the agent and instantly aborts the NEXT turn at its
first loop check.
The fix: when ``chat()`` consumed an ``interrupt_msg`` but the result
doesn't acknowledge the interrupt, re-queue the message as the next turn
and clear the stale agent flag (only when the agent thread has exited).
"""
from __future__ import annotations
import importlib
import queue
import sys
import time
from unittest.mock import MagicMock, patch
def _make_cli():
"""Build a HermesCLI with prompt_toolkit stubbed (same pattern as
test_cli_interrupt_drain_regression.py)."""
_clean_config = {
"model": {
"default": "anthropic/claude-opus-4.6",
"base_url": "https://openrouter.ai/api/v1",
"provider": "auto",
},
"display": {"compact": False, "tool_progress": "all"},
"agent": {},
"terminal": {"env_type": "local"},
}
clean_env = {"LLM_MODEL": "", "HERMES_MAX_ITERATIONS": ""}
prompt_toolkit_stubs = {
"prompt_toolkit": MagicMock(),
"prompt_toolkit.history": MagicMock(),
"prompt_toolkit.styles": MagicMock(),
"prompt_toolkit.patch_stdout": MagicMock(),
"prompt_toolkit.application": MagicMock(),
"prompt_toolkit.layout": MagicMock(),
"prompt_toolkit.layout.processors": MagicMock(),
"prompt_toolkit.filters": MagicMock(),
"prompt_toolkit.layout.dimension": MagicMock(),
"prompt_toolkit.layout.menus": MagicMock(),
"prompt_toolkit.widgets": MagicMock(),
"prompt_toolkit.key_binding": MagicMock(),
"prompt_toolkit.completion": MagicMock(),
"prompt_toolkit.formatted_text": MagicMock(),
"prompt_toolkit.auto_suggest": MagicMock(),
}
with patch.dict(sys.modules, prompt_toolkit_stubs), patch.dict(
"os.environ", clean_env, clear=False
):
import cli as _cli_mod
_cli_mod = importlib.reload(_cli_mod)
with patch.object(_cli_mod, "get_tool_definitions", return_value=[]), patch.dict(
_cli_mod.__dict__, {"CLI_CONFIG": _clean_config}
):
return _cli_mod.HermesCLI()
class _StubAgent:
"""Agent whose turn completes WITHOUT acknowledging the interrupt."""
def __init__(self, session_id, turn_seconds=0.5):
self.session_id = session_id
self.turn_seconds = turn_seconds
self._interrupt_requested = False
self._interrupt_message = None
self._active_children = []
self.interrupt_calls = []
self.clear_calls = 0
self.max_iterations = 90
self.model = "test/model"
self.platform = "cli"
def run_conversation(self, **kwargs):
# Simulate a turn that finishes normally — it never observed the
# interrupt flag (raced past its last check).
time.sleep(self.turn_seconds)
return {
"final_response": "turn finished normally",
"messages": [
{"role": "user", "content": "original"},
{"role": "assistant", "content": "turn finished normally"},
],
"api_calls": 1,
"completed": True,
# NOTE: no "interrupted" key — the race means finalize_turn
# never saw the flag (or cleared it before it was re-set).
"partial": True, # skip auto-title thread in the test
# Skip the Rich Panel rendering path (crashes under the
# prompt_toolkit/skin mocks; irrelevant to this regression).
"response_previewed": True,
}
def interrupt(self, message=None):
self.interrupt_calls.append(message)
self._interrupt_requested = True
self._interrupt_message = message
def clear_interrupt(self):
self.clear_calls += 1
self._interrupt_requested = False
self._interrupt_message = None
def test_unacknowledged_interrupt_message_is_requeued_not_dropped():
cli = _make_cli()
agent = _StubAgent(cli.session_id)
cli.agent = agent
cli._interrupt_queue = queue.Queue()
cli._pending_input = queue.Queue()
cli._interrupt_queue.put("urgent new message")
with patch.object(cli, "_ensure_runtime_credentials", return_value=True), \
patch.object(cli, "_resolve_turn_agent_config", return_value={
"signature": cli._active_agent_route_signature,
"model": None, "runtime": None, "request_overrides": None,
}), \
patch.object(cli, "_init_agent", return_value=True):
cli.chat("original")
# The interrupt fired against the agent...
assert agent.interrupt_calls == ["urgent new message"]
# ...the turn result never acknowledged it, so the message must be
# re-queued as the next turn instead of dropped.
queued = []
while not cli._pending_input.empty():
queued.append(cli._pending_input.get_nowait())
assert any("urgent new message" in str(q) for q in queued), (
f"interrupt message was dropped; pending_input={queued!r}"
)
# ...and the stale flag must be cleared so the NEXT turn doesn't
# instantly self-abort at its first _interrupt_requested check.
assert agent._interrupt_requested is False
assert agent.clear_calls >= 1
def test_acknowledged_interrupt_still_requeues_message():
"""The pre-existing path (result carries interrupted=True) still works."""
cli = _make_cli()
class _AckAgent(_StubAgent):
def run_conversation(self, **kwargs):
# Wait until the monitor loop delivers the interrupt.
for _ in range(100):
if self._interrupt_requested:
break
time.sleep(0.05)
return {
"final_response": "partial work",
"messages": [{"role": "assistant", "content": "partial work"}],
"api_calls": 1,
"completed": False,
"interrupted": True,
"interrupt_message": self._interrupt_message,
"partial": True,
}
agent = _AckAgent(cli.session_id)
cli.agent = agent
cli._interrupt_queue = queue.Queue()
cli._pending_input = queue.Queue()
cli._interrupt_queue.put("redirect please")
with patch.object(cli, "_ensure_runtime_credentials", return_value=True), \
patch.object(cli, "_resolve_turn_agent_config", return_value={
"signature": cli._active_agent_route_signature,
"model": None, "runtime": None, "request_overrides": None,
}), \
patch.object(cli, "_init_agent", return_value=True):
cli.chat("original")
queued = []
while not cli._pending_input.empty():
queued.append(cli._pending_input.get_nowait())
assert any("redirect please" in str(q) for q in queued)
assert cli._last_turn_interrupted is True

View file

@ -2731,7 +2731,7 @@ class TestConcurrentToolExecution:
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
with patch("agent.tool_executor.concurrent.futures.ThreadPoolExecutor", ShutdownExecutor):
with patch("tools.daemon_pool.DaemonThreadPoolExecutor", ShutdownExecutor):
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
assert len(messages) == 2

View file

@ -0,0 +1,89 @@
"""Tests for tools.daemon_pool.DaemonThreadPoolExecutor.
The daemon pool exists so abandoned workers (interrupted/timed-out tool
batches, wedged memory-provider syncs) can never block interpreter exit:
stdlib ThreadPoolExecutor workers are non-daemon AND registered in
concurrent.futures.thread._threads_queues, whose atexit hook joins every
worker unconditionally even after shutdown(wait=False).
"""
import subprocess
import sys
import threading
import time
from concurrent.futures.thread import _threads_queues
from tools.daemon_pool import DaemonThreadPoolExecutor
def test_workers_are_daemon_threads():
pool = DaemonThreadPoolExecutor(max_workers=2)
try:
info = pool.submit(
lambda: (threading.current_thread().daemon, threading.current_thread())
).result(timeout=10)
is_daemon, worker = info
assert is_daemon is True
# Not registered with concurrent.futures' atexit join hook.
assert worker not in _threads_queues
finally:
pool.shutdown(wait=True)
def test_results_and_initializer_work_like_stdlib():
seen = []
def _init(tag):
seen.append(tag)
pool = DaemonThreadPoolExecutor(max_workers=1, initializer=_init, initargs=("t",))
try:
assert pool.submit(lambda: 41 + 1).result(timeout=10) == 42
assert seen == ["t"]
finally:
pool.shutdown(wait=True)
def test_idle_worker_reuse():
pool = DaemonThreadPoolExecutor(max_workers=4)
try:
tid1 = pool.submit(threading.get_ident).result(timeout=10)
time.sleep(0.05) # let the worker park on the idle semaphore
tid2 = pool.submit(threading.get_ident).result(timeout=10)
assert tid1 == tid2
finally:
pool.shutdown(wait=True)
def test_wedged_worker_does_not_block_interpreter_exit():
"""A worker stuck in a long sleep must not hold the process open.
With stdlib ThreadPoolExecutor this subprocess hangs until the sleep
finishes (the atexit hook joins the worker); with the daemon pool it
exits as soon as the main thread returns.
"""
script = (
"import sys; sys.path.insert(0, %r)\n"
"from tools.daemon_pool import DaemonThreadPoolExecutor\n"
"import time\n"
"pool = DaemonThreadPoolExecutor(max_workers=1)\n"
"pool.submit(time.sleep, 120)\n"
"time.sleep(0.3)\n"
"pool.shutdown(wait=False)\n"
"print('main-done', flush=True)\n"
) % (str(_repo_root()),)
proc = subprocess.run(
[sys.executable, "-c", script],
capture_output=True,
text=True,
timeout=30,
)
assert proc.returncode == 0
assert "main-done" in proc.stdout
def _repo_root():
import pathlib
return pathlib.Path(__file__).resolve().parents[2]

View file

@ -40,48 +40,18 @@ import logging
import threading
import time
import uuid
import weakref
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures.thread import _worker
from typing import Any, Callable, Dict, List, Optional
from tools.daemon_pool import DaemonThreadPoolExecutor
from tools.thread_context import propagate_context_to_thread
logger = logging.getLogger(__name__)
class _DaemonThreadPoolExecutor(ThreadPoolExecutor):
"""ThreadPoolExecutor variant whose workers do not block process exit.
Stdlib ``ThreadPoolExecutor`` workers are non-daemon. Background
delegation is explicitly best-effort detached work, so a long child should
be interruptible by ``/stop``/shutdown but must not keep a CLI process alive
after the user exits.
"""
def _adjust_thread_count(self) -> None:
if self._idle_semaphore.acquire(timeout=0):
return
def weakref_cb(_, q=self._work_queue):
q.put(None)
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = "%s_%d" % (self._thread_name_prefix or self, num_threads)
t = threading.Thread(
name=thread_name,
target=_worker,
args=(
weakref.ref(self, weakref_cb),
self._work_queue,
self._initializer,
self._initargs,
),
daemon=True,
)
t.start()
self._threads.add(t)
# Back-compat alias — the daemon executor now lives in tools.daemon_pool so
# other subsystems (tool_executor, memory_manager, delegate_tool, skills_hub)
# can share it. Existing imports of ``_DaemonThreadPoolExecutor`` keep working.
_DaemonThreadPoolExecutor = DaemonThreadPoolExecutor
# ---------------------------------------------------------------------------

64
tools/daemon_pool.py Normal file
View file

@ -0,0 +1,64 @@
"""Shared daemon-thread ThreadPoolExecutor.
Stdlib ``ThreadPoolExecutor`` workers are non-daemon AND are registered in
``concurrent.futures.thread._threads_queues``, whose atexit hook
(``_python_exit``) joins every worker unconditionally even after
``shutdown(wait=False)``. A single wedged worker (tool blocked on network
I/O, hung provider daemon, stuck subagent) therefore blocks interpreter
exit forever. This is the root cause of multi-minute CLI exits on long
sessions: every abandoned concurrent-tool batch leaves workers that the
exit hook insists on joining.
``DaemonThreadPoolExecutor`` spawns daemon workers and skips the
``_threads_queues`` registration, so:
- ``_python_exit`` never joins them, and
- the interpreter's non-daemon thread join at shutdown skips them.
Semantics are otherwise identical (initializer/initargs, work queue,
idle-thread reuse). Use it for any pool whose work is best-effort or
independently interruptible and must never hold the process open:
concurrent tool execution, background memory sync, catalog fan-out,
subagent timeout wrappers. Do NOT use it for work that must complete
before exit (durable writes) those belong on foreground threads with
explicit bounded joins.
"""
from __future__ import annotations
import threading
import weakref
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures.thread import _worker
__all__ = ["DaemonThreadPoolExecutor"]
class DaemonThreadPoolExecutor(ThreadPoolExecutor):
"""ThreadPoolExecutor variant whose workers do not block process exit."""
def _adjust_thread_count(self) -> None:
# Mirrors CPython's implementation (3.83.13) with two changes:
# daemon=True and no _threads_queues registration.
if self._idle_semaphore.acquire(timeout=0):
return
def weakref_cb(_, q=self._work_queue):
q.put(None)
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = "%s_%d" % (self._thread_name_prefix or self, num_threads)
t = threading.Thread(
name=thread_name,
target=_worker,
args=(
weakref.ref(self, weakref_cb),
self._work_queue,
self._initializer,
self._initargs,
),
daemon=True,
)
t.start()
self._threads.add(t)

View file

@ -1887,7 +1887,11 @@ def _run_single_child(
# result(timeout=None) blocks until the child finishes). Stuck-child
# protection comes from the heartbeat staleness monitor instead.
child_timeout = _get_child_timeout()
_timeout_executor = ThreadPoolExecutor(
# Daemon worker (tools.daemon_pool): a timed-out child is abandoned
# below; a stdlib non-daemon worker would then block interpreter
# exit at atexit-join time if the child never unwinds.
from tools.daemon_pool import DaemonThreadPoolExecutor
_timeout_executor = DaemonThreadPoolExecutor(
max_workers=1,
# Install a non-interactive approval callback in the worker thread
# so dangerous-command prompts from the subagent don't fall back to
@ -2535,7 +2539,11 @@ def delegate_task(
completed_count = 0
spinner_ref = getattr(parent_agent, "_delegate_spinner", None)
with ThreadPoolExecutor(max_workers=max_children) as executor:
# Daemon workers (tools.daemon_pool): the `with` block still joins
# normally, but if the parent is interrupted while a child is
# wedged, the abandoned worker must not block interpreter exit.
from tools.daemon_pool import DaemonThreadPoolExecutor
with DaemonThreadPoolExecutor(max_workers=max_children) as executor:
futures = {}
for i, t, child in children:
future = executor.submit(

View file

@ -4005,8 +4005,11 @@ def parallel_search_sources(
# worker finishes — so a single slow source (e.g. ClawHub) keeps the
# caller blocked for minutes and renders ``overall_timeout`` a no-op.
# Manage the executor manually and shut it down with ``wait=False`` so
# the timeout is actually honoured.
pool = ThreadPoolExecutor(max_workers=min(len(active), 8))
# the timeout is actually honoured. Daemon workers (tools.daemon_pool):
# an abandoned slow source must not block interpreter exit either —
# stdlib workers are joined unconditionally by the atexit hook.
from tools.daemon_pool import DaemonThreadPoolExecutor
pool = DaemonThreadPoolExecutor(max_workers=min(len(active), 8))
futures = {}
for src in active:
lim = per_source_limits.get(src.source_id(), 50)