feat(moa): opt-in full-turn trace persistence to JSONL (#56101)

Adds moa.save_traces (default off). When on, every MoA turn that runs the
reference fan-out appends one JSON line to
<hermes_home>/moa-traces/<session_id>.jsonl capturing the TRUE FULL turn:
each reference model's exact input messages (system advisory prompt + full
advisory view, not the truncated display preview) + full output + usage +
per-advisor cost, and the aggregator's exact input (including the injected
reference-context guidance block) + output. Lets MoA runs be audited and
improved offline — what every model saw, said, and cost.

- agent/moa_trace.py: config-gated JSONL writer, profile-aware path via
  get_hermes_home(), best-effort (never breaks a turn), moa.trace_dir override.
- agent/moa_loop.py: _RefAccounting now carries full input/output/model/
  provider/temperature; create() stashes the full turn on a cache MISS
  (once per turn, never on the cache-HIT repeat iterations); non-streaming
  aggregator output captured inline, streaming marked + pointed at the
  session assistant message. consume_and_save_trace(session_id) flushes it.
- agent/conversation_loop.py: flushes the trace with the live session_id
  right after MoA usage consumption. No-op for non-MoA clients.
- hermes_cli/config.py: moa.save_traces + moa.trace_dir defaults.

Traces are a side channel — NOT the messages table, never in replay, safe
to delete. Off by default; only overhead when off is one config read on a
MoA cache-MISS turn.

Tests: full-trace-when-enabled (per-ref input+output+cost, aggregator
input-with-guidance + output), nothing-when-disabled. Live E2E through
run_conversation confirmed the loop wiring writes the file.
This commit is contained in:
Teknium 2026-07-01 00:09:42 -07:00 committed by GitHub
parent 5f7deeba84
commit 2e8748ed22
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 437 additions and 8 deletions

View file

