- Rewrote voice_ws.py: receive loop uses queue.put_nowait(), separate consumer task handles STT->LLM->TTS pipeline (no more blocking the WebSocket) - Updated voice.html: TTS audio playback, transcript display, thinking indicator - Added energy-based silence detection (skip STT on silent buffers) - Fixed sample rate mismatch (16kHz throughout, not 24kHz) - Added AUDIT.md: full pipeline audit confirming STT/TTS/OpenClaw client work Known blocker: OpenClaw gateway chat.send requires operator.write scope, gateway password token doesn't grant scopes. Needs device pairing fix.
394 lines
13 KiB
Python
394 lines
13 KiB
Python
"""WebSocket voice endpoint for browser-based speech-to-text and text-to-speech.
|
|
|
|
Accepts binary PCM audio from browser, transcribes via Deepgram, sends to OpenClaw Gateway,
|
|
and streams TTS audio back to browser.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import random
|
|
import string
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import numpy as np
|
|
from fastapi import WebSocket, WebSocketDisconnect
|
|
from pydantic import BaseModel
|
|
|
|
from server.stt import DeepgramSTT
|
|
from server.tts import VeniceKokoroTTS
|
|
from openclaw_client.client import OpenClawClient, OpenClawConfig
|
|
|
|
# Simple energy-based VAD to avoid sending silence to Deepgram
|
|
def _is_speech(audio: np.ndarray, threshold: float = 0.01) -> bool:
|
|
"""Check if audio buffer contains speech (above energy threshold)."""
|
|
if len(audio) == 0:
|
|
return False
|
|
energy = float(np.sqrt(np.mean(audio ** 2)))
|
|
return energy > threshold
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class VoiceSession:
|
|
"""Manages a single voice session."""
|
|
|
|
def __init__(self, session_id: str):
|
|
self.session_id = session_id
|
|
self.transcript_file = Path("logs/voice") / f"{session_id}.jsonl"
|
|
self.transcript_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Audio buffering
|
|
self.audio_buffer = bytearray()
|
|
self.buffer_duration = 0.0 # Seconds
|
|
self._buffer_lock = asyncio.Lock()
|
|
|
|
# Audio processing
|
|
self.sample_rate = 16000
|
|
self.channel_count = 1
|
|
self.bits_per_sample = 32
|
|
|
|
# Concurrency
|
|
self.audio_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=100)
|
|
|
|
# WebSocket connection
|
|
self.websocket: Optional[WebSocket] = None
|
|
|
|
# Engines (self-contained, don't share with run.py)
|
|
self.stt = None
|
|
self.tts = None
|
|
self.openclaw = None
|
|
|
|
# Session state
|
|
self.connected = False
|
|
self.transcript = []
|
|
|
|
# Consumer task
|
|
self.consumer_task: Optional[asyncio.Task] = None
|
|
|
|
logger.info(f"Created voice session {session_id}")
|
|
|
|
async def initialize(self):
|
|
"""Initialize STT, TTS, and OpenClaw client."""
|
|
# Load env vars
|
|
deepgram_key = os.getenv("DEEPGRAM_API_KEY")
|
|
venice_key = os.getenv("VENICE_API_KEY")
|
|
openclaw_url = os.getenv("OPENCLAW_BASE_URL", "ws://192.168.50.9:18789")
|
|
openclaw_token = os.getenv("OPENCLAW_AUTH_TOKEN")
|
|
|
|
if not deepgram_key or not venice_key:
|
|
raise ValueError("Missing required API keys")
|
|
|
|
# Initialize STT
|
|
self.stt = DeepgramSTT(
|
|
api_key=deepgram_key,
|
|
model="nova-3",
|
|
language="en",
|
|
sample_rate=self.sample_rate,
|
|
)
|
|
|
|
# Initialize TTS
|
|
self.tts = VeniceKokoroTTS(
|
|
api_key=venice_key,
|
|
voice="am_liam",
|
|
base_url="https://api.venice.ai/api/v1",
|
|
)
|
|
|
|
# Initialize OpenClaw client
|
|
self.openclaw = OpenClawClient(
|
|
config=OpenClawConfig(
|
|
base_url=openclaw_url,
|
|
auth_token=openclaw_token,
|
|
timeout=30.0,
|
|
agent_id="main",
|
|
)
|
|
)
|
|
|
|
await self.openclaw.connect()
|
|
|
|
logger.info(f"Voice session {self.session_id} initialized")
|
|
|
|
async def close(self):
|
|
"""Clean up resources."""
|
|
self.connected = False
|
|
|
|
if self.consumer_task and not self.consumer_task.done():
|
|
self.consumer_task.cancel()
|
|
try:
|
|
await self.consumer_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
if self.openclaw:
|
|
await self.openclaw.disconnect()
|
|
|
|
logger.info(f"Voice session {self.session_id} closed")
|
|
|
|
def _new_id(self) -> str:
|
|
"""Generate random session ID."""
|
|
return "".join(random.choices(string.ascii_letters + string.digits, k=8))
|
|
|
|
async def _consumer_task(self):
|
|
"""Consumer task that processes audio from queue."""
|
|
start_time = asyncio.get_event_loop().time()
|
|
|
|
while self.connected:
|
|
try:
|
|
# Wait for audio chunk (with timeout)
|
|
try:
|
|
data = await asyncio.wait_for(self.audio_queue.get(), timeout=0.1)
|
|
except asyncio.TimeoutError:
|
|
# Check if enough time has passed for buffer to accumulate
|
|
elapsed = asyncio.get_event_loop().time() - start_time
|
|
if elapsed > 1.0 and len(self.audio_buffer) == 0:
|
|
# No audio received for 1 second, reset
|
|
start_time = asyncio.get_event_loop().time()
|
|
continue
|
|
|
|
# Accumulate audio (no lock needed — only consumer touches buffer)
|
|
self.audio_buffer.extend(data)
|
|
|
|
# Calculate duration
|
|
chunk_size = len(data)
|
|
chunk_duration = chunk_size / (self.sample_rate * self.channel_count * 4)
|
|
|
|
self.buffer_duration += chunk_duration
|
|
|
|
# Buffer until ~0.8 seconds
|
|
if self.buffer_duration >= 0.8:
|
|
await self._transcribe_buffered_audio()
|
|
start_time = asyncio.get_event_loop().time()
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info(f"Consumer task cancelled for session {self.session_id}")
|
|
break
|
|
|
|
except Exception as e:
|
|
logger.error(f"Consumer task error: {e}", exc_info=True)
|
|
|
|
logger.info(f"Consumer task exited for session {self.session_id}")
|
|
|
|
async def _transcribe_buffered_audio(self):
|
|
"""Transcribe accumulated audio and send to OpenClaw."""
|
|
if not self.audio_buffer:
|
|
return
|
|
|
|
# Copy and clear buffer immediately (only consumer touches it)
|
|
audio_bytes = bytes(self.audio_buffer)
|
|
self.audio_buffer.clear()
|
|
self.buffer_duration = 0.0
|
|
|
|
# Convert bytearray to numpy array
|
|
audio_data = np.frombuffer(audio_bytes, dtype=np.float32)
|
|
|
|
# Skip silence — don't waste Deepgram credits on empty audio
|
|
if not _is_speech(audio_data):
|
|
logger.debug(f"Session {self.session_id}: silence detected, skipping STT")
|
|
return
|
|
|
|
try:
|
|
# Transcribe
|
|
result = await self.stt.transcribe_async(audio_data)
|
|
|
|
if result.text.strip():
|
|
# Send intermediate transcript status
|
|
if self.connected:
|
|
await self._send_status("transcript", result.text)
|
|
|
|
# Send to OpenClaw
|
|
response = await self.openclaw.send_message(
|
|
agent="main",
|
|
message=result.text,
|
|
speaker="voice_user",
|
|
)
|
|
|
|
# Send intermediate response status
|
|
if self.connected:
|
|
await self._send_status("response", response)
|
|
|
|
# Log transcript
|
|
timestamp = asyncio.get_event_loop().time()
|
|
entry = {
|
|
"timestamp": timestamp,
|
|
"session_id": self.session_id,
|
|
"transcript": result.text,
|
|
"response": response,
|
|
}
|
|
|
|
self.transcript.append(entry)
|
|
|
|
# Write to file
|
|
with open(self.transcript_file, "a") as f:
|
|
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
|
|
|
|
logger.info(
|
|
f"Session {self.session_id}: "
|
|
f'"{result.text[:50]}..." -> "{response[:50]}..."'
|
|
)
|
|
|
|
# Generate TTS audio
|
|
audio = await self._synthesize_response(response)
|
|
|
|
# Send TTS audio back to browser
|
|
if audio and self.connected:
|
|
await self._send_tts_audio(audio)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Transcription error: {e}", exc_info=True)
|
|
|
|
async def _synthesize_response(self, text: str):
|
|
"""Synthesize TTS audio from response text."""
|
|
try:
|
|
audio = await self.tts.generate_async(
|
|
text=text,
|
|
voice_ref_path=None,
|
|
emotion_exaggeration=0.8,
|
|
)
|
|
|
|
return audio
|
|
|
|
except Exception as e:
|
|
logger.error(f"TTS synthesis error: {e}", exc_info=True)
|
|
return None
|
|
|
|
async def _send_status(self, status_type: str, text: str):
|
|
"""Send status message to WebSocket."""
|
|
try:
|
|
message = {
|
|
"type": status_type,
|
|
"text": text,
|
|
}
|
|
await self._send_json(message)
|
|
except Exception as e:
|
|
logger.error(f"Failed to send {status_type} status: {e}")
|
|
|
|
async def _send_tts_audio(self, audio: np.ndarray):
|
|
"""Send TTS audio back to browser as binary PCM with JSON header."""
|
|
try:
|
|
# Convert to 16-bit PCM
|
|
pcm_data = (audio * 32767).astype(np.int16).tobytes()
|
|
|
|
# Create JSON header
|
|
header = {
|
|
"type": "tts_audio",
|
|
"samples": len(pcm_data) // 2, # 2 bytes per sample
|
|
"sample_rate": self.sample_rate,
|
|
}
|
|
|
|
# Send header as JSON text
|
|
await self._send_json(header)
|
|
|
|
# Send PCM audio as binary
|
|
await self._send_bytes(pcm_data)
|
|
|
|
logger.info(f"Sent TTS audio: {len(pcm_data)} bytes, {header['samples']} samples")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to send TTS audio: {e}", exc_info=True)
|
|
|
|
async def _send_json(self, data: dict):
|
|
"""Send JSON message to WebSocket."""
|
|
await self._send_bytes(json.dumps(data).encode("utf-8"))
|
|
|
|
async def _send_bytes(self, data: bytes):
|
|
"""Send bytes to WebSocket."""
|
|
await self._send_json({"type": "websocket.send", "bytes": len(data)})
|
|
if self.websocket:
|
|
await self.websocket.send_bytes(data)
|
|
|
|
|
|
async def handle_voice_websocket(websocket: WebSocket, session_id: str):
|
|
"""Handle WebSocket connection for voice session."""
|
|
session = VoiceSession(session_id)
|
|
session.websocket = websocket
|
|
|
|
await websocket.accept()
|
|
session.connected = True
|
|
|
|
logger.info(f"WebSocket connected for session {session_id}")
|
|
|
|
# Initialize session
|
|
try:
|
|
await session.initialize()
|
|
|
|
# Send welcome message
|
|
await websocket.send_json({
|
|
"type": "welcome",
|
|
"message": "Connected to voice portal",
|
|
})
|
|
|
|
# Background task: send periodic pings to keep connection alive through Caddy
|
|
async def keepalive():
|
|
while session.connected:
|
|
try:
|
|
await asyncio.sleep(15)
|
|
if session.connected:
|
|
await websocket.send_json({"type": "ping"})
|
|
except Exception:
|
|
break
|
|
|
|
keepalive_task = asyncio.create_task(keepalive())
|
|
|
|
# Start consumer task
|
|
session.consumer_task = asyncio.create_task(session._consumer_task())
|
|
|
|
# Receive and process audio (non-blocking)
|
|
chunk_count = 0
|
|
while session.connected:
|
|
try:
|
|
msg = await websocket.receive()
|
|
msg_type = msg.get("type", "unknown")
|
|
|
|
if msg_type == "websocket.disconnect":
|
|
session.connected = False
|
|
logger.info(f"WebSocket disconnected for session {session_id}")
|
|
break
|
|
|
|
elif msg_type == "websocket.receive":
|
|
if "bytes" in msg:
|
|
chunk_count += 1
|
|
if chunk_count <= 5 or chunk_count % 100 == 0:
|
|
logger.info(f"Audio chunk #{chunk_count}: {len(msg['bytes'])} bytes")
|
|
|
|
# Put audio chunk into queue (non-blocking)
|
|
try:
|
|
session.audio_queue.put_nowait(msg["bytes"])
|
|
except asyncio.QueueFull:
|
|
logger.warning(f"Audio queue full for session {session_id}, dropping chunk")
|
|
|
|
elif "text" in msg:
|
|
pass
|
|
else:
|
|
logger.warning(f"Unknown receive msg: {msg}")
|
|
|
|
else:
|
|
logger.warning(f"Unknown WebSocket msg type: {msg_type}: {msg}")
|
|
|
|
except WebSocketDisconnect:
|
|
session.connected = False
|
|
logger.info(f"WebSocket disconnected for session {session_id}")
|
|
break
|
|
|
|
except Exception as e:
|
|
logger.error(f"WebSocket error in receive loop: {e}", exc_info=True)
|
|
session.connected = False
|
|
break
|
|
|
|
keepalive_task.cancel()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Session error: {e}", exc_info=True)
|
|
try:
|
|
await websocket.close(code=1011, reason=str(e))
|
|
except Exception:
|
|
pass
|
|
|
|
finally:
|
|
await session.close()
|
|
|
|
|
|
def create_session_id() -> str:
|
|
"""Generate a random session ID."""
|
|
return "".join(random.choices(string.ascii_letters + string.digits, k=8))
|