openclaw-voice/openclaw_client/client.py
MCKRUZ 9fde3d31ba feat: Major performance optimizations and feature enhancements
## Performance Optimizations (3-10x faster responses)
- STT beam_size reduced to 1 (3-5x faster transcription, minimal quality loss)
- Smart query routing: Haiku (simple) → Sonnet (medium) → Opus (complex)
- TTS cache for common phrases (27 pre-generated responses)
- Sentence-level streaming TTS (start playing while generating)
- Sample-based VAD timing (30x improvement in silence detection)

## TTS Engine Upgrade
- Migrated from Chatterbox to Chatterbox-Turbo
- Zero-shot voice cloning (no fine-tuning required)
- Native paralinguistic tag support ([laugh], [sigh], [chuckle], etc.)
- Emotion presets with temperature control
- Improved marker conversion (*action*, (action), ~action~)

## Discord Bot Enhancements
- Multi-agent support (Jarvis, Sage)
- Improved voice receiving with discord-ext-voice-recv
- Enhanced /join, /leave, /status commands
- Per-agent personality configuration
- Better audio sink/receiver implementation

## OpenClaw Integration
- WebSocket support for Gateway communication
- Query complexity routing (auto-select model)
- Improved error handling and retries
- Session management per Discord guild
- Better latency tracking

## Pipeline Improvements
- Sentence splitter for streaming optimization
- Query router for intelligent model selection
- Enhanced VAD receiver with sample-based timing
- Improved audio buffering and format conversion
- Better transcript management

## Documentation
- Added QUICK_START.md (5-minute test guide)
- Added OPTIMIZATION_SUMMARY.md (performance analysis)
- Added DISCORD_OPTIMIZATION_TEST.md (testing guide)
- Added USAGE_GUIDE.md (comprehensive usage)
- Updated README.md with optimization details

## Utilities & Scripts
- Added get_invite_link.py (Discord bot invite)
- Added sync_commands.py, sync_to_guild.py (command sync)
- Added test_gateway.py, test_stt.py (testing utilities)
- Added openclaw_wrapper.py (wrapper script)
- Removed create_mock_turn_model.py (no longer needed)

## Configuration Updates
- STT model: medium → small (faster, acceptable quality)
- TTS engine: chatterbox → coqui (Turbo integration)
- Beam size: 5 → 1 (latency optimization)
- Added emotion_exaggeration per agent
- Updated .gitignore for project files

Total: ~2105 insertions, ~462 deletions across 35 files
Performance: ~5.5s total latency (down from 22-35s)
Target: ~3.5s (achieved in simple queries with cache)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-16 19:29:57 -05:00

870 lines
26 KiB
Python

