feat(kanban): route notifications via owning profile + wake creator agent

Three connected changes that fix kanban notifications in multiplex_profile
gateways and enable event-driven agent collaboration:

1. Session profile propagation
   - Add HERMES_SESSION_PROFILE ContextVar (session_context.py)
   - Gateway stamps source.profile at dispatch time (run.py)
   - _maybe_auto_subscribe reads profile from ContextVar instead of
     os.environ which is unset in the gateway main process (kanban_tools.py)

2. Notifier profile-aware routing (kanban_watchers.py)
   - Adapter selection: prefer _profile_adapters[sub.notifier_profile]
     so each profile's bot delivers its own task notifications
   - Relax profile skip-filter: process cross-profile subscriptions when
     the gateway has an adapter for the owning profile
   - Extend TERMINAL_KINDS with status/archived/unblocked

3. Creator agent wakeup on terminal events (kanban_watchers.py)
   - After delivering completed/blocked/gave_up/crashed/timed_out
     notifications, inject a synthetic MessageEvent into the creator's
     session via adapter.handle_message to trigger their agent loop
   - SessionSource built from subscription metadata — no session_store
     lookup needed
This commit is contained in:
张满良 2026-06-29 19:46:14 +08:00 committed by kshitij
parent 7322da487f
commit c69643026a
4 changed files with 87 additions and 13 deletions

View file

@ -160,7 +160,9 @@ class GatewayKanbanWatchersMixin:
logger.warning("kanban notifier: kanban_db not importable; notifier disabled")
return
TERMINAL_KINDS = ("completed", "blocked", "gave_up", "crashed", "timed_out")
# "status" covers dashboard drag-drop and `_set_status_direct()`
# writes — surface those transitions to subscribers too.
TERMINAL_KINDS = ("completed", "blocked", "gave_up", "crashed", "timed_out", "status", "archived", "unblocked")
# Subscriptions are removed only when the task reaches a truly final
# status (done / archived). We used to also unsub on any terminal
# event kind (gave_up / crashed / timed_out / blocked), but that
@ -250,11 +252,13 @@ class GatewayKanbanWatchersMixin:
for sub in subs:
owner_profile = sub.get("notifier_profile") or None
if owner_profile and owner_profile != notifier_profile:
logger.debug(
"kanban notifier: subscription for %s owned by profile %s; current profile %s skipping",
sub.get("task_id"), owner_profile, notifier_profile,
)
continue
_owner_adapters = getattr(self, "_profile_adapters", {}).get(owner_profile)
if not _owner_adapters:
logger.debug(
"kanban notifier: subscription for %s owned by profile %s; current profile %s has no adapter for it, skipping",
sub.get("task_id"), owner_profile, notifier_profile,
)
continue
platform = (sub.get("platform") or "").lower()
if platform not in active_platforms:
logger.debug(
@ -304,7 +308,14 @@ class GatewayKanbanWatchersMixin:
self._kanban_advance, sub, d["cursor"], board_slug,
)
continue
adapter = self.adapters.get(plat)
sub_profile = sub.get("notifier_profile") or ""
adapter = None
if sub_profile:
_profile_map = getattr(self, "_profile_adapters", {}).get(sub_profile)
if _profile_map:
adapter = _profile_map.get(plat)
if adapter is None:
adapter = self.adapters.get(plat)
if adapter is None:
logger.debug(
"kanban notifier: adapter %s disconnected before delivery for %s; rewinding claim",
@ -319,6 +330,7 @@ class GatewayKanbanWatchersMixin:
)
continue
title = (task.title if task else sub["task_id"])[:120]
board_tag = f"[{board_slug}] " if board_slug else ""
for ev in d["events"]:
kind = ev.kind
# Identity prefix: attribute terminal pings to the
@ -345,25 +357,25 @@ class GatewayKanbanWatchersMixin:
r = lines[0][:160] if lines else task.result[:160]
handoff = f"\n{r}"
msg = (
f"{tag}Kanban {sub['task_id']} done"
f"{board_tag}{tag}Kanban {sub['task_id']} done"
f"{title}{handoff}"
)
elif kind == "blocked":
reason = ""
if ev.payload and ev.payload.get("reason"):
reason = f": {str(ev.payload['reason'])[:160]}"
msg = f"{tag}Kanban {sub['task_id']} blocked{reason}"
msg = f"{board_tag}{tag}Kanban {sub['task_id']} blocked{reason}"
elif kind == "gave_up":
err = ""
if ev.payload and ev.payload.get("error"):
err = f"\n{str(ev.payload['error'])[:200]}"
msg = (
f"{tag}Kanban {sub['task_id']} gave up "
f"{board_tag}{tag}Kanban {sub['task_id']} gave up "
f"after repeated spawn failures{err}"
)
elif kind == "crashed":
msg = (
f"{tag}Kanban {sub['task_id']} worker crashed "
f"{board_tag}{tag}Kanban {sub['task_id']} worker crashed "
f"(pid gone); dispatcher will retry"
)
elif kind == "timed_out":
@ -371,9 +383,14 @@ class GatewayKanbanWatchersMixin:
if ev.payload and ev.payload.get("limit_seconds"):
limit = int(ev.payload["limit_seconds"])
msg = (
f"{tag}Kanban {sub['task_id']} timed out "
f"{board_tag}{tag}Kanban {sub['task_id']} timed out "
f"(max_runtime={limit}s); will retry"
)
elif kind == "status":
new_status = ""
if ev.payload and ev.payload.get("status"):
new_status = str(ev.payload["status"])
msg = f"🔄 {board_tag}{tag}Kanban {sub['task_id']}{new_status}"
else:
continue
metadata: dict[str, Any] = {}
@ -460,6 +477,53 @@ class GatewayKanbanWatchersMixin:
# same state. See the longer comment on TERMINAL_KINDS
# above for the failure mode this prevents.
task_terminal = task and task.status in {"done", "archived"}
_WAKE_KINDS = ("completed", "gave_up", "crashed", "timed_out", "blocked")
_wake_kinds = {ev.kind for ev in d["events"] if ev.kind in _WAKE_KINDS}
if _wake_kinds:
try:
_session_key = getattr(task, "session_id", None) or ""
if _session_key:
_title = (task.title if task else sub["task_id"])[:120]
_assignee = task.assignee if task else ""
_parts = []
if "completed" in _wake_kinds: _parts.append("已完成")
if "gave_up" in _wake_kinds: _parts.append("已放弃(重试次数耗尽)")
if "crashed" in _wake_kinds: _parts.append("崩溃worker 异常退出dispatcher 将重试")
if "timed_out" in _wake_kinds: _parts.append("超时dispatcher 将重试")
if "blocked" in _wake_kinds: _parts.append("被阻塞,需要处理")
_status = "".join(_parts) or "状态变化"
_synth = (
f"[kanban] 任务 {sub['task_id']} {_status}\n"
f"标题: {_title}\n执行者: @{_assignee}\n"
f"看板: {board_slug}\n\n"
f"请检查结果或决定下一步动作。"
)
from gateway.session import SessionSource
from gateway.platforms.base import MessageEvent, MessageType
_source = SessionSource(
platform=plat,
chat_id=sub["chat_id"],
chat_type="group",
thread_id=sub.get("thread_id") or None,
user_id=sub.get("user_id"),
profile=sub_profile or None,
)
_synth_event = MessageEvent(
text=_synth,
message_type=MessageType.TEXT,
source=_source,
internal=True,
)
await adapter.handle_message(_synth_event)
logger.info(
"kanban notifier: woke agent for %s on %s/%s profile=%s events=%s",
sub["task_id"], platform_str, sub["chat_id"], sub_profile or "default", _wake_kinds,
)
except Exception as _wk_err:
logger.debug(
"kanban notifier: wakeup injection failed for %s: %s",
sub["task_id"], _wk_err,
)
if task_terminal:
await asyncio.to_thread(
self._kanban_unsub, sub, board_slug,

View file

@ -14134,6 +14134,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
user_name=str(context.source.user_name) if context.source.user_name else "",
session_key=context.session_key,
message_id=str(context.source.message_id) if context.source.message_id else "",
profile=getattr(context.source, "profile", "") or "",
async_delivery=_async_delivery,
)

