feat(gateway): add Raft bundled platform plugin with activity hooks

Adds a Raft platform adapter as a bundled plugin (plugins/platforms/raft/)
connecting Hermes to Raft as an external agent via a wake-channel bridge.
The adapter starts a loopback HTTP endpoint, spawns 'raft agent bridge' as a
child process, and injects content-free wake hints into the gateway session
pipeline. The agent reads/sends messages through the Raft CLI; the adapter
never touches message bodies or delivery cursors. Activity observer hooks
report tool/LLM/session lifecycle events via a bounded at-most-once queue.
Auto-enables when RAFT_PROFILE is set.

Cherry-picked from PR #47629. Authored by skyzh (@xxchan).
This commit is contained in:
skyzh 2026-06-17 21:15:55 -07:00 committed by Teknium
parent 2a5e9d994a
commit 9026a8c789
6 changed files with 1323 additions and 1 deletions

View file

@ -0,0 +1,3 @@
from .adapter import register
__all__ = ["register"]

View file

@ -0,0 +1,772 @@
"""Raft channel platform adapter.
Starts a local wake endpoint, spawns ``raft agent bridge`` as a child process,
and injects content-free wake hints into Hermes' normal gateway session pipeline.
Token and port are auto-generated when not provided via env/config.
The bridge remains responsible for Raft message cursors and body materialization;
the agent uses the Raft CLI according to the Raft manual.
"""
from __future__ import annotations
import asyncio
from collections import deque
from datetime import datetime, timezone
import hmac
import json
import logging
import os
import re
import secrets
import shutil
import subprocess
import threading
import time
import uuid
import weakref
from typing import Any, Deque, Dict, List, Optional
try:
from aiohttp import web
AIOHTTP_AVAILABLE = True
except ImportError:
AIOHTTP_AVAILABLE = False
web = None # type: ignore[assignment]
import sys
from pathlib import Path as _Path
sys.path.insert(0, str(_Path(__file__).resolve().parents[2]))
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import (
BasePlatformAdapter,
MessageEvent,
MessageType,
SendResult,
merge_pending_message_event,
)
from gateway.session import build_session_key
logger = logging.getLogger(__name__)
DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 0
DEFAULT_PATH = "/wake"
DEFAULT_RUNTIME_SESSION = "default"
DEFAULT_MAX_BODY_BYTES = 16_384
DEFAULT_ACTIVITY_QUEUE_CAP = 500
ACTIVITY_CONTENT_CAP = 4096
ACTIVITY_EVENT_SCHEMA = "raft-activity.v1"
ACTIVITY_DRAIN_SCHEMA = "raft-activity-drain.v1"
BRIDGE_TOKEN_HEADER = "x-raft-bridge-token"
_CONTENT_FIELD_NAMES = {
"body",
"content",
"message",
"messages",
"preview",
"snippet",
"text",
}
_SAFE_SCALAR_RE = re.compile(r"^[a-zA-Z0-9._:@/ -]+$")
_MAX_SCALAR_LENGTH = 120
_ACTIVITY_ALLOWED_FIELDS = {
"schema",
"eventId",
"sessionId",
"hookEventName",
"status",
"occurredAt",
"toolName",
"toolInput",
"toolOutput",
"toolInputTruncated",
"toolOutputTruncated",
"truncated",
"errorClass",
"durationMs",
}
_ACTIVE_ADAPTERS: "weakref.WeakSet[RaftAdapter]" = weakref.WeakSet()
_ACTIVE_ADAPTERS_LOCK = threading.Lock()
_RAFT_CONTEXT_LOCK = threading.Lock()
_RAFT_SESSION_IDS: set[str] = set()
_RAFT_TURN_IDS: set[str] = set()
_RAFT_PROMPT_TURN_IDS: set[str] = set()
def check_raft_requirements() -> bool:
"""Check if Raft channel dependencies are available."""
if not AIOHTTP_AVAILABLE:
logger.warning("[raft] aiohttp is not installed — install with: pip install aiohttp")
return False
if not shutil.which("raft"):
logger.warning("[raft] raft CLI not found in PATH — install from https://raft.build")
return False
return True
def _path_value(value: Any) -> str:
path = str(value or DEFAULT_PATH).strip() or DEFAULT_PATH
if not path.startswith("/"):
path = f"/{path}"
return path
def _has_content_field(value: Any) -> bool:
if isinstance(value, dict):
for key, nested in value.items():
if str(key).strip().lower() in _CONTENT_FIELD_NAMES:
return True
if _has_content_field(nested):
return True
elif isinstance(value, list):
return any(_has_content_field(item) for item in value)
return False
def _platform_value(value: Any) -> str:
return str(getattr(value, "value", value) or "")
def _safe_scalar(value: Any, default: Optional[str] = None) -> Optional[str]:
if not isinstance(value, str):
return default
if not value or len(value) > _MAX_SCALAR_LENGTH:
return default
if not _SAFE_SCALAR_RE.match(value):
return default
return value
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def _content_string(value: Any) -> Optional[tuple[str, bool]]:
if value is None:
return None
if isinstance(value, str):
text = value
else:
try:
text = json.dumps(value, ensure_ascii=False, sort_keys=True)
except Exception:
return None
if not text:
return None
if len(text) > ACTIVITY_CONTENT_CAP:
return text[:ACTIVITY_CONTENT_CAP], True
return text, False
def _duration_ms(value: Any) -> Optional[int]:
if not isinstance(value, (int, float)) or isinstance(value, bool):
return None
duration = int(value)
if duration < 0:
return None
return duration
def _make_activity_event(
*,
hook_event_name: str,
session_id: Any,
status: str = "ok",
tool_name: Any = None,
tool_input: Any = None,
tool_output: Any = None,
error_class: Any = None,
duration_ms: Any = None,
) -> Dict[str, Any]:
event: Dict[str, Any] = {
"schema": ACTIVITY_EVENT_SCHEMA,
"eventId": f"hermes-{uuid.uuid4()}",
"sessionId": _safe_scalar(session_id, "unknown") or "unknown",
"hookEventName": hook_event_name,
"status": "error" if status == "error" else "ok",
"occurredAt": _now_iso(),
}
safe_tool_name = _safe_scalar(tool_name)
if safe_tool_name:
event["toolName"] = safe_tool_name
safe_error_class = _safe_scalar(error_class)
if safe_error_class:
event["errorClass"] = safe_error_class
safe_duration_ms = _duration_ms(duration_ms)
if safe_duration_ms is not None:
event["durationMs"] = safe_duration_ms
truncated = False
input_value = _content_string(tool_input)
if input_value:
event["toolInput"], input_truncated = input_value
if input_truncated:
event["toolInputTruncated"] = True
truncated = True
output_value = _content_string(tool_output)
if output_value:
event["toolOutput"], output_truncated = output_value
if output_truncated:
event["toolOutputTruncated"] = True
truncated = True
if truncated:
event["truncated"] = True
return event
def _validate_activity_event(value: Any) -> Dict[str, Any]:
if not isinstance(value, dict):
raise ValueError("activity event must be an object")
if value.get("schema") != ACTIVITY_EVENT_SCHEMA:
raise ValueError("unsupported activity event schema")
unknown = set(value) - _ACTIVITY_ALLOWED_FIELDS
if unknown:
raise ValueError(f"activity event field {sorted(unknown)[0]} is not allowed")
for key in ("eventId", "sessionId", "hookEventName", "occurredAt"):
if not _safe_scalar(value.get(key)):
raise ValueError(f"activity event {key} must be a safe non-empty string")
if value.get("status") not in {"ok", "error"}:
raise ValueError("activity event status must be ok|error")
if value.get("toolName") is not None and not _safe_scalar(value.get("toolName")):
raise ValueError("activity event toolName must be a safe string")
if value.get("errorClass") is not None and not _safe_scalar(value.get("errorClass")):
raise ValueError("activity event errorClass must be a safe string")
if value.get("durationMs") is not None and _duration_ms(value.get("durationMs")) is None:
raise ValueError("activity event durationMs must be a non-negative number")
for key in ("truncated", "toolInputTruncated", "toolOutputTruncated"):
if value.get(key) is not None and not isinstance(value.get(key), bool):
raise ValueError(f"activity event {key} must be a boolean")
event = dict(value)
if event.get("durationMs") is not None:
event["durationMs"] = _duration_ms(event["durationMs"])
for key in ("toolInput", "toolOutput"):
content = event.get(key)
if content is None:
continue
if not isinstance(content, str):
raise ValueError(f"activity event {key} must be a string")
if len(content) > ACTIVITY_CONTENT_CAP:
event[key] = content[:ACTIVITY_CONTENT_CAP]
event["truncated"] = True
event[f"{key}Truncated"] = True
return event
class ActivityQueue:
"""Bounded at-most-once queue for Raft external activity telemetry."""
def __init__(self, cap: int = DEFAULT_ACTIVITY_QUEUE_CAP):
self._cap = max(1, int(cap or DEFAULT_ACTIVITY_QUEUE_CAP))
self._events: Deque[Dict[str, Any]] = deque()
self._dropped_since_drain = 0
self._lock = threading.Lock()
def push(self, event: Dict[str, Any]) -> None:
validated = _validate_activity_event(event)
with self._lock:
self._events.append(validated)
while len(self._events) > self._cap:
self._events.popleft()
self._dropped_since_drain += 1
def drain(self, max_events: int = 200) -> Dict[str, Any]:
limit = max(1, int(max_events or 200))
with self._lock:
events: List[Dict[str, Any]] = []
while self._events and len(events) < limit:
events.append(self._events.popleft())
dropped = self._dropped_since_drain
self._dropped_since_drain = 0
return {"schema": ACTIVITY_DRAIN_SCHEMA, "events": events, "dropped": dropped}
@property
def size(self) -> int:
with self._lock:
return len(self._events)
def _remember_raft_context(session_id: Any, turn_id: Any = None) -> None:
safe_session_id = _safe_scalar(session_id)
safe_turn_id = _safe_scalar(turn_id)
with _RAFT_CONTEXT_LOCK:
if safe_session_id:
_RAFT_SESSION_IDS.add(safe_session_id)
if safe_turn_id:
_RAFT_TURN_IDS.add(safe_turn_id)
def _forget_raft_context(session_id: Any, turn_id: Any = None, *, forget_session: bool = False) -> None:
safe_session_id = _safe_scalar(session_id)
safe_turn_id = _safe_scalar(turn_id)
with _RAFT_CONTEXT_LOCK:
if safe_turn_id:
_RAFT_TURN_IDS.discard(safe_turn_id)
_RAFT_PROMPT_TURN_IDS.discard(safe_turn_id)
if forget_session and safe_session_id:
_RAFT_SESSION_IDS.discard(safe_session_id)
def _is_raft_context(**kwargs: Any) -> bool:
if _platform_value(kwargs.get("platform")) == "raft":
_remember_raft_context(kwargs.get("session_id"), kwargs.get("turn_id"))
return True
safe_session_id = _safe_scalar(kwargs.get("session_id"))
safe_turn_id = _safe_scalar(kwargs.get("turn_id"))
with _RAFT_CONTEXT_LOCK:
return bool(
(safe_turn_id and safe_turn_id in _RAFT_TURN_IDS)
or (safe_session_id and safe_session_id in _RAFT_SESSION_IDS)
)
def _report_activity(event: Dict[str, Any]) -> None:
with _ACTIVE_ADAPTERS_LOCK:
adapters = list(_ACTIVE_ADAPTERS)
for adapter in adapters:
adapter.report_activity(event)
def _on_session_start(**kwargs: Any) -> None:
if not _is_raft_context(**kwargs):
return
try:
from tools.env_passthrough import register_env_passthrough
register_env_passthrough(["RAFT_PROFILE"])
except Exception:
logger.debug("[raft] failed to register RAFT_PROFILE env passthrough", exc_info=True)
_report_activity(
_make_activity_event(
hook_event_name="SessionStart",
session_id=kwargs.get("session_id"),
)
)
def _on_pre_llm_call(**kwargs: Any) -> None:
if not _is_raft_context(**kwargs):
return
safe_turn_id = _safe_scalar(kwargs.get("turn_id"))
if safe_turn_id:
with _RAFT_CONTEXT_LOCK:
if safe_turn_id in _RAFT_PROMPT_TURN_IDS:
return
_RAFT_PROMPT_TURN_IDS.add(safe_turn_id)
_report_activity(
_make_activity_event(
hook_event_name="UserPromptSubmit",
session_id=kwargs.get("session_id"),
)
)
def _on_pre_tool_call(**kwargs: Any) -> None:
if not _is_raft_context(**kwargs):
return
_report_activity(
_make_activity_event(
hook_event_name="PreToolUse",
session_id=kwargs.get("session_id"),
tool_name=kwargs.get("tool_name"),
tool_input=kwargs.get("args"),
)
)
def _on_post_tool_call(**kwargs: Any) -> None:
if not _is_raft_context(**kwargs):
return
status = "error" if kwargs.get("status") in {"error", "blocked"} or kwargs.get("error_type") else "ok"
hook_name = "PostToolUseFailure" if status == "error" else "PostToolUse"
_report_activity(
_make_activity_event(
hook_event_name=hook_name,
session_id=kwargs.get("session_id"),
status=status,
tool_name=kwargs.get("tool_name"),
tool_input=kwargs.get("args"),
tool_output=kwargs.get("error_message") or kwargs.get("result"),
error_class=kwargs.get("error_type") or ("tool_failure" if status == "error" else None),
duration_ms=kwargs.get("duration_ms"),
)
)
def _on_post_llm_call(**kwargs: Any) -> None:
if not _is_raft_context(**kwargs):
return
_report_activity(
_make_activity_event(
hook_event_name="Stop",
session_id=kwargs.get("session_id"),
)
)
def _on_session_end(**kwargs: Any) -> None:
if not _is_raft_context(**kwargs):
return
if kwargs.get("interrupted") or kwargs.get("completed") is False:
_report_activity(
_make_activity_event(
hook_event_name="Stop",
session_id=kwargs.get("session_id"),
status="error",
error_class="interrupted" if kwargs.get("interrupted") else "incomplete",
)
)
_forget_raft_context(kwargs.get("session_id"), kwargs.get("turn_id"))
def _on_session_finalize(**kwargs: Any) -> None:
if not _is_raft_context(**kwargs):
return
_report_activity(
_make_activity_event(
hook_event_name="SessionEnd",
session_id=kwargs.get("session_id"),
)
)
_forget_raft_context(kwargs.get("session_id"), kwargs.get("turn_id"), forget_session=True)
class RaftAdapter(BasePlatformAdapter):
"""Local HTTP endpoint for Raft channel bridge delivery."""
def __init__(self, config: PlatformConfig):
super().__init__(config, Platform("raft"))
extra = config.extra or {}
self._host: str = str(extra.get("host", DEFAULT_HOST))
self._port: int = int(extra.get("port", DEFAULT_PORT))
self._path: str = _path_value(extra.get("path", DEFAULT_PATH))
self._bridge_token: str = str(extra.get("bridge_token", ""))
self._runtime_session: str = str(
extra.get("runtime_session", DEFAULT_RUNTIME_SESSION)
or DEFAULT_RUNTIME_SESSION
)
self._max_body_bytes: int = int(
extra.get("max_body_bytes", DEFAULT_MAX_BODY_BYTES)
)
self._runner = None
self._bridge_process: Optional[subprocess.Popen] = None
self._activity_queue = ActivityQueue()
@property
def runtime_session(self) -> str:
return self._runtime_session
async def connect(self) -> bool:
if not self._bridge_token:
self._bridge_token = secrets.token_hex(32)
logger.info("[raft] Auto-generated bridge token")
app = web.Application()
app.router.add_get("/health", self._handle_health)
app.router.add_post(self._path, self._handle_wake)
app.router.add_post("/activity", self._handle_activity)
app.router.add_get("/activity/drain", self._handle_activity_drain)
if self._port != 0:
import socket as _socket
try:
with _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) as sock:
sock.settimeout(1)
sock.connect(("127.0.0.1", self._port))
logger.error(
"[raft] Port %d already in use. Set platforms.raft.extra.port in config",
self._port,
)
return False
except (ConnectionRefusedError, OSError):
pass
self._runner = web.AppRunner(app)
await self._runner.setup()
site = web.TCPSite(self._runner, self._host, self._port)
await site.start()
bound_port = self._port
if bound_port == 0 and site._server and site._server.sockets:
bound_port = site._server.sockets[0].getsockname()[1]
self._mark_connected()
with _ACTIVE_ADAPTERS_LOCK:
_ACTIVE_ADAPTERS.add(self)
logger.info("[raft] Raft channel listening on %s:%d%s", self._host, bound_port, self._path)
self._spawn_bridge(bound_port)
return True
async def disconnect(self) -> None:
self._stop_bridge()
if self._runner:
await self._runner.cleanup()
self._runner = None
with _ACTIVE_ADAPTERS_LOCK:
_ACTIVE_ADAPTERS.discard(self)
self._mark_disconnected()
logger.info("[raft] Disconnected")
def _spawn_bridge(self, port: int) -> None:
raft_bin = shutil.which("raft")
if not raft_bin:
logger.warning("[raft] raft CLI not found in PATH; bridge not spawned — wake-only polling mode")
return
profile = os.environ.get("RAFT_PROFILE", "")
if not profile:
logger.warning("[raft] RAFT_PROFILE not set; bridge not spawned")
return
endpoint = f"http://{self._host}:{port}{self._path}"
cmd: List[str] = [
raft_bin, "--profile", profile,
"agent", "bridge",
"--wake-adapter", "wake-channel",
"--wake-channel-endpoint", endpoint,
]
env = {**os.environ, "RAFT_CHANNEL_TOKEN": self._bridge_token}
try:
self._bridge_process = subprocess.Popen(cmd, env=env)
logger.info("[raft] Spawned bridge pid=%d profile=%s endpoint=%s", self._bridge_process.pid, profile, endpoint)
except Exception:
logger.exception("[raft] Failed to spawn bridge")
def _stop_bridge(self) -> None:
proc = self._bridge_process
if proc is None:
return
self._bridge_process = None
try:
proc.terminate()
proc.wait(timeout=5)
logger.info("[raft] Bridge process terminated (pid=%d)", proc.pid)
except subprocess.TimeoutExpired:
proc.kill()
logger.warning("[raft] Bridge process killed after timeout (pid=%d)", proc.pid)
except Exception:
logger.exception("[raft] Error stopping bridge")
async def send(
self,
chat_id: str,
content: str,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
logger.debug("[raft] adapter send is a no-op; agent delivers via raft CLI")
return SendResult(success=True)
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
return {"name": f"raft/{chat_id}", "type": "raft"}
async def _handle_health(self, request: "web.Request") -> "web.Response":
return web.json_response(
{
"status": "ok",
"platform": "raft",
"runtimeSession": self._runtime_session,
"activity": {
"queueSize": self._activity_queue.size,
"endpoint": "/activity",
"drainEndpoint": "/activity/drain",
},
}
)
async def _handle_wake(self, request: "web.Request") -> "web.Response":
if not self._validate_bridge_token(request.headers.get(BRIDGE_TOKEN_HEADER, "")):
return web.json_response({"ok": False, "error": "unauthorized"}, status=401)
content_length = request.content_length or 0
if content_length > self._max_body_bytes:
return web.json_response({"ok": False, "error": "payload_too_large"}, status=413)
try:
raw_body = await request.read()
except Exception:
return web.json_response({"ok": False, "error": "bad_request"}, status=400)
payload: Dict[str, Any] = {}
if raw_body.strip():
try:
parsed = json.loads(raw_body)
except json.JSONDecodeError:
return web.json_response({"ok": False, "error": "invalid_json"}, status=400)
if not isinstance(parsed, dict):
return web.json_response({"ok": False, "error": "invalid_payload"}, status=400)
payload = parsed
# Do not gate on payload["schema"]: the bridge owns schema evolution;
# Hermes only verifies that wake hints are content-free.
if _has_content_field(payload):
return web.json_response({"ok": False, "error": "content_not_allowed"}, status=400)
accepted = await self._accept_wake(payload)
if not accepted:
return web.json_response(
{
"ok": False,
"error": "not_ready",
"runtimeSession": self._runtime_session,
},
status=503,
)
return web.json_response(
{
"ok": True,
"runtimeSession": self._runtime_session,
},
status=202,
)
async def _handle_activity(self, request: "web.Request") -> "web.Response":
if not self._validate_bridge_token(request.headers.get(BRIDGE_TOKEN_HEADER, "")):
return web.json_response({"ok": False, "error": "unauthorized"}, status=401)
content_length = request.content_length or 0
if content_length > self._max_body_bytes:
return web.json_response({"ok": False, "error": "payload_too_large"}, status=413)
try:
payload = json.loads(await request.text())
self._activity_queue.push(payload)
except json.JSONDecodeError:
return web.json_response({"ok": False, "error": "invalid_json"}, status=400)
except Exception as exc:
return web.json_response({"ok": False, "error": str(exc)}, status=400)
return web.json_response({"ok": True}, status=202)
async def _handle_activity_drain(self, request: "web.Request") -> "web.Response":
if not self._validate_bridge_token(request.headers.get(BRIDGE_TOKEN_HEADER, "")):
return web.json_response({"ok": False, "error": "unauthorized"}, status=401)
try:
max_events = int(request.query.get("max", "200"))
except ValueError:
max_events = 200
return web.json_response(self._activity_queue.drain(max_events))
def _validate_bridge_token(self, token: str) -> bool:
if not self._bridge_token or not token:
return False
return hmac.compare_digest(token, self._bridge_token)
async def _accept_wake(self, payload: Dict[str, Any]) -> bool:
if not self._message_handler:
logger.warning("[raft] Wake received before gateway message handler was attached")
return False
delivery_id = str(
payload.get("eventId")
or payload.get("attemptId")
or payload.get("messageId")
or payload.get("delivery_id")
or payload.get("wake_id")
or payload.get("id")
or f"raft-wake-{int(time.time() * 1000)}"
)
source = self.build_source(
chat_id=self._runtime_session,
chat_name="Raft channel",
chat_type="dm",
user_id="raft-bridge",
user_name="Raft Bridge",
)
event = MessageEvent(
text=self._wake_prompt(),
message_type=MessageType.TEXT,
source=source,
raw_message=payload,
message_id=delivery_id,
internal=True,
)
try:
await self.handle_message(event)
except Exception:
logger.exception("[raft] Failed to inject wake event")
return False
return True
async def handle_message(self, event: MessageEvent) -> None:
"""Accept Raft wake hints without interrupting an active Hermes turn."""
if not self._message_handler:
return
session_key = build_session_key(
event.source,
group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True),
thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False),
)
if session_key in self._active_sessions:
logger.debug("[raft] Wake queued for busy session %s", session_key)
merge_pending_message_event(self._pending_messages, session_key, event)
return
await super().handle_message(event)
@staticmethod
def _wake_prompt() -> str:
return (
"Raft wake hint received. New Raft messages may be pending. "
"If you have not read the Raft manual in this session, run "
"`raft manual get raft-cli-overview` before using Raft commands."
)
def report_activity(self, event: Dict[str, Any]) -> None:
try:
self._activity_queue.push(event)
except Exception:
logger.debug("[raft] activity event dropped during validation", exc_info=True)
def _is_connected(config: PlatformConfig) -> bool:
extra = config.extra or {}
return bool(extra.get("enabled") or extra.get("bridge_token"))
def _env_enablement() -> Optional[dict]:
"""Seed PlatformConfig.extra from env vars during gateway config load.
Auto-enables when RAFT_PROFILE is set (the adapter needs it anyway).
"""
if not os.getenv("RAFT_PROFILE"):
return None
return {"enabled": True}
def register(ctx) -> None:
"""Plugin entry point — called by the Hermes plugin system."""
ctx.register_platform(
name="raft",
label="Raft",
adapter_factory=lambda cfg: RaftAdapter(cfg),
check_fn=check_raft_requirements,
is_connected=_is_connected,
required_env=["RAFT_PROFILE"],
install_hint="Install the Raft CLI from https://raft.build",
env_enablement_fn=_env_enablement,
emoji="🔔",
platform_hint=(
"You are connected to Raft via an external-agent channel. "
"Run `raft --profile {profile} profile show` to confirm which agent profile is active. "
"Run `raft --profile {profile} manual get raft-cli-overview` to learn available Raft commands. "
"Always pass `--profile {profile}` to every raft CLI call."
).format(profile=os.environ.get("RAFT_PROFILE", "your-agent-profile")),
)
ctx.register_hook("on_session_start", _on_session_start)
ctx.register_hook("pre_llm_call", _on_pre_llm_call)
ctx.register_hook("pre_tool_call", _on_pre_tool_call)
ctx.register_hook("post_tool_call", _on_post_tool_call)
ctx.register_hook("post_llm_call", _on_post_llm_call)
ctx.register_hook("on_session_end", _on_session_end)
ctx.register_hook("on_session_finalize", _on_session_finalize)

