## Performance Optimizations (3-10x faster responses) - STT beam_size reduced to 1 (3-5x faster transcription, minimal quality loss) - Smart query routing: Haiku (simple) → Sonnet (medium) → Opus (complex) - TTS cache for common phrases (27 pre-generated responses) - Sentence-level streaming TTS (start playing while generating) - Sample-based VAD timing (30x improvement in silence detection) ## TTS Engine Upgrade - Migrated from Chatterbox to Chatterbox-Turbo - Zero-shot voice cloning (no fine-tuning required) - Native paralinguistic tag support ([laugh], [sigh], [chuckle], etc.) - Emotion presets with temperature control - Improved marker conversion (*action*, (action), ~action~) ## Discord Bot Enhancements - Multi-agent support (Jarvis, Sage) - Improved voice receiving with discord-ext-voice-recv - Enhanced /join, /leave, /status commands - Per-agent personality configuration - Better audio sink/receiver implementation ## OpenClaw Integration - WebSocket support for Gateway communication - Query complexity routing (auto-select model) - Improved error handling and retries - Session management per Discord guild - Better latency tracking ## Pipeline Improvements - Sentence splitter for streaming optimization - Query router for intelligent model selection - Enhanced VAD receiver with sample-based timing - Improved audio buffering and format conversion - Better transcript management ## Documentation - Added QUICK_START.md (5-minute test guide) - Added OPTIMIZATION_SUMMARY.md (performance analysis) - Added DISCORD_OPTIMIZATION_TEST.md (testing guide) - Added USAGE_GUIDE.md (comprehensive usage) - Updated README.md with optimization details ## Utilities & Scripts - Added get_invite_link.py (Discord bot invite) - Added sync_commands.py, sync_to_guild.py (command sync) - Added test_gateway.py, test_stt.py (testing utilities) - Added openclaw_wrapper.py (wrapper script) - Removed create_mock_turn_model.py (no longer needed) ## Configuration Updates - STT model: medium → small (faster, acceptable quality) - TTS engine: chatterbox → coqui (Turbo integration) - Beam size: 5 → 1 (latency optimization) - Added emotion_exaggeration per agent - Updated .gitignore for project files Total: ~2105 insertions, ~462 deletions across 35 files Performance: ~5.5s total latency (down from 22-35s) Target: ~3.5s (achieved in simple queries with cache) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
870 lines
26 KiB
Python
870 lines
26 KiB
Python
"""OpenClaw Gateway WebSocket JSON-RPC client.
|
|
|
|
Implements the OpenClaw Gateway protocol for agent response generation.
|
|
Connects via WebSocket to OpenClaw Gateway running on Synology NAS.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import time
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from typing import AsyncIterator, Dict, Optional
|
|
|
|
import websockets
|
|
from websockets.exceptions import ConnectionClosed
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class OpenClawConfig:
|
|
"""Configuration for OpenClaw Gateway client."""
|
|
|
|
# WebSocket URL for OpenClaw Gateway
|
|
base_url: str = "ws://192.168.50.9:18789"
|
|
|
|
# Authentication token (from OPENCLAW_AUTH_TOKEN env var)
|
|
auth_token: Optional[str] = None
|
|
|
|
# Request timeout (seconds)
|
|
timeout: float = 8.0
|
|
|
|
# Retry timeout for second attempt
|
|
retry_timeout: float = 15.0
|
|
|
|
# Maximum number of retries
|
|
max_retries: int = 1
|
|
|
|
# Agent ID for session keys
|
|
agent_id: str = "main"
|
|
|
|
# Session scope: "per-peer" or "shared"
|
|
session_scope: str = "per-peer"
|
|
|
|
|
|
class OpenClawClient:
|
|
"""
|
|
WebSocket client for OpenClaw Gateway JSON-RPC protocol.
|
|
|
|
Manages connection, handshake, and chat message exchange with
|
|
OpenClaw Gateway running on Synology NAS.
|
|
"""
|
|
|
|
# Agent personalities (for system context)
|
|
AGENT_PERSONALITIES = {
|
|
"main": (
|
|
"You are an intelligent and helpful AI assistant "
|
|
"participating in a Discord voice conversation. You are knowledgeable, "
|
|
"professional, and provide thoughtful, concise responses. "
|
|
"You speak naturally in conversation, avoiding overly formal language."
|
|
),
|
|
"jarvis": (
|
|
"You are Jarvis, an intelligent and helpful AI assistant "
|
|
"participating in a Discord voice conversation. You are knowledgeable, "
|
|
"professional, and provide thoughtful, concise responses. "
|
|
"You speak naturally in conversation, avoiding overly formal language."
|
|
),
|
|
"sage": (
|
|
"You are Sage, a wise and insightful AI assistant "
|
|
"participating in a Discord voice conversation. You offer deep insights "
|
|
"and thoughtful perspectives. You are calm, measured, and speak with "
|
|
"clarity and wisdom."
|
|
),
|
|
}
|
|
|
|
def __init__(self, config: OpenClawConfig):
|
|
"""
|
|
Initialize OpenClaw Gateway client.
|
|
|
|
Args:
|
|
config: Client configuration
|
|
"""
|
|
self.config = config
|
|
|
|
# WebSocket connection
|
|
self._ws: Optional[websockets.WebSocketClientProtocol] = None
|
|
self._connected = False
|
|
|
|
# Request/response tracking
|
|
self._pending: Dict[str, asyncio.Future] = {}
|
|
self._chat_waiters: Dict[str, asyncio.Future] = {}
|
|
self._stream_queues: Dict[str, asyncio.Queue] = {} # For streaming responses
|
|
|
|
# Background listener task
|
|
self._listener_task: Optional[asyncio.Task] = None
|
|
|
|
# Reconnection lock
|
|
self._reconnect_lock = asyncio.Lock()
|
|
|
|
# Stats
|
|
self.total_requests = 0
|
|
self.total_failures = 0
|
|
self.total_retries = 0
|
|
self.total_latency = 0.0
|
|
|
|
@property
|
|
def is_connected(self) -> bool:
|
|
"""Check if client is connected to Gateway."""
|
|
return self._connected
|
|
|
|
async def connect(self) -> None:
|
|
"""
|
|
Establish WebSocket connection and complete the handshake.
|
|
|
|
Protocol:
|
|
1. Connect to WebSocket
|
|
2. Wait for connect.challenge event
|
|
3. Send connect request with auth
|
|
4. Wait for hello-ok response
|
|
5. Start background listener
|
|
|
|
Raises:
|
|
ConnectionError: If handshake fails
|
|
"""
|
|
url = self.config.base_url
|
|
logger.info(f"Connecting to OpenClaw Gateway at {url}")
|
|
|
|
# Connect WebSocket
|
|
self._ws = await websockets.connect(url, max_size=10 * 1024 * 1024)
|
|
|
|
# Wait for connect.challenge
|
|
challenge_msg = await asyncio.wait_for(self._ws.recv(), timeout=10)
|
|
challenge = json.loads(challenge_msg)
|
|
|
|
if challenge.get("event") != "connect.challenge":
|
|
raise ConnectionError(
|
|
f"Expected connect.challenge, got: {challenge.get('event')}"
|
|
)
|
|
|
|
nonce = challenge["payload"]["nonce"]
|
|
logger.debug(f"Received challenge nonce: {nonce}")
|
|
|
|
# Send connect request
|
|
connect_params = {
|
|
"minProtocol": 3,
|
|
"maxProtocol": 5,
|
|
"client": {
|
|
"id": "gateway-client",
|
|
"displayName": "OpenClaw Voice Bot",
|
|
"version": "1.0.0",
|
|
"platform": "custom",
|
|
"mode": "backend",
|
|
},
|
|
"role": "operator",
|
|
"caps": [],
|
|
"commands": [],
|
|
"permissions": {},
|
|
"scopes": ["chat", "operator.read", "operator.write"],
|
|
"auth": {},
|
|
}
|
|
|
|
if self.config.auth_token:
|
|
connect_params["auth"] = {"token": self.config.auth_token}
|
|
|
|
connect_id = self._new_id()
|
|
frame = {
|
|
"type": "req",
|
|
"id": connect_id,
|
|
"method": "connect",
|
|
"params": connect_params,
|
|
}
|
|
await self._ws.send(json.dumps(frame))
|
|
|
|
# Read hello response
|
|
resp_msg = await asyncio.wait_for(self._ws.recv(), timeout=10)
|
|
resp = json.loads(resp_msg)
|
|
|
|
if not resp.get("ok"):
|
|
error = resp.get("error", {})
|
|
raise ConnectionError(
|
|
f"Gateway connect failed: {error.get('message', 'unknown')}"
|
|
)
|
|
|
|
server_info = resp.get("payload", {}).get("server", {})
|
|
logger.info(
|
|
f"Connected to OpenClaw Gateway "
|
|
f"(version={server_info.get('version', '?')}, "
|
|
f"connId={server_info.get('connId', '?')})"
|
|
)
|
|
self._connected = True
|
|
|
|
# Start background listener for subsequent messages
|
|
self._listener_task = asyncio.create_task(self._listen())
|
|
|
|
async def disconnect(self) -> None:
|
|
"""Gracefully close the Gateway connection."""
|
|
self._connected = False
|
|
|
|
if self._listener_task:
|
|
self._listener_task.cancel()
|
|
self._listener_task = None
|
|
|
|
if self._ws:
|
|
await self._ws.close()
|
|
self._ws = None
|
|
|
|
# Cancel all pending requests
|
|
for fut in self._pending.values():
|
|
if not fut.done():
|
|
fut.cancel()
|
|
|
|
for fut in self._chat_waiters.values():
|
|
if not fut.done():
|
|
fut.cancel()
|
|
|
|
self._pending.clear()
|
|
self._chat_waiters.clear()
|
|
self._stream_queues.clear()
|
|
|
|
async def send_message(
|
|
self,
|
|
agent: str,
|
|
message: str,
|
|
context: str = "",
|
|
speaker: Optional[str] = None,
|
|
model: Optional[str] = None,
|
|
) -> str:
|
|
"""
|
|
Send message to agent and get response.
|
|
|
|
Args:
|
|
agent: Agent name ("jarvis" or "sage")
|
|
message: User's message/utterance
|
|
context: Recent conversation context (not used with Gateway)
|
|
speaker: Speaker name/ID (used for session key)
|
|
model: Optional model override (e.g., "claude-haiku-3.5", "claude-sonnet-4")
|
|
|
|
Returns:
|
|
Agent's response text
|
|
|
|
Raises:
|
|
RuntimeError: If request fails after retries
|
|
ValueError: If agent is invalid
|
|
"""
|
|
agent_lower = agent.lower()
|
|
if agent_lower not in self.AGENT_PERSONALITIES:
|
|
raise ValueError(
|
|
f"Invalid agent: {agent}. "
|
|
f"Choose from: {list(self.AGENT_PERSONALITIES.keys())}"
|
|
)
|
|
|
|
self.total_requests += 1
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Ensure connected
|
|
await self._ensure_connected()
|
|
|
|
# Build session key
|
|
session_key = self._build_session_key(speaker or "default")
|
|
|
|
# Try with normal timeout
|
|
response = await self._send_chat(
|
|
session_key, message, timeout=self.config.timeout, model=model
|
|
)
|
|
|
|
latency = time.time() - start_time
|
|
self.total_latency += latency
|
|
|
|
logger.info(
|
|
f"Agent {agent} responded in {latency:.2f}s: "
|
|
f'"{response[:50]}..."'
|
|
)
|
|
|
|
return response
|
|
|
|
except asyncio.TimeoutError:
|
|
logger.warning(
|
|
f"First attempt timeout ({self.config.timeout}s), retrying..."
|
|
)
|
|
self.total_retries += 1
|
|
|
|
try:
|
|
# Retry with extended timeout
|
|
await self._ensure_connected()
|
|
session_key = self._build_session_key(speaker or "default")
|
|
|
|
response = await self._send_chat(
|
|
session_key, message, timeout=self.config.retry_timeout, model=model
|
|
)
|
|
|
|
latency = time.time() - start_time
|
|
self.total_latency += latency
|
|
|
|
logger.info(
|
|
f"Agent {agent} responded on retry in {latency:.2f}s"
|
|
)
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
self.total_failures += 1
|
|
logger.error(f"OpenClaw request failed after retry: {e}")
|
|
raise RuntimeError(
|
|
f"Failed to get response from {agent} after retry: {e}"
|
|
)
|
|
|
|
except Exception as e:
|
|
self.total_failures += 1
|
|
logger.error(f"OpenClaw request failed: {e}")
|
|
raise RuntimeError(f"Failed to get response from {agent}: {e}")
|
|
|
|
async def _send_chat(
|
|
self, session_key: str, message: str, timeout: float = 120, model: Optional[str] = None
|
|
) -> str:
|
|
"""
|
|
Send a chat message and wait for the final response text.
|
|
|
|
Args:
|
|
session_key: OpenClaw session key (e.g. "agent:main:discord:dm:123")
|
|
message: User's transcribed speech
|
|
timeout: Max seconds to wait for AI response
|
|
model: Optional model override (e.g., "claude-haiku-3.5")
|
|
|
|
Returns:
|
|
Agent's response text
|
|
|
|
Raises:
|
|
RuntimeError: If chat.send fails
|
|
asyncio.TimeoutError: If response takes too long
|
|
"""
|
|
idempotency_key = f"voice-{uuid.uuid4().hex[:12]}"
|
|
req_id = self._new_id()
|
|
|
|
try:
|
|
# Build chat.send params
|
|
params = {
|
|
"sessionKey": session_key,
|
|
"message": message,
|
|
"deliver": True,
|
|
"idempotencyKey": idempotency_key,
|
|
"timeoutMs": int(timeout * 1000),
|
|
}
|
|
|
|
# Add model override if specified
|
|
if model:
|
|
params["model"] = model
|
|
|
|
# Send chat.send request
|
|
await self._send_request(
|
|
req_id,
|
|
"chat.send",
|
|
params,
|
|
)
|
|
|
|
# Wait for RPC acknowledgement to get server-assigned runId
|
|
resp = await self._wait_response(req_id, timeout=15)
|
|
if not resp.get("ok"):
|
|
error = resp.get("error", {})
|
|
raise RuntimeError(
|
|
f"chat.send failed: {error.get('message', 'unknown')}"
|
|
)
|
|
|
|
# Use server-assigned runId as waiter key
|
|
run_id = resp.get("payload", {}).get("runId", idempotency_key)
|
|
|
|
# Create waiter for final response
|
|
waiter: asyncio.Future[str] = asyncio.get_running_loop().create_future()
|
|
self._chat_waiters[run_id] = waiter
|
|
|
|
try:
|
|
result = await asyncio.wait_for(waiter, timeout=timeout)
|
|
return result
|
|
finally:
|
|
self._chat_waiters.pop(run_id, None)
|
|
|
|
except Exception:
|
|
# Clean up any waiter that might have been registered
|
|
self._chat_waiters.pop(idempotency_key, None)
|
|
raise
|
|
|
|
async def send_message_streaming(
|
|
self,
|
|
agent: str,
|
|
message: str,
|
|
context: str = "",
|
|
speaker: Optional[str] = None,
|
|
model: Optional[str] = None,
|
|
) -> AsyncIterator[str]:
|
|
"""
|
|
Send message and stream response chunks in real-time.
|
|
|
|
Args:
|
|
agent: Agent name ("jarvis" or "sage")
|
|
message: User's message/utterance
|
|
context: Recent conversation context (not used with Gateway)
|
|
speaker: Speaker name/ID (used for session key)
|
|
model: Optional model override
|
|
|
|
Yields:
|
|
Text chunks as they arrive from the LLM
|
|
|
|
Raises:
|
|
RuntimeError: If request fails
|
|
ValueError: If agent is invalid
|
|
"""
|
|
agent_lower = agent.lower()
|
|
if agent_lower not in self.AGENT_PERSONALITIES:
|
|
raise ValueError(
|
|
f"Invalid agent: {agent}. "
|
|
f"Choose from: {list(self.AGENT_PERSONALITIES.keys())}"
|
|
)
|
|
|
|
self.total_requests += 1
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Ensure connected
|
|
await self._ensure_connected()
|
|
|
|
# Build session key
|
|
session_key = self._build_session_key(speaker or "default")
|
|
|
|
# Stream the chat response
|
|
async for chunk in self._send_chat_streaming(
|
|
session_key, message, model=model
|
|
):
|
|
yield chunk
|
|
|
|
latency = time.time() - start_time
|
|
self.total_latency += latency
|
|
|
|
logger.info(
|
|
f"Agent {agent} streaming response completed in {latency:.2f}s"
|
|
)
|
|
|
|
except Exception as e:
|
|
self.total_failures += 1
|
|
logger.error(f"OpenClaw streaming request failed: {e}")
|
|
raise RuntimeError(f"Failed to get streaming response from {agent}: {e}")
|
|
|
|
async def _send_chat_streaming(
|
|
self, session_key: str, message: str, model: Optional[str] = None, timeout: float = 120
|
|
) -> AsyncIterator[str]:
|
|
"""
|
|
Send a chat message and stream response chunks.
|
|
|
|
Args:
|
|
session_key: OpenClaw session key
|
|
message: User's transcribed speech
|
|
model: Optional model override
|
|
timeout: Max seconds to wait for response
|
|
|
|
Yields:
|
|
Text deltas as they arrive
|
|
|
|
Raises:
|
|
RuntimeError: If chat.send fails
|
|
asyncio.TimeoutError: If response takes too long
|
|
"""
|
|
idempotency_key = f"voice-stream-{uuid.uuid4().hex[:12]}"
|
|
req_id = self._new_id()
|
|
|
|
try:
|
|
# Build chat.send params
|
|
params = {
|
|
"sessionKey": session_key,
|
|
"message": message,
|
|
"deliver": True,
|
|
"idempotencyKey": idempotency_key,
|
|
"timeoutMs": int(timeout * 1000),
|
|
}
|
|
|
|
if model:
|
|
params["model"] = model
|
|
|
|
# Send chat.send request
|
|
await self._send_request(req_id, "chat.send", params)
|
|
|
|
# Wait for RPC acknowledgement
|
|
resp = await self._wait_response(req_id, timeout=15)
|
|
if not resp.get("ok"):
|
|
error = resp.get("error", {})
|
|
raise RuntimeError(
|
|
f"chat.send failed: {error.get('message', 'unknown')}"
|
|
)
|
|
|
|
# Use server-assigned runId as stream key
|
|
run_id = resp.get("payload", {}).get("runId", idempotency_key)
|
|
|
|
# Create queue for streaming chunks
|
|
stream_queue: asyncio.Queue[Optional[str]] = asyncio.Queue()
|
|
self._stream_queues[run_id] = stream_queue
|
|
|
|
try:
|
|
# Stream chunks from queue
|
|
while True:
|
|
try:
|
|
chunk = await asyncio.wait_for(
|
|
stream_queue.get(), timeout=timeout
|
|
)
|
|
|
|
if chunk is None:
|
|
# End of stream sentinel
|
|
break
|
|
|
|
yield chunk
|
|
|
|
except asyncio.TimeoutError:
|
|
logger.warning(f"Stream timeout waiting for chunk (runId: {run_id})")
|
|
break
|
|
|
|
finally:
|
|
self._stream_queues.pop(run_id, None)
|
|
|
|
except Exception:
|
|
self._stream_queues.pop(idempotency_key, None)
|
|
raise
|
|
|
|
async def abort_chat(self, session_key: str) -> None:
|
|
"""
|
|
Abort any in-flight chat for the session.
|
|
|
|
Args:
|
|
session_key: OpenClaw session key
|
|
"""
|
|
await self._ensure_connected()
|
|
req_id = self._new_id()
|
|
await self._send_request(
|
|
req_id, "chat.abort", {"sessionKey": session_key}
|
|
)
|
|
|
|
async def _ensure_connected(self) -> None:
|
|
"""Reconnect if disconnected."""
|
|
if self._connected and self._ws:
|
|
return
|
|
|
|
async with self._reconnect_lock:
|
|
if self._connected and self._ws:
|
|
return
|
|
logger.warning("Gateway disconnected, reconnecting...")
|
|
await self.connect()
|
|
|
|
async def _send_request(
|
|
self, req_id: str, method: str, params: dict
|
|
) -> None:
|
|
"""
|
|
Send a JSON-RPC request frame.
|
|
|
|
Args:
|
|
req_id: Request ID
|
|
method: RPC method name
|
|
params: Method parameters
|
|
"""
|
|
frame = {
|
|
"type": "req",
|
|
"id": req_id,
|
|
"method": method,
|
|
"params": params,
|
|
}
|
|
|
|
if not self._ws:
|
|
raise ConnectionError("Not connected to Gateway")
|
|
|
|
await self._ws.send(json.dumps(frame))
|
|
|
|
async def _wait_response(self, req_id: str, timeout: float = 30) -> dict:
|
|
"""
|
|
Wait for a response matching the given request ID.
|
|
|
|
Args:
|
|
req_id: Request ID to wait for
|
|
timeout: Timeout in seconds
|
|
|
|
Returns:
|
|
Response payload
|
|
"""
|
|
fut: asyncio.Future[dict] = asyncio.get_running_loop().create_future()
|
|
self._pending[req_id] = fut
|
|
|
|
try:
|
|
return await asyncio.wait_for(fut, timeout=timeout)
|
|
finally:
|
|
self._pending.pop(req_id, None)
|
|
|
|
async def _listen(self) -> None:
|
|
"""Background task that reads all incoming WebSocket messages."""
|
|
try:
|
|
async for raw in self._ws:
|
|
try:
|
|
msg = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
logger.warning("Received non-JSON message from Gateway")
|
|
continue
|
|
|
|
msg_type = msg.get("type")
|
|
|
|
if msg_type == "res":
|
|
# RPC response
|
|
req_id = msg.get("id")
|
|
fut = self._pending.get(req_id)
|
|
if fut and not fut.done():
|
|
fut.set_result(msg)
|
|
|
|
elif msg_type == "event":
|
|
# Event notification
|
|
event_name = msg.get("event")
|
|
if event_name == "chat":
|
|
self._handle_chat_event(msg.get("payload", {}))
|
|
|
|
except ConnectionClosed:
|
|
logger.warning("Gateway WebSocket closed")
|
|
self._connected = False
|
|
except asyncio.CancelledError:
|
|
pass
|
|
except Exception:
|
|
logger.exception("Gateway listener error")
|
|
self._connected = False
|
|
|
|
def _handle_chat_event(self, payload: dict) -> None:
|
|
"""
|
|
Process incoming chat events, resolve waiters on 'final'.
|
|
|
|
Args:
|
|
payload: Chat event payload
|
|
"""
|
|
run_id = payload.get("runId", "")
|
|
state = payload.get("state", "")
|
|
|
|
if state == "final":
|
|
# Extract text content from final message
|
|
message = payload.get("message", {})
|
|
content = message.get("content", [])
|
|
text_parts = [
|
|
block.get("text", "")
|
|
for block in content
|
|
if block.get("type") == "text"
|
|
]
|
|
response_text = "\n".join(text_parts).strip()
|
|
|
|
# Resolve waiting future (non-streaming)
|
|
fut = self._chat_waiters.get(run_id)
|
|
if fut and not fut.done():
|
|
fut.set_result(response_text)
|
|
|
|
# Signal end of stream (streaming)
|
|
stream_queue = self._stream_queues.get(run_id)
|
|
if stream_queue:
|
|
# Send None sentinel to indicate stream end
|
|
stream_queue.put_nowait(None)
|
|
|
|
elif state == "error":
|
|
# Chat error
|
|
error_msg = payload.get("errorMessage", "Unknown error")
|
|
logger.error(f"Chat error for runId {run_id}: {error_msg}")
|
|
|
|
fut = self._chat_waiters.get(run_id)
|
|
if fut and not fut.done():
|
|
fut.set_exception(RuntimeError(f"Chat error: {error_msg}"))
|
|
|
|
stream_queue = self._stream_queues.get(run_id)
|
|
if stream_queue:
|
|
stream_queue.put_nowait(None)
|
|
|
|
elif state == "aborted":
|
|
# Chat aborted
|
|
fut = self._chat_waiters.get(run_id)
|
|
if fut and not fut.done():
|
|
fut.set_exception(asyncio.CancelledError("Chat aborted"))
|
|
|
|
stream_queue = self._stream_queues.get(run_id)
|
|
if stream_queue:
|
|
stream_queue.put_nowait(None)
|
|
|
|
elif state == "delta":
|
|
# Streaming delta - extract text and send to stream queue
|
|
delta = payload.get("delta", {})
|
|
text_delta = ""
|
|
|
|
# Extract text from delta content blocks
|
|
if "content" in delta:
|
|
for block in delta.get("content", []):
|
|
if block.get("type") == "text":
|
|
text_delta += block.get("text", "")
|
|
|
|
# Send delta to stream queue if we have one
|
|
if text_delta:
|
|
stream_queue = self._stream_queues.get(run_id)
|
|
if stream_queue:
|
|
stream_queue.put_nowait(text_delta)
|
|
|
|
def _build_session_key(self, user_id: str) -> str:
|
|
"""
|
|
Build OpenClaw session key for user.
|
|
|
|
Format: agent:<agentId>:discord:dm:<userId>
|
|
|
|
Args:
|
|
user_id: Discord user ID
|
|
|
|
Returns:
|
|
Session key
|
|
"""
|
|
uid = str(user_id).strip().lower()
|
|
|
|
if self.config.session_scope == "per-peer":
|
|
return f"agent:{self.config.agent_id}:discord:dm:{uid}"
|
|
else:
|
|
return f"agent:{self.config.agent_id}:main"
|
|
|
|
def format_context(self, transcript: str) -> str:
|
|
"""
|
|
Format transcript for context.
|
|
|
|
Note: OpenClaw Gateway maintains conversation history internally,
|
|
so we don't need to send explicit context.
|
|
|
|
Args:
|
|
transcript: Raw transcript text
|
|
|
|
Returns:
|
|
Formatted context (empty for Gateway)
|
|
"""
|
|
return ""
|
|
|
|
def get_stats(self) -> dict:
|
|
"""
|
|
Get client statistics.
|
|
|
|
Returns:
|
|
Dictionary with stats
|
|
"""
|
|
avg_latency = (
|
|
self.total_latency / self.total_requests
|
|
if self.total_requests > 0
|
|
else 0.0
|
|
)
|
|
|
|
return {
|
|
"total_requests": self.total_requests,
|
|
"total_failures": self.total_failures,
|
|
"total_retries": self.total_retries,
|
|
"success_rate": (
|
|
(self.total_requests - self.total_failures) / self.total_requests
|
|
if self.total_requests > 0
|
|
else 0.0
|
|
),
|
|
"avg_latency": avg_latency,
|
|
"connected": self._connected,
|
|
}
|
|
|
|
@staticmethod
|
|
def _new_id() -> str:
|
|
"""Generate unique request ID."""
|
|
return str(uuid.uuid4())
|
|
|
|
|
|
class PerGuildOpenClawClient:
|
|
"""
|
|
Manages separate OpenClaw sessions for multiple Discord guilds.
|
|
|
|
Each guild can maintain independent conversation state.
|
|
"""
|
|
|
|
def __init__(self, config: OpenClawConfig):
|
|
"""
|
|
Initialize per-guild client manager.
|
|
|
|
Args:
|
|
config: Default client configuration
|
|
"""
|
|
self.config = config
|
|
|
|
# Per-guild clients (for session management)
|
|
self._clients: Dict[int, OpenClawClient] = {}
|
|
|
|
def get_or_create(self, guild_id: int) -> OpenClawClient:
|
|
"""
|
|
Get or create client for a guild.
|
|
|
|
Args:
|
|
guild_id: Discord guild ID
|
|
|
|
Returns:
|
|
OpenClawClient for this guild
|
|
"""
|
|
if guild_id not in self._clients:
|
|
self._clients[guild_id] = OpenClawClient(config=self.config)
|
|
logger.info(f"Created OpenClaw client for guild {guild_id}")
|
|
|
|
return self._clients[guild_id]
|
|
|
|
async def send_message(
|
|
self,
|
|
guild_id: int,
|
|
agent: str,
|
|
message: str,
|
|
context: str = "",
|
|
speaker: Optional[str] = None,
|
|
model: Optional[str] = None,
|
|
) -> str:
|
|
"""
|
|
Send message for a guild.
|
|
|
|
Args:
|
|
guild_id: Discord guild ID
|
|
agent: Agent name
|
|
message: User's message
|
|
context: Conversation context
|
|
speaker: Speaker name
|
|
model: Optional model override
|
|
|
|
Returns:
|
|
Agent's response
|
|
"""
|
|
client = self.get_or_create(guild_id)
|
|
return await client.send_message(agent, message, context, speaker, model)
|
|
|
|
def remove_guild(self, guild_id: int) -> None:
|
|
"""
|
|
Remove client for a guild.
|
|
|
|
Args:
|
|
guild_id: Discord guild ID
|
|
"""
|
|
if guild_id in self._clients:
|
|
del self._clients[guild_id]
|
|
logger.info(f"Removed OpenClaw client for guild {guild_id}")
|
|
|
|
def get_all_stats(self) -> Dict[int, dict]:
|
|
"""
|
|
Get stats for all guilds.
|
|
|
|
Returns:
|
|
Dictionary mapping guild_id -> stats
|
|
"""
|
|
return {
|
|
guild_id: client.get_stats()
|
|
for guild_id, client in self._clients.items()
|
|
}
|
|
|
|
|
|
# Convenience function
|
|
def create_client(
|
|
base_url: str = "ws://192.168.50.9:18789",
|
|
auth_token: Optional[str] = None,
|
|
timeout: float = 8.0,
|
|
agent_id: str = "main",
|
|
) -> OpenClawClient:
|
|
"""
|
|
Create OpenClaw Gateway client with default settings.
|
|
|
|
Args:
|
|
base_url: OpenClaw Gateway WebSocket URL
|
|
auth_token: Authentication token
|
|
timeout: Request timeout (seconds)
|
|
agent_id: Agent ID for session keys
|
|
|
|
Returns:
|
|
OpenClawClient instance
|
|
"""
|
|
config = OpenClawConfig(
|
|
base_url=base_url,
|
|
auth_token=auth_token,
|
|
timeout=timeout,
|
|
agent_id=agent_id,
|
|
)
|
|
|
|
return OpenClawClient(config=config)
|