fix+test(codex): self-persist projected turns; keep agent_persisted=True

Follow-up correcting the salvaged fix's persistence approach to avoid a
duplicate user-message write (verified via E2E — the #860/#42039 bug class
the original diff aimed to avoid).

Root cause: in gateway mode the AIAgent is built WITH a session_db, so the
inbound user turn is already flushed at turn start (turn_context.
_persist_session). The original fix returned agent_persisted=False, making the
gateway re-write the whole new-message slice via append_to_transcript ->
append_message (a raw INSERT with no dedup), duplicating the already-flushed
user turn.

Corrected approach (single writer): run_codex_app_server_turn now flushes its
OWN projected assistant/tool messages via _flush_messages_to_session_db (which
dedups the already-persisted user turn through _DB_PERSISTED_MARKER) and
returns agent_persisted=True so the gateway skips its write. Net result:
session_search/distill see the full codex conversation, each message persisted
exactly once.

Adds regression coverage asserting exactly-once persistence on a real
SessionDB, agent_persisted=True, FTS visibility, and standard-runtime skip-db
behaviour preserved.

Co-authored-by: Lubos Buracinsky <lubos@komfi.health>
This commit is contained in:
kshitijk4poor 2026-07-01 16:46:48 +05:30 committed by kshitij
parent 5558382457
commit dc1ea005d9
3 changed files with 208 additions and 19 deletions

View file