View file

@ -0,0 +1,19 @@
name: raft-platform
label: Raft
kind: platform
version: 1.0.0
description: >
Raft gateway adapter for Hermes Agent.
Connects to a Raft workspace as an external agent via a local
wake-channel bridge. The adapter starts a loopback HTTP endpoint
that receives content-free wake hints from the bridge, then
injects them into the Hermes gateway session pipeline. The agent
reads and sends messages through the Raft CLI — the adapter never
touches message bodies or delivery cursors.
author: botiverse
requires_env:
- name: RAFT_PROFILE
description: "Raft agent profile slug — auto-enables the adapter when set"
prompt: "Raft agent profile"
password: false
category: setting

View file

@ -0,0 +1,455 @@
"""Tests for the Raft channel adapter."""
import os
from unittest.mock import AsyncMock, patch
import pytest
from aiohttp import web
from aiohttp.test_utils import TestClient, TestServer
from gateway.config import Platform, PlatformConfig
from plugins.platforms.raft.adapter import (
ACTIVITY_DRAIN_SCHEMA,
ACTIVITY_EVENT_SCHEMA,
ActivityQueue,
BRIDGE_TOKEN_HEADER,
DEFAULT_PATH,
RaftAdapter,
_ACTIVE_ADAPTERS,
_ACTIVE_ADAPTERS_LOCK,
_RAFT_CONTEXT_LOCK,
_RAFT_PROMPT_TURN_IDS,
_RAFT_SESSION_IDS,
_RAFT_TURN_IDS,
_has_content_field,
_env_enablement,
_is_connected,
_on_session_start,
_on_pre_llm_call,
_on_pre_tool_call,
_on_post_llm_call,
_on_post_tool_call,
_on_session_end,
_on_session_finalize,
check_raft_requirements,
register,
)
from gateway.session import build_session_key
RAFT_CHANNEL_SCHEMA = "raft-channel-wake.v1"
FUTURE_RAFT_CHANNEL_SCHEMA = "raft-channel-wake.v2"
def _make_config(**extra):
data = {
"bridge_token": "bridge-secret",
"runtime_session": "default",
"port": 0,
}
data.update(extra)
return PlatformConfig(enabled=True, extra=data)
def _make_adapter(**extra):
return RaftAdapter(_make_config(**extra))
def _create_app(adapter: RaftAdapter) -> web.Application:
app = web.Application()
app.router.add_get("/health", adapter._handle_health)
app.router.add_post(adapter._path, adapter._handle_wake)
app.router.add_post("/activity", adapter._handle_activity)
app.router.add_get("/activity/drain", adapter._handle_activity_drain)
return app
def _activity_event(event_id: str, **overrides):
event = {
"schema": ACTIVITY_EVENT_SCHEMA,
"eventId": event_id,
"sessionId": "session-1",
"hookEventName": "PreToolUse",
"status": "ok",
"occurredAt": "2026-06-16T06:00:00Z",
"toolName": "execute_code",
}
event.update(overrides)
return event
class TestRaftWakePayload:
def test_detects_content_fields(self):
assert _has_content_field({"text": "hello"}) is True
assert _has_content_field({"nested": {"messages": []}}) is True
assert _has_content_field({"eventId": "evt-1", "messageId": "msg-1"}) is False
class TestRaftWakeHttp:
@pytest.mark.asyncio
async def test_send_is_noop_success(self):
adapter = _make_adapter()
result = await adapter.send("default", "hello")
assert result.success is True
assert result.message_id is None
@pytest.mark.asyncio
async def test_rejects_missing_bridge_token(self):
adapter = _make_adapter()
adapter.handle_message = AsyncMock()
app = _create_app(adapter)
async with TestClient(TestServer(app)) as client:
resp = await client.post(DEFAULT_PATH, json={"eventId": "wake-1"})
assert resp.status == 401
body = await resp.json()
assert body["ok"] is False
adapter.handle_message.assert_not_called()
@pytest.mark.asyncio
async def test_rejects_content_bearing_payload(self):
adapter = _make_adapter()
adapter.set_message_handler(AsyncMock())
adapter.handle_message = AsyncMock()
app = _create_app(adapter)
async with TestClient(TestServer(app)) as client:
resp = await client.post(
DEFAULT_PATH,
json={"eventId": "wake-1", "text": "do work"},
headers={BRIDGE_TOKEN_HEADER: "bridge-secret"},
)
assert resp.status == 400
body = await resp.json()
assert body == {"ok": False, "error": "content_not_allowed"}
adapter.handle_message.assert_not_called()
@pytest.mark.asyncio
async def test_returns_not_ready_without_gateway_handler(self):
adapter = _make_adapter()
app = _create_app(adapter)
async with TestClient(TestServer(app)) as client:
resp = await client.post(
DEFAULT_PATH,
json={"eventId": "wake-1"},
headers={BRIDGE_TOKEN_HEADER: "bridge-secret"},
)
assert resp.status == 503
body = await resp.json()
assert body["ok"] is False
assert body["runtimeSession"] == "default"
@pytest.mark.asyncio
@pytest.mark.parametrize("schema", [RAFT_CHANNEL_SCHEMA, FUTURE_RAFT_CHANNEL_SCHEMA])
async def test_accepts_content_free_wake_as_internal_event(self, schema):
adapter = _make_adapter()
adapter.set_message_handler(AsyncMock())
adapter.handle_message = AsyncMock()
app = _create_app(adapter)
async with TestClient(TestServer(app)) as client:
resp = await client.post(
DEFAULT_PATH,
json={
"schema": schema,
"attemptId": "attempt-1",
"eventId": "wake-1",
"messageId": "msg-1",
"agentId": "agent-1",
"profile": "dev",
"coreSessionId": "default",
"adapterInstance": "hermes",
"occurredAt": "2026-06-11T08:00:00Z",
},
headers={BRIDGE_TOKEN_HEADER: "bridge-secret"},
)
assert resp.status == 202
body = await resp.json()
assert body == {"ok": True, "runtimeSession": "default"}
adapter.handle_message.assert_awaited_once()
event = adapter.handle_message.await_args.args[0]
assert event.internal is True
assert event.message_id == "wake-1"
assert event.raw_message["schema"] == schema
assert event.raw_message["eventId"] == "wake-1"
assert event.raw_message["attemptId"] == "attempt-1"
assert event.raw_message["messageId"] == "msg-1"
assert event.source.platform == Platform("raft")
assert event.source.chat_id == "default"
assert "raft manual get" in event.text
@pytest.mark.asyncio
async def test_busy_session_queues_without_interrupt(self):
handler = AsyncMock()
adapter = _make_adapter()
adapter.set_message_handler(handler)
source = adapter.build_source(
chat_id="default",
chat_name="Raft channel",
chat_type="dm",
user_id="raft-bridge",
user_name="Raft Bridge",
)
session_key = build_session_key(source)
adapter._active_sessions[session_key] = __import__("asyncio").Event()
accepted = await adapter._accept_wake({"eventId": "wake-busy"})
assert accepted is True
handler.assert_not_called()
assert session_key in adapter._pending_messages
pending = adapter._pending_messages[session_key]
assert pending.message_id == "wake-busy"
assert "raft manual get" in pending.text
class TestRaftActivityHttp:
@pytest.mark.asyncio
async def test_activity_endpoint_auth_validation_and_drain(self):
adapter = _make_adapter()
adapter._activity_queue = ActivityQueue(cap=2)
app = _create_app(adapter)
async with TestClient(TestServer(app)) as client:
unauthorized = await client.post("/activity", json=_activity_event("evt-1"))
assert unauthorized.status == 401
unknown = await client.post(
"/activity",
json={**_activity_event("evt-1"), "transcript_path": "/tmp/session.jsonl"},
headers={BRIDGE_TOKEN_HEADER: "bridge-secret"},
)
assert unknown.status == 400
for event_id in ["evt-1", "evt-2", "evt-3"]:
resp = await client.post(
"/activity",
json=_activity_event(event_id),
headers={BRIDGE_TOKEN_HEADER: "bridge-secret"},
)
assert resp.status == 202
drain = await client.get(
"/activity/drain?max=10",
headers={BRIDGE_TOKEN_HEADER: "bridge-secret"},
)
assert drain.status == 200
body = await drain.json()
assert body["schema"] == ACTIVITY_DRAIN_SCHEMA
assert body["dropped"] == 1
assert [event["eventId"] for event in body["events"]] == ["evt-2", "evt-3"]
def test_hook_mapping_reports_only_raft_context(self):
adapter = _make_adapter()
with _RAFT_CONTEXT_LOCK:
_RAFT_PROMPT_TURN_IDS.clear()
_RAFT_SESSION_IDS.clear()
_RAFT_TURN_IDS.clear()
with _ACTIVE_ADAPTERS_LOCK:
_ACTIVE_ADAPTERS.add(adapter)
try:
_on_pre_tool_call(
session_id="session-1",
turn_id="turn-1",
tool_name="execute_code",
args={"cmd": "echo nope"},
)
assert adapter._activity_queue.drain(10)["events"] == []
_on_pre_llm_call(
platform="raft",
session_id="session-1",
turn_id="turn-1",
user_message="run a probe",
)
_on_pre_llm_call(
platform="raft",
session_id="session-1",
turn_id="turn-1",
user_message="run a follow-up LLM call in the same turn",
)
_on_pre_tool_call(
session_id="session-1",
turn_id="turn-1",
tool_name="execute_code",
args={"cmd": "echo ok"},
)
_on_post_tool_call(
session_id="session-1",
turn_id="turn-1",
tool_name="execute_code",
args={"cmd": "echo ok"},
result="ok",
status="ok",
duration_ms=321,
)
_on_post_llm_call(
platform="raft",
session_id="session-1",
turn_id="turn-1",
assistant_response="done",
)
_on_session_end(
platform="raft",
session_id="session-1",
turn_id="turn-1",
completed=True,
interrupted=False,
)
_on_session_finalize(
platform="raft",
session_id="session-1",
reason="shutdown",
)
drain = adapter._activity_queue.drain(10)
finally:
with _ACTIVE_ADAPTERS_LOCK:
_ACTIVE_ADAPTERS.discard(adapter)
with _RAFT_CONTEXT_LOCK:
_RAFT_PROMPT_TURN_IDS.clear()
_RAFT_SESSION_IDS.clear()
_RAFT_TURN_IDS.clear()
assert [event["hookEventName"] for event in drain["events"]] == [
"UserPromptSubmit",
"PreToolUse",
"PostToolUse",
"Stop",
"SessionEnd",
]
tool_start = drain["events"][1]
assert tool_start["toolName"] == "execute_code"
assert '"cmd": "echo ok"' in tool_start["toolInput"]
tool_result = drain["events"][2]
assert tool_result["durationMs"] == 321
def test_session_start_registers_raft_profile_env_passthrough(self):
import tools.env_passthrough as env_passthrough_mod
from tools.code_execution_tool import _scrub_child_env
from tools.environments.local import _make_run_env
from tools.env_passthrough import clear_env_passthrough, is_env_passthrough
previous_config_passthrough = env_passthrough_mod._config_passthrough
clear_env_passthrough()
env_passthrough_mod._config_passthrough = frozenset()
with _RAFT_CONTEXT_LOCK:
_RAFT_PROMPT_TURN_IDS.clear()
_RAFT_SESSION_IDS.clear()
_RAFT_TURN_IDS.clear()
try:
assert "RAFT_PROFILE" not in _scrub_child_env(
{"RAFT_PROFILE": "dev"},
is_windows=False,
)
_on_session_start(session_id="session-1", turn_id="turn-1")
assert not is_env_passthrough("RAFT_PROFILE")
_on_session_start(platform="raft", session_id="session-1", turn_id="turn-1")
assert is_env_passthrough("RAFT_PROFILE")
assert _scrub_child_env({"RAFT_PROFILE": "dev"}, is_windows=False)["RAFT_PROFILE"] == "dev"
with patch.dict(os.environ, {"PATH": "/usr/bin", "RAFT_PROFILE": "dev"}, clear=True):
assert _make_run_env({})["RAFT_PROFILE"] == "dev"
finally:
clear_env_passthrough()
env_passthrough_mod._config_passthrough = previous_config_passthrough
with _RAFT_CONTEXT_LOCK:
_RAFT_PROMPT_TURN_IDS.clear()
_RAFT_SESSION_IDS.clear()
_RAFT_TURN_IDS.clear()
def test_interrupted_turn_reports_error_stop(self):
adapter = _make_adapter()
with _RAFT_CONTEXT_LOCK:
_RAFT_PROMPT_TURN_IDS.clear()
_RAFT_SESSION_IDS.clear()
_RAFT_TURN_IDS.clear()
with _ACTIVE_ADAPTERS_LOCK:
_ACTIVE_ADAPTERS.add(adapter)
try:
_on_pre_llm_call(
platform="raft",
session_id="session-1",
turn_id="turn-1",
)
_on_session_end(
platform="raft",
session_id="session-1",
turn_id="turn-1",
completed=False,
interrupted=True,
)
drain = adapter._activity_queue.drain(10)
finally:
with _ACTIVE_ADAPTERS_LOCK:
_ACTIVE_ADAPTERS.discard(adapter)
with _RAFT_CONTEXT_LOCK:
_RAFT_PROMPT_TURN_IDS.clear()
_RAFT_SESSION_IDS.clear()
_RAFT_TURN_IDS.clear()
assert [event["hookEventName"] for event in drain["events"]] == [
"UserPromptSubmit",
"Stop",
]
assert drain["events"][1]["status"] == "error"
assert drain["events"][1]["errorClass"] == "interrupted"
class TestRaftConfig:
def test_env_enablement_auto_enables_with_raft_profile(self, monkeypatch):
monkeypatch.setenv("RAFT_PROFILE", "my-agent")
extra = _env_enablement()
assert extra is not None
assert extra["enabled"] is True
def test_env_enablement_returns_none_without_profile(self, monkeypatch):
monkeypatch.delenv("RAFT_PROFILE", raising=False)
assert _env_enablement() is None
def test_is_connected_checks_bridge_token_or_enabled(self):
assert _is_connected(PlatformConfig(enabled=True, extra={"bridge_token": "tok"})) is True
assert _is_connected(PlatformConfig(enabled=True, extra={"enabled": True})) is True
assert _is_connected(PlatformConfig(enabled=True, extra={})) is False
def test_register_calls_register_platform(self):
registered = {}
hooks = {}
class FakeCtx:
def register_platform(self, **kwargs):
registered.update(kwargs)
def register_hook(self, name, handler):
hooks[name] = handler
register(FakeCtx())
assert registered["name"] == "raft"
assert registered["label"] == "Raft"
assert registered["emoji"] == "🔔"
assert "profile show" in registered["platform_hint"]
assert "manual get" in registered["platform_hint"]
assert "--profile" in registered["platform_hint"]
assert hooks == {
"on_session_start": _on_session_start,
"pre_llm_call": _on_pre_llm_call,
"pre_tool_call": _on_pre_tool_call,
"post_tool_call": _on_post_tool_call,
"post_llm_call": _on_post_llm_call,
"on_session_end": _on_session_end,
"on_session_finalize": _on_session_finalize,
}