"""OpenClaw Gateway WebSocket JSON-RPC client.
Implements the OpenClaw Gateway protocol for agent response generation.
Connects via WebSocket to OpenClaw Gateway running on Synology NAS.
"""
import asyncio
import json
import logging
import time
import uuid
from dataclasses import dataclass
from typing import AsyncIterator, Dict, Optional
import websockets
from websockets.exceptions import ConnectionClosed
logger = logging.getLogger(__name__)
@dataclass
class OpenClawConfig:
"""Configuration for OpenClaw Gateway client."""
# WebSocket URL for OpenClaw Gateway
base_url: str = "ws://192.168.50.9:18789"
# Authentication token (from OPENCLAW_AUTH_TOKEN env var)
auth_token: Optional[str] = None
# Request timeout (seconds)
timeout: float = 8.0
# Retry timeout for second attempt
retry_timeout: float = 15.0
# Maximum number of retries
max_retries: int = 1
# Agent ID for session keys
agent_id: str = "main"
# Session scope: "per-peer" or "shared"
session_scope: str = "per-peer"
class OpenClawClient:
"""
WebSocket client for OpenClaw Gateway JSON-RPC protocol.
Manages connection, handshake, and chat message exchange with
OpenClaw Gateway running on Synology NAS.
"""
# Agent personalities (for system context)
AGENT_PERSONALITIES = {
"main": (
"You are an intelligent and helpful AI assistant "
"participating in a Discord voice conversation. You are knowledgeable, "
"professional, and provide thoughtful, concise responses. "
"You speak naturally in conversation, avoiding overly formal language."
),
"jarvis": (
"You are Jarvis, an intelligent and helpful AI assistant "
"participating in a Discord voice conversation. You are knowledgeable, "
"professional, and provide thoughtful, concise responses. "
"You speak naturally in conversation, avoiding overly formal language."
),
"sage": (
"You are Sage, a wise and insightful AI assistant "
"participating in a Discord voice conversation. You offer deep insights "
"and thoughtful perspectives. You are calm, measured, and speak with "
"clarity and wisdom."
),
}
def __init__(self, config: OpenClawConfig):
"""
Initialize OpenClaw Gateway client.
Args:
config: Client configuration
"""
self.config = config
# WebSocket connection
self._ws: Optional[websockets.WebSocketClientProtocol] = None
self._connected = False
# Request/response tracking
self._pending: Dict[str, asyncio.Future] = {}
self._chat_waiters: Dict[str, asyncio.Future] = {}
self._stream_queues: Dict[str, asyncio.Queue] = {} # For streaming responses
# Background listener task
self._listener_task: Optional[asyncio.Task] = None
# Reconnection lock
self._reconnect_lock = asyncio.Lock()
# Stats
self.total_requests = 0
self.total_failures = 0
self.total_retries = 0
self.total_latency = 0.0
@property
def is_connected(self) -> bool:
"""Check if client is connected to Gateway."""
return self._connected
async def connect(self) -> None:
"""
Establish WebSocket connection and complete the handshake.
Protocol:
1. Connect to WebSocket
2. Wait for connect.challenge event
3. Send connect request with auth
4. Wait for hello-ok response
5. Start background listener
Raises:
ConnectionError: If handshake fails
"""
url = self.config.base_url
logger.info(f"Connecting to OpenClaw Gateway at {url}")
# Connect WebSocket
self._ws = await websockets.connect(url, max_size=10 * 1024 * 1024)
# Wait for connect.challenge
challenge_msg = await asyncio.wait_for(self._ws.recv(), timeout=10)
challenge = json.loads(challenge_msg)
if challenge.get("event") != "connect.challenge":
raise ConnectionError(
f"Expected connect.challenge, got: {challenge.get('event')}"
)
nonce = challenge["payload"]["nonce"]
logger.debug(f"Received challenge nonce: {nonce}")
# Send connect request
connect_params = {
"minProtocol": 3,
"maxProtocol": 5,
"client": {
"id": "gateway-client",
"displayName": "OpenClaw Voice Bot",
"version": "1.0.0",
"platform": "custom",
"mode": "backend",
},
"role": "operator",
"caps": [],
"commands": [],
"permissions": {},
"scopes": ["chat", "operator.read", "operator.write"],
"auth": {},
}
if self.config.auth_token:
connect_params["auth"] = {"token": self.config.auth_token}
connect_id = self._new_id()
frame = {
"type": "req",
"id": connect_id,
"method": "connect",
"params": connect_params,
}
await self._ws.send(json.dumps(frame))
# Read hello response
resp_msg = await asyncio.wait_for(self._ws.recv(), timeout=10)
resp = json.loads(resp_msg)
if not resp.get("ok"):
error = resp.get("error", {})
raise ConnectionError(
f"Gateway connect failed: {error.get('message', 'unknown')}"
)
server_info = resp.get("payload", {}).get("server", {})
logger.info(
f"Connected to OpenClaw Gateway "
f"(version={server_info.get('version', '?')}, "
f"connId={server_info.get('connId', '?')})"
)
self._connected = True
# Start background listener for subsequent messages
self._listener_task = asyncio.create_task(self._listen())
async def disconnect(self) -> None:
"""Gracefully close the Gateway connection."""
self._connected = False
if self._listener_task:
self._listener_task.cancel()
self._listener_task = None
if self._ws:
await self._ws.close()
self._ws = None
# Cancel all pending requests
for fut in self._pending.values():
if not fut.done():
fut.cancel()
for fut in self._chat_waiters.values():
if not fut.done():
fut.cancel()
self._pending.clear()
self._chat_waiters.clear()
self._stream_queues.clear()
async def send_message(
self,
agent: str,
message: str,
context: str = "",
speaker: Optional[str] = None,
model: Optional[str] = None,
) -> str:
"""
Send message to agent and get response.
Args:
agent: Agent name ("jarvis" or "sage")
message: User's message/utterance
context: Recent conversation context (not used with Gateway)
speaker: Speaker name/ID (used for session key)
model: Optional model override (e.g., "claude-haiku-3.5", "claude-sonnet-4")
Returns:
Agent's response text
Raises:
RuntimeError: If request fails after retries
ValueError: If agent is invalid
"""
agent_lower = agent.lower()
if agent_lower not in self.AGENT_PERSONALITIES:
raise ValueError(
f"Invalid agent: {agent}. "
f"Choose from: {list(self.AGENT_PERSONALITIES.keys())}"
)
self.total_requests += 1
start_time = time.time()
try:
# Ensure connected
await self._ensure_connected()
# Build session key
session_key = self._build_session_key(speaker or "default")
# Try with normal timeout
response = await self._send_chat(
session_key, message, timeout=self.config.timeout, model=model
)
latency = time.time() - start_time
self.total_latency += latency
logger.info(
f"Agent {agent} responded in {latency:.2f}s: "
f'"{response[:50]}..."'
)
return response
except asyncio.TimeoutError:
logger.warning(
f"First attempt timeout ({self.config.timeout}s), retrying..."
)
self.total_retries += 1
try:
# Retry with extended timeout
await self._ensure_connected()
session_key = self._build_session_key(speaker or "default")
response = await self._send_chat(
session_key, message, timeout=self.config.retry_timeout, model=model
)
latency = time.time() - start_time
self.total_latency += latency
logger.info(
f"Agent {agent} responded on retry in {latency:.2f}s"
)
return response
except Exception as e:
self.total_failures += 1
logger.error(f"OpenClaw request failed after retry: {e}")
raise RuntimeError(
f"Failed to get response from {agent} after retry: {e}"
)
except Exception as e:
self.total_failures += 1
logger.error(f"OpenClaw request failed: {e}")
raise RuntimeError(f"Failed to get response from {agent}: {e}")
async def _send_chat(
self, session_key: str, message: str, timeout: float = 120, model: Optional[str] = None
) -> str:
"""
Send a chat message and wait for the final response text.
Args:
session_key: OpenClaw session key (e.g. "agent:main:discord:dm:123")
message: User's transcribed speech
timeout: Max seconds to wait for AI response
model: Optional model override (e.g., "claude-haiku-3.5")
Returns:
Agent's response text
Raises:
RuntimeError: If chat.send fails
asyncio.TimeoutError: If response takes too long
"""
idempotency_key = f"voice-{uuid.uuid4().hex[:12]}"
req_id = self._new_id()
try:
# Build chat.send params
params = {
"sessionKey": session_key,
"message": message,
"deliver": True,
"idempotencyKey": idempotency_key,
"timeoutMs": int(timeout * 1000),
}
# Add model override if specified
if model:
params["model"] = model
# Send chat.send request
await self._send_request(
req_id,
"chat.send",
params,
)
# Wait for RPC acknowledgement to get server-assigned runId
resp = await self._wait_response(req_id, timeout=15)
if not resp.get("ok"):
error = resp.get("error", {})
raise RuntimeError(
f"chat.send failed: {error.get('message', 'unknown')}"
)
# Use server-assigned runId as waiter key
run_id = resp.get("payload", {}).get("runId", idempotency_key)
# Create waiter for final response
waiter: asyncio.Future[str] = asyncio.get_running_loop().create_future()
self._chat_waiters[run_id] = waiter
try:
result = await asyncio.wait_for(waiter, timeout=timeout)
return result
finally:
self._chat_waiters.pop(run_id, None)
except Exception:
# Clean up any waiter that might have been registered
self._chat_waiters.pop(idempotency_key, None)
raise
async def send_message_streaming(
self,
agent: str,
message: str,
context: str = "",
speaker: Optional[str] = None,
model: Optional[str] = None,
) -> AsyncIterator[str]:
"""
Send message and stream response chunks in real-time.
Args:
agent: Agent name ("jarvis" or "sage")
message: User's message/utterance
context: Recent conversation context (not used with Gateway)
speaker: Speaker name/ID (used for session key)
model: Optional model override
Yields:
Text chunks as they arrive from the LLM
Raises:
RuntimeError: If request fails
ValueError: If agent is invalid
"""
agent_lower = agent.lower()
if agent_lower not in self.AGENT_PERSONALITIES:
raise ValueError(
f"Invalid agent: {agent}. "
f"Choose from: {list(self.AGENT_PERSONALITIES.keys())}"
)
self.total_requests += 1
start_time = time.time()
try:
# Ensure connected
await self._ensure_connected()
# Build session key
session_key = self._build_session_key(speaker or "default")
# Stream the chat response
async for chunk in self._send_chat_streaming(
session_key, message, model=model
):
yield chunk
latency = time.time() - start_time
self.total_latency += latency
logger.info(
f"Agent {agent} streaming response completed in {latency:.2f}s"
)
except Exception as e:
self.total_failures += 1
logger.error(f"OpenClaw streaming request failed: {e}")
raise RuntimeError(f"Failed to get streaming response from {agent}: {e}")
async def _send_chat_streaming(
self, session_key: str, message: str, model: Optional[str] = None, timeout: float = 120
) -> AsyncIterator[str]:
"""
Send a chat message and stream response chunks.
Args:
session_key: OpenClaw session key
message: User's transcribed speech
model: Optional model override
timeout: Max seconds to wait for response
Yields:
Text deltas as they arrive
Raises:
RuntimeError: If chat.send fails
asyncio.TimeoutError: If response takes too long
"""
idempotency_key = f"voice-stream-{uuid.uuid4().hex[:12]}"
req_id = self._new_id()
try:
# Build chat.send params
params = {
"sessionKey": session_key,
"message": message,
"deliver": True,
"idempotencyKey": idempotency_key,
"timeoutMs": int(timeout * 1000),
}
if model:
params["model"] = model
# Send chat.send request
await self._send_request(req_id, "chat.send", params)
# Wait for RPC acknowledgement
resp = await self._wait_response(req_id, timeout=15)
if not resp.get("ok"):
error = resp.get("error", {})
raise RuntimeError(
f"chat.send failed: {error.get('message', 'unknown')}"
)
# Use server-assigned runId as stream key
run_id = resp.get("payload", {}).get("runId", idempotency_key)
# Create queue for streaming chunks
stream_queue: asyncio.Queue[Optional[str]] = asyncio.Queue()
self._stream_queues[run_id] = stream_queue
try:
# Stream chunks from queue
while True:
try:
chunk = await asyncio.wait_for(
stream_queue.get(), timeout=timeout
)
if chunk is None:
# End of stream sentinel
break
yield chunk
except asyncio.TimeoutError:
logger.warning(f"Stream timeout waiting for chunk (runId: {run_id})")
break
finally:
self._stream_queues.pop(run_id, None)
except Exception:
self._stream_queues.pop(idempotency_key, None)
raise
async def abort_chat(self, session_key: str) -> None:
"""
Abort any in-flight chat for the session.
Args:
session_key: OpenClaw session key
"""
await self._ensure_connected()
req_id = self._new_id()
await self._send_request(
req_id, "chat.abort", {"sessionKey": session_key}
)
async def _ensure_connected(self) -> None:
"""Reconnect if disconnected."""
if self._connected and self._ws:
return
async with self._reconnect_lock:
if self._connected and self._ws:
return
logger.warning("Gateway disconnected, reconnecting...")
await self.connect()
async def _send_request(
self, req_id: str, method: str, params: dict
) -> None:
"""
Send a JSON-RPC request frame.
Args:
req_id: Request ID
method: RPC method name
params: Method parameters
"""
frame = {
"type": "req",
"id": req_id,
"method": method,
"params": params,
}
if not self._ws:
raise ConnectionError("Not connected to Gateway")
await self._ws.send(json.dumps(frame))
async def _wait_response(self, req_id: str, timeout: float = 30) -> dict:
"""
Wait for a response matching the given request ID.
Args:
req_id: Request ID to wait for
timeout: Timeout in seconds
Returns:
Response payload
"""
fut: asyncio.Future[dict] = asyncio.get_running_loop().create_future()
self._pending[req_id] = fut
try:
return await asyncio.wait_for(fut, timeout=timeout)
finally:
self._pending.pop(req_id, None)
async def _listen(self) -> None:
"""Background task that reads all incoming WebSocket messages."""
try:
async for raw in self._ws:
try:
msg = json.loads(raw)
except json.JSONDecodeError:
logger.warning("Received non-JSON message from Gateway")
continue
msg_type = msg.get("type")
if msg_type == "res":
# RPC response
req_id = msg.get("id")
fut = self._pending.get(req_id)
if fut and not fut.done():
fut.set_result(msg)
elif msg_type == "event":
# Event notification
event_name = msg.get("event")
if event_name == "chat":
self._handle_chat_event(msg.get("payload", {}))
except ConnectionClosed:
logger.warning("Gateway WebSocket closed")
self._connected = False
except asyncio.CancelledError:
pass
except Exception:
logger.exception("Gateway listener error")
self._connected = False
def _handle_chat_event(self, payload: dict) -> None:
"""
Process incoming chat events, resolve waiters on 'final'.
Args:
payload: Chat event payload
"""
run_id = payload.get("runId", "")
state = payload.get("state", "")
if state == "final":
# Extract text content from final message
message = payload.get("message", {})
content = message.get("content", [])
text_parts = [
block.get("text", "")
for block in content
if block.get("type") == "text"
]
response_text = "\n".join(text_parts).strip()
# Resolve waiting future (non-streaming)
fut = self._chat_waiters.get(run_id)
if fut and not fut.done():
fut.set_result(response_text)
# Signal end of stream (streaming)
stream_queue = self._stream_queues.get(run_id)
if stream_queue:
# Send None sentinel to indicate stream end
stream_queue.put_nowait(None)
elif state == "error":
# Chat error
error_msg = payload.get("errorMessage", "Unknown error")
logger.error(f"Chat error for runId {run_id}: {error_msg}")
fut = self._chat_waiters.get(run_id)
if fut and not fut.done():
fut.set_exception(RuntimeError(f"Chat error: {error_msg}"))
stream_queue = self._stream_queues.get(run_id)
if stream_queue:
stream_queue.put_nowait(None)
elif state == "aborted":
# Chat aborted
fut = self._chat_waiters.get(run_id)
if fut and not fut.done():
fut.set_exception(asyncio.CancelledError("Chat aborted"))
stream_queue = self._stream_queues.get(run_id)
if stream_queue:
stream_queue.put_nowait(None)
elif state == "delta":
# Streaming delta - extract text and send to stream queue
delta = payload.get("delta", {})
text_delta = ""
# Extract text from delta content blocks
if "content" in delta:
for block in delta.get("content", []):
if block.get("type") == "text":
text_delta += block.get("text", "")
# Send delta to stream queue if we have one
if text_delta:
stream_queue = self._stream_queues.get(run_id)
if stream_queue:
stream_queue.put_nowait(text_delta)
def _build_session_key(self, user_id: str) -> str:
"""
Build OpenClaw session key for user.
Format: agent:<agentId>:discord:dm:<userId>
Args:
user_id: Discord user ID
Returns:
Session key
"""
uid = str(user_id).strip().lower()
if self.config.session_scope == "per-peer":
return f"agent:{self.config.agent_id}:discord:dm:{uid}"
else:
return f"agent:{self.config.agent_id}:main"
def format_context(self, transcript: str) -> str:
"""
Format transcript for context.
Note: OpenClaw Gateway maintains conversation history internally,
so we don't need to send explicit context.
Args:
transcript: Raw transcript text
Returns:
Formatted context (empty for Gateway)
"""
return ""
def get_stats(self) -> dict:
"""
Get client statistics.
Returns:
Dictionary with stats
"""
avg_latency = (
self.total_latency / self.total_requests
if self.total_requests > 0
else 0.0
)
return {
"total_requests": self.total_requests,
"total_failures": self.total_failures,
"total_retries": self.total_retries,
"success_rate": (
(self.total_requests - self.total_failures) / self.total_requests
if self.total_requests > 0
else 0.0
),
"avg_latency": avg_latency,
"connected": self._connected,
}
@staticmethod
def _new_id() -> str:
"""Generate unique request ID."""
return str(uuid.uuid4())
class PerGuildOpenClawClient:
"""
Manages separate OpenClaw sessions for multiple Discord guilds.
Each guild can maintain independent conversation state.
"""
def __init__(self, config: OpenClawConfig):
"""
Initialize per-guild client manager.
Args:
config: Default client configuration
"""
self.config = config
# Per-guild clients (for session management)
self._clients: Dict[int, OpenClawClient] = {}
def get_or_create(self, guild_id: int) -> OpenClawClient:
"""
Get or create client for a guild.
Args:
guild_id: Discord guild ID
Returns:
OpenClawClient for this guild
"""
if guild_id not in self._clients:
self._clients[guild_id] = OpenClawClient(config=self.config)
logger.info(f"Created OpenClaw client for guild {guild_id}")
return self._clients[guild_id]
async def send_message(
self,
guild_id: int,
agent: str,
message: str,
context: str = "",
speaker: Optional[str] = None,
model: Optional[str] = None,
) -> str:
"""
Send message for a guild.
Args:
guild_id: Discord guild ID
agent: Agent name
message: User's message
context: Conversation context
speaker: Speaker name
model: Optional model override
Returns:
Agent's response
"""
client = self.get_or_create(guild_id)
return await client.send_message(agent, message, context, speaker, model)
def remove_guild(self, guild_id: int) -> None:
"""
Remove client for a guild.
Args:
guild_id: Discord guild ID
"""
if guild_id in self._clients:
del self._clients[guild_id]
logger.info(f"Removed OpenClaw client for guild {guild_id}")
def get_all_stats(self) -> Dict[int, dict]:
"""
Get stats for all guilds.
Returns:
Dictionary mapping guild_id -> stats
"""
return {
guild_id: client.get_stats()
for guild_id, client in self._clients.items()
}
# Convenience function
def create_client(
base_url: str = "ws://192.168.50.9:18789",
auth_token: Optional[str] = None,
timeout: float = 8.0,
agent_id: str = "main",
) -> OpenClawClient:
"""
Create OpenClaw Gateway client with default settings.
Args:
base_url: OpenClaw Gateway WebSocket URL
auth_token: Authentication token
timeout: Request timeout (seconds)
agent_id: Agent ID for session keys
Returns:
OpenClawClient instance
"""
config = OpenClawConfig(
base_url=base_url,
auth_token=auth_token,
timeout=timeout,
agent_id=agent_id,
)
return OpenClawClient(config=config)