View file

@ -84,6 +84,8 @@ _SESSION_ID: ContextVar = ContextVar("HERMES_SESSION_ID", default=_UNSET)
# private-chat topic (those lanes route only with thread id + reply anchor).
_SESSION_MESSAGE_ID: ContextVar = ContextVar("HERMES_SESSION_MESSAGE_ID", default=_UNSET)
_SESSION_PROFILE: ContextVar = ContextVar("HERMES_SESSION_PROFILE", default=_UNSET)
# Whether the current session's delivery channel can route an ASYNC completion
# back to the agent AFTER the current turn ends (i.e. wake a fresh turn).
#
@ -122,6 +124,7 @@ _VAR_MAP = {
"HERMES_SESSION_KEY": _SESSION_KEY,
"HERMES_SESSION_ID": _SESSION_ID,
"HERMES_SESSION_MESSAGE_ID": _SESSION_MESSAGE_ID,
"HERMES_SESSION_PROFILE": _SESSION_PROFILE,
"HERMES_CRON_AUTO_DELIVER_PLATFORM": _CRON_AUTO_DELIVER_PLATFORM,
"HERMES_CRON_AUTO_DELIVER_CHAT_ID": _CRON_AUTO_DELIVER_CHAT_ID,
"HERMES_CRON_AUTO_DELIVER_THREAD_ID": _CRON_AUTO_DELIVER_THREAD_ID,
@ -154,6 +157,7 @@ def set_session_vars(
session_key: str = "",
session_id: str = "",
message_id: str = "",
profile: str = "",
cwd: str = "",
async_delivery: bool = True,
) -> list:
@ -188,6 +192,7 @@ def set_session_vars(
_SESSION_KEY.set(session_key),
_SESSION_ID.set(session_id),
_SESSION_MESSAGE_ID.set(message_id),
_SESSION_PROFILE.set(profile),
_SESSION_ASYNC_DELIVERY.set(bool(async_delivery)),
]
try:
@ -221,6 +226,7 @@ def clear_session_vars(tokens: list) -> None:
_SESSION_KEY,
_SESSION_ID,
_SESSION_MESSAGE_ID,
_SESSION_PROFILE,
):
var.set("")
# Reset async-delivery capability to the "never set" sentinel rather than a

View file

@ -1027,7 +1027,10 @@ def _maybe_auto_subscribe(conn: Any, task_id: str) -> bool:
chat_id = session_key
thread_id = get_session_env("HERMES_SESSION_THREAD_ID", "") or None
user_id = get_session_env("HERMES_SESSION_USER_ID", "") or None
notifier_profile = os.environ.get("HERMES_PROFILE")
notifier_profile = (
get_session_env("HERMES_SESSION_PROFILE", "")
or os.environ.get("HERMES_PROFILE")
)
# Lazy-import to keep the module-level dependency light
from hermes_cli import kanban_db as _kb