View file

@ -1,7 +1,7 @@
---
sidebar_position: 1
title: "Messaging Gateway"
description: "Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, SMS, Email, Home Assistant, Mattermost, Matrix, DingTalk, Yuanbao, Microsoft Teams, LINE, Webhooks, or any OpenAI-compatible frontend via the API server — architecture and setup overview"
description: "Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, SMS, Email, Home Assistant, Mattermost, Matrix, DingTalk, Yuanbao, Microsoft Teams, LINE, Raft, Webhooks, or any OpenAI-compatible frontend via the API server — architecture and setup overview"
---
# Messaging Gateway
@ -40,6 +40,7 @@ Bots need both a model provider and tool providers (TTS, web). A [Nous Portal](/
| Microsoft Teams | — | ✅ | — | ✅ | — | ✅ | — |
| LINE | — | ✅ | ✅ | — | — | ✅ | — |
| ntfy | — | — | — | — | — | — | — |
| Raft | — | — | — | — | — | — | — |
**Voice** = TTS audio replies and/or voice message transcription. **Images** = send/receive images. **Files** = send/receive file attachments. **Threads** = threaded conversations. **Reactions** = emoji reactions on messages. **Typing** = typing indicator while processing. **Streaming** = progressive message updates via editing.
@ -511,6 +512,7 @@ Each platform has its own toolset:
| Microsoft Teams | `hermes-teams` | Full tools including terminal |
| API Server | `hermes-api-server` | Full tools (drops `clarify`, `send_message`, `text_to_speech` — programmatic access doesn't have an interactive user) |
| Webhooks | `hermes-webhook` | Full tools including terminal |
| Raft | `hermes-raft` | Wake-only channel; agent uses Raft CLI for message I/O |
## Operating a multi-platform gateway
@ -639,4 +641,5 @@ Defaults to `false`. Only platforms whose adapter implements `delete_message` ho
- [Microsoft Teams Setup](teams.md)
- [Teams Meetings Pipeline](teams-meetings.md)
- [Open WebUI + API Server](open-webui.md)
- [Raft Setup](raft.md)
- [Webhooks](webhooks.md)

View file

@ -0,0 +1,70 @@
---
sidebar_position: 19
title: "Raft"
description: "Connect Hermes Agent to Raft as an external agent via wake-channel bridge"
---
# Raft Setup
Hermes connects to [Raft](https://raft.build) as an external agent through a local wake-channel bridge. The adapter starts a loopback HTTP endpoint that receives content-free wake hints from the bridge, then injects them into the Hermes gateway session pipeline. The agent reads and sends messages through the Raft CLI — the adapter never touches message bodies or delivery cursors.
:::info Division of Labor
- **The bridge** owns: wake-hint consumption, dedup, backoff, reconnection, at-least-once delivery, and proof logging.
- **The Hermes adapter** owns: a localhost wake endpoint and injecting a short notice into the agent's context.
- **The agent** owns: pulling messages (`raft message check`), replying (`raft message send`), and all other Raft interactions via the CLI.
The adapter holds no Raft credentials — only a per-session shared token for localhost auth between the bridge and the endpoint.
:::
---
## Prerequisites
- A **Raft workspace** where you can create an External Agent
- The **Raft CLI** installed and logged in to that External Agent profile
- **aiohttp** — Python package (included in Hermes `[all]` extras)
In Raft, open the Agents menu, create an External Agent, and follow the setup card to install the Raft CLI and log in the agent profile. Once the agent is created, Raft shows a Hermes setup guide with the environment variables and configuration needed to start the gateway.
---
## Setup
Add to `~/.hermes/.env`:
```bash
RAFT_PROFILE=your-agent-profile
```
That's it — the adapter auto-enables when `RAFT_PROFILE` is set. It generates a per-session bridge token, picks an ephemeral port, and spawns the bridge child process automatically when the gateway starts.
---
## How It Works
```
Raft Server → Bridge (wake-hints SSE) → POST /wake → Hermes Adapter → Agent context
Agent → raft message check → Raft Server (message bodies)
Agent → raft message send → Raft Server (replies)
```
1. The Raft server sends wake hints to the bridge process via SSE.
2. The bridge forwards each hint as a `POST /wake` to the adapter's loopback endpoint.
3. The adapter validates the bridge token, verifies the payload is content-free, and injects a wake notice into the Hermes session.
4. The agent sees the wake notice and uses the Raft CLI to read messages and reply.
Wake payloads are **content-free by contract** — they carry metadata (event ID, message ID, timestamps) but never message bodies, channel names, or sender identities. The adapter rejects any payload containing content-shaped fields (`text`, `body`, `content`, `messages`, etc.).
---
## Bridge
The adapter automatically spawns `raft agent bridge` as a child process, passing the endpoint URL and token. The bridge connects to the Raft server using the configured profile and begins forwarding wake hints. It is terminated when the gateway shuts down.
---
## Environment Variables
| Variable | Description | Default |
|----------|-------------|---------|
| `RAFT_PROFILE` | Raft agent profile slug — auto-enables the adapter when set | _(required)_ |