diff --git a/agent/conversation_loop.py b/agent/conversation_loop.py index 502fda1c5..e451c43cb 100644 --- a/agent/conversation_loop.py +++ b/agent/conversation_loop.py @@ -1941,6 +1941,15 @@ def run_conversation( canonical_usage = canonical_usage + _ref_usage except Exception as _moa_acct_exc: # pragma: no cover - defensive logger.debug("MoA reference usage accounting failed: %s", _moa_acct_exc) + # Flush the full-turn MoA trace (references + aggregator I/O) + # to disk when moa.save_traces is on. No-op otherwise and + # for non-MoA clients. Uses the live session_id so traces + # land in the right per-session file. + if _moa_client is not None and hasattr(_moa_client, "consume_and_save_trace"): + try: + _moa_client.consume_and_save_trace(agent.session_id) + except Exception as _moa_trace_exc: # pragma: no cover - defensive + logger.debug("MoA trace flush failed: %s", _moa_trace_exc) prompt_tokens = canonical_usage.prompt_tokens completion_tokens = canonical_usage.output_tokens total_tokens = canonical_usage.total_tokens diff --git a/agent/moa_loop.py b/agent/moa_loop.py index 022aafe7d..149142503 100644 --- a/agent/moa_loop.py +++ b/agent/moa_loop.py @@ -28,8 +28,8 @@ _MAX_REFERENCE_WORKERS = 8 class _RefAccounting: - """Per-reference token usage + estimated cost, carried as the third slot - of a reference-output tuple. + """Per-reference token usage + estimated cost + full trace, carried as the + third slot of a reference-output tuple. Kept as a tiny object (not a bare CanonicalUsage) because an advisor may run on a different model/provider than the aggregator, so its cost MUST be @@ -37,15 +37,48 @@ class _RefAccounting: aggregator's usage and pricing the sum at the aggregator's rate would misprice every advisor. ``usage`` feeds accurate token counts; ``cost_usd`` feeds accurate cost. + + ``messages`` / ``output`` / ``model`` / ``provider`` / ``temperature`` + carry the FULL reference input and output for trace persistence (the + display ``text`` is a truncated preview and is not enough to audit what an + advisor actually saw). They are only populated when tracing is on; they add + negligible cost otherwise. """ - __slots__ = ("usage", "cost_usd", "cost_status", "cost_source") + __slots__ = ( + "usage", + "cost_usd", + "cost_status", + "cost_source", + "messages", + "output", + "model", + "provider", + "temperature", + ) - def __init__(self, usage: Any, cost_usd: Any = None, cost_status: str | None = None, cost_source: str | None = None): + def __init__( + self, + usage: Any, + cost_usd: Any = None, + cost_status: str | None = None, + cost_source: str | None = None, + *, + messages: Any = None, + output: str | None = None, + model: str | None = None, + provider: str | None = None, + temperature: Any = None, + ): self.usage = usage self.cost_usd = cost_usd self.cost_status = cost_status self.cost_source = cost_source + self.messages = messages + self.output = output + self.model = model + self.provider = provider + self.temperature = temperature # Per-tool-result character budget for the advisory reference view. Tool # results can be huge (a full diff, a 5000-line file dump); replaying them @@ -219,11 +252,29 @@ def _run_reference( cost_source = cost.source except Exception: # pragma: no cover - defensive pass - acct = _RefAccounting(usage, cost_usd, cost_status, cost_source) - return label, _extract_text(response) or "(empty response)", acct + _output_text = _extract_text(response) or "(empty response)" + acct = _RefAccounting( + usage, + cost_usd, + cost_status, + cost_source, + messages=messages, + output=_output_text, + model=slot.get("model"), + provider=runtime.get("provider") or slot.get("provider"), + temperature=temperature, + ) + return label, _output_text, acct except Exception as exc: logger.warning("MoA reference model %s failed: %s", label, exc) - return label, f"[failed: {exc}]", _RefAccounting(CanonicalUsage()) + return label, f"[failed: {exc}]", _RefAccounting( + CanonicalUsage(), + messages=[{"role": "system", "content": _REFERENCE_SYSTEM_PROMPT}, *ref_messages], + output=f"[failed: {exc}]", + model=slot.get("model"), + provider=runtime.get("provider") or slot.get("provider"), + temperature=temperature, + ) def _run_references_parallel( @@ -545,6 +596,10 @@ class MoAChatCompletions: self._pending_reference_usage: Any = CanonicalUsage() self._pending_reference_cost: Any = None + # Full-turn trace parts stashed on a cache-MISS create(), awaiting the + # caller to stitch in the live session_id + resolved aggregator output + # and flush to the trace file (only when moa.save_traces is on). + self._pending_trace: Any = None def consume_reference_usage(self) -> tuple[Any, Any]: """Pop pending reference-fan-out usage + cost, resetting both to empty. @@ -563,6 +618,37 @@ class MoAChatCompletions: self._pending_reference_cost = None return usage, cost + def consume_and_save_trace(self, session_id: Any = None) -> None: + """Flush the pending full-turn trace to disk, if one is pending. + + No-op when tracing is off (``save_moa_turn`` checks the config), when + there is no pending trace (a cache-HIT iteration ran no references), or + when the aggregator input was never recorded. Clears the pending trace + so a repeat consume cannot double-write. Best-effort — never raises. + """ + pending = self._pending_trace + self._pending_trace = None + if not pending or "aggregator_input_messages" not in pending: + return + try: + from agent.moa_trace import save_moa_turn + + agg_slot = pending.get("aggregator_slot") or {} + save_moa_turn( + session_id=session_id, + preset_name=pending.get("preset", ""), + reference_outputs=pending.get("reference_outputs", []), + aggregator_label=pending.get("aggregator_label", ""), + aggregator_model=agg_slot.get("model"), + aggregator_provider=agg_slot.get("provider"), + aggregator_temperature=pending.get("aggregator_temperature"), + aggregator_input_messages=pending.get("aggregator_input_messages"), + aggregator_output=pending.get("aggregator_output"), + aggregator_streamed=bool(pending.get("aggregator_streamed")), + ) + except Exception as exc: # pragma: no cover - tracing must never break a turn + logger.debug("MoA trace flush failed: %s", exc) + def _emit(self, event: str, **kwargs: Any) -> None: cb = self.reference_callback if cb is None: @@ -618,6 +704,10 @@ class MoAChatCompletions: # advisor spend by the tool-iteration count, so pending is zero. self._pending_reference_usage = CanonicalUsage() self._pending_reference_cost = None + # Likewise no trace on a cache HIT — the full turn was already + # traced on the MISS that ran the references. A repeat iteration is + # not a new MoA turn. + self._pending_trace = None else: reference_outputs = _run_references_parallel( reference_models, @@ -645,6 +735,17 @@ class MoAChatCompletions: _ref_cost = (_ref_cost or 0) + _acct.cost_usd self._pending_reference_usage = _ref_usage self._pending_reference_cost = _ref_cost + # Stash the full reference fan-out for trace persistence. The + # aggregator input/label are filled in below once agg_messages is + # built; the aggregator OUTPUT is stitched in by the caller + # (consume_and_save_trace) once the response resolves — the caller + # holds the live session_id and the resolved aggregator response. + self._pending_trace = { + "preset": self.preset_name, + "reference_outputs": list(reference_outputs), + "aggregator_slot": aggregator, + "aggregator_temperature": aggregator_temperature, + } # Surface each reference model's answer to the display BEFORE the # aggregator acts — once per turn (only on the iteration that @@ -694,6 +795,12 @@ class MoAChatCompletions: raise RuntimeError("MoA aggregator cannot be another MoA preset") agg_kwargs = dict(api_kwargs) agg_kwargs["messages"] = agg_messages + # Record the exact aggregator INPUT (incl. the injected reference + # context) into the pending trace so a trace captures what the + # aggregator actually saw, not a reconstruction. + if self._pending_trace is not None: + self._pending_trace["aggregator_input_messages"] = agg_messages + self._pending_trace["aggregator_label"] = _slot_label(aggregator) # The aggregator is the acting model. Resolve its slot to the provider's # real runtime (base_url/api_key/api_mode) and call it through the same # request-building path any model uses — so per-model wire-format @@ -720,7 +827,7 @@ class MoAChatCompletions: # actually governs the aggregator stream, not just call_llm's default. if api_kwargs.get("timeout") is not None: stream_kwargs["timeout"] = api_kwargs["timeout"] - return call_llm( + _agg_response = call_llm( task="moa_aggregator", messages=agg_messages, temperature=aggregator_temperature, @@ -730,6 +837,22 @@ class MoAChatCompletions: **stream_kwargs, **_slot_runtime(aggregator), ) + # Non-streaming path (quiet mode / eval / subagents): the aggregator + # output is available inline, so capture it into the pending trace now. + # Streaming path: the aggregator's raw token stream is returned to the + # consumer live and its acting output lands as the turn's assistant + # message; the trace marks it streamed and points there. + if self._pending_trace is not None: + if stream: + self._pending_trace["aggregator_streamed"] = True + self._pending_trace["aggregator_output"] = None + else: + self._pending_trace["aggregator_streamed"] = False + try: + self._pending_trace["aggregator_output"] = _extract_text(_agg_response) + except Exception: # pragma: no cover - defensive + self._pending_trace["aggregator_output"] = None + return _agg_response class MoAClient: @@ -744,3 +867,10 @@ class MoAClient: usage without reaching into ``.chat.completions`` internals. """ return self.chat.completions.consume_reference_usage() + + def consume_and_save_trace(self, session_id: Any = None) -> None: + """Flush the pending full-turn MoA trace via the completions facade. + + No-op unless ``moa.save_traces`` is enabled and a turn is pending. + """ + return self.chat.completions.consume_and_save_trace(session_id) diff --git a/agent/moa_trace.py b/agent/moa_trace.py new file mode 100644 index 000000000..a18a26df8 --- /dev/null +++ b/agent/moa_trace.py @@ -0,0 +1,153 @@ +"""Full MoA turn trace persistence (opt-in via config ``moa.save_traces``). + +When enabled, every Mixture-of-Agents turn that actually runs the reference +fan-out (a cache MISS in ``MoAChatCompletions.create``) appends one JSON line +to ``/moa-traces/.jsonl``. The record is the TRUE +FULL turn — the exact messages array each reference model received (system +prompt + advisory view, not the truncated display preview), each reference's +full output, and the exact messages array the aggregator received (including +the injected reference-context guidance block) plus its output when available +— so a run can be audited end-to-end offline: what every model saw, what every +model said, and what it cost. + +This is a side-channel trace. It is NOT the conversation ``messages`` table and +never enters message history or replay — MoA references are advisory side-calls +with their own system prompt, not conversation turns, so persisting them as +message rows would corrupt role alternation / replay. Traces live in their own +files, keyed by session id, and are safe to delete. + +Cost model note: gated OFF by default. When off, the only overhead is the +``_traces_enabled()`` config read (cheap) — no file I/O, no serialization. +""" + +from __future__ import annotations + +import json +import logging +import os +import time +from pathlib import Path +from typing import Any, Optional + +from hermes_constants import get_hermes_home + +logger = logging.getLogger(__name__) + + +def _traces_enabled_and_dir() -> Optional[Path]: + """Return the trace directory if ``moa.save_traces`` is on, else None. + + Reads config lazily per call (config is cheap to load and this only runs on + a cache-MISS MoA turn, i.e. once per user turn, not per tool iteration). + ``moa.trace_dir`` overrides the default ``/moa-traces/``. + """ + try: + from hermes_cli.config import load_config + + moa_cfg = (load_config() or {}).get("moa") or {} + except Exception: # pragma: no cover - defensive: never break a turn over tracing + return None + if not moa_cfg.get("save_traces"): + return None + override = moa_cfg.get("trace_dir") + if override: + base = Path(os.path.expandvars(os.path.expanduser(str(override)))) + else: + base = get_hermes_home() / "moa-traces" + return base + + +def _sanitize_session_id(session_id: Optional[str]) -> str: + """Make a session id safe as a filename component.""" + if not session_id: + return "unknown-session" + return "".join(c if (c.isalnum() or c in "-_.") else "_" for c in str(session_id)) + + +def _slot_trace(acct: Any, label: str) -> dict[str, Any]: + """Render one reference's _RefAccounting into a full trace dict. + + Includes the FULL input messages the reference received and its FULL + output — not the truncated display preview. + """ + usage = getattr(acct, "usage", None) + usage_dict: dict[str, Any] = {} + if usage is not None: + usage_dict = { + "input_tokens": getattr(usage, "input_tokens", 0), + "output_tokens": getattr(usage, "output_tokens", 0), + "cache_read_tokens": getattr(usage, "cache_read_tokens", 0), + "cache_write_tokens": getattr(usage, "cache_write_tokens", 0), + "reasoning_tokens": getattr(usage, "reasoning_tokens", 0), + } + return { + "label": label, + "model": getattr(acct, "model", None), + "provider": getattr(acct, "provider", None), + "temperature": getattr(acct, "temperature", None), + "input_messages": getattr(acct, "messages", None), + "output": getattr(acct, "output", None), + "usage": usage_dict, + "cost_usd": getattr(acct, "cost_usd", None), + "cost_status": getattr(acct, "cost_status", None), + "cost_source": getattr(acct, "cost_source", None), + } + + +def save_moa_turn( + *, + session_id: Optional[str], + preset_name: str, + reference_outputs: list[tuple[str, str, Any]], + aggregator_label: str, + aggregator_model: Optional[str], + aggregator_provider: Optional[str], + aggregator_temperature: Any, + aggregator_input_messages: Any, + aggregator_output: Optional[str], + aggregator_streamed: bool, +) -> None: + """Append one full MoA turn record to the session's trace JSONL, if enabled. + + Best-effort: any failure is logged at debug and swallowed — tracing must + never break a live turn. Called once per turn on a reference cache MISS. + + ``aggregator_output`` is the aggregator's synthesized text when it was + captured inline (non-streaming path — the eval / quiet-mode path). When the + aggregator streamed to a live consumer, ``aggregator_streamed`` is True and + the output is delivered as the turn's assistant message in the session + store instead; the trace records the full aggregator INPUT either way. + """ + base = _traces_enabled_and_dir() + if base is None: + return + try: + base.mkdir(parents=True, exist_ok=True) + path = base / f"{_sanitize_session_id(session_id)}.jsonl" + record = { + "ts": time.time(), + "session_id": session_id, + "preset": preset_name, + "references": [ + _slot_trace(acct, label) + for label, _text, acct in reference_outputs + ], + "aggregator": { + "label": aggregator_label, + "model": aggregator_model, + "provider": aggregator_provider, + "temperature": aggregator_temperature, + "input_messages": aggregator_input_messages, + "output": aggregator_output, + "streamed": aggregator_streamed, + # When streamed, the aggregator's acting output is persisted as + # the turn's assistant message in state.db (see the session + # store); it is not duplicated here. + "output_location": "assistant_message_in_session_db" + if aggregator_streamed else "inline", + }, + } + with path.open("a", encoding="utf-8") as f: + f.write(json.dumps(record, ensure_ascii=False, default=str) + "\n") + except Exception as exc: # pragma: no cover - tracing must never break a turn + logger.debug("MoA trace write failed (session=%s): %s", session_id, exc) diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 5b2ad0fd9..b19ef5479 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -2155,6 +2155,14 @@ DEFAULT_CONFIG = { "moa": { "default_preset": "default", "active_preset": "", + # When true, every MoA turn that runs the reference fan-out writes the + # FULL turn (each reference's exact input messages + output + usage/cost, + # and the aggregator's exact input + output) to a JSONL file at + # /moa-traces/.jsonl. Off by default — turn it + # on to audit / improve MoA behavior from real runs. Set trace_dir to + # override the output directory. + "save_traces": False, + "trace_dir": "", "presets": { "default": { "reference_models": [ diff --git a/tests/run_agent/test_moa_loop_mode.py b/tests/run_agent/test_moa_loop_mode.py index 46976c77a..33103c5ff 100644 --- a/tests/run_agent/test_moa_loop_mode.py +++ b/tests/run_agent/test_moa_loop_mode.py @@ -884,3 +884,132 @@ def test_canonical_usage_add(): assert total.cache_read_tokens == 5 assert total.cache_write_tokens == 3 assert total.request_count == 2 + + +def test_moa_full_trace_written_when_enabled(monkeypatch, tmp_path): + """With moa.save_traces on, a full MoA turn is written to JSONL. + + Asserts the record captures each reference's FULL input messages + output + and the aggregator's FULL input (incl. injected reference guidance) + + output — the true full turn, auditable offline. + """ + import json + + home = tmp_path / ".hermes" + home.mkdir() + (home / "config.yaml").write_text( + """ +moa: + save_traces: true + default_preset: review + presets: + review: + reference_models: + - provider: openrouter + model: adv-a + - provider: openrouter + model: adv-b + aggregator: + provider: openrouter + model: anthropic/claude-opus-4.8 +""".strip(), + encoding="utf-8", + ) + monkeypatch.setenv("HERMES_HOME", str(home)) + + def fake_call_llm(**kwargs): + if kwargs["task"] == "moa_reference": + # Echo the model so we can prove per-reference output is captured. + model = kwargs.get("model", "?") + return _response_with_usage(content=f"advice from {model}", prompt=500, completion=80) + return _response("AGGREGATOR FINAL ANSWER") + + monkeypatch.setattr("agent.moa_loop.call_llm", fake_call_llm) + monkeypatch.setattr( + "agent.moa_loop._slot_runtime", + lambda slot: {"provider": "openrouter", "model": slot.get("model")}, + ) + monkeypatch.setattr( + "agent.usage_pricing.estimate_usage_cost", + lambda *a, **k: SimpleNamespace(amount_usd=0.001, status="estimated", source="table"), + ) + + from agent.moa_loop import MoAChatCompletions + + facade = MoAChatCompletions("review") + # Non-streaming create() → aggregator output captured inline. + facade.create(messages=[{"role": "user", "content": "please review the plan"}], tools=[]) + facade.consume_and_save_trace(session_id="sess-xyz") + + trace_file = home / "moa-traces" / "sess-xyz.jsonl" + assert trace_file.exists(), "trace file not written" + lines = trace_file.read_text(encoding="utf-8").strip().splitlines() + assert len(lines) == 1 + rec = json.loads(lines[0]) + + # Turn framing. + assert rec["session_id"] == "sess-xyz" + assert rec["preset"] == "review" + + # Both references captured, each with FULL input messages + output. + assert len(rec["references"]) == 2 + for ref in rec["references"]: + assert ref["model"] in ("adv-a", "adv-b") + assert ref["provider"] == "openrouter" + # Full input messages present (system advisory prompt + advisory view). + assert isinstance(ref["input_messages"], list) and len(ref["input_messages"]) >= 2 + assert ref["input_messages"][0]["role"] == "system" + # Full output present and model-specific. + assert ref["output"] == f"advice from {ref['model']}" + assert ref["usage"]["input_tokens"] == 500 + assert ref["cost_usd"] == 0.001 + + # Aggregator: full input (with injected reference guidance) + inline output. + agg = rec["aggregator"] + assert agg["model"] == "anthropic/claude-opus-4.8" + assert agg["streamed"] is False + assert agg["output"] == "AGGREGATOR FINAL ANSWER" + agg_text = json.dumps(agg["input_messages"]) + assert "Mixture of Agents reference context" in agg_text + assert "advice from adv-a" in agg_text and "advice from adv-b" in agg_text + + +def test_moa_trace_not_written_when_disabled(monkeypatch, tmp_path): + """Default (save_traces off) writes nothing.""" + home = tmp_path / ".hermes" + home.mkdir() + (home / "config.yaml").write_text( + """ +moa: + default_preset: review + presets: + review: + reference_models: + - provider: openrouter + model: adv-a + aggregator: + provider: openrouter + model: anthropic/claude-opus-4.8 +""".strip(), + encoding="utf-8", + ) + monkeypatch.setenv("HERMES_HOME", str(home)) + + def fake_call_llm(**kwargs): + if kwargs["task"] == "moa_reference": + return _response_with_usage(content="advice") + return _response("acted") + + monkeypatch.setattr("agent.moa_loop.call_llm", fake_call_llm) + monkeypatch.setattr( + "agent.moa_loop._slot_runtime", + lambda slot: {"provider": "openrouter", "model": slot.get("model")}, + ) + + from agent.moa_loop import MoAChatCompletions + + facade = MoAChatCompletions("review") + facade.create(messages=[{"role": "user", "content": "hi"}], tools=[]) + facade.consume_and_save_trace(session_id="sess-off") + + assert not (home / "moa-traces").exists()