hermes-agent/plugins/mascot/mascot_state.py

185 lines
No EOL
5.7 KiB
Python

"""
Mascot state manager - singleton for tracking agent state.
State values: idle, thinking, working, waiting_input, error
Track: status, task (description), mood (optional), last_update (timestamp)
"""
from __future__ import annotations
import json
import logging
import os
import threading
import time
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
log = logging.getLogger(__name__)
# State values
STATE_IDLE = "idle"
STATE_THINKING = "thinking"
STATE_WORKING = "working"
STATE_WAITING_INPUT = "waiting_input"
STATE_ERROR = "error"
VALID_STATES = (STATE_IDLE, STATE_THINKING, STATE_WORKING, STATE_WAITING_INPUT, STATE_ERROR)
@dataclass
class MascotState:
"""Current mascot state."""
status: str = STATE_IDLE
task: Optional[str] = None
mood: Optional[str] = None
last_update: float = field(default_factory=time.time)
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
class MascotStateManager:
"""
Singleton state manager for mascot.
Thread-safe updates via set_state().
Pub/sub for WebSocket broadcasting.
File persistence for cross-restart state survival.
"""
_instance: Optional["MascotStateManager"] = None
_lock = threading.Lock()
def __new__(cls) -> "MascotStateManager":
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if hasattr(self, "_initialized"):
return
self._initialized = True
self._state = MascotState()
self._subscribers: List[Callable[[MascotState], None]] = []
self._state_lock = threading.Lock()
# Persistence path
self._state_path = Path.home() / ".hermes" / "plugins" / "mascot" / "state.json"
self._state_path.parent.mkdir(parents=True, exist_ok=True)
# Load persisted state
self._load_state()
def _load_state(self) -> None:
"""Load state from disk if available."""
try:
if self._state_path.exists():
data = json.loads(self._state_path.read_text())
with self._state_lock:
self._state = MascotState(
status=data.get("status", STATE_IDLE),
task=data.get("task"),
mood=data.get("mood"),
last_update=data.get("last_update", time.time()),
)
# Reset transient states that don't survive restarts
if self._state.status == STATE_THINKING:
self._state.status = STATE_IDLE
log.debug("Loaded mascot state from %s", self._state_path)
except Exception as e:
log.warning("Failed to load mascot state: %s", e)
def _save_state(self) -> None:
"""Persist state to disk."""
try:
with self._state_lock:
data = self._state.to_dict()
self._state_path.write_text(json.dumps(data, indent=2))
log.debug("Saved mascot state to %s", self._state_path)
except Exception as e:
log.warning("Failed to save mascot state: %s", e)
def get_state(self) -> MascotState:
"""Get current state (thread-safe copy)."""
with self._state_lock:
return MascotState(
status=self._state.status,
task=self._state.task,
mood=self._state.mood,
last_update=self._state.last_update,
)
def set_state(
self,
status: Optional[str] = None,
task: Optional[str] = None,
mood: Optional[str] = None,
) -> MascotState:
"""
Update mascot state.
Thread-safe. Persists to disk. Broadcast to subscribers.
Args:
status: New status (idle/thinking/working/waiting_input/error)
task: Task description (optional)
mood: Mood override (optional)
Returns:
The new state (copy)
"""
if status is not None and status not in VALID_STATES:
raise ValueError(f"Invalid status: {status}. Must be one of {VALID_STATES}")
with self._state_lock:
if status is not None:
self._state.status = status
if task is not None:
self._state.task = task if task else None
if mood is not None:
self._state.mood = mood
self._state.last_update = time.time()
new_state = self.get_state()
# Persist
self._save_state()
# Broadcast to subscribers
for callback in self._subscribers[:]: # Copy to avoid modification during iteration
try:
callback(new_state)
except Exception as e:
log.warning("Subscriber callback failed: %s", e)
return new_state
def reset(self) -> MascotState:
"""Reset to idle state."""
return self.set_state(status=STATE_IDLE, task=None, mood=None)
def subscribe(self, callback: Callable[[MascotState], None]) -> None:
"""Register a callback for state changes."""
self._subscribers.append(callback)
def unsubscribe(self, callback: Callable[[MascotState], None]) -> None:
"""Unregister a callback."""
if callback in self._subscribers:
self._subscribers.remove(callback)
# Global singleton accessor
_manager: Optional[MascotStateManager] = None
def get_manager() -> MascotStateManager:
"""Get the global mascot state manager singleton."""
global _manager
if _manager is None:
_manager = MascotStateManager()
return _manager