@ -333,6 +333,28 @@ def run_codex_app_server_turn(
if turn.projected_messages:
messages.extend(turn.projected_messages)
# Persist the newly-projected assistant/tool messages ourselves.
# This path is an early return that bypasses conversation_loop, whose
# normal per-step _persist_session() calls would otherwise flush them.
# The inbound user turn was already flushed at turn start
# (turn_context.py _persist_session), and _flush_messages_to_session_db
# is idempotent via the intrinsic _DB_PERSISTED_MARKER — so this writes
# ONLY the new codex projected rows and does NOT re-write the user turn.
# Keeping the agent as the sole persister lets us return
# agent_persisted=True below, so the gateway skips its own DB write and
# we avoid the #860/#42039 duplicate user-message write (append_message
# is a raw INSERT with no dedup, so a gateway re-write would duplicate
# the already-flushed user turn). See gateway/run.py agent_persisted.
if getattr(agent, "_session_db", None) is not None:
try:
agent._flush_messages_to_session_db(messages)
except Exception:
logger.debug(
"codex app-server projected-message flush failed",
exc_info=True,
)
# Counter ticks for the agent-improvement loop.
# _turns_since_memory and _user_turn_count are ALREADY incremented
# in the run_conversation() pre-loop block (lines ~11793-11817) so we
@ -394,16 +416,18 @@ def run_codex_app_server_turn(
"completed": not turn.interrupted and turn.error is None,
"partial": turn.interrupted or turn.error is not None,
"error": turn.error,
# Signal that the codex app-server runtime did NOT self-persist
# its turn messages to the session DB. The standard conversation_loop
# path flushes messages via _flush_messages_to_session_db(), but
# run_codex_app_server_turn is an early-return that bypasses that
# loop entirely. Without this flag, gateway/run.py assumes
# self._session_db is not None → skip_db=True on every
# append_to_transcript call, leaving codex turns persisted nowhere
# (state.db gets only session_meta rows, so session_search and
# conversation-distill are blind to real gateway conversations).
"agent_persisted": False,
# The codex app-server runtime IS an early-return path that bypasses
# conversation_loop, but we flush the projected assistant/tool messages
# ourselves above (see the _flush_messages_to_session_db call after
# messages.extend). The inbound user turn was already flushed at turn
# start (turn_context._persist_session) and the flush dedups via
# _DB_PERSISTED_MARKER, so state.db ends up with each real message
# exactly once and session_search / conversation-distill see the full
# gateway conversation. Report agent_persisted=True so the gateway
# skips its own append_to_transcript DB write — writing again there
# would re-INSERT the already-flushed user turn (append_message has no
# dedup), reintroducing the #860 / #42039 duplicate-write bug.
"agent_persisted": True,
"codex_thread_id": turn.thread_id,
"codex_turn_id": turn.turn_id,
**usage_result,

View file

@ -11096,15 +11096,14 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
# The agent already persisted these messages to SQLite via
# _flush_messages_to_session_db(), so skip the DB write here
# to prevent the duplicate-write bug (#860 / #42039).
# Exception: the codex app-server runtime is an early-return path
# that bypasses conversation_loop and never calls
# _flush_messages_to_session_db(). It signals this by returning
# agent_persisted=False; the gateway then writes the new turn's
# messages to state.db so session_search (FTS) and
# conversation-distill can see real gateway conversations.
# Default True preserves the existing behaviour for the standard
# runtime (no duplicate-write regression, #860 / #42039).
# to prevent the duplicate-write bug (#860 / #42039). This holds
# for the codex app-server runtime too: although it early-returns
# and bypasses conversation_loop's per-step flushes, it flushes its
# own projected assistant/tool messages before returning and
# reports agent_persisted=True (see agent/codex_runtime.py). Reading
# the flag (default = self._session_db is not None) keeps the
# persistence contract explicit and lets any future non-persisting
# runtime opt into a gateway-side write by returning False.
agent_persisted = agent_result.get("agent_persisted", self._session_db is not None)
# Find only the NEW messages from this turn (skip history we loaded).

View file

@ -0,0 +1,166 @@
"""Regression for #49225 — codex app-server turns must reach the session DB
exactly once.
The codex app-server runtime (``run_codex_app_server_turn``) is an early-return
path that bypasses ``conversation_loop`` and therefore never runs the loop's
per-step ``_persist_session()`` flushes. Before the fix, the projected
assistant/tool messages were persisted *nowhere* (state.db got only
session_meta rows), leaving ``session_search`` (FTS) and conversation-distill
blind to real gateway conversations.
The fix has the codex runtime flush its own projected messages via
``_flush_messages_to_session_db()`` (idempotent through the intrinsic
``_DB_PERSISTED_MARKER``) and return ``agent_persisted=True`` so the gateway
skips its own ``append_to_transcript`` DB write. This is critical: the inbound
user turn is already flushed at turn start (``turn_context._persist_session``),
and ``append_message`` is a raw INSERT with no dedup a gateway re-write would
duplicate the user turn (#860 / #42039). This test locks in:
1. ``run_codex_app_server_turn`` flushes projected messages and returns
``agent_persisted=True``.
2. Exactly-once persistence: the already-flushed user turn is NOT re-written,
and the new projected assistant message lands once.
3. The gateway resolution expression preserves standard-runtime behaviour.
"""
import tempfile
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import MagicMock
from agent.codex_runtime import run_codex_app_server_turn
from hermes_state import SessionDB
from run_agent import AIAgent
def _make_turn():
return SimpleNamespace(
interrupted=False,
error=None,
thread_id="thread-1",
turn_id="turn-1",
projected_messages=[{"role": "assistant", "content": "CODEX_ASSISTANT"}],
tool_iterations=0,
final_text="CODEX_ASSISTANT",
should_retire=False,
)
def _make_agent(session_db=None, session_id="sess-codex"):
agent = MagicMock()
# Pre-seed the session so run_codex_app_server_turn skips the spawn block.
agent._codex_session = MagicMock()
agent._codex_session.run_turn.return_value = _make_turn()
agent.tool_progress_callback = None
agent._iters_since_skill = 0
agent._skill_nudge_interval = 0
agent.valid_tool_names = set()
agent._session_db = session_db
agent._session_db_created = True
agent.session_id = session_id
return agent
def test_codex_success_flushes_and_reports_persisted():
"""Codex success turn must self-persist and return agent_persisted=True."""
agent = _make_agent(session_db=None) # no DB -> flush is a no-op, still True
result = run_codex_app_server_turn(
agent,
user_message="hello",
original_user_message="hello",
messages=[{"role": "user", "content": "hello"}],
effective_task_id="task-1",
)
assert result["completed"] is True
# With the agent as sole persister, the gateway must SKIP its DB write.
assert result["agent_persisted"] is True
def test_codex_turn_persists_each_message_exactly_once():
"""The user turn (flushed at turn start) must not be duplicated; the
projected assistant message must land once. Uses a real SessionDB and the
real AIAgent._flush_messages_to_session_db to prove no #860/#42039
duplicate-write regression on the codex path."""
tmp = tempfile.mkdtemp(prefix="codex_persist_")
try:
db = SessionDB(Path(tmp) / "state.db")
sid = "sess-codex-once"
db.create_session(session_id=sid, source="telegram", model="codex")
# Real agent bound to this DB/session, minimal construction.
agent = AIAgent(
api_key="test-key",
base_url="https://openrouter.ai/api/v1",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
session_db=db,
session_id=sid,
)
agent._session_db_created = True
agent._codex_session = MagicMock()
agent._codex_session.run_turn.return_value = _make_turn()
agent.tool_progress_callback = None
# Model the real flow: the inbound user turn is flushed at turn start
# (turn_context._persist_session) on the SAME `messages` list the codex
# path later reuses. That flush stamps _DB_PERSISTED_MARKER on the user
# dict, so the codex-path flush skips it — no duplicate.
user_msg = {"role": "user", "content": "USER_TURN"}
messages = [user_msg]
agent._flush_messages_to_session_db(messages) # turn-start flush
result = run_codex_app_server_turn(
agent,
user_message="USER_TURN",
original_user_message="USER_TURN",
messages=messages,
effective_task_id="task-1",
)
assert result["agent_persisted"] is True
rows = db.get_messages(sid, include_inactive=True)
contents = [r["content"] for r in rows]
# Exactly one user turn, exactly one assistant turn — no duplicates.
assert contents.count("USER_TURN") == 1, contents
assert contents.count("CODEX_ASSISTANT") == 1, contents
# session_search can now see the codex conversation.
hits = {r["session_id"] for r in db.search_messages("CODEX_ASSISTANT")}
assert sid in hits
finally:
import shutil
shutil.rmtree(tmp)
class TestGatewayPersistedResolution:
"""The gateway default must preserve standard-runtime skip-db behaviour."""
@staticmethod
def _resolve_persistence_block(agent_result, session_db_present):
# gateway/run.py persistence block:
# agent_persisted = agent_result.get("agent_persisted", self._session_db is not None)
return agent_result.get("agent_persisted", session_db_present)
@staticmethod
def _resolve_passthrough(result_holder0):
# gateway/run.py result_holder passthrough:
# result_holder[0].get("agent_persisted", True) if result_holder[0] else True
return result_holder0.get("agent_persisted", True) if result_holder0 else True
def test_codex_result_keeps_gateway_skip(self):
# Codex now self-persists → gateway must SKIP (agent_persisted True).
codex = {"agent_persisted": True}
assert self._resolve_persistence_block(codex, True) is True
assert self._resolve_persistence_block(codex, False) is True
assert self._resolve_passthrough(codex) is True
def test_standard_runtime_preserves_skip_db(self):
# Standard runtime omits the key → old behaviour: skip iff DB present.
standard = {"final_response": "ok"}
assert self._resolve_persistence_block(standard, True) is True
assert self._resolve_persistence_block(standard, False) is False
assert self._resolve_passthrough(standard) is True
def test_missing_result_holder_defaults_persisted(self):
assert self._resolve_passthrough(None) is True