"""WebSocket voice endpoint for browser-based speech-to-text and text-to-speech. Accepts binary PCM audio from browser, transcribes via Deepgram, sends to OpenClaw Gateway, and streams TTS audio back to browser. """ import asyncio import json import logging import os import random import string from pathlib import Path from typing import Optional import numpy as np from fastapi import WebSocket, WebSocketDisconnect from pydantic import BaseModel from server.stt import DeepgramSTT from server.tts import VeniceKokoroTTS from openclaw_client.client import OpenClawClient, OpenClawConfig # Simple energy-based VAD to avoid sending silence to Deepgram def _is_speech(audio: np.ndarray, threshold: float = 0.01) -> bool: """Check if audio buffer contains speech (above energy threshold).""" if len(audio) == 0: return False energy = float(np.sqrt(np.mean(audio ** 2))) return energy > threshold logger = logging.getLogger(__name__) class VoiceSession: """Manages a single voice session.""" def __init__(self, session_id: str): self.session_id = session_id self.transcript_file = Path("logs/voice") / f"{session_id}.jsonl" self.transcript_file.parent.mkdir(parents=True, exist_ok=True) # Audio buffering self.audio_buffer = bytearray() self.buffer_duration = 0.0 # Seconds self._buffer_lock = asyncio.Lock() # Audio processing self.sample_rate = 16000 self.channel_count = 1 self.bits_per_sample = 32 # Concurrency self.audio_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=100) # WebSocket connection self.websocket: Optional[WebSocket] = None # Engines (self-contained, don't share with run.py) self.stt = None self.tts = None self.openclaw = None # Session state self.connected = False self.transcript = [] # Consumer task self.consumer_task: Optional[asyncio.Task] = None logger.info(f"Created voice session {session_id}") async def initialize(self): """Initialize STT, TTS, and OpenClaw client.""" # Load env vars deepgram_key = os.getenv("DEEPGRAM_API_KEY") venice_key = os.getenv("VENICE_API_KEY") openclaw_url = os.getenv("OPENCLAW_BASE_URL", "ws://192.168.50.9:18789") openclaw_token = os.getenv("OPENCLAW_AUTH_TOKEN") if not deepgram_key or not venice_key: raise ValueError("Missing required API keys") # Initialize STT self.stt = DeepgramSTT( api_key=deepgram_key, model="nova-3", language="en", sample_rate=self.sample_rate, ) # Initialize TTS self.tts = VeniceKokoroTTS( api_key=venice_key, voice="am_liam", base_url="https://api.venice.ai/api/v1", ) # Initialize OpenClaw client self.openclaw = OpenClawClient( config=OpenClawConfig( base_url=openclaw_url, auth_token=openclaw_token, timeout=30.0, agent_id="main", ) ) await self.openclaw.connect() logger.info(f"Voice session {self.session_id} initialized") async def close(self): """Clean up resources.""" self.connected = False if self.consumer_task and not self.consumer_task.done(): self.consumer_task.cancel() try: await self.consumer_task except asyncio.CancelledError: pass if self.openclaw: await self.openclaw.disconnect() logger.info(f"Voice session {self.session_id} closed") def _new_id(self) -> str: """Generate random session ID.""" return "".join(random.choices(string.ascii_letters + string.digits, k=8)) async def _consumer_task(self): """Consumer task that processes audio from queue.""" start_time = asyncio.get_event_loop().time() while self.connected: try: # Wait for audio chunk (with timeout) try: data = await asyncio.wait_for(self.audio_queue.get(), timeout=0.1) except asyncio.TimeoutError: # Check if enough time has passed for buffer to accumulate elapsed = asyncio.get_event_loop().time() - start_time if elapsed > 1.0 and len(self.audio_buffer) == 0: # No audio received for 1 second, reset start_time = asyncio.get_event_loop().time() continue # Accumulate audio (no lock needed — only consumer touches buffer) self.audio_buffer.extend(data) # Calculate duration chunk_size = len(data) chunk_duration = chunk_size / (self.sample_rate * self.channel_count * 4) self.buffer_duration += chunk_duration # Buffer until ~0.8 seconds if self.buffer_duration >= 0.8: await self._transcribe_buffered_audio() start_time = asyncio.get_event_loop().time() except asyncio.CancelledError: logger.info(f"Consumer task cancelled for session {self.session_id}") break except Exception as e: logger.error(f"Consumer task error: {e}", exc_info=True) logger.info(f"Consumer task exited for session {self.session_id}") async def _transcribe_buffered_audio(self): """Transcribe accumulated audio and send to OpenClaw.""" if not self.audio_buffer: return # Copy and clear buffer immediately (only consumer touches it) audio_bytes = bytes(self.audio_buffer) self.audio_buffer.clear() self.buffer_duration = 0.0 # Convert bytearray to numpy array audio_data = np.frombuffer(audio_bytes, dtype=np.float32) # Skip silence — don't waste Deepgram credits on empty audio if not _is_speech(audio_data): logger.debug(f"Session {self.session_id}: silence detected, skipping STT") return try: # Transcribe result = await self.stt.transcribe_async(audio_data) if result.text.strip(): # Send intermediate transcript status if self.connected: await self._send_status("transcript", result.text) # Send to OpenClaw response = await self.openclaw.send_message( agent="main", message=result.text, speaker="voice_user", ) # Send intermediate response status if self.connected: await self._send_status("response", response) # Log transcript timestamp = asyncio.get_event_loop().time() entry = { "timestamp": timestamp, "session_id": self.session_id, "transcript": result.text, "response": response, } self.transcript.append(entry) # Write to file with open(self.transcript_file, "a") as f: f.write(json.dumps(entry, ensure_ascii=False) + "\n") logger.info( f"Session {self.session_id}: " f'"{result.text[:50]}..." -> "{response[:50]}..."' ) # Generate TTS audio audio = await self._synthesize_response(response) # Send TTS audio back to browser if audio and self.connected: await self._send_tts_audio(audio) except Exception as e: logger.error(f"Transcription error: {e}", exc_info=True) async def _synthesize_response(self, text: str): """Synthesize TTS audio from response text.""" try: audio = await self.tts.generate_async( text=text, voice_ref_path=None, emotion_exaggeration=0.8, ) return audio except Exception as e: logger.error(f"TTS synthesis error: {e}", exc_info=True) return None async def _send_status(self, status_type: str, text: str): """Send status message to WebSocket.""" try: message = { "type": status_type, "text": text, } await self._send_json(message) except Exception as e: logger.error(f"Failed to send {status_type} status: {e}") async def _send_tts_audio(self, audio: np.ndarray): """Send TTS audio back to browser as binary PCM with JSON header.""" try: # Convert to 16-bit PCM pcm_data = (audio * 32767).astype(np.int16).tobytes() # Create JSON header header = { "type": "tts_audio", "samples": len(pcm_data) // 2, # 2 bytes per sample "sample_rate": self.sample_rate, } # Send header as JSON text await self._send_json(header) # Send PCM audio as binary await self._send_bytes(pcm_data) logger.info(f"Sent TTS audio: {len(pcm_data)} bytes, {header['samples']} samples") except Exception as e: logger.error(f"Failed to send TTS audio: {e}", exc_info=True) async def _send_json(self, data: dict): """Send JSON message to WebSocket.""" await self._send_bytes(json.dumps(data).encode("utf-8")) async def _send_bytes(self, data: bytes): """Send bytes to WebSocket.""" await self._send_json({"type": "websocket.send", "bytes": len(data)}) if self.websocket: await self.websocket.send_bytes(data) async def handle_voice_websocket(websocket: WebSocket, session_id: str): """Handle WebSocket connection for voice session.""" session = VoiceSession(session_id) session.websocket = websocket await websocket.accept() session.connected = True logger.info(f"WebSocket connected for session {session_id}") # Initialize session try: await session.initialize() # Send welcome message await websocket.send_json({ "type": "welcome", "message": "Connected to voice portal", }) # Background task: send periodic pings to keep connection alive through Caddy async def keepalive(): while session.connected: try: await asyncio.sleep(15) if session.connected: await websocket.send_json({"type": "ping"}) except Exception: break keepalive_task = asyncio.create_task(keepalive()) # Start consumer task session.consumer_task = asyncio.create_task(session._consumer_task()) # Receive and process audio (non-blocking) chunk_count = 0 while session.connected: try: msg = await websocket.receive() msg_type = msg.get("type", "unknown") if msg_type == "websocket.disconnect": session.connected = False logger.info(f"WebSocket disconnected for session {session_id}") break elif msg_type == "websocket.receive": if "bytes" in msg: chunk_count += 1 if chunk_count <= 5 or chunk_count % 100 == 0: logger.info(f"Audio chunk #{chunk_count}: {len(msg['bytes'])} bytes") # Put audio chunk into queue (non-blocking) try: session.audio_queue.put_nowait(msg["bytes"]) except asyncio.QueueFull: logger.warning(f"Audio queue full for session {session_id}, dropping chunk") elif "text" in msg: pass else: logger.warning(f"Unknown receive msg: {msg}") else: logger.warning(f"Unknown WebSocket msg type: {msg_type}: {msg}") except WebSocketDisconnect: session.connected = False logger.info(f"WebSocket disconnected for session {session_id}") break except Exception as e: logger.error(f"WebSocket error in receive loop: {e}", exc_info=True) session.connected = False break keepalive_task.cancel() except Exception as e: logger.error(f"Session error: {e}", exc_info=True) try: await websocket.close(code=1011, reason=str(e)) except Exception: pass finally: await session.close() def create_session_id() -> str: """Generate a random session ID.""" return "".join(random.choices(string.ascii_letters + string.digits, k=8))