diff --git a/cli.py b/cli.py index ca01b82d5..bc4f4a76b 100644 --- a/cli.py +++ b/cli.py @@ -977,6 +977,11 @@ def _run_cleanup(*, notify_session_finalize: bool = True): _cleanup_all_terminals() except Exception: pass + try: + from tools.async_delegation import interrupt_all as _interrupt_async_delegations + _interrupt_async_delegations(reason="CLI shutdown") + except Exception: + pass try: _cleanup_all_browsers() except Exception: diff --git a/gateway/run.py b/gateway/run.py index 4541e0fa6..1650851fb 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1921,9 +1921,42 @@ def _format_gateway_process_notification(evt: dict) -> "str | None": text += "]" return text + if evt_type == "async_delegation": + # Reuse the shared rich formatter (self-contained task-source block). + from tools.process_registry import format_process_notification + return format_process_notification(evt) + return None +def _drain_gateway_watch_events(completion_queue) -> "list[dict]": + """Drain gateway-owned watch events without spinning on requeued events. + + Watch events are handled by the post-turn gateway drain. Process + completions are owned by their per-process watcher task, and async + delegation completions are owned by ``_async_delegation_watcher``. + Requeueing async events inside ``while not queue.empty()`` would make the + loop non-terminating, so detach the current batch first, then requeue any + events this drain does not own after the queue is empty. + """ + watch_events: list[dict] = [] + requeue: list[dict] = [] + while not completion_queue.empty(): + try: + evt = completion_queue.get_nowait() + except Exception: + break + evt_type = evt.get("type", "completion") + if evt_type in {"watch_match", "watch_disabled"}: + watch_events.append(evt) + elif evt_type == "async_delegation": + requeue.append(evt) + # else: process completion events are handled by the watcher task + for evt in requeue: + completion_queue.put(evt) + return watch_events + + # Module-level weak reference to the active GatewayRunner instance. # Used by tools (e.g. send_message) that need to route through a live # adapter for plugin platforms. Set in GatewayRunner.__init__(). @@ -5353,6 +5386,12 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew # turn so the agent kicks off the new chat. asyncio.create_task(self._handoff_watcher()) + # Start background async-delegation watcher — drains completion events + # from delegate_task(background=true) subagents and injects each + # result back into its originating session as a new turn, covering the + # idle case where the subagent finishes with no agent turn running. + asyncio.create_task(self._async_delegation_watcher()) + logger.info("Press Ctrl+C to stop") return True @@ -5989,6 +6028,16 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew ) except Exception as _e: logger.debug("process_registry.kill_all (%s) error: %s", phase, _e) + try: + from tools.async_delegation import interrupt_all as _interrupt_async + _async_n = _interrupt_async(reason=f"gateway shutdown ({phase})") + if _async_n: + logger.info( + "Shutdown (%s): interrupted %d background delegation(s)", + phase, _async_n, + ) + except Exception as _e: + logger.debug("async interrupt_all (%s) error: %s", phase, _e) try: from tools.terminal_tool import cleanup_all_environments cleanup_all_environments() @@ -8995,18 +9044,17 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew logger.error("Process watcher setup error: %s", e) # Drain watch pattern notifications that arrived during the agent run. - # Watch events and completions share the same queue; completions are - # already handled by the per-process watcher task above, so we only - # inject watch-type events here. + # Watch events and completions share the same queue; process + # completions are already handled by the per-process watcher task + # above, so we only inject watch-type events here. + # + # Async-delegation completions ALSO ride this shared queue but are + # owned by the dedicated _async_delegation_watcher (started at + # boot), which covers both the idle and post-turn cases with a + # single consumer — so we leave them on the queue here. try: from tools.process_registry import process_registry as _pr - _watch_events = [] - while not _pr.completion_queue.empty(): - evt = _pr.completion_queue.get_nowait() - evt_type = evt.get("type", "completion") - if evt_type in {"watch_match", "watch_disabled"}: - _watch_events.append(evt) - # else: completion events are handled by the watcher task + _watch_events = _drain_gateway_watch_events(_pr.completion_queue) for evt in _watch_events: synth_text = _format_gateway_process_notification(evt) if synth_text: @@ -12265,6 +12313,74 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew except Exception as e: logger.error("Watch notification injection error: %s", e) + def _enrich_async_delegation_routing(self, evt: dict) -> None: + """Fill platform/chat_id/thread_id/chat_type on an async-delegation event. + + Async-delegation completion events only carry ``session_key`` (the + daemon worker has no access to the per-message routing metadata the + terminal background watcher captures at spawn time). Parse the + session_key into the routing fields ``_build_process_event_source`` + expects. Best-effort: a CLI-origin event (empty session_key) is left + as-is and simply won't route on the gateway. + """ + if evt.get("platform"): + return # already enriched + parsed = _parse_session_key(evt.get("session_key", "") or "") + if not parsed: + return + evt["platform"] = parsed.get("platform", "") + evt["chat_type"] = parsed.get("chat_type", "") + evt["chat_id"] = parsed.get("chat_id", "") + if parsed.get("thread_id"): + evt["thread_id"] = parsed["thread_id"] + + async def _async_delegation_watcher(self, interval: float = 2.0) -> None: + """Drain async-delegation completions and inject them as new turns. + + Background subagents (``delegate_task(background=true)``) run on the + async-delegation daemon executor — they have no per-process watcher + task, so their completion events would only be seen by the post-turn + queue drain. This watcher covers the IDLE case: when a background + subagent finishes while no agent turn is running, its result still + re-enters the originating session promptly. + + Mirrors the CLI's idle ``process_loop`` drain. Stays silent when the + queue has nothing for us; ignores non-async event types (those are + handled by ``_run_process_watcher`` / the post-turn drain). + """ + await asyncio.sleep(3) # let platforms finish connecting + from tools.process_registry import process_registry as _pr + while self._running: + try: + # Peek the queue for async-delegation events. We must NOT + # consume watch/completion events here (other drains own them), + # so requeue anything that isn't ours. + requeue = [] + async_events = [] + while not _pr.completion_queue.empty(): + try: + evt = _pr.completion_queue.get_nowait() + except Exception: + break + if evt.get("type") == "async_delegation": + async_events.append(evt) + else: + requeue.append(evt) + for evt in requeue: + _pr.completion_queue.put(evt) + for evt in async_events: + self._enrich_async_delegation_routing(evt) + synth_text = _format_gateway_process_notification(evt) + if not synth_text: + continue + try: + await self._inject_watch_notification(synth_text, evt) + except Exception as e: + logger.error("Async delegation injection error: %s", e) + except Exception as e: + logger.debug("Async delegation watcher error: %s", e) + await asyncio.sleep(interval) + async def _run_process_watcher(self, watcher: dict) -> None: """ Periodically check a background process and push updates to the user. diff --git a/hermes_cli/cli_commands_mixin.py b/hermes_cli/cli_commands_mixin.py index b52c6de80..499f8e9a1 100644 --- a/hermes_cli/cli_commands_mixin.py +++ b/hermes_cli/cli_commands_mixin.py @@ -225,7 +225,8 @@ class CLICommandsMixin: print(" Usage: /snapshot [list|create [label]|restore |prune [N]]") def _handle_stop_command(self): - """Handle /stop — kill all running background processes. + """Handle /stop — kill all running background processes and + background (async) delegations. Inspired by OpenAI Codex's separation of interrupt (stop current turn) from /stop (clean up background processes). See openai/codex#14602. @@ -235,13 +236,26 @@ class CLICommandsMixin: processes = process_registry.list_sessions() running = [p for p in processes if p.get("status") == "running"] - if not running: + # Background subagents dispatched via delegate_task(background=true) + # live in their own registry, not the process registry. + try: + from tools.async_delegation import active_count, interrupt_all + n_async = active_count() + except Exception: + n_async = 0 + interrupt_all = None + + if not running and not n_async: print(" No running background processes.") return - print(f" Stopping {len(running)} background process(es)...") - killed = process_registry.kill_all() - print(f" ✅ Stopped {killed} process(es).") + if running: + print(f" Stopping {len(running)} background process(es)...") + killed = process_registry.kill_all() + print(f" ✅ Stopped {killed} process(es).") + if n_async and interrupt_all is not None: + stopped = interrupt_all(reason="/stop") + print(f" ✅ Interrupted {stopped} background delegation(s).") def _handle_agents_command(self): """Handle /agents — show background processes and agent status.""" @@ -261,6 +275,22 @@ class CLICommandsMixin: if finished: _cprint(f" Recently finished: {len(finished)}") + # Background (async) delegations — delegate_task(background=true) + try: + from tools.async_delegation import list_async_delegations + delegations = list_async_delegations() + except Exception: + delegations = [] + running_d = [d for d in delegations if d.get("status") == "running"] + if delegations: + _cprint(f" Background delegations: {len(running_d)} running") + for d in delegations: + goal = (d.get("goal") or "")[:60] + _cprint( + f" {d.get('delegation_id', '?')} · " + f"{d.get('status', '?')} · {goal}" + ) + agent_running = getattr(self, "_agent_running", False) _cprint(f" Agent: {'running' if agent_running else 'idle'}") diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 7ee1f8690..3a0982562 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -1775,6 +1775,7 @@ DEFAULT_CONFIG = { "reasoning_effort": "", # reasoning effort for subagents: "xhigh", "high", "medium", # "low", "minimal", "none" (empty = inherit parent's level) "max_concurrent_children": 3, # max parallel children per batch; floor of 1 enforced, no ceiling + "max_async_children": 3, # max concurrent background (background=true) subagents; new dispatches rejected at capacity # Orchestrator role controls (see tools/delegate_tool.py:_get_max_spawn_depth # and _get_orchestrator_enabled). Floored at 1, no upper ceiling — # raise deliberately, each level multiplies API cost. diff --git a/tests/tools/test_async_delegation.py b/tests/tools/test_async_delegation.py new file mode 100644 index 000000000..5dbecfc4b --- /dev/null +++ b/tests/tools/test_async_delegation.py @@ -0,0 +1,473 @@ +"""Tests for async (background) delegation — tools/async_delegation.py. + +Covers the dispatch handle, non-blocking behavior, completion-event delivery +onto the shared process_registry.completion_queue, the rich re-injection block +formatting, capacity rejection, and crash handling. +""" + +import queue +import threading +import time + +import pytest + +from tools import async_delegation as ad +from tools.process_registry import process_registry, format_process_notification + + +@pytest.fixture(autouse=True) +def _clean_state(): + ad._reset_for_tests() + while not process_registry.completion_queue.empty(): + process_registry.completion_queue.get_nowait() + yield + ad._reset_for_tests() + while not process_registry.completion_queue.empty(): + process_registry.completion_queue.get_nowait() + + +def _drain_one(timeout=5.0): + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if not process_registry.completion_queue.empty(): + return process_registry.completion_queue.get_nowait() + time.sleep(0.02) + return None + + +def test_dispatch_returns_immediately_without_blocking(): + gate = threading.Event() + + def runner(): + gate.wait(timeout=5) + return {"status": "completed", "summary": "done", "api_calls": 1, + "duration_seconds": 0.1, "model": "m"} + + t0 = time.monotonic() + res = ad.dispatch_async_delegation( + goal="g", context=None, toolsets=None, role="leaf", model="m", + session_key="", runner=runner, max_async_children=3, + ) + elapsed = time.monotonic() - t0 + + assert res["status"] == "dispatched" + assert res["delegation_id"].startswith("deleg_") + # Non-blocking invariant: dispatch returned while the runner is still + # gated (active), so it cannot have waited on the gate. The active_count + # check is the environment-independent proof; the generous wall-clock + # bound is a loose sanity backstop, not the primary assertion (a loaded + # CI runner can be slow but never anywhere near the runner's 5s gate). + assert ad.active_count() == 1 + assert elapsed < 4.0, f"dispatch blocked {elapsed:.2f}s (gate is 5s)" + gate.set() + + +def test_async_executor_workers_are_daemon_threads(): + gate = threading.Event() + + def runner(): + gate.wait(timeout=5) + return {"status": "completed", "summary": "done"} + + res = ad.dispatch_async_delegation( + goal="daemon check", context=None, toolsets=None, role="leaf", model="m", + session_key="", runner=runner, max_async_children=1, + ) + assert res["status"] == "dispatched" + + deadline = time.monotonic() + 2 + worker = None + while time.monotonic() < deadline: + worker = next( + (t for t in threading.enumerate() if t.name.startswith("async-delegate")), + None, + ) + if worker is not None: + break + time.sleep(0.02) + assert worker is not None + assert worker.daemon is True + gate.set() + assert _drain_one() is not None + + +def test_completion_event_lands_on_shared_queue_with_session_key(): + def runner(): + return {"status": "completed", "summary": "the result", + "api_calls": 3, "duration_seconds": 2.0, "model": "test-model"} + + res = ad.dispatch_async_delegation( + goal="compute X", context="some context", toolsets=["web", "file"], + role="leaf", model="test-model", session_key="agent:main:cli:dm:local", + runner=runner, max_async_children=3, + ) + assert res["status"] == "dispatched" + + evt = _drain_one() + assert evt is not None + assert evt["type"] == "async_delegation" + assert evt["summary"] == "the result" + assert evt["session_key"] == "agent:main:cli:dm:local" + assert evt["delegation_id"] == res["delegation_id"] + + +def test_rich_reinjection_block_is_self_contained(): + def runner(): + return {"status": "completed", "summary": "The answer is 42.", + "api_calls": 7, "duration_seconds": 3.5, "model": "test-model"} + + ad.dispatch_async_delegation( + goal="Compute the meaning of life", + context="User is a philosopher. Respond tersely.", + toolsets=["web"], role="leaf", model="test-model", + session_key="", runner=runner, max_async_children=3, + ) + evt = _drain_one() + assert evt is not None + text = format_process_notification(evt) + assert text is not None + for needle in [ + "ASYNC DELEGATION COMPLETE", + "Compute the meaning of life", + "User is a philosopher", + "Toolsets: web", + "The answer is 42.", + "Status: completed", + "API calls: 7", + ]: + assert needle in text, f"missing {needle!r}" + + +def test_dispatch_rejected_at_capacity(): + ev = threading.Event() + + def blocker(): + ev.wait(timeout=5) + return {"status": "completed", "summary": "x"} + + for i in range(2): + r = ad.dispatch_async_delegation( + goal=f"task{i}", context=None, toolsets=None, role="leaf", + model="m", session_key="", runner=blocker, max_async_children=2, + ) + assert r["status"] == "dispatched" + + r3 = ad.dispatch_async_delegation( + goal="task3", context=None, toolsets=None, role="leaf", model="m", + session_key="", runner=blocker, max_async_children=2, + ) + assert r3["status"] == "rejected" + assert "capacity reached" in r3["error"] + ev.set() + + +def test_crashed_runner_produces_error_completion(): + def boom(): + raise RuntimeError("subagent exploded") + + r = ad.dispatch_async_delegation( + goal="risky", context=None, toolsets=None, role="leaf", model="m", + session_key="", runner=boom, max_async_children=3, + ) + assert r["status"] == "dispatched" + evt = _drain_one() + assert evt is not None + assert evt["status"] == "error" + text = format_process_notification(evt) + assert text is not None + assert "did not complete successfully" in text + assert "subagent exploded" in text + + +def test_interrupt_all_signals_running_children(): + ev = threading.Event() + interrupted = {"count": 0} + + def blocker(): + ev.wait(timeout=5) + return {"status": "interrupted", "summary": None, + "error": "cancelled"} + + def interrupt_fn(): + interrupted["count"] += 1 + ev.set() + + ad.dispatch_async_delegation( + goal="long task", context=None, toolsets=None, role="leaf", + model="m", session_key="", runner=blocker, + interrupt_fn=interrupt_fn, max_async_children=3, + ) + n = ad.interrupt_all(reason="test") + assert n == 1 + assert interrupted["count"] == 1 + # child still emits a completion event after interrupt + evt = _drain_one() + assert evt is not None + assert evt["status"] == "interrupted" + + +def test_completed_records_pruned_to_cap(): + # Run more than the retention cap quickly; ensure list doesn't grow forever. + for i in range(ad._MAX_RETAINED_COMPLETED + 10): + ad.dispatch_async_delegation( + goal=f"t{i}", context=None, toolsets=None, role="leaf", model="m", + session_key="", runner=lambda: {"status": "completed", "summary": "ok"}, + max_async_children=ad._MAX_RETAINED_COMPLETED + 20, + ) + # let workers finish + deadline = time.monotonic() + 10 + while time.monotonic() < deadline and ad.active_count() > 0: + time.sleep(0.05) + assert len(ad.list_async_delegations()) <= ad._MAX_RETAINED_COMPLETED + + +# --------------------------------------------------------------------------- +# Integration: delegate_task(background=True) routing +# --------------------------------------------------------------------------- + +def test_delegate_task_background_routes_async_and_does_not_block(monkeypatch): + """delegate_task(background=True) returns a handle without running the + child synchronously, and the child completes on the background thread.""" + from unittest.mock import MagicMock, patch + import tools.delegate_tool as dt + + parent = MagicMock() + parent._delegate_depth = 0 + parent.session_id = "sess" + parent._interrupt_requested = False + fake_child = MagicMock() + fake_child._delegate_role = "leaf" + fake_child._subagent_id = "s1" + + gate = threading.Event() + + def slow_child(task_index, goal, child=None, parent_agent=None, **kw): + gate.wait(timeout=5) # a sync impl would hang delegate_task here + return { + "task_index": 0, "status": "completed", "summary": f"done: {goal}", + "api_calls": 1, "duration_seconds": 0.1, "model": "m", + "exit_reason": "completed", + } + + creds = { + "model": "m", "provider": None, "base_url": None, "api_key": None, + "api_mode": None, "command": None, "args": None, + } + with patch.object(dt, "_build_child_agent", return_value=fake_child), \ + patch.object(dt, "_run_single_child", side_effect=slow_child), \ + patch.object(dt, "_resolve_delegation_credentials", return_value=creds): + out = dt.delegate_task( + goal="the real task", context="ctx", toolsets=["web"], + background=True, parent_agent=parent, + ) + + import json + parsed = json.loads(out) + assert parsed["status"] == "dispatched" + assert parsed["mode"] == "background" + assert parsed["delegation_id"].startswith("deleg_") + # The real non-blocking invariant (environment-independent — no wall-clock + # threshold that flakes on a loaded CI runner): delegate_task returned + # while the child is STILL blocked on the closed gate, so no completion + # event exists yet. A synchronous impl could not have returned here — it + # would still be inside slow_child waiting on the gate. + assert process_registry.completion_queue.empty() + assert ad.active_count() == 1 # child running in background, not finished + + gate.set() + evt = _drain_one() + assert evt is not None + assert evt["type"] == "async_delegation" + assert evt["summary"] == "done: the real task" + text = format_process_notification(evt) + assert text is not None + assert "the real task" in text and "ctx" in text + + +def test_delegate_task_background_rejects_batch(monkeypatch): + """background=True with a multi-item tasks batch is rejected (v1: single-task only).""" + import json + from unittest.mock import MagicMock + import tools.delegate_tool as dt + + parent = MagicMock() + parent._delegate_depth = 0 + parent.session_id = "sess" + + out = dt.delegate_task( + tasks=[{"goal": "a"}, {"goal": "b"}], + background=True, + parent_agent=parent, + ) + parsed = json.loads(out) + assert "error" in parsed + assert "single-task only" in parsed["error"] + + +def test_delegate_task_background_detaches_child_from_parent(monkeypatch): + """A background child must NOT remain in parent._active_children — + otherwise parent-turn interrupts / cache evicts / session close would + kill the detached subagent mid-run.""" + from unittest.mock import MagicMock, patch + import tools.delegate_tool as dt + + parent = MagicMock() + parent._delegate_depth = 0 + parent.session_id = "sess" + parent._active_children = [] + parent._active_children_lock = threading.Lock() + fake_child = MagicMock() + fake_child._delegate_role = "leaf" + fake_child._subagent_id = "s1" + + gate = threading.Event() + + def slow_child(task_index, goal, child=None, parent_agent=None, **kw): + gate.wait(timeout=5) + return {"task_index": 0, "status": "completed", "summary": "ok"} + + def build_and_register(**kw): + # Mirror what the real _build_child_agent does: register the child + # for interrupt propagation. + parent._active_children.append(fake_child) + return fake_child + + creds = { + "model": "m", "provider": None, "base_url": None, "api_key": None, + "api_mode": None, "command": None, "args": None, + } + with patch.object(dt, "_build_child_agent", side_effect=build_and_register), \ + patch.object(dt, "_run_single_child", side_effect=slow_child), \ + patch.object(dt, "_resolve_delegation_credentials", return_value=creds): + out = dt.delegate_task(goal="bg task", background=True, parent_agent=parent) + + import json + assert json.loads(out)["status"] == "dispatched" + # Child detached immediately at dispatch, while it is still running. + assert fake_child not in parent._active_children + gate.set() + assert _drain_one() is not None + + +def test_concurrent_dispatch_respects_capacity(): + """Two threads racing dispatch with cap=1 must yield exactly one accept + (capacity check and record insert are atomic under the records lock).""" + gate = threading.Event() + + def blocker(): + gate.wait(timeout=5) + return {"status": "completed", "summary": "x"} + + results = [] + barrier = threading.Barrier(2) + + def racer(): + barrier.wait(timeout=5) + results.append( + ad.dispatch_async_delegation( + goal="race", context=None, toolsets=None, role="leaf", + model="m", session_key="", runner=blocker, + max_async_children=1, + ) + ) + + threads = [threading.Thread(target=racer) for _ in range(2)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=10) + statuses = sorted(r["status"] for r in results) + assert statuses == ["dispatched", "rejected"] + gate.set() + + +# --------------------------------------------------------------------------- +# Gateway routing: session_key -> platform/chat_id, rich formatting, injection +# --------------------------------------------------------------------------- + +def _make_async_evt(**over): + evt = { + "type": "async_delegation", + "delegation_id": "deleg_x1", + "session_key": "agent:main:telegram:dm:12345:678", + "goal": "Investigate flaky test", + "context": "repo /tmp/p", + "toolsets": ["terminal"], + "role": "leaf", + "model": "m", + "status": "completed", + "summary": "Found the bug in test_foo", + "api_calls": 4, + "duration_seconds": 12.0, + "dispatched_at": 1000.0, + "completed_at": 1012.0, + } + evt.update(over) + return evt + + +def test_gateway_enriches_routing_from_session_key(): + from gateway.run import GatewayRunner + + runner = object.__new__(GatewayRunner) + evt = _make_async_evt() + runner._enrich_async_delegation_routing(evt) + assert evt["platform"] == "telegram" + assert evt["chat_id"] == "12345" + assert evt["thread_id"] == "678" + + +def test_gateway_formatter_renders_async_block(): + from gateway.run import _format_gateway_process_notification + + txt = _format_gateway_process_notification(_make_async_evt()) + assert txt is not None + assert "ASYNC DELEGATION COMPLETE" in txt + assert "Found the bug in test_foo" in txt + assert "Investigate flaky test" in txt + + +def test_gateway_watch_drain_requeues_async_without_looping(): + from gateway.run import _drain_gateway_watch_events + + q = queue.Queue() + async_evt = _make_async_evt() + watch_evt = { + "type": "watch_match", + "session_id": "proc_1", + "command": "pytest", + "pattern": "READY", + "output": "READY", + } + q.put(async_evt) + q.put(watch_evt) + + watch_events = _drain_gateway_watch_events(q) + + assert watch_events == [watch_evt] + assert q.qsize() == 1 + assert q.get_nowait() == async_evt + + +def test_gateway_builds_routable_source_from_enriched_event(): + from gateway.run import GatewayRunner + + runner = object.__new__(GatewayRunner) + evt = _make_async_evt() + runner._enrich_async_delegation_routing(evt) + src = runner._build_process_event_source(evt) + assert src is not None + assert src.platform.value == "telegram" + assert src.chat_id == "12345" + + +def test_gateway_cli_origin_event_left_unrouted(): + """An empty session_key (CLI origin) is left without routing fields.""" + from gateway.run import GatewayRunner + + runner = object.__new__(GatewayRunner) + evt = _make_async_evt(session_key="") + runner._enrich_async_delegation_routing(evt) + assert "platform" not in evt + + diff --git a/tools/async_delegation.py b/tools/async_delegation.py new file mode 100644 index 000000000..5975e9b13 --- /dev/null +++ b/tools/async_delegation.py @@ -0,0 +1,386 @@ +#!/usr/bin/env python3 +""" +Async (background) delegation registry. + +Backs ``delegate_task(background=true)``: the parent agent dispatches a +subagent that runs on a module-level daemon executor and returns a handle +immediately, so the user and the model can keep working while the child runs. + +When the child finishes, a completion event is pushed onto the SHARED +``process_registry.completion_queue`` with ``type="async_delegation"``. The +CLI (``cli.py`` process_loop) and gateway (``_run_process_watcher`` / +``completion_queue`` drain) already poll that queue while the agent is idle +and forge a fresh user/internal turn from each event. We deliberately reuse +that rail rather than reaching into a running agent loop: + + - completions surface as a NEW turn when the agent is idle, never spliced + between a tool result and an assistant message. That keeps strict + message-role alternation legal and the prompt cache intact (hard + invariant: never mutate past context). + - we inherit the queue's de-dup, crash-recovery checkpoint, and the + existing CLI + gateway drain wiring for free — no new drain loops in the + two largest files in the repo. + +The completion payload carries a RICH, self-contained task-source block (the +original goal, the context the parent supplied, toolsets, model, dispatch +time, status, and the full result summary). When the result re-enters the +conversation the parent may be deep in unrelated context and won't remember +why the subagent existed; the block lets it either use the result or +re-dispatch if the world has moved on. + +This module owns ONLY the async lifecycle. The actual child build + run is +delegated back to ``delegate_tool._run_single_child`` via an injected +runner, so all the credential leasing, heartbeat, timeout, and result-shaping +logic stays in one place. +""" + +from __future__ import annotations + +import logging +import threading +import time +import uuid +import weakref +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures.thread import _worker +from typing import Any, Callable, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +class _DaemonThreadPoolExecutor(ThreadPoolExecutor): + """ThreadPoolExecutor variant whose workers do not block process exit. + + Stdlib ``ThreadPoolExecutor`` workers are non-daemon. Background + delegation is explicitly best-effort detached work, so a long child should + be interruptible by ``/stop``/shutdown but must not keep a CLI process alive + after the user exits. + """ + + def _adjust_thread_count(self) -> None: + if self._idle_semaphore.acquire(timeout=0): + return + + def weakref_cb(_, q=self._work_queue): + q.put(None) + + num_threads = len(self._threads) + if num_threads < self._max_workers: + thread_name = "%s_%d" % (self._thread_name_prefix or self, num_threads) + t = threading.Thread( + name=thread_name, + target=_worker, + args=( + weakref.ref(self, weakref_cb), + self._work_queue, + self._initializer, + self._initargs, + ), + daemon=True, + ) + t.start() + self._threads.add(t) + + +# --------------------------------------------------------------------------- +# Module-level state +# --------------------------------------------------------------------------- +# A persistent daemon executor (NOT a `with ThreadPoolExecutor()` block, which +# would join on exit and defeat the whole point of async). Workers are daemon +# threads so a hard process exit doesn't hang on an in-flight child. +_executor: Optional[ThreadPoolExecutor] = None +_executor_lock = threading.Lock() +_executor_max_workers: int = 0 + +_records_lock = threading.Lock() +# delegation_id -> record dict. Kept for the lifetime of the run plus a short +# tail after completion so `list_async_delegations()` can show recent results. +_records: Dict[str, Dict[str, Any]] = {} + +_DEFAULT_MAX_ASYNC_CHILDREN = 3 +# How many completed records to retain for status queries before pruning. +_MAX_RETAINED_COMPLETED = 50 + + +def _get_executor(max_workers: int) -> ThreadPoolExecutor: + """Lazily create (or grow) the shared daemon executor. + + We never shrink — ThreadPoolExecutor can't resize — but if the configured + cap grows between calls we rebuild a larger pool. Existing in-flight + futures keep running on the old pool until it's garbage collected. + """ + global _executor, _executor_max_workers + with _executor_lock: + if _executor is None or max_workers > _executor_max_workers: + # Daemon threads: thread_name_prefix aids debugging in stack dumps. + _executor = _DaemonThreadPoolExecutor( + max_workers=max_workers, + thread_name_prefix="async-delegate", + ) + _executor_max_workers = max_workers + return _executor + + +def active_count() -> int: + """Number of async delegations currently running.""" + with _records_lock: + return sum(1 for r in _records.values() if r.get("status") == "running") + + +def _new_delegation_id() -> str: + return f"deleg_{uuid.uuid4().hex[:8]}" + + +def _prune_completed_locked() -> None: + """Drop the oldest completed records beyond the retention cap. + + Caller must hold ``_records_lock``. + """ + completed = [ + (rid, r) + for rid, r in _records.items() + if r.get("status") != "running" + ] + if len(completed) <= _MAX_RETAINED_COMPLETED: + return + # Oldest-first by completion time (fall back to dispatch time). + completed.sort(key=lambda kv: kv[1].get("completed_at") or kv[1].get("dispatched_at") or 0) + for rid, _ in completed[: len(completed) - _MAX_RETAINED_COMPLETED]: + _records.pop(rid, None) + + +def dispatch_async_delegation( + *, + goal: str, + context: Optional[str], + toolsets: Optional[List[str]], + role: str, + model: Optional[str], + session_key: str, + runner: Callable[[], Dict[str, Any]], + interrupt_fn: Optional[Callable[[], None]] = None, + max_async_children: int = _DEFAULT_MAX_ASYNC_CHILDREN, +) -> Dict[str, Any]: + """Spawn ``runner`` on the daemon executor and return a handle immediately. + + Parameters + ---------- + goal, context, toolsets, role, model + The dispatch-time task spec, captured verbatim for the rich + completion block. + session_key + The gateway session_key (from ``tools.approval.get_current_session_key``) + captured on the parent thread BEFORE dispatch, because the daemon + worker thread won't carry the contextvar. Used to route the + completion back to the originating session. + runner + Zero-arg callable that builds + runs the child and returns the same + result dict ``_run_single_child`` produces. Runs on the worker thread. + interrupt_fn + Optional callable to signal the child to stop (used on shutdown / + explicit cancel). + max_async_children + Concurrency cap. When at capacity the dispatch is REJECTED (the caller + should fall back to sync or tell the user) rather than queued, so a + runaway model can't pile up unbounded background work. + + Returns + ------- + dict + ``{"status": "dispatched", "delegation_id": ...}`` on success, or + ``{"status": "rejected", "error": ...}`` when at capacity. + """ + delegation_id = _new_delegation_id() + dispatched_at = time.time() + record: Dict[str, Any] = { + "delegation_id": delegation_id, + "goal": goal, + "context": context, + "toolsets": list(toolsets) if toolsets else None, + "role": role, + "model": model, + "session_key": session_key, + "status": "running", + "dispatched_at": dispatched_at, + "completed_at": None, + "interrupt_fn": interrupt_fn, + } + # Capacity check and record insert under ONE lock hold — checking + # active_count() separately would let two concurrent dispatches (e.g. + # from different gateway sessions) both pass the check and exceed the cap. + with _records_lock: + running = sum( + 1 for r in _records.values() if r.get("status") == "running" + ) + if running >= max_async_children: + return { + "status": "rejected", + "error": ( + f"Async delegation capacity reached ({max_async_children} " + f"running). Wait for one to finish (its result will re-enter " + f"the chat), or run this task synchronously " + f"(background=false). Raise delegation.max_async_children in " + f"config.yaml to allow more concurrent background subagents." + ), + } + _records[delegation_id] = record + + executor = _get_executor(max_async_children) + + def _worker() -> None: + result: Dict[str, Any] = {} + status = "error" + try: + result = runner() or {} + status = result.get("status") or "completed" + except Exception as exc: # noqa: BLE001 — must never crash the worker + logger.exception("Async delegation %s crashed", delegation_id) + result = { + "status": "error", + "summary": None, + "error": f"{type(exc).__name__}: {exc}", + "api_calls": 0, + "duration_seconds": round(time.time() - dispatched_at, 2), + } + status = "error" + finally: + _finalize(delegation_id, result, status) + + try: + executor.submit(_worker) + except Exception as exc: # pragma: no cover — pool submit failure is rare + with _records_lock: + _records.pop(delegation_id, None) + return { + "status": "rejected", + "error": f"Failed to schedule async delegation: {exc}", + } + + logger.info( + "Dispatched async delegation %s (session_key=%s): %s", + delegation_id, session_key or "", (goal or "")[:80], + ) + return {"status": "dispatched", "delegation_id": delegation_id} + + +def _finalize(delegation_id: str, result: Dict[str, Any], status: str) -> None: + """Mark a record complete and push the completion event onto the queue.""" + with _records_lock: + record = _records.get(delegation_id) + if record is None: + return + record["status"] = status + record["completed_at"] = time.time() + record["interrupt_fn"] = None # drop the closure; child is done + # Snapshot fields needed for the event while holding the lock. + event_record = dict(record) + _prune_completed_locked() + + _push_completion_event(event_record, result, status) + + +def _push_completion_event( + record: Dict[str, Any], result: Dict[str, Any], status: str +) -> None: + """Push a type='async_delegation' event onto the shared completion queue. + + Best-effort: a failure here must not crash the worker, but it WOULD mean a + silently-lost result, so we log loudly. + """ + try: + from tools.process_registry import process_registry + except Exception as exc: # pragma: no cover + logger.error( + "Async delegation %s finished but process_registry import failed; " + "result lost: %s", + record.get("delegation_id"), exc, + ) + return + + summary = result.get("summary") + error = result.get("error") + dispatched_at = record.get("dispatched_at") or time.time() + completed_at = record.get("completed_at") or time.time() + + evt = { + "type": "async_delegation", + "delegation_id": record.get("delegation_id"), + # session_key routes the completion back to the originating gateway + # session; empty string => CLI (single-session) path. + "session_key": record.get("session_key", ""), + "goal": record.get("goal", ""), + "context": record.get("context"), + "toolsets": record.get("toolsets"), + "role": record.get("role"), + "model": result.get("model") or record.get("model"), + "status": status, + "summary": summary, + "error": error, + "api_calls": result.get("api_calls", 0), + "duration_seconds": result.get( + "duration_seconds", round(completed_at - dispatched_at, 2) + ), + "dispatched_at": dispatched_at, + "completed_at": completed_at, + "exit_reason": result.get("exit_reason"), + } + try: + process_registry.completion_queue.put(evt) + except Exception as exc: # pragma: no cover + logger.error( + "Async delegation %s: failed to enqueue completion event; " + "result lost: %s", + record.get("delegation_id"), exc, + ) + + +def list_async_delegations() -> List[Dict[str, Any]]: + """Snapshot of async delegations (running + recently completed). + + Safe to call from any thread. Excludes the non-serialisable interrupt_fn. + """ + with _records_lock: + return [ + {k: v for k, v in r.items() if k != "interrupt_fn"} + for r in _records.values() + ] + + +def interrupt_all(reason: str = "shutdown") -> int: + """Signal every running async delegation to stop. Returns how many. + + Used on ``/stop`` and gateway shutdown so a dangling background subagent + can't keep burning tokens with no one listening. The child still emits a + completion event (status='interrupted') via the normal finalize path. + """ + count = 0 + with _records_lock: + targets = [ + r for r in _records.values() if r.get("status") == "running" + ] + for r in targets: + fn = r.get("interrupt_fn") + if callable(fn): + try: + fn() + count += 1 + except Exception as exc: + logger.debug( + "interrupt_all: %s interrupt failed: %s", + r.get("delegation_id"), exc, + ) + if count: + logger.info("Interrupted %d async delegation(s) (%s)", count, reason) + return count + + +def _reset_for_tests() -> None: + """Test-only: clear all state and tear down the executor.""" + global _executor, _executor_max_workers + with _executor_lock: + if _executor is not None: + _executor.shutdown(wait=False) + _executor = None + _executor_max_workers = 0 + with _records_lock: + _records.clear() diff --git a/tools/delegate_tool.py b/tools/delegate_tool.py index fb17c537b..7fc82c72f 100644 --- a/tools/delegate_tool.py +++ b/tools/delegate_tool.py @@ -397,6 +397,38 @@ def _get_max_concurrent_children() -> int: return _DEFAULT_MAX_CONCURRENT_CHILDREN +_DEFAULT_MAX_ASYNC_CHILDREN = 3 + + +def _get_max_async_children() -> int: + """Read delegation.max_async_children from config (floor 1, no ceiling). + + Caps how many background (``background=true``) subagents can run at once. + When at capacity, a new async dispatch is REJECTED (not queued) so a + runaway model can't pile up unbounded background work. Separate from + max_concurrent_children, which bounds a single synchronous batch. + """ + cfg = _load_config() + val = cfg.get("max_async_children") + if val is not None: + try: + return max(1, int(val)) + except (TypeError, ValueError): + logger.warning( + "delegation.max_async_children=%r is not a valid integer; " + "using default %d", + val, _DEFAULT_MAX_ASYNC_CHILDREN, + ) + return _DEFAULT_MAX_ASYNC_CHILDREN + env_val = os.getenv("DELEGATION_MAX_ASYNC_CHILDREN") + if env_val: + try: + return max(1, int(env_val)) + except (TypeError, ValueError): + return _DEFAULT_MAX_ASYNC_CHILDREN + return _DEFAULT_MAX_ASYNC_CHILDREN + + def _get_child_timeout() -> Optional[float]: """Read delegation.child_timeout_seconds from config. @@ -2018,6 +2050,7 @@ def delegate_task( acp_command: Optional[str] = None, acp_args: Optional[List[str]] = None, role: Optional[str] = None, + background: Optional[bool] = None, parent_agent=None, ) -> str: """ @@ -2049,6 +2082,19 @@ def delegate_task( # Normalise the top-level role once; per-task overrides re-normalise. top_role = _normalize_role(role) + # Async (background) delegation is single-task only in v1. A batch carries + # fan-out semantics (N handles, partial completion) that double the state + # model — reject early with a clear message rather than silently running + # the batch synchronously. + background = is_truthy_value(background, default=False) if background is not None else False + if background and tasks and isinstance(tasks, list) and len(tasks) > 1: + return tool_error( + "background=true is single-task only. Dispatch one background " + "subagent per delegate_task call (each returns its own handle and " + "re-enters the conversation independently), or run the batch " + "synchronously with background=false." + ) + # Depth limit — configurable via delegation.max_spawn_depth, # default 2 for parity with the original MAX_DEPTH constant. depth = getattr(parent_agent, "_delegate_depth", 0) @@ -2186,6 +2232,90 @@ def delegate_task( if n_tasks == 1: # Single task -- run directly (no thread pool overhead) _i, _t, child = children[0] + + # ----- Async / background dispatch ----- + # When background=true, hand the already-built child to the async + # delegation registry and return a handle immediately. The child runs + # on a daemon executor; its result re-enters the conversation as a + # fresh turn via process_registry.completion_queue (see + # tools/async_delegation.py). Batch async is intentionally NOT + # supported in v1 — the rejection is handled before we get here. + if background: + from tools.async_delegation import dispatch_async_delegation + from tools.approval import get_current_session_key + + # Capture the gateway routing key on THIS (parent) thread — the + # daemon worker won't carry the session contextvar. + _session_key = get_current_session_key(default="") + + # Detach the child from the parent's interrupt-propagation list. + # _build_child_agent registered it there (correct for sync + # children, which block the parent's turn), but a BACKGROUND + # child must survive parent-turn interrupts (Ctrl+C, mid-turn + # steering), cache evicts (release_clients), and session close + # (/new) — otherwise the detached subagent dies with whatever + # the parent was doing when it was dispatched. Its lifecycle is + # owned by the async-delegation registry (interrupt_fn below), + # and _run_single_child's finally block closes its resources + # when it finishes. + if hasattr(parent_agent, "_active_children"): + try: + _ac_lock = getattr(parent_agent, "_active_children_lock", None) + if _ac_lock: + with _ac_lock: + parent_agent._active_children.remove(child) + else: + parent_agent._active_children.remove(child) + except ValueError: + pass + + def _async_runner(_child=child, _goal=_t["goal"]): + return _run_single_child(0, _goal, _child, parent_agent) + + def _async_interrupt(_child=child): + try: + if hasattr(_child, "interrupt"): + _child.interrupt("Async delegation cancelled") + elif hasattr(_child, "_interrupt_requested"): + _child._interrupt_requested = True + except Exception: + pass + + dispatch = dispatch_async_delegation( + goal=_t["goal"], + context=_t.get("context"), + toolsets=_t.get("toolsets") or toolsets, + role=_normalize_role(_t.get("role") or top_role), + model=creds["model"], + session_key=_session_key, + runner=_async_runner, + interrupt_fn=_async_interrupt, + max_async_children=_get_max_async_children(), + ) + + if dispatch.get("status") == "dispatched": + return json.dumps( + { + "status": "dispatched", + "delegation_id": dispatch["delegation_id"], + "goal": _t["goal"], + "mode": "background", + "note": ( + "Subagent is running in the background. You and the " + "user can keep working; the full task source and " + "result will re-enter the conversation as a new " + "message when it finishes. Do not wait or poll — " + "just continue." + ), + }, + ensure_ascii=False, + ) + # Rejected (at capacity or schedule failure) — surface as a tool + # error so the model can fall back to synchronous delegation. + return tool_error( + dispatch.get("error", "Async delegation could not be scheduled.") + ) + result = _run_single_child(0, _t["goal"], child, parent_agent) results.append(result) else: @@ -2904,6 +3034,24 @@ DELEGATE_TASK_SCHEMA = { "enum": ["leaf", "orchestrator"], "description": "(rebuilt at get_definitions() time)", }, + "background": { + "type": "boolean", + "description": ( + "Run the subagent asynchronously in the BACKGROUND " + "instead of blocking this turn. When true, delegate_task " + "returns immediately with a delegation_id; you and the " + "user keep working while the subagent runs, and its full " + "result re-enters the conversation as a new message when " + "it finishes (similar to terminal background=true + " + "notify_on_complete). The re-injected message includes the " + "original goal/context so you can act on it even after " + "moving on. Single-task only — cannot be combined with the " + "'tasks' batch array. Use for long-running independent work " + "the user shouldn't have to wait on (research, builds, " + "multi-step investigations). Do NOT poll or wait after " + "dispatching — just continue; the result will come to you." + ), + }, "acp_command": { "type": "string", "description": ( @@ -2948,6 +3096,7 @@ registry.register( acp_command=args.get("acp_command"), acp_args=args.get("acp_args"), role=args.get("role"), + background=args.get("background"), parent_agent=kw.get("parent_agent"), ), check_fn=check_delegate_requirements, diff --git a/tools/process_registry.py b/tools/process_registry.py index 6c3d61ce5..e9f3276ff 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -1531,6 +1531,91 @@ class ProcessRegistry: process_registry = ProcessRegistry() +def _format_age(seconds: float) -> str: + """Human-friendly elapsed string ('18m', '2h3m', '45s').""" + try: + s = int(max(0, seconds)) + except (TypeError, ValueError): + return "?" + if s < 60: + return f"{s}s" + m, s = divmod(s, 60) + if m < 60: + return f"{m}m" if s == 0 else f"{m}m{s}s" + h, m = divmod(m, 60) + return f"{h}h" if m == 0 else f"{h}h{m}m" + + +def _format_async_delegation(evt: dict) -> str: + """Format an async-delegation completion into a self-contained re-injection. + + Carries the FULL original task source (goal, the context the parent + supplied, toolsets, role, model) plus dispatch time, status, and the + complete result summary. When this re-enters the conversation the agent + may be deep in unrelated context and won't remember why the subagent + existed, so the block is written to stand entirely on its own — enough to + use the result OR re-dispatch if the world has moved on. + """ + import time as _time + + deleg_id = evt.get("delegation_id", "unknown") + goal = evt.get("goal", "") or "" + context = evt.get("context") + toolsets = evt.get("toolsets") + role = evt.get("role") or "leaf" + model = evt.get("model") or "?" + status = evt.get("status") or "completed" + summary = evt.get("summary") + error = evt.get("error") + api_calls = evt.get("api_calls", 0) + duration = evt.get("duration_seconds", "?") + dispatched_at = evt.get("dispatched_at") + completed_at = evt.get("completed_at") or _time.time() + + age = "" + if isinstance(dispatched_at, (int, float)): + age = f" ({_format_age(completed_at - dispatched_at)} ago)" + + lines = [ + f"[ASYNC DELEGATION COMPLETE — {deleg_id}]", + "A background subagent you dispatched earlier has finished. You may " + "have moved on since dispatching it; the full task source is below so " + "you can act on the result or re-dispatch if things have changed.", + "", + ] + if isinstance(dispatched_at, (int, float)): + ts = _time.strftime("%Y-%m-%d %H:%M:%S", _time.localtime(dispatched_at)) + lines.append(f"Dispatched: {ts}{age}") + lines.append(f"Original goal: {goal}") + if context: + lines.append(f"Context you provided: {context}") + if toolsets: + lines.append(f"Toolsets: {', '.join(toolsets)}") + lines.append(f"Role: {role} Model: {model}") + lines.append(f"Status: {status} API calls: {api_calls} Duration: {duration}s") + lines.append("--- RESULT ---") + if status in ("completed", "success") and summary: + lines.append(summary) + elif status == "interrupted": + lines.append( + "The subagent was interrupted before completing" + + (f": {error}" if error else ".") + ) + if summary: + lines.append("Partial output:") + lines.append(summary) + else: + # error / timeout / failed + lines.append( + f"The subagent did not complete successfully (status={status})." + + (f"\n{error}" if error else "") + ) + if summary: + lines.append("Partial output:") + lines.append(summary) + return "\n".join(lines) + + def format_process_notification(evt: dict) -> "str | None": """Format a process notification event into a [IMPORTANT: ...] message. @@ -1559,6 +1644,9 @@ def format_process_notification(evt: dict) -> "str | None": text += "]" return text + if evt_type == "async_delegation": + return _format_async_delegation(evt) + _exit = evt.get("exit_code", "?") _out = evt.get("output", "") _reason = evt.get("completion_reason") or "exited" diff --git a/tui_gateway/server.py b/tui_gateway/server.py index 715ca8b48..4d12a1a41 100644 --- a/tui_gateway/server.py +++ b/tui_gateway/server.py @@ -5595,6 +5595,11 @@ def _notification_event_dedup_key(evt: dict) -> tuple: evt.get("message", ""), evt.get("suppressed", 0), ) + if evt_type == "async_delegation": + # Async-delegation completions have no process session_id; without + # this the fallthrough keys every one as ("", "async_delegation") + # and the second completion's status update is suppressed forever. + return (evt.get("delegation_id", ""), evt_type) return (evt_sid, evt_type)