feat(delegation): async background subagents via delegate_task(background=true) (#40946)

* feat(delegation): async background subagents via delegate_task(background=true)

delegate_task(background=true) dispatches a subagent that runs in the
background and returns a handle immediately, so the user and model keep
working while it runs. The full result — plus the original task source —
re-enters the conversation as a new turn when the subagent finishes,
riding the same completion-queue rail as terminal background processes.

- tools/async_delegation.py: daemon-executor registry, capacity cap,
  rich self-contained completion event pushed onto the shared
  process_registry.completion_queue (type='async_delegation').
- delegate_tool.py: background param + single-task dispatch branch;
  batch async rejected (v1).
- process_registry.py: format_process_notification renders the rich
  task-source block (goal/context/toolsets/model/status/result).
- gateway/run.py: dedicated _async_delegation_watcher drains + injects
  results into the originating session (idle + post-turn), session_key
  routing enrichment, shutdown interrupt of dangling delegations.
- config: delegation.max_async_children (default 3).

Reuses the existing idle-drain wiring rather than mutating a running
agent loop, preserving message-role alternation and prompt-cache
invariants. 13 targeted tests; CLI + gateway paths E2E-verified.

* test(delegation): make async non-blocking tests environment-independent

CI 'test (5)' flaked on a cold, 8-worker runner: the first
delegate_task(background=true) call measured 2.27s of one-time setup
(config load + child-agent construction + imports), tripping the
elapsed < 1.0 wall-clock assertion. That assertion was testing setup
overhead, not blocking.

Replace the wall-clock thresholds with the real invariant: dispatch
returns while the child is still gated (active_count == 1, completion
queue empty), which a synchronous impl could not do. Keep only a loose
4s sanity backstop well under the runner's 5s gate.

* fix(delegation): harden async background delegation

Follow-up review fixes:
- Detach background child from parent._active_children at dispatch —
  otherwise parent-turn interrupts (Ctrl+C, mid-turn steering), cache
  evicts (release_clients), and session close (/new) kill/close the
  detached subagent mid-run, defeating the point of background mode.
  Lifecycle is owned by the async registry's interrupt_fn.
- Make the capacity check atomic with the record insert (TOCTOU: two
  concurrent dispatches could both pass active_count() and exceed the cap).
- TUI dedup: key async_delegation events by delegation_id — the
  fallthrough keyed them all as ("", type), suppressing every completion
  after the first in the desktop/TUI status feed.
- CLI /stop now interrupts running background delegations and /agents
  lists them (they live outside the process registry and were invisible).
- Drop stray unbalanced ']' line from the re-injection block and the
  unused _ASYNC_DEFAULT import.

Tests: detach-at-dispatch + concurrent-capacity race added (15 total in
test_async_delegation.py); 137 delegate + 140 process-registry/notify/watch
+ 7 TUI dedup tests pass.

* fix(delegation): harden async background completion drains
This commit is contained in:
Teknium 2026-06-15 13:33:12 -07:00 committed by GitHub
parent 368fcf1ff0
commit c66ecf0bc3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 1268 additions and 15 deletions

5
cli.py
View file

@ -977,6 +977,11 @@ def _run_cleanup(*, notify_session_finalize: bool = True):
_cleanup_all_terminals()
except Exception:
pass
try:
from tools.async_delegation import interrupt_all as _interrupt_async_delegations
_interrupt_async_delegations(reason="CLI shutdown")
except Exception:
pass
try:
_cleanup_all_browsers()
except Exception:

View file

@ -1921,9 +1921,42 @@ def _format_gateway_process_notification(evt: dict) -> "str | None":
text += "]"
return text
if evt_type == "async_delegation":
# Reuse the shared rich formatter (self-contained task-source block).
from tools.process_registry import format_process_notification
return format_process_notification(evt)
return None
def _drain_gateway_watch_events(completion_queue) -> "list[dict]":
"""Drain gateway-owned watch events without spinning on requeued events.
Watch events are handled by the post-turn gateway drain. Process
completions are owned by their per-process watcher task, and async
delegation completions are owned by ``_async_delegation_watcher``.
Requeueing async events inside ``while not queue.empty()`` would make the
loop non-terminating, so detach the current batch first, then requeue any
events this drain does not own after the queue is empty.
"""
watch_events: list[dict] = []
requeue: list[dict] = []
while not completion_queue.empty():
try:
evt = completion_queue.get_nowait()
except Exception:
break
evt_type = evt.get("type", "completion")
if evt_type in {"watch_match", "watch_disabled"}:
watch_events.append(evt)
elif evt_type == "async_delegation":
requeue.append(evt)
# else: process completion events are handled by the watcher task
for evt in requeue:
completion_queue.put(evt)
return watch_events
# Module-level weak reference to the active GatewayRunner instance.
# Used by tools (e.g. send_message) that need to route through a live
# adapter for plugin platforms. Set in GatewayRunner.__init__().
@ -5353,6 +5386,12 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
# turn so the agent kicks off the new chat.
asyncio.create_task(self._handoff_watcher())
# Start background async-delegation watcher — drains completion events
# from delegate_task(background=true) subagents and injects each
# result back into its originating session as a new turn, covering the
# idle case where the subagent finishes with no agent turn running.
asyncio.create_task(self._async_delegation_watcher())
logger.info("Press Ctrl+C to stop")
return True
@ -5989,6 +6028,16 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
)
except Exception as _e:
logger.debug("process_registry.kill_all (%s) error: %s", phase, _e)
try:
from tools.async_delegation import interrupt_all as _interrupt_async
_async_n = _interrupt_async(reason=f"gateway shutdown ({phase})")
if _async_n:
logger.info(
"Shutdown (%s): interrupted %d background delegation(s)",
phase, _async_n,
)
except Exception as _e:
logger.debug("async interrupt_all (%s) error: %s", phase, _e)
try:
from tools.terminal_tool import cleanup_all_environments
cleanup_all_environments()
@ -8995,18 +9044,17 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
logger.error("Process watcher setup error: %s", e)
# Drain watch pattern notifications that arrived during the agent run.
# Watch events and completions share the same queue; completions are
# already handled by the per-process watcher task above, so we only
# inject watch-type events here.
# Watch events and completions share the same queue; process
# completions are already handled by the per-process watcher task
# above, so we only inject watch-type events here.
#
# Async-delegation completions ALSO ride this shared queue but are
# owned by the dedicated _async_delegation_watcher (started at
# boot), which covers both the idle and post-turn cases with a
# single consumer — so we leave them on the queue here.
try:
from tools.process_registry import process_registry as _pr
_watch_events = []
while not _pr.completion_queue.empty():
evt = _pr.completion_queue.get_nowait()
evt_type = evt.get("type", "completion")
if evt_type in {"watch_match", "watch_disabled"}:
_watch_events.append(evt)
# else: completion events are handled by the watcher task
_watch_events = _drain_gateway_watch_events(_pr.completion_queue)
for evt in _watch_events:
synth_text = _format_gateway_process_notification(evt)
if synth_text:
@ -12265,6 +12313,74 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
except Exception as e:
logger.error("Watch notification injection error: %s", e)
def _enrich_async_delegation_routing(self, evt: dict) -> None:
"""Fill platform/chat_id/thread_id/chat_type on an async-delegation event.
Async-delegation completion events only carry ``session_key`` (the
daemon worker has no access to the per-message routing metadata the
terminal background watcher captures at spawn time). Parse the
session_key into the routing fields ``_build_process_event_source``
expects. Best-effort: a CLI-origin event (empty session_key) is left
as-is and simply won't route on the gateway.
"""
if evt.get("platform"):
return # already enriched
parsed = _parse_session_key(evt.get("session_key", "") or "")
if not parsed:
return
evt["platform"] = parsed.get("platform", "")
evt["chat_type"] = parsed.get("chat_type", "")
evt["chat_id"] = parsed.get("chat_id", "")
if parsed.get("thread_id"):
evt["thread_id"] = parsed["thread_id"]
async def _async_delegation_watcher(self, interval: float = 2.0) -> None:
"""Drain async-delegation completions and inject them as new turns.
Background subagents (``delegate_task(background=true)``) run on the
async-delegation daemon executor they have no per-process watcher
task, so their completion events would only be seen by the post-turn
queue drain. This watcher covers the IDLE case: when a background
subagent finishes while no agent turn is running, its result still
re-enters the originating session promptly.
Mirrors the CLI's idle ``process_loop`` drain. Stays silent when the
queue has nothing for us; ignores non-async event types (those are
handled by ``_run_process_watcher`` / the post-turn drain).
"""
await asyncio.sleep(3) # let platforms finish connecting
from tools.process_registry import process_registry as _pr
while self._running:
try:
# Peek the queue for async-delegation events. We must NOT
# consume watch/completion events here (other drains own them),
# so requeue anything that isn't ours.
requeue = []
async_events = []
while not _pr.completion_queue.empty():
try:
evt = _pr.completion_queue.get_nowait()
except Exception:
break
if evt.get("type") == "async_delegation":
async_events.append(evt)
else:
requeue.append(evt)
for evt in requeue:
_pr.completion_queue.put(evt)
for evt in async_events:
self._enrich_async_delegation_routing(evt)
synth_text = _format_gateway_process_notification(evt)
if not synth_text:
continue
try:
await self._inject_watch_notification(synth_text, evt)
except Exception as e:
logger.error("Async delegation injection error: %s", e)
except Exception as e:
logger.debug("Async delegation watcher error: %s", e)
await asyncio.sleep(interval)
async def _run_process_watcher(self, watcher: dict) -> None:
"""
Periodically check a background process and push updates to the user.

View file

@ -225,7 +225,8 @@ class CLICommandsMixin:
print(" Usage: /snapshot [list|create [label]|restore <id>|prune [N]]")
def _handle_stop_command(self):
"""Handle /stop — kill all running background processes.
"""Handle /stop — kill all running background processes and
background (async) delegations.
Inspired by OpenAI Codex's separation of interrupt (stop current turn)
from /stop (clean up background processes). See openai/codex#14602.
@ -235,13 +236,26 @@ class CLICommandsMixin:
processes = process_registry.list_sessions()
running = [p for p in processes if p.get("status") == "running"]
if not running:
# Background subagents dispatched via delegate_task(background=true)
# live in their own registry, not the process registry.
try:
from tools.async_delegation import active_count, interrupt_all
n_async = active_count()
except Exception:
n_async = 0
interrupt_all = None
if not running and not n_async:
print(" No running background processes.")
return
print(f" Stopping {len(running)} background process(es)...")
killed = process_registry.kill_all()
print(f" ✅ Stopped {killed} process(es).")
if running:
print(f" Stopping {len(running)} background process(es)...")
killed = process_registry.kill_all()
print(f" ✅ Stopped {killed} process(es).")
if n_async and interrupt_all is not None:
stopped = interrupt_all(reason="/stop")
print(f" ✅ Interrupted {stopped} background delegation(s).")
def _handle_agents_command(self):
"""Handle /agents — show background processes and agent status."""
@ -261,6 +275,22 @@ class CLICommandsMixin:
if finished:
_cprint(f" Recently finished: {len(finished)}")
# Background (async) delegations — delegate_task(background=true)
try:
from tools.async_delegation import list_async_delegations
delegations = list_async_delegations()
except Exception:
delegations = []
running_d = [d for d in delegations if d.get("status") == "running"]
if delegations:
_cprint(f" Background delegations: {len(running_d)} running")
for d in delegations:
goal = (d.get("goal") or "")[:60]
_cprint(
f" {d.get('delegation_id', '?')} · "
f"{d.get('status', '?')} · {goal}"
)
agent_running = getattr(self, "_agent_running", False)
_cprint(f" Agent: {'running' if agent_running else 'idle'}")

View file

@ -1775,6 +1775,7 @@ DEFAULT_CONFIG = {
"reasoning_effort": "", # reasoning effort for subagents: "xhigh", "high", "medium",
# "low", "minimal", "none" (empty = inherit parent's level)
"max_concurrent_children": 3, # max parallel children per batch; floor of 1 enforced, no ceiling
"max_async_children": 3, # max concurrent background (background=true) subagents; new dispatches rejected at capacity
# Orchestrator role controls (see tools/delegate_tool.py:_get_max_spawn_depth
# and _get_orchestrator_enabled). Floored at 1, no upper ceiling —
# raise deliberately, each level multiplies API cost.

View file

@ -0,0 +1,473 @@
"""Tests for async (background) delegation — tools/async_delegation.py.
Covers the dispatch handle, non-blocking behavior, completion-event delivery
onto the shared process_registry.completion_queue, the rich re-injection block
formatting, capacity rejection, and crash handling.
"""
import queue
import threading
import time
import pytest
from tools import async_delegation as ad
from tools.process_registry import process_registry, format_process_notification
@pytest.fixture(autouse=True)
def _clean_state():
ad._reset_for_tests()
while not process_registry.completion_queue.empty():
process_registry.completion_queue.get_nowait()
yield
ad._reset_for_tests()
while not process_registry.completion_queue.empty():
process_registry.completion_queue.get_nowait()
def _drain_one(timeout=5.0):
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if not process_registry.completion_queue.empty():
return process_registry.completion_queue.get_nowait()
time.sleep(0.02)
return None
def test_dispatch_returns_immediately_without_blocking():
gate = threading.Event()
def runner():
gate.wait(timeout=5)
return {"status": "completed", "summary": "done", "api_calls": 1,
"duration_seconds": 0.1, "model": "m"}
t0 = time.monotonic()
res = ad.dispatch_async_delegation(
goal="g", context=None, toolsets=None, role="leaf", model="m",
session_key="", runner=runner, max_async_children=3,
)
elapsed = time.monotonic() - t0
assert res["status"] == "dispatched"
assert res["delegation_id"].startswith("deleg_")
# Non-blocking invariant: dispatch returned while the runner is still
# gated (active), so it cannot have waited on the gate. The active_count
# check is the environment-independent proof; the generous wall-clock
# bound is a loose sanity backstop, not the primary assertion (a loaded
# CI runner can be slow but never anywhere near the runner's 5s gate).
assert ad.active_count() == 1
assert elapsed < 4.0, f"dispatch blocked {elapsed:.2f}s (gate is 5s)"
gate.set()
def test_async_executor_workers_are_daemon_threads():
gate = threading.Event()
def runner():
gate.wait(timeout=5)
return {"status": "completed", "summary": "done"}
res = ad.dispatch_async_delegation(
goal="daemon check", context=None, toolsets=None, role="leaf", model="m",
session_key="", runner=runner, max_async_children=1,
)
assert res["status"] == "dispatched"
deadline = time.monotonic() + 2
worker = None
while time.monotonic() < deadline:
worker = next(
(t for t in threading.enumerate() if t.name.startswith("async-delegate")),
None,
)
if worker is not None:
break
time.sleep(0.02)
assert worker is not None
assert worker.daemon is True
gate.set()
assert _drain_one() is not None
def test_completion_event_lands_on_shared_queue_with_session_key():
def runner():
return {"status": "completed", "summary": "the result",
"api_calls": 3, "duration_seconds": 2.0, "model": "test-model"}
res = ad.dispatch_async_delegation(
goal="compute X", context="some context", toolsets=["web", "file"],
role="leaf", model="test-model", session_key="agent:main:cli:dm:local",
runner=runner, max_async_children=3,
)
assert res["status"] == "dispatched"
evt = _drain_one()
assert evt is not None
assert evt["type"] == "async_delegation"
assert evt["summary"] == "the result"
assert evt["session_key"] == "agent:main:cli:dm:local"
assert evt["delegation_id"] == res["delegation_id"]
def test_rich_reinjection_block_is_self_contained():
def runner():
return {"status": "completed", "summary": "The answer is 42.",
"api_calls": 7, "duration_seconds": 3.5, "model": "test-model"}
ad.dispatch_async_delegation(
goal="Compute the meaning of life",
context="User is a philosopher. Respond tersely.",
toolsets=["web"], role="leaf", model="test-model",
session_key="", runner=runner, max_async_children=3,
)
evt = _drain_one()
assert evt is not None
text = format_process_notification(evt)
assert text is not None
for needle in [
"ASYNC DELEGATION COMPLETE",
"Compute the meaning of life",
"User is a philosopher",
"Toolsets: web",
"The answer is 42.",
"Status: completed",
"API calls: 7",
]:
assert needle in text, f"missing {needle!r}"
def test_dispatch_rejected_at_capacity():
ev = threading.Event()
def blocker():
ev.wait(timeout=5)
return {"status": "completed", "summary": "x"}
for i in range(2):
r = ad.dispatch_async_delegation(
goal=f"task{i}", context=None, toolsets=None, role="leaf",
model="m", session_key="", runner=blocker, max_async_children=2,
)
assert r["status"] == "dispatched"
r3 = ad.dispatch_async_delegation(
goal="task3", context=None, toolsets=None, role="leaf", model="m",
session_key="", runner=blocker, max_async_children=2,
)
assert r3["status"] == "rejected"
assert "capacity reached" in r3["error"]
ev.set()
def test_crashed_runner_produces_error_completion():
def boom():
raise RuntimeError("subagent exploded")
r = ad.dispatch_async_delegation(
goal="risky", context=None, toolsets=None, role="leaf", model="m",
session_key="", runner=boom, max_async_children=3,
)
assert r["status"] == "dispatched"
evt = _drain_one()
assert evt is not None
assert evt["status"] == "error"
text = format_process_notification(evt)
assert text is not None
assert "did not complete successfully" in text
assert "subagent exploded" in text
def test_interrupt_all_signals_running_children():
ev = threading.Event()
interrupted = {"count": 0}
def blocker():
ev.wait(timeout=5)
return {"status": "interrupted", "summary": None,
"error": "cancelled"}
def interrupt_fn():
interrupted["count"] += 1
ev.set()
ad.dispatch_async_delegation(
goal="long task", context=None, toolsets=None, role="leaf",
model="m", session_key="", runner=blocker,
interrupt_fn=interrupt_fn, max_async_children=3,
)
n = ad.interrupt_all(reason="test")
assert n == 1
assert interrupted["count"] == 1
# child still emits a completion event after interrupt
evt = _drain_one()
assert evt is not None
assert evt["status"] == "interrupted"
def test_completed_records_pruned_to_cap():
# Run more than the retention cap quickly; ensure list doesn't grow forever.
for i in range(ad._MAX_RETAINED_COMPLETED + 10):
ad.dispatch_async_delegation(
goal=f"t{i}", context=None, toolsets=None, role="leaf", model="m",
session_key="", runner=lambda: {"status": "completed", "summary": "ok"},
max_async_children=ad._MAX_RETAINED_COMPLETED + 20,
)
# let workers finish
deadline = time.monotonic() + 10
while time.monotonic() < deadline and ad.active_count() > 0:
time.sleep(0.05)
assert len(ad.list_async_delegations()) <= ad._MAX_RETAINED_COMPLETED
# ---------------------------------------------------------------------------
# Integration: delegate_task(background=True) routing
# ---------------------------------------------------------------------------
def test_delegate_task_background_routes_async_and_does_not_block(monkeypatch):
"""delegate_task(background=True) returns a handle without running the
child synchronously, and the child completes on the background thread."""
from unittest.mock import MagicMock, patch
import tools.delegate_tool as dt
parent = MagicMock()
parent._delegate_depth = 0
parent.session_id = "sess"
parent._interrupt_requested = False
fake_child = MagicMock()
fake_child._delegate_role = "leaf"
fake_child._subagent_id = "s1"
gate = threading.Event()
def slow_child(task_index, goal, child=None, parent_agent=None, **kw):
gate.wait(timeout=5) # a sync impl would hang delegate_task here
return {
"task_index": 0, "status": "completed", "summary": f"done: {goal}",
"api_calls": 1, "duration_seconds": 0.1, "model": "m",
"exit_reason": "completed",
}
creds = {
"model": "m", "provider": None, "base_url": None, "api_key": None,
"api_mode": None, "command": None, "args": None,
}
with patch.object(dt, "_build_child_agent", return_value=fake_child), \
patch.object(dt, "_run_single_child", side_effect=slow_child), \
patch.object(dt, "_resolve_delegation_credentials", return_value=creds):
out = dt.delegate_task(
goal="the real task", context="ctx", toolsets=["web"],
background=True, parent_agent=parent,
)
import json
parsed = json.loads(out)
assert parsed["status"] == "dispatched"
assert parsed["mode"] == "background"
assert parsed["delegation_id"].startswith("deleg_")
# The real non-blocking invariant (environment-independent — no wall-clock
# threshold that flakes on a loaded CI runner): delegate_task returned
# while the child is STILL blocked on the closed gate, so no completion
# event exists yet. A synchronous impl could not have returned here — it
# would still be inside slow_child waiting on the gate.
assert process_registry.completion_queue.empty()
assert ad.active_count() == 1 # child running in background, not finished
gate.set()
evt = _drain_one()
assert evt is not None
assert evt["type"] == "async_delegation"
assert evt["summary"] == "done: the real task"
text = format_process_notification(evt)
assert text is not None
assert "the real task" in text and "ctx" in text
def test_delegate_task_background_rejects_batch(monkeypatch):
"""background=True with a multi-item tasks batch is rejected (v1: single-task only)."""
import json
from unittest.mock import MagicMock
import tools.delegate_tool as dt
parent = MagicMock()
parent._delegate_depth = 0
parent.session_id = "sess"
out = dt.delegate_task(
tasks=[{"goal": "a"}, {"goal": "b"}],
background=True,
parent_agent=parent,
)
parsed = json.loads(out)
assert "error" in parsed
assert "single-task only" in parsed["error"]
def test_delegate_task_background_detaches_child_from_parent(monkeypatch):
"""A background child must NOT remain in parent._active_children —
otherwise parent-turn interrupts / cache evicts / session close would
kill the detached subagent mid-run."""
from unittest.mock import MagicMock, patch
import tools.delegate_tool as dt
parent = MagicMock()
parent._delegate_depth = 0
parent.session_id = "sess"
parent._active_children = []
parent._active_children_lock = threading.Lock()
fake_child = MagicMock()
fake_child._delegate_role = "leaf"
fake_child._subagent_id = "s1"
gate = threading.Event()
def slow_child(task_index, goal, child=None, parent_agent=None, **kw):
gate.wait(timeout=5)
return {"task_index": 0, "status": "completed", "summary": "ok"}
def build_and_register(**kw):
# Mirror what the real _build_child_agent does: register the child
# for interrupt propagation.
parent._active_children.append(fake_child)
return fake_child
creds = {
"model": "m", "provider": None, "base_url": None, "api_key": None,
"api_mode": None, "command": None, "args": None,
}
with patch.object(dt, "_build_child_agent", side_effect=build_and_register), \
patch.object(dt, "_run_single_child", side_effect=slow_child), \
patch.object(dt, "_resolve_delegation_credentials", return_value=creds):
out = dt.delegate_task(goal="bg task", background=True, parent_agent=parent)
import json
assert json.loads(out)["status"] == "dispatched"
# Child detached immediately at dispatch, while it is still running.
assert fake_child not in parent._active_children
gate.set()
assert _drain_one() is not None
def test_concurrent_dispatch_respects_capacity():
"""Two threads racing dispatch with cap=1 must yield exactly one accept
(capacity check and record insert are atomic under the records lock)."""
gate = threading.Event()
def blocker():
gate.wait(timeout=5)
return {"status": "completed", "summary": "x"}
results = []
barrier = threading.Barrier(2)
def racer():
barrier.wait(timeout=5)
results.append(
ad.dispatch_async_delegation(
goal="race", context=None, toolsets=None, role="leaf",
model="m", session_key="", runner=blocker,
max_async_children=1,
)
)
threads = [threading.Thread(target=racer) for _ in range(2)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=10)
statuses = sorted(r["status"] for r in results)
assert statuses == ["dispatched", "rejected"]
gate.set()
# ---------------------------------------------------------------------------
# Gateway routing: session_key -> platform/chat_id, rich formatting, injection
# ---------------------------------------------------------------------------
def _make_async_evt(**over):
evt = {
"type": "async_delegation",
"delegation_id": "deleg_x1",
"session_key": "agent:main:telegram:dm:12345:678",
"goal": "Investigate flaky test",
"context": "repo /tmp/p",
"toolsets": ["terminal"],
"role": "leaf",
"model": "m",
"status": "completed",
"summary": "Found the bug in test_foo",
"api_calls": 4,
"duration_seconds": 12.0,
"dispatched_at": 1000.0,
"completed_at": 1012.0,
}
evt.update(over)
return evt
def test_gateway_enriches_routing_from_session_key():
from gateway.run import GatewayRunner
runner = object.__new__(GatewayRunner)
evt = _make_async_evt()
runner._enrich_async_delegation_routing(evt)
assert evt["platform"] == "telegram"
assert evt["chat_id"] == "12345"
assert evt["thread_id"] == "678"
def test_gateway_formatter_renders_async_block():
from gateway.run import _format_gateway_process_notification
txt = _format_gateway_process_notification(_make_async_evt())
assert txt is not None
assert "ASYNC DELEGATION COMPLETE" in txt
assert "Found the bug in test_foo" in txt
assert "Investigate flaky test" in txt
def test_gateway_watch_drain_requeues_async_without_looping():
from gateway.run import _drain_gateway_watch_events
q = queue.Queue()
async_evt = _make_async_evt()
watch_evt = {
"type": "watch_match",
"session_id": "proc_1",
"command": "pytest",
"pattern": "READY",
"output": "READY",
}
q.put(async_evt)
q.put(watch_evt)
watch_events = _drain_gateway_watch_events(q)
assert watch_events == [watch_evt]
assert q.qsize() == 1
assert q.get_nowait() == async_evt
def test_gateway_builds_routable_source_from_enriched_event():
from gateway.run import GatewayRunner
runner = object.__new__(GatewayRunner)
evt = _make_async_evt()
runner._enrich_async_delegation_routing(evt)
src = runner._build_process_event_source(evt)
assert src is not None
assert src.platform.value == "telegram"
assert src.chat_id == "12345"
def test_gateway_cli_origin_event_left_unrouted():
"""An empty session_key (CLI origin) is left without routing fields."""
from gateway.run import GatewayRunner
runner = object.__new__(GatewayRunner)
evt = _make_async_evt(session_key="")
runner._enrich_async_delegation_routing(evt)
assert "platform" not in evt

386
tools/async_delegation.py Normal file
View file

@ -0,0 +1,386 @@
#!/usr/bin/env python3
"""
Async (background) delegation registry.
Backs ``delegate_task(background=true)``: the parent agent dispatches a
subagent that runs on a module-level daemon executor and returns a handle
immediately, so the user and the model can keep working while the child runs.
When the child finishes, a completion event is pushed onto the SHARED
``process_registry.completion_queue`` with ``type="async_delegation"``. The
CLI (``cli.py`` process_loop) and gateway (``_run_process_watcher`` /
``completion_queue`` drain) already poll that queue while the agent is idle
and forge a fresh user/internal turn from each event. We deliberately reuse
that rail rather than reaching into a running agent loop:
- completions surface as a NEW turn when the agent is idle, never spliced
between a tool result and an assistant message. That keeps strict
message-role alternation legal and the prompt cache intact (hard
invariant: never mutate past context).
- we inherit the queue's de-dup, crash-recovery checkpoint, and the
existing CLI + gateway drain wiring for free no new drain loops in the
two largest files in the repo.
The completion payload carries a RICH, self-contained task-source block (the
original goal, the context the parent supplied, toolsets, model, dispatch
time, status, and the full result summary). When the result re-enters the
conversation the parent may be deep in unrelated context and won't remember
why the subagent existed; the block lets it either use the result or
re-dispatch if the world has moved on.
This module owns ONLY the async lifecycle. The actual child build + run is
delegated back to ``delegate_tool._run_single_child`` via an injected
runner, so all the credential leasing, heartbeat, timeout, and result-shaping
logic stays in one place.
"""
from __future__ import annotations
import logging
import threading
import time
import uuid
import weakref
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures.thread import _worker
from typing import Any, Callable, Dict, List, Optional
logger = logging.getLogger(__name__)
class _DaemonThreadPoolExecutor(ThreadPoolExecutor):
"""ThreadPoolExecutor variant whose workers do not block process exit.
Stdlib ``ThreadPoolExecutor`` workers are non-daemon. Background
delegation is explicitly best-effort detached work, so a long child should
be interruptible by ``/stop``/shutdown but must not keep a CLI process alive
after the user exits.
"""
def _adjust_thread_count(self) -> None:
if self._idle_semaphore.acquire(timeout=0):
return
def weakref_cb(_, q=self._work_queue):
q.put(None)
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = "%s_%d" % (self._thread_name_prefix or self, num_threads)
t = threading.Thread(
name=thread_name,
target=_worker,
args=(
weakref.ref(self, weakref_cb),
self._work_queue,
self._initializer,
self._initargs,
),
daemon=True,
)
t.start()
self._threads.add(t)
# ---------------------------------------------------------------------------
# Module-level state
# ---------------------------------------------------------------------------
# A persistent daemon executor (NOT a `with ThreadPoolExecutor()` block, which
# would join on exit and defeat the whole point of async). Workers are daemon
# threads so a hard process exit doesn't hang on an in-flight child.
_executor: Optional[ThreadPoolExecutor] = None
_executor_lock = threading.Lock()
_executor_max_workers: int = 0
_records_lock = threading.Lock()
# delegation_id -> record dict. Kept for the lifetime of the run plus a short
# tail after completion so `list_async_delegations()` can show recent results.
_records: Dict[str, Dict[str, Any]] = {}
_DEFAULT_MAX_ASYNC_CHILDREN = 3
# How many completed records to retain for status queries before pruning.
_MAX_RETAINED_COMPLETED = 50
def _get_executor(max_workers: int) -> ThreadPoolExecutor:
"""Lazily create (or grow) the shared daemon executor.
We never shrink ThreadPoolExecutor can't resize — but if the configured
cap grows between calls we rebuild a larger pool. Existing in-flight
futures keep running on the old pool until it's garbage collected.
"""
global _executor, _executor_max_workers
with _executor_lock:
if _executor is None or max_workers > _executor_max_workers:
# Daemon threads: thread_name_prefix aids debugging in stack dumps.
_executor = _DaemonThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix="async-delegate",
)
_executor_max_workers = max_workers
return _executor
def active_count() -> int:
"""Number of async delegations currently running."""
with _records_lock:
return sum(1 for r in _records.values() if r.get("status") == "running")
def _new_delegation_id() -> str:
return f"deleg_{uuid.uuid4().hex[:8]}"
def _prune_completed_locked() -> None:
"""Drop the oldest completed records beyond the retention cap.
Caller must hold ``_records_lock``.
"""
completed = [
(rid, r)
for rid, r in _records.items()
if r.get("status") != "running"
]
if len(completed) <= _MAX_RETAINED_COMPLETED:
return
# Oldest-first by completion time (fall back to dispatch time).
completed.sort(key=lambda kv: kv[1].get("completed_at") or kv[1].get("dispatched_at") or 0)
for rid, _ in completed[: len(completed) - _MAX_RETAINED_COMPLETED]:
_records.pop(rid, None)
def dispatch_async_delegation(
*,
goal: str,
context: Optional[str],
toolsets: Optional[List[str]],
role: str,
model: Optional[str],
session_key: str,
runner: Callable[[], Dict[str, Any]],
interrupt_fn: Optional[Callable[[], None]] = None,
max_async_children: int = _DEFAULT_MAX_ASYNC_CHILDREN,
) -> Dict[str, Any]:
"""Spawn ``runner`` on the daemon executor and return a handle immediately.
Parameters
----------
goal, context, toolsets, role, model
The dispatch-time task spec, captured verbatim for the rich
completion block.
session_key
The gateway session_key (from ``tools.approval.get_current_session_key``)
captured on the parent thread BEFORE dispatch, because the daemon
worker thread won't carry the contextvar. Used to route the
completion back to the originating session.
runner
Zero-arg callable that builds + runs the child and returns the same
result dict ``_run_single_child`` produces. Runs on the worker thread.
interrupt_fn
Optional callable to signal the child to stop (used on shutdown /
explicit cancel).
max_async_children
Concurrency cap. When at capacity the dispatch is REJECTED (the caller
should fall back to sync or tell the user) rather than queued, so a
runaway model can't pile up unbounded background work.
Returns
-------
dict
``{"status": "dispatched", "delegation_id": ...}`` on success, or
``{"status": "rejected", "error": ...}`` when at capacity.
"""
delegation_id = _new_delegation_id()
dispatched_at = time.time()
record: Dict[str, Any] = {
"delegation_id": delegation_id,
"goal": goal,
"context": context,
"toolsets": list(toolsets) if toolsets else None,
"role": role,
"model": model,
"session_key": session_key,
"status": "running",
"dispatched_at": dispatched_at,
"completed_at": None,
"interrupt_fn": interrupt_fn,
}
# Capacity check and record insert under ONE lock hold — checking
# active_count() separately would let two concurrent dispatches (e.g.
# from different gateway sessions) both pass the check and exceed the cap.
with _records_lock:
running = sum(
1 for r in _records.values() if r.get("status") == "running"
)
if running >= max_async_children:
return {
"status": "rejected",
"error": (
f"Async delegation capacity reached ({max_async_children} "
f"running). Wait for one to finish (its result will re-enter "
f"the chat), or run this task synchronously "
f"(background=false). Raise delegation.max_async_children in "
f"config.yaml to allow more concurrent background subagents."
),
}
_records[delegation_id] = record
executor = _get_executor(max_async_children)
def _worker() -> None:
result: Dict[str, Any] = {}
status = "error"
try:
result = runner() or {}
status = result.get("status") or "completed"
except Exception as exc: # noqa: BLE001 — must never crash the worker
logger.exception("Async delegation %s crashed", delegation_id)
result = {
"status": "error",
"summary": None,
"error": f"{type(exc).__name__}: {exc}",
"api_calls": 0,
"duration_seconds": round(time.time() - dispatched_at, 2),
}
status = "error"
finally:
_finalize(delegation_id, result, status)
try:
executor.submit(_worker)
except Exception as exc: # pragma: no cover — pool submit failure is rare
with _records_lock:
_records.pop(delegation_id, None)
return {
"status": "rejected",
"error": f"Failed to schedule async delegation: {exc}",
}
logger.info(
"Dispatched async delegation %s (session_key=%s): %s",
delegation_id, session_key or "<cli>", (goal or "")[:80],
)
return {"status": "dispatched", "delegation_id": delegation_id}
def _finalize(delegation_id: str, result: Dict[str, Any], status: str) -> None:
"""Mark a record complete and push the completion event onto the queue."""
with _records_lock:
record = _records.get(delegation_id)
if record is None:
return
record["status"] = status
record["completed_at"] = time.time()
record["interrupt_fn"] = None # drop the closure; child is done
# Snapshot fields needed for the event while holding the lock.
event_record = dict(record)
_prune_completed_locked()
_push_completion_event(event_record, result, status)
def _push_completion_event(
record: Dict[str, Any], result: Dict[str, Any], status: str
) -> None:
"""Push a type='async_delegation' event onto the shared completion queue.
Best-effort: a failure here must not crash the worker, but it WOULD mean a
silently-lost result, so we log loudly.
"""
try:
from tools.process_registry import process_registry
except Exception as exc: # pragma: no cover
logger.error(
"Async delegation %s finished but process_registry import failed; "
"result lost: %s",
record.get("delegation_id"), exc,
)
return
summary = result.get("summary")
error = result.get("error")
dispatched_at = record.get("dispatched_at") or time.time()
completed_at = record.get("completed_at") or time.time()
evt = {
"type": "async_delegation",
"delegation_id": record.get("delegation_id"),
# session_key routes the completion back to the originating gateway
# session; empty string => CLI (single-session) path.
"session_key": record.get("session_key", ""),
"goal": record.get("goal", ""),
"context": record.get("context"),
"toolsets": record.get("toolsets"),
"role": record.get("role"),
"model": result.get("model") or record.get("model"),
"status": status,
"summary": summary,
"error": error,
"api_calls": result.get("api_calls", 0),
"duration_seconds": result.get(
"duration_seconds", round(completed_at - dispatched_at, 2)
),
"dispatched_at": dispatched_at,
"completed_at": completed_at,
"exit_reason": result.get("exit_reason"),
}
try:
process_registry.completion_queue.put(evt)
except Exception as exc: # pragma: no cover
logger.error(
"Async delegation %s: failed to enqueue completion event; "
"result lost: %s",
record.get("delegation_id"), exc,
)
def list_async_delegations() -> List[Dict[str, Any]]:
"""Snapshot of async delegations (running + recently completed).
Safe to call from any thread. Excludes the non-serialisable interrupt_fn.
"""
with _records_lock:
return [
{k: v for k, v in r.items() if k != "interrupt_fn"}
for r in _records.values()
]
def interrupt_all(reason: str = "shutdown") -> int:
"""Signal every running async delegation to stop. Returns how many.
Used on ``/stop`` and gateway shutdown so a dangling background subagent
can't keep burning tokens with no one listening. The child still emits a
completion event (status='interrupted') via the normal finalize path.
"""
count = 0
with _records_lock:
targets = [
r for r in _records.values() if r.get("status") == "running"
]
for r in targets:
fn = r.get("interrupt_fn")
if callable(fn):
try:
fn()
count += 1
except Exception as exc:
logger.debug(
"interrupt_all: %s interrupt failed: %s",
r.get("delegation_id"), exc,
)
if count:
logger.info("Interrupted %d async delegation(s) (%s)", count, reason)
return count
def _reset_for_tests() -> None:
"""Test-only: clear all state and tear down the executor."""
global _executor, _executor_max_workers
with _executor_lock:
if _executor is not None:
_executor.shutdown(wait=False)
_executor = None
_executor_max_workers = 0
with _records_lock:
_records.clear()

View file

@ -397,6 +397,38 @@ def _get_max_concurrent_children() -> int:
return _DEFAULT_MAX_CONCURRENT_CHILDREN
_DEFAULT_MAX_ASYNC_CHILDREN = 3
def _get_max_async_children() -> int:
"""Read delegation.max_async_children from config (floor 1, no ceiling).
Caps how many background (``background=true``) subagents can run at once.
When at capacity, a new async dispatch is REJECTED (not queued) so a
runaway model can't pile up unbounded background work. Separate from
max_concurrent_children, which bounds a single synchronous batch.
"""
cfg = _load_config()
val = cfg.get("max_async_children")
if val is not None:
try:
return max(1, int(val))
except (TypeError, ValueError):
logger.warning(
"delegation.max_async_children=%r is not a valid integer; "
"using default %d",
val, _DEFAULT_MAX_ASYNC_CHILDREN,
)
return _DEFAULT_MAX_ASYNC_CHILDREN
env_val = os.getenv("DELEGATION_MAX_ASYNC_CHILDREN")
if env_val:
try:
return max(1, int(env_val))
except (TypeError, ValueError):
return _DEFAULT_MAX_ASYNC_CHILDREN
return _DEFAULT_MAX_ASYNC_CHILDREN
def _get_child_timeout() -> Optional[float]:
"""Read delegation.child_timeout_seconds from config.
@ -2018,6 +2050,7 @@ def delegate_task(
acp_command: Optional[str] = None,
acp_args: Optional[List[str]] = None,
role: Optional[str] = None,
background: Optional[bool] = None,
parent_agent=None,
) -> str:
"""
@ -2049,6 +2082,19 @@ def delegate_task(
# Normalise the top-level role once; per-task overrides re-normalise.
top_role = _normalize_role(role)
# Async (background) delegation is single-task only in v1. A batch carries
# fan-out semantics (N handles, partial completion) that double the state
# model — reject early with a clear message rather than silently running
# the batch synchronously.
background = is_truthy_value(background, default=False) if background is not None else False
if background and tasks and isinstance(tasks, list) and len(tasks) > 1:
return tool_error(
"background=true is single-task only. Dispatch one background "
"subagent per delegate_task call (each returns its own handle and "
"re-enters the conversation independently), or run the batch "
"synchronously with background=false."
)
# Depth limit — configurable via delegation.max_spawn_depth,
# default 2 for parity with the original MAX_DEPTH constant.
depth = getattr(parent_agent, "_delegate_depth", 0)
@ -2186,6 +2232,90 @@ def delegate_task(
if n_tasks == 1:
# Single task -- run directly (no thread pool overhead)
_i, _t, child = children[0]
# ----- Async / background dispatch -----
# When background=true, hand the already-built child to the async
# delegation registry and return a handle immediately. The child runs
# on a daemon executor; its result re-enters the conversation as a
# fresh turn via process_registry.completion_queue (see
# tools/async_delegation.py). Batch async is intentionally NOT
# supported in v1 — the rejection is handled before we get here.
if background:
from tools.async_delegation import dispatch_async_delegation
from tools.approval import get_current_session_key
# Capture the gateway routing key on THIS (parent) thread — the
# daemon worker won't carry the session contextvar.
_session_key = get_current_session_key(default="")
# Detach the child from the parent's interrupt-propagation list.
# _build_child_agent registered it there (correct for sync
# children, which block the parent's turn), but a BACKGROUND
# child must survive parent-turn interrupts (Ctrl+C, mid-turn
# steering), cache evicts (release_clients), and session close
# (/new) — otherwise the detached subagent dies with whatever
# the parent was doing when it was dispatched. Its lifecycle is
# owned by the async-delegation registry (interrupt_fn below),
# and _run_single_child's finally block closes its resources
# when it finishes.
if hasattr(parent_agent, "_active_children"):
try:
_ac_lock = getattr(parent_agent, "_active_children_lock", None)
if _ac_lock:
with _ac_lock:
parent_agent._active_children.remove(child)
else:
parent_agent._active_children.remove(child)
except ValueError:
pass
def _async_runner(_child=child, _goal=_t["goal"]):
return _run_single_child(0, _goal, _child, parent_agent)
def _async_interrupt(_child=child):
try:
if hasattr(_child, "interrupt"):
_child.interrupt("Async delegation cancelled")
elif hasattr(_child, "_interrupt_requested"):
_child._interrupt_requested = True
except Exception:
pass
dispatch = dispatch_async_delegation(
goal=_t["goal"],
context=_t.get("context"),
toolsets=_t.get("toolsets") or toolsets,
role=_normalize_role(_t.get("role") or top_role),
model=creds["model"],
session_key=_session_key,
runner=_async_runner,
interrupt_fn=_async_interrupt,
max_async_children=_get_max_async_children(),
)
if dispatch.get("status") == "dispatched":
return json.dumps(
{
"status": "dispatched",
"delegation_id": dispatch["delegation_id"],
"goal": _t["goal"],
"mode": "background",
"note": (
"Subagent is running in the background. You and the "
"user can keep working; the full task source and "
"result will re-enter the conversation as a new "
"message when it finishes. Do not wait or poll — "
"just continue."
),
},
ensure_ascii=False,
)
# Rejected (at capacity or schedule failure) — surface as a tool
# error so the model can fall back to synchronous delegation.
return tool_error(
dispatch.get("error", "Async delegation could not be scheduled.")
)
result = _run_single_child(0, _t["goal"], child, parent_agent)
results.append(result)
else:
@ -2904,6 +3034,24 @@ DELEGATE_TASK_SCHEMA = {
"enum": ["leaf", "orchestrator"],
"description": "(rebuilt at get_definitions() time)",
},
"background": {
"type": "boolean",
"description": (
"Run the subagent asynchronously in the BACKGROUND "
"instead of blocking this turn. When true, delegate_task "
"returns immediately with a delegation_id; you and the "
"user keep working while the subagent runs, and its full "
"result re-enters the conversation as a new message when "
"it finishes (similar to terminal background=true + "
"notify_on_complete). The re-injected message includes the "
"original goal/context so you can act on it even after "
"moving on. Single-task only — cannot be combined with the "
"'tasks' batch array. Use for long-running independent work "
"the user shouldn't have to wait on (research, builds, "
"multi-step investigations). Do NOT poll or wait after "
"dispatching — just continue; the result will come to you."
),
},
"acp_command": {
"type": "string",
"description": (
@ -2948,6 +3096,7 @@ registry.register(
acp_command=args.get("acp_command"),
acp_args=args.get("acp_args"),
role=args.get("role"),
background=args.get("background"),
parent_agent=kw.get("parent_agent"),
),
check_fn=check_delegate_requirements,

View file

@ -1531,6 +1531,91 @@ class ProcessRegistry:
process_registry = ProcessRegistry()
def _format_age(seconds: float) -> str:
"""Human-friendly elapsed string ('18m', '2h3m', '45s')."""
try:
s = int(max(0, seconds))
except (TypeError, ValueError):
return "?"
if s < 60:
return f"{s}s"
m, s = divmod(s, 60)
if m < 60:
return f"{m}m" if s == 0 else f"{m}m{s}s"
h, m = divmod(m, 60)
return f"{h}h" if m == 0 else f"{h}h{m}m"
def _format_async_delegation(evt: dict) -> str:
"""Format an async-delegation completion into a self-contained re-injection.
Carries the FULL original task source (goal, the context the parent
supplied, toolsets, role, model) plus dispatch time, status, and the
complete result summary. When this re-enters the conversation the agent
may be deep in unrelated context and won't remember why the subagent
existed, so the block is written to stand entirely on its own enough to
use the result OR re-dispatch if the world has moved on.
"""
import time as _time
deleg_id = evt.get("delegation_id", "unknown")
goal = evt.get("goal", "") or ""
context = evt.get("context")
toolsets = evt.get("toolsets")
role = evt.get("role") or "leaf"
model = evt.get("model") or "?"
status = evt.get("status") or "completed"
summary = evt.get("summary")
error = evt.get("error")
api_calls = evt.get("api_calls", 0)
duration = evt.get("duration_seconds", "?")
dispatched_at = evt.get("dispatched_at")
completed_at = evt.get("completed_at") or _time.time()
age = ""
if isinstance(dispatched_at, (int, float)):
age = f" ({_format_age(completed_at - dispatched_at)} ago)"
lines = [
f"[ASYNC DELEGATION COMPLETE — {deleg_id}]",
"A background subagent you dispatched earlier has finished. You may "
"have moved on since dispatching it; the full task source is below so "
"you can act on the result or re-dispatch if things have changed.",
"",
]
if isinstance(dispatched_at, (int, float)):
ts = _time.strftime("%Y-%m-%d %H:%M:%S", _time.localtime(dispatched_at))
lines.append(f"Dispatched: {ts}{age}")
lines.append(f"Original goal: {goal}")
if context:
lines.append(f"Context you provided: {context}")
if toolsets:
lines.append(f"Toolsets: {', '.join(toolsets)}")
lines.append(f"Role: {role} Model: {model}")
lines.append(f"Status: {status} API calls: {api_calls} Duration: {duration}s")
lines.append("--- RESULT ---")
if status in ("completed", "success") and summary:
lines.append(summary)
elif status == "interrupted":
lines.append(
"The subagent was interrupted before completing"
+ (f": {error}" if error else ".")
)
if summary:
lines.append("Partial output:")
lines.append(summary)
else:
# error / timeout / failed
lines.append(
f"The subagent did not complete successfully (status={status})."
+ (f"\n{error}" if error else "")
)
if summary:
lines.append("Partial output:")
lines.append(summary)
return "\n".join(lines)
def format_process_notification(evt: dict) -> "str | None":
"""Format a process notification event into a [IMPORTANT: ...] message.
@ -1559,6 +1644,9 @@ def format_process_notification(evt: dict) -> "str | None":
text += "]"
return text
if evt_type == "async_delegation":
return _format_async_delegation(evt)
_exit = evt.get("exit_code", "?")
_out = evt.get("output", "")
_reason = evt.get("completion_reason") or "exited"

View file

@ -5595,6 +5595,11 @@ def _notification_event_dedup_key(evt: dict) -> tuple:
evt.get("message", ""),
evt.get("suppressed", 0),
)
if evt_type == "async_delegation":
# Async-delegation completions have no process session_id; without
# this the fallthrough keys every one as ("", "async_delegation")
# and the second completion's status update is suppressed forever.
return (evt.get("delegation_id", ""), evt_type)
return (evt_sid, evt_type)