@ -1941,6 +1941,15 @@ def run_conversation(
canonical_usage = canonical_usage + _ref_usage
except Exception as _moa_acct_exc: # pragma: no cover - defensive
logger.debug("MoA reference usage accounting failed: %s", _moa_acct_exc)
# Flush the full-turn MoA trace (references + aggregator I/O)
# to disk when moa.save_traces is on. No-op otherwise and
# for non-MoA clients. Uses the live session_id so traces
# land in the right per-session file.
if _moa_client is not None and hasattr(_moa_client, "consume_and_save_trace"):
try:
_moa_client.consume_and_save_trace(agent.session_id)
except Exception as _moa_trace_exc: # pragma: no cover - defensive
logger.debug("MoA trace flush failed: %s", _moa_trace_exc)
prompt_tokens = canonical_usage.prompt_tokens
completion_tokens = canonical_usage.output_tokens
total_tokens = canonical_usage.total_tokens

View file

@ -28,8 +28,8 @@ _MAX_REFERENCE_WORKERS = 8
class _RefAccounting:
"""Per-reference token usage + estimated cost, carried as the third slot
of a reference-output tuple.
"""Per-reference token usage + estimated cost + full trace, carried as the
third slot of a reference-output tuple.
Kept as a tiny object (not a bare CanonicalUsage) because an advisor may
run on a different model/provider than the aggregator, so its cost MUST be
@ -37,15 +37,48 @@ class _RefAccounting:
aggregator's usage and pricing the sum at the aggregator's rate would
misprice every advisor. ``usage`` feeds accurate token counts;
``cost_usd`` feeds accurate cost.
``messages`` / ``output`` / ``model`` / ``provider`` / ``temperature``
carry the FULL reference input and output for trace persistence (the
display ``text`` is a truncated preview and is not enough to audit what an
advisor actually saw). They are only populated when tracing is on; they add
negligible cost otherwise.
"""
__slots__ = ("usage", "cost_usd", "cost_status", "cost_source")
__slots__ = (
"usage",
"cost_usd",
"cost_status",
"cost_source",
"messages",
"output",
"model",
"provider",
"temperature",
)
def __init__(self, usage: Any, cost_usd: Any = None, cost_status: str | None = None, cost_source: str | None = None):
def __init__(
self,
usage: Any,
cost_usd: Any = None,
cost_status: str | None = None,
cost_source: str | None = None,
*,
messages: Any = None,
output: str | None = None,
model: str | None = None,
provider: str | None = None,
temperature: Any = None,
):
self.usage = usage
self.cost_usd = cost_usd
self.cost_status = cost_status
self.cost_source = cost_source
self.messages = messages
self.output = output
self.model = model
self.provider = provider
self.temperature = temperature
# Per-tool-result character budget for the advisory reference view. Tool
# results can be huge (a full diff, a 5000-line file dump); replaying them
@ -219,11 +252,29 @@ def _run_reference(
cost_source = cost.source
except Exception: # pragma: no cover - defensive
pass
acct = _RefAccounting(usage, cost_usd, cost_status, cost_source)
return label, _extract_text(response) or "(empty response)", acct
_output_text = _extract_text(response) or "(empty response)"
acct = _RefAccounting(
usage,
cost_usd,
cost_status,
cost_source,
messages=messages,
output=_output_text,
model=slot.get("model"),
provider=runtime.get("provider") or slot.get("provider"),
temperature=temperature,
)
return label, _output_text, acct
except Exception as exc:
logger.warning("MoA reference model %s failed: %s", label, exc)
return label, f"[failed: {exc}]", _RefAccounting(CanonicalUsage())
return label, f"[failed: {exc}]", _RefAccounting(
CanonicalUsage(),
messages=[{"role": "system", "content": _REFERENCE_SYSTEM_PROMPT}, *ref_messages],
output=f"[failed: {exc}]",
model=slot.get("model"),
provider=runtime.get("provider") or slot.get("provider"),
temperature=temperature,
)
def _run_references_parallel(
@ -545,6 +596,10 @@ class MoAChatCompletions:
self._pending_reference_usage: Any = CanonicalUsage()
self._pending_reference_cost: Any = None
# Full-turn trace parts stashed on a cache-MISS create(), awaiting the
# caller to stitch in the live session_id + resolved aggregator output
# and flush to the trace file (only when moa.save_traces is on).
self._pending_trace: Any = None
def consume_reference_usage(self) -> tuple[Any, Any]:
"""Pop pending reference-fan-out usage + cost, resetting both to empty.
@ -563,6 +618,37 @@ class MoAChatCompletions:
self._pending_reference_cost = None
return usage, cost
def consume_and_save_trace(self, session_id: Any = None) -> None:
"""Flush the pending full-turn trace to disk, if one is pending.
No-op when tracing is off (``save_moa_turn`` checks the config), when
there is no pending trace (a cache-HIT iteration ran no references), or
when the aggregator input was never recorded. Clears the pending trace
so a repeat consume cannot double-write. Best-effort never raises.
"""
pending = self._pending_trace
self._pending_trace = None
if not pending or "aggregator_input_messages" not in pending:
return
try:
from agent.moa_trace import save_moa_turn
agg_slot = pending.get("aggregator_slot") or {}
save_moa_turn(
session_id=session_id,
preset_name=pending.get("preset", ""),
reference_outputs=pending.get("reference_outputs", []),
aggregator_label=pending.get("aggregator_label", ""),
aggregator_model=agg_slot.get("model"),
aggregator_provider=agg_slot.get("provider"),
aggregator_temperature=pending.get("aggregator_temperature"),
aggregator_input_messages=pending.get("aggregator_input_messages"),
aggregator_output=pending.get("aggregator_output"),
aggregator_streamed=bool(pending.get("aggregator_streamed")),
)
except Exception as exc: # pragma: no cover - tracing must never break a turn
logger.debug("MoA trace flush failed: %s", exc)
def _emit(self, event: str, **kwargs: Any) -> None:
cb = self.reference_callback
if cb is None:
@ -618,6 +704,10 @@ class MoAChatCompletions:
# advisor spend by the tool-iteration count, so pending is zero.
self._pending_reference_usage = CanonicalUsage()
self._pending_reference_cost = None
# Likewise no trace on a cache HIT — the full turn was already
# traced on the MISS that ran the references. A repeat iteration is
# not a new MoA turn.
self._pending_trace = None
else:
reference_outputs = _run_references_parallel(
reference_models,
@ -645,6 +735,17 @@ class MoAChatCompletions:
_ref_cost = (_ref_cost or 0) + _acct.cost_usd
self._pending_reference_usage = _ref_usage
self._pending_reference_cost = _ref_cost
# Stash the full reference fan-out for trace persistence. The
# aggregator input/label are filled in below once agg_messages is
# built; the aggregator OUTPUT is stitched in by the caller
# (consume_and_save_trace) once the response resolves — the caller
# holds the live session_id and the resolved aggregator response.
self._pending_trace = {
"preset": self.preset_name,
"reference_outputs": list(reference_outputs),
"aggregator_slot": aggregator,
"aggregator_temperature": aggregator_temperature,
}
# Surface each reference model's answer to the display BEFORE the
# aggregator acts — once per turn (only on the iteration that
@ -694,6 +795,12 @@ class MoAChatCompletions:
raise RuntimeError("MoA aggregator cannot be another MoA preset")
agg_kwargs = dict(api_kwargs)
agg_kwargs["messages"] = agg_messages
# Record the exact aggregator INPUT (incl. the injected reference
# context) into the pending trace so a trace captures what the
# aggregator actually saw, not a reconstruction.
if self._pending_trace is not None:
self._pending_trace["aggregator_input_messages"] = agg_messages
self._pending_trace["aggregator_label"] = _slot_label(aggregator)
# The aggregator is the acting model. Resolve its slot to the provider's
# real runtime (base_url/api_key/api_mode) and call it through the same
# request-building path any model uses — so per-model wire-format
@ -720,7 +827,7 @@ class MoAChatCompletions:
# actually governs the aggregator stream, not just call_llm's default.
if api_kwargs.get("timeout") is not None:
stream_kwargs["timeout"] = api_kwargs["timeout"]
return call_llm(
_agg_response = call_llm(
task="moa_aggregator",
messages=agg_messages,
temperature=aggregator_temperature,
@ -730,6 +837,22 @@ class MoAChatCompletions:
**stream_kwargs,
**_slot_runtime(aggregator),
)
# Non-streaming path (quiet mode / eval / subagents): the aggregator
# output is available inline, so capture it into the pending trace now.
# Streaming path: the aggregator's raw token stream is returned to the
# consumer live and its acting output lands as the turn's assistant
# message; the trace marks it streamed and points there.
if self._pending_trace is not None:
if stream:
self._pending_trace["aggregator_streamed"] = True
self._pending_trace["aggregator_output"] = None
else:
self._pending_trace["aggregator_streamed"] = False
try:
self._pending_trace["aggregator_output"] = _extract_text(_agg_response)
except Exception: # pragma: no cover - defensive
self._pending_trace["aggregator_output"] = None
return _agg_response
class MoAClient:
@ -744,3 +867,10 @@ class MoAClient:
usage without reaching into ``.chat.completions`` internals.
"""
return self.chat.completions.consume_reference_usage()
def consume_and_save_trace(self, session_id: Any = None) -> None:
"""Flush the pending full-turn MoA trace via the completions facade.
No-op unless ``moa.save_traces`` is enabled and a turn is pending.
"""
return self.chat.completions.consume_and_save_trace(session_id)

153
agent/moa_trace.py Normal file
View file

@ -0,0 +1,153 @@
"""Full MoA turn trace persistence (opt-in via config ``moa.save_traces``).
When enabled, every Mixture-of-Agents turn that actually runs the reference
fan-out (a cache MISS in ``MoAChatCompletions.create``) appends one JSON line
to ``<hermes_home>/moa-traces/<session_id>.jsonl``. The record is the TRUE
FULL turn the exact messages array each reference model received (system
prompt + advisory view, not the truncated display preview), each reference's
full output, and the exact messages array the aggregator received (including
the injected reference-context guidance block) plus its output when available
so a run can be audited end-to-end offline: what every model saw, what every
model said, and what it cost.
This is a side-channel trace. It is NOT the conversation ``messages`` table and
never enters message history or replay MoA references are advisory side-calls
with their own system prompt, not conversation turns, so persisting them as
message rows would corrupt role alternation / replay. Traces live in their own
files, keyed by session id, and are safe to delete.
Cost model note: gated OFF by default. When off, the only overhead is the
``_traces_enabled()`` config read (cheap) no file I/O, no serialization.
"""
from __future__ import annotations
import json
import logging
import os
import time
from pathlib import Path
from typing import Any, Optional
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
def _traces_enabled_and_dir() -> Optional[Path]:
"""Return the trace directory if ``moa.save_traces`` is on, else None.
Reads config lazily per call (config is cheap to load and this only runs on
a cache-MISS MoA turn, i.e. once per user turn, not per tool iteration).
``moa.trace_dir`` overrides the default ``<hermes_home>/moa-traces/``.
"""
try:
from hermes_cli.config import load_config
moa_cfg = (load_config() or {}).get("moa") or {}
except Exception: # pragma: no cover - defensive: never break a turn over tracing
return None
if not moa_cfg.get("save_traces"):
return None
override = moa_cfg.get("trace_dir")
if override:
base = Path(os.path.expandvars(os.path.expanduser(str(override))))
else:
base = get_hermes_home() / "moa-traces"
return base
def _sanitize_session_id(session_id: Optional[str]) -> str:
"""Make a session id safe as a filename component."""
if not session_id:
return "unknown-session"
return "".join(c if (c.isalnum() or c in "-_.") else "_" for c in str(session_id))
def _slot_trace(acct: Any, label: str) -> dict[str, Any]:
"""Render one reference's _RefAccounting into a full trace dict.
Includes the FULL input messages the reference received and its FULL
output not the truncated display preview.
"""
usage = getattr(acct, "usage", None)
usage_dict: dict[str, Any] = {}
if usage is not None:
usage_dict = {
"input_tokens": getattr(usage, "input_tokens", 0),
"output_tokens": getattr(usage, "output_tokens", 0),
"cache_read_tokens": getattr(usage, "cache_read_tokens", 0),
"cache_write_tokens": getattr(usage, "cache_write_tokens", 0),
"reasoning_tokens": getattr(usage, "reasoning_tokens", 0),
}
return {
"label": label,
"model": getattr(acct, "model", None),
"provider": getattr(acct, "provider", None),
"temperature": getattr(acct, "temperature", None),
"input_messages": getattr(acct, "messages", None),
"output": getattr(acct, "output", None),
"usage": usage_dict,
"cost_usd": getattr(acct, "cost_usd", None),
"cost_status": getattr(acct, "cost_status", None),
"cost_source": getattr(acct, "cost_source", None),
}
def save_moa_turn(
*,
session_id: Optional[str],
preset_name: str,
reference_outputs: list[tuple[str, str, Any]],
aggregator_label: str,
aggregator_model: Optional[str],
aggregator_provider: Optional[str],
aggregator_temperature: Any,
aggregator_input_messages: Any,
aggregator_output: Optional[str],
aggregator_streamed: bool,
) -> None:
"""Append one full MoA turn record to the session's trace JSONL, if enabled.
Best-effort: any failure is logged at debug and swallowed tracing must
never break a live turn. Called once per turn on a reference cache MISS.
``aggregator_output`` is the aggregator's synthesized text when it was
captured inline (non-streaming path the eval / quiet-mode path). When the
aggregator streamed to a live consumer, ``aggregator_streamed`` is True and
the output is delivered as the turn's assistant message in the session
store instead; the trace records the full aggregator INPUT either way.
"""
base = _traces_enabled_and_dir()
if base is None:
return
try:
base.mkdir(parents=True, exist_ok=True)
path = base / f"{_sanitize_session_id(session_id)}.jsonl"
record = {
"ts": time.time(),
"session_id": session_id,
"preset": preset_name,
"references": [
_slot_trace(acct, label)
for label, _text, acct in reference_outputs
],
"aggregator": {
"label": aggregator_label,
"model": aggregator_model,
"provider": aggregator_provider,
"temperature": aggregator_temperature,
"input_messages": aggregator_input_messages,
"output": aggregator_output,
"streamed": aggregator_streamed,
# When streamed, the aggregator's acting output is persisted as
# the turn's assistant message in state.db (see the session
# store); it is not duplicated here.
"output_location": "assistant_message_in_session_db"
if aggregator_streamed else "inline",
},
}
with path.open("a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False, default=str) + "\n")
except Exception as exc: # pragma: no cover - tracing must never break a turn
logger.debug("MoA trace write failed (session=%s): %s", session_id, exc)

View file

@ -2155,6 +2155,14 @@ DEFAULT_CONFIG = {
"moa": {
"default_preset": "default",
"active_preset": "",
# When true, every MoA turn that runs the reference fan-out writes the
# FULL turn (each reference's exact input messages + output + usage/cost,
# and the aggregator's exact input + output) to a JSONL file at
# <hermes_home>/moa-traces/<session_id>.jsonl. Off by default — turn it
# on to audit / improve MoA behavior from real runs. Set trace_dir to
# override the output directory.
"save_traces": False,
"trace_dir": "",
"presets": {
"default": {
"reference_models": [

View file

@ -884,3 +884,132 @@ def test_canonical_usage_add():
assert total.cache_read_tokens == 5
assert total.cache_write_tokens == 3
assert total.request_count == 2
def test_moa_full_trace_written_when_enabled(monkeypatch, tmp_path):
"""With moa.save_traces on, a full MoA turn is written to JSONL.
Asserts the record captures each reference's FULL input messages + output
and the aggregator's FULL input (incl. injected reference guidance) +
output the true full turn, auditable offline.
"""
import json
home = tmp_path / ".hermes"
home.mkdir()
(home / "config.yaml").write_text(
"""
moa:
save_traces: true
default_preset: review
presets:
review:
reference_models:
- provider: openrouter
model: adv-a
- provider: openrouter
model: adv-b
aggregator:
provider: openrouter
model: anthropic/claude-opus-4.8
""".strip(),
encoding="utf-8",
)
monkeypatch.setenv("HERMES_HOME", str(home))
def fake_call_llm(**kwargs):
if kwargs["task"] == "moa_reference":
# Echo the model so we can prove per-reference output is captured.
model = kwargs.get("model", "?")
return _response_with_usage(content=f"advice from {model}", prompt=500, completion=80)
return _response("AGGREGATOR FINAL ANSWER")
monkeypatch.setattr("agent.moa_loop.call_llm", fake_call_llm)
monkeypatch.setattr(
"agent.moa_loop._slot_runtime",
lambda slot: {"provider": "openrouter", "model": slot.get("model")},
)
monkeypatch.setattr(
"agent.usage_pricing.estimate_usage_cost",
lambda *a, **k: SimpleNamespace(amount_usd=0.001, status="estimated", source="table"),
)
from agent.moa_loop import MoAChatCompletions
facade = MoAChatCompletions("review")
# Non-streaming create() → aggregator output captured inline.
facade.create(messages=[{"role": "user", "content": "please review the plan"}], tools=[])
facade.consume_and_save_trace(session_id="sess-xyz")
trace_file = home / "moa-traces" / "sess-xyz.jsonl"
assert trace_file.exists(), "trace file not written"
lines = trace_file.read_text(encoding="utf-8").strip().splitlines()
assert len(lines) == 1
rec = json.loads(lines[0])
# Turn framing.
assert rec["session_id"] == "sess-xyz"
assert rec["preset"] == "review"
# Both references captured, each with FULL input messages + output.
assert len(rec["references"]) == 2
for ref in rec["references"]:
assert ref["model"] in ("adv-a", "adv-b")
assert ref["provider"] == "openrouter"
# Full input messages present (system advisory prompt + advisory view).
assert isinstance(ref["input_messages"], list) and len(ref["input_messages"]) >= 2
assert ref["input_messages"][0]["role"] == "system"
# Full output present and model-specific.
assert ref["output"] == f"advice from {ref['model']}"
assert ref["usage"]["input_tokens"] == 500
assert ref["cost_usd"] == 0.001
# Aggregator: full input (with injected reference guidance) + inline output.
agg = rec["aggregator"]
assert agg["model"] == "anthropic/claude-opus-4.8"
assert agg["streamed"] is False
assert agg["output"] == "AGGREGATOR FINAL ANSWER"
agg_text = json.dumps(agg["input_messages"])
assert "Mixture of Agents reference context" in agg_text
assert "advice from adv-a" in agg_text and "advice from adv-b" in agg_text
def test_moa_trace_not_written_when_disabled(monkeypatch, tmp_path):
"""Default (save_traces off) writes nothing."""
home = tmp_path / ".hermes"
home.mkdir()
(home / "config.yaml").write_text(
"""
moa:
default_preset: review
presets:
review:
reference_models:
- provider: openrouter
model: adv-a
aggregator:
provider: openrouter
model: anthropic/claude-opus-4.8
""".strip(),
encoding="utf-8",
)
monkeypatch.setenv("HERMES_HOME", str(home))
def fake_call_llm(**kwargs):
if kwargs["task"] == "moa_reference":
return _response_with_usage(content="advice")
return _response("acted")
monkeypatch.setattr("agent.moa_loop.call_llm", fake_call_llm)
monkeypatch.setattr(
"agent.moa_loop._slot_runtime",
lambda slot: {"provider": "openrouter", "model": slot.get("model")},
)
from agent.moa_loop import MoAChatCompletions
facade = MoAChatCompletions("review")
facade.create(messages=[{"role": "user", "content": "hi"}], tools=[])
facade.consume_and_save_trace(session_id="sess-off")
assert not (home / "moa-traces").exists()