diff --git a/gateway/kanban_watchers.py b/gateway/kanban_watchers.py index 5bcf70c8d..52b86c3b9 100644 --- a/gateway/kanban_watchers.py +++ b/gateway/kanban_watchers.py @@ -160,7 +160,9 @@ class GatewayKanbanWatchersMixin: logger.warning("kanban notifier: kanban_db not importable; notifier disabled") return - TERMINAL_KINDS = ("completed", "blocked", "gave_up", "crashed", "timed_out") + # "status" covers dashboard drag-drop and `_set_status_direct()` + # writes — surface those transitions to subscribers too. + TERMINAL_KINDS = ("completed", "blocked", "gave_up", "crashed", "timed_out", "status", "archived", "unblocked") # Subscriptions are removed only when the task reaches a truly final # status (done / archived). We used to also unsub on any terminal # event kind (gave_up / crashed / timed_out / blocked), but that @@ -250,11 +252,13 @@ class GatewayKanbanWatchersMixin: for sub in subs: owner_profile = sub.get("notifier_profile") or None if owner_profile and owner_profile != notifier_profile: - logger.debug( - "kanban notifier: subscription for %s owned by profile %s; current profile %s skipping", - sub.get("task_id"), owner_profile, notifier_profile, - ) - continue + _owner_adapters = getattr(self, "_profile_adapters", {}).get(owner_profile) + if not _owner_adapters: + logger.debug( + "kanban notifier: subscription for %s owned by profile %s; current profile %s has no adapter for it, skipping", + sub.get("task_id"), owner_profile, notifier_profile, + ) + continue platform = (sub.get("platform") or "").lower() if platform not in active_platforms: logger.debug( @@ -304,7 +308,14 @@ class GatewayKanbanWatchersMixin: self._kanban_advance, sub, d["cursor"], board_slug, ) continue - adapter = self.adapters.get(plat) + sub_profile = sub.get("notifier_profile") or "" + adapter = None + if sub_profile: + _profile_map = getattr(self, "_profile_adapters", {}).get(sub_profile) + if _profile_map: + adapter = _profile_map.get(plat) + if adapter is None: + adapter = self.adapters.get(plat) if adapter is None: logger.debug( "kanban notifier: adapter %s disconnected before delivery for %s; rewinding claim", @@ -319,6 +330,7 @@ class GatewayKanbanWatchersMixin: ) continue title = (task.title if task else sub["task_id"])[:120] + board_tag = f"[{board_slug}] " if board_slug else "" for ev in d["events"]: kind = ev.kind # Identity prefix: attribute terminal pings to the @@ -345,25 +357,25 @@ class GatewayKanbanWatchersMixin: r = lines[0][:160] if lines else task.result[:160] handoff = f"\n{r}" msg = ( - f"✔ {tag}Kanban {sub['task_id']} done" + f"✔ {board_tag}{tag}Kanban {sub['task_id']} done" f" — {title}{handoff}" ) elif kind == "blocked": reason = "" if ev.payload and ev.payload.get("reason"): reason = f": {str(ev.payload['reason'])[:160]}" - msg = f"⏸ {tag}Kanban {sub['task_id']} blocked{reason}" + msg = f"⏸ {board_tag}{tag}Kanban {sub['task_id']} blocked{reason}" elif kind == "gave_up": err = "" if ev.payload and ev.payload.get("error"): err = f"\n{str(ev.payload['error'])[:200]}" msg = ( - f"✖ {tag}Kanban {sub['task_id']} gave up " + f"✖ {board_tag}{tag}Kanban {sub['task_id']} gave up " f"after repeated spawn failures{err}" ) elif kind == "crashed": msg = ( - f"✖ {tag}Kanban {sub['task_id']} worker crashed " + f"✖ {board_tag}{tag}Kanban {sub['task_id']} worker crashed " f"(pid gone); dispatcher will retry" ) elif kind == "timed_out": @@ -371,9 +383,14 @@ class GatewayKanbanWatchersMixin: if ev.payload and ev.payload.get("limit_seconds"): limit = int(ev.payload["limit_seconds"]) msg = ( - f"⏱ {tag}Kanban {sub['task_id']} timed out " + f"⏱ {board_tag}{tag}Kanban {sub['task_id']} timed out " f"(max_runtime={limit}s); will retry" ) + elif kind == "status": + new_status = "" + if ev.payload and ev.payload.get("status"): + new_status = str(ev.payload["status"]) + msg = f"🔄 {board_tag}{tag}Kanban {sub['task_id']} → {new_status}" else: continue metadata: dict[str, Any] = {} @@ -460,6 +477,53 @@ class GatewayKanbanWatchersMixin: # same state. See the longer comment on TERMINAL_KINDS # above for the failure mode this prevents. task_terminal = task and task.status in {"done", "archived"} + _WAKE_KINDS = ("completed", "gave_up", "crashed", "timed_out", "blocked") + _wake_kinds = {ev.kind for ev in d["events"] if ev.kind in _WAKE_KINDS} + if _wake_kinds: + try: + _session_key = getattr(task, "session_id", None) or "" + if _session_key: + _title = (task.title if task else sub["task_id"])[:120] + _assignee = task.assignee if task else "" + _parts = [] + if "completed" in _wake_kinds: _parts.append("已完成") + if "gave_up" in _wake_kinds: _parts.append("已放弃(重试次数耗尽)") + if "crashed" in _wake_kinds: _parts.append("崩溃(worker 异常退出),dispatcher 将重试") + if "timed_out" in _wake_kinds: _parts.append("超时,dispatcher 将重试") + if "blocked" in _wake_kinds: _parts.append("被阻塞,需要处理") + _status = ",".join(_parts) or "状态变化" + _synth = ( + f"[kanban] 任务 {sub['task_id']} {_status}。\n" + f"标题: {_title}\n执行者: @{_assignee}\n" + f"看板: {board_slug}\n\n" + f"请检查结果或决定下一步动作。" + ) + from gateway.session import SessionSource + from gateway.platforms.base import MessageEvent, MessageType + _source = SessionSource( + platform=plat, + chat_id=sub["chat_id"], + chat_type="group", + thread_id=sub.get("thread_id") or None, + user_id=sub.get("user_id"), + profile=sub_profile or None, + ) + _synth_event = MessageEvent( + text=_synth, + message_type=MessageType.TEXT, + source=_source, + internal=True, + ) + await adapter.handle_message(_synth_event) + logger.info( + "kanban notifier: woke agent for %s on %s/%s profile=%s events=%s", + sub["task_id"], platform_str, sub["chat_id"], sub_profile or "default", _wake_kinds, + ) + except Exception as _wk_err: + logger.debug( + "kanban notifier: wakeup injection failed for %s: %s", + sub["task_id"], _wk_err, + ) if task_terminal: await asyncio.to_thread( self._kanban_unsub, sub, board_slug, diff --git a/gateway/run.py b/gateway/run.py index e87739c14..a5fe58f97 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -14134,6 +14134,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew user_name=str(context.source.user_name) if context.source.user_name else "", session_key=context.session_key, message_id=str(context.source.message_id) if context.source.message_id else "", + profile=getattr(context.source, "profile", "") or "", async_delivery=_async_delivery, ) diff --git a/gateway/session_context.py b/gateway/session_context.py index a61ceb6c3..cdd1a8bfa 100644 --- a/gateway/session_context.py +++ b/gateway/session_context.py @@ -84,6 +84,8 @@ _SESSION_ID: ContextVar = ContextVar("HERMES_SESSION_ID", default=_UNSET) # private-chat topic (those lanes route only with thread id + reply anchor). _SESSION_MESSAGE_ID: ContextVar = ContextVar("HERMES_SESSION_MESSAGE_ID", default=_UNSET) +_SESSION_PROFILE: ContextVar = ContextVar("HERMES_SESSION_PROFILE", default=_UNSET) + # Whether the current session's delivery channel can route an ASYNC completion # back to the agent AFTER the current turn ends (i.e. wake a fresh turn). # @@ -122,6 +124,7 @@ _VAR_MAP = { "HERMES_SESSION_KEY": _SESSION_KEY, "HERMES_SESSION_ID": _SESSION_ID, "HERMES_SESSION_MESSAGE_ID": _SESSION_MESSAGE_ID, + "HERMES_SESSION_PROFILE": _SESSION_PROFILE, "HERMES_CRON_AUTO_DELIVER_PLATFORM": _CRON_AUTO_DELIVER_PLATFORM, "HERMES_CRON_AUTO_DELIVER_CHAT_ID": _CRON_AUTO_DELIVER_CHAT_ID, "HERMES_CRON_AUTO_DELIVER_THREAD_ID": _CRON_AUTO_DELIVER_THREAD_ID, @@ -154,6 +157,7 @@ def set_session_vars( session_key: str = "", session_id: str = "", message_id: str = "", + profile: str = "", cwd: str = "", async_delivery: bool = True, ) -> list: @@ -188,6 +192,7 @@ def set_session_vars( _SESSION_KEY.set(session_key), _SESSION_ID.set(session_id), _SESSION_MESSAGE_ID.set(message_id), + _SESSION_PROFILE.set(profile), _SESSION_ASYNC_DELIVERY.set(bool(async_delivery)), ] try: @@ -221,6 +226,7 @@ def clear_session_vars(tokens: list) -> None: _SESSION_KEY, _SESSION_ID, _SESSION_MESSAGE_ID, + _SESSION_PROFILE, ): var.set("") # Reset async-delivery capability to the "never set" sentinel rather than a diff --git a/tools/kanban_tools.py b/tools/kanban_tools.py index fca0ae7a8..2e81f9442 100644 --- a/tools/kanban_tools.py +++ b/tools/kanban_tools.py @@ -1027,7 +1027,10 @@ def _maybe_auto_subscribe(conn: Any, task_id: str) -> bool: chat_id = session_key thread_id = get_session_env("HERMES_SESSION_THREAD_ID", "") or None user_id = get_session_env("HERMES_SESSION_USER_ID", "") or None - notifier_profile = os.environ.get("HERMES_PROFILE") + notifier_profile = ( + get_session_env("HERMES_SESSION_PROFILE", "") + or os.environ.get("HERMES_PROFILE") + ) # Lazy-import to keep the module-level dependency light from hermes_cli import kanban_db as _kb