From dc1ea005d9dbf7cbe18380755bfc4d4c08df9553 Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Wed, 1 Jul 2026 16:46:48 +0530 Subject: [PATCH] fix+test(codex): self-persist projected turns; keep agent_persisted=True MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up correcting the salvaged fix's persistence approach to avoid a duplicate user-message write (verified via E2E — the #860/#42039 bug class the original diff aimed to avoid). Root cause: in gateway mode the AIAgent is built WITH a session_db, so the inbound user turn is already flushed at turn start (turn_context. _persist_session). The original fix returned agent_persisted=False, making the gateway re-write the whole new-message slice via append_to_transcript -> append_message (a raw INSERT with no dedup), duplicating the already-flushed user turn. Corrected approach (single writer): run_codex_app_server_turn now flushes its OWN projected assistant/tool messages via _flush_messages_to_session_db (which dedups the already-persisted user turn through _DB_PERSISTED_MARKER) and returns agent_persisted=True so the gateway skips its write. Net result: session_search/distill see the full codex conversation, each message persisted exactly once. Adds regression coverage asserting exactly-once persistence on a real SessionDB, agent_persisted=True, FTS visibility, and standard-runtime skip-db behaviour preserved. Co-authored-by: Lubos Buracinsky --- agent/codex_runtime.py | 44 +++-- gateway/run.py | 17 +- tests/agent/test_codex_app_server_persist.py | 166 +++++++++++++++++++ 3 files changed, 208 insertions(+), 19 deletions(-) create mode 100644 tests/agent/test_codex_app_server_persist.py diff --git a/agent/codex_runtime.py b/agent/codex_runtime.py index 0a63273ef..d8f3f83e7 100644 --- a/agent/codex_runtime.py +++ b/agent/codex_runtime.py @@ -333,6 +333,28 @@ def run_codex_app_server_turn( if turn.projected_messages: messages.extend(turn.projected_messages) + # Persist the newly-projected assistant/tool messages ourselves. + # This path is an early return that bypasses conversation_loop, whose + # normal per-step _persist_session() calls would otherwise flush them. + # The inbound user turn was already flushed at turn start + # (turn_context.py _persist_session), and _flush_messages_to_session_db + # is idempotent via the intrinsic _DB_PERSISTED_MARKER — so this writes + # ONLY the new codex projected rows and does NOT re-write the user turn. + # Keeping the agent as the sole persister lets us return + # agent_persisted=True below, so the gateway skips its own DB write and + # we avoid the #860/#42039 duplicate user-message write (append_message + # is a raw INSERT with no dedup, so a gateway re-write would duplicate + # the already-flushed user turn). See gateway/run.py agent_persisted. + if getattr(agent, "_session_db", None) is not None: + try: + agent._flush_messages_to_session_db(messages) + except Exception: + logger.debug( + "codex app-server projected-message flush failed", + exc_info=True, + ) + + # Counter ticks for the agent-improvement loop. # _turns_since_memory and _user_turn_count are ALREADY incremented # in the run_conversation() pre-loop block (lines ~11793-11817) so we @@ -394,16 +416,18 @@ def run_codex_app_server_turn( "completed": not turn.interrupted and turn.error is None, "partial": turn.interrupted or turn.error is not None, "error": turn.error, - # Signal that the codex app-server runtime did NOT self-persist - # its turn messages to the session DB. The standard conversation_loop - # path flushes messages via _flush_messages_to_session_db(), but - # run_codex_app_server_turn is an early-return that bypasses that - # loop entirely. Without this flag, gateway/run.py assumes - # self._session_db is not None → skip_db=True on every - # append_to_transcript call, leaving codex turns persisted nowhere - # (state.db gets only session_meta rows, so session_search and - # conversation-distill are blind to real gateway conversations). - "agent_persisted": False, + # The codex app-server runtime IS an early-return path that bypasses + # conversation_loop, but we flush the projected assistant/tool messages + # ourselves above (see the _flush_messages_to_session_db call after + # messages.extend). The inbound user turn was already flushed at turn + # start (turn_context._persist_session) and the flush dedups via + # _DB_PERSISTED_MARKER, so state.db ends up with each real message + # exactly once and session_search / conversation-distill see the full + # gateway conversation. Report agent_persisted=True so the gateway + # skips its own append_to_transcript DB write — writing again there + # would re-INSERT the already-flushed user turn (append_message has no + # dedup), reintroducing the #860 / #42039 duplicate-write bug. + "agent_persisted": True, "codex_thread_id": turn.thread_id, "codex_turn_id": turn.turn_id, **usage_result, diff --git a/gateway/run.py b/gateway/run.py index 677cb9223..06e1e411e 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -11096,15 +11096,14 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew # The agent already persisted these messages to SQLite via # _flush_messages_to_session_db(), so skip the DB write here - # to prevent the duplicate-write bug (#860 / #42039). - # Exception: the codex app-server runtime is an early-return path - # that bypasses conversation_loop and never calls - # _flush_messages_to_session_db(). It signals this by returning - # agent_persisted=False; the gateway then writes the new turn's - # messages to state.db so session_search (FTS) and - # conversation-distill can see real gateway conversations. - # Default True preserves the existing behaviour for the standard - # runtime (no duplicate-write regression, #860 / #42039). + # to prevent the duplicate-write bug (#860 / #42039). This holds + # for the codex app-server runtime too: although it early-returns + # and bypasses conversation_loop's per-step flushes, it flushes its + # own projected assistant/tool messages before returning and + # reports agent_persisted=True (see agent/codex_runtime.py). Reading + # the flag (default = self._session_db is not None) keeps the + # persistence contract explicit and lets any future non-persisting + # runtime opt into a gateway-side write by returning False. agent_persisted = agent_result.get("agent_persisted", self._session_db is not None) # Find only the NEW messages from this turn (skip history we loaded). diff --git a/tests/agent/test_codex_app_server_persist.py b/tests/agent/test_codex_app_server_persist.py new file mode 100644 index 000000000..001082e3f --- /dev/null +++ b/tests/agent/test_codex_app_server_persist.py @@ -0,0 +1,166 @@ +"""Regression for #49225 — codex app-server turns must reach the session DB +exactly once. + +The codex app-server runtime (``run_codex_app_server_turn``) is an early-return +path that bypasses ``conversation_loop`` and therefore never runs the loop's +per-step ``_persist_session()`` flushes. Before the fix, the projected +assistant/tool messages were persisted *nowhere* (state.db got only +session_meta rows), leaving ``session_search`` (FTS) and conversation-distill +blind to real gateway conversations. + +The fix has the codex runtime flush its own projected messages via +``_flush_messages_to_session_db()`` (idempotent through the intrinsic +``_DB_PERSISTED_MARKER``) and return ``agent_persisted=True`` so the gateway +skips its own ``append_to_transcript`` DB write. This is critical: the inbound +user turn is already flushed at turn start (``turn_context._persist_session``), +and ``append_message`` is a raw INSERT with no dedup — a gateway re-write would +duplicate the user turn (#860 / #42039). This test locks in: + +1. ``run_codex_app_server_turn`` flushes projected messages and returns + ``agent_persisted=True``. +2. Exactly-once persistence: the already-flushed user turn is NOT re-written, + and the new projected assistant message lands once. +3. The gateway resolution expression preserves standard-runtime behaviour. +""" + +import tempfile +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import MagicMock + +from agent.codex_runtime import run_codex_app_server_turn +from hermes_state import SessionDB +from run_agent import AIAgent + + +def _make_turn(): + return SimpleNamespace( + interrupted=False, + error=None, + thread_id="thread-1", + turn_id="turn-1", + projected_messages=[{"role": "assistant", "content": "CODEX_ASSISTANT"}], + tool_iterations=0, + final_text="CODEX_ASSISTANT", + should_retire=False, + ) + + +def _make_agent(session_db=None, session_id="sess-codex"): + agent = MagicMock() + # Pre-seed the session so run_codex_app_server_turn skips the spawn block. + agent._codex_session = MagicMock() + agent._codex_session.run_turn.return_value = _make_turn() + agent.tool_progress_callback = None + agent._iters_since_skill = 0 + agent._skill_nudge_interval = 0 + agent.valid_tool_names = set() + agent._session_db = session_db + agent._session_db_created = True + agent.session_id = session_id + return agent + + +def test_codex_success_flushes_and_reports_persisted(): + """Codex success turn must self-persist and return agent_persisted=True.""" + agent = _make_agent(session_db=None) # no DB -> flush is a no-op, still True + result = run_codex_app_server_turn( + agent, + user_message="hello", + original_user_message="hello", + messages=[{"role": "user", "content": "hello"}], + effective_task_id="task-1", + ) + assert result["completed"] is True + # With the agent as sole persister, the gateway must SKIP its DB write. + assert result["agent_persisted"] is True + + +def test_codex_turn_persists_each_message_exactly_once(): + """The user turn (flushed at turn start) must not be duplicated; the + projected assistant message must land once. Uses a real SessionDB and the + real AIAgent._flush_messages_to_session_db to prove no #860/#42039 + duplicate-write regression on the codex path.""" + tmp = tempfile.mkdtemp(prefix="codex_persist_") + try: + db = SessionDB(Path(tmp) / "state.db") + sid = "sess-codex-once" + db.create_session(session_id=sid, source="telegram", model="codex") + + # Real agent bound to this DB/session, minimal construction. + agent = AIAgent( + api_key="test-key", + base_url="https://openrouter.ai/api/v1", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + session_db=db, + session_id=sid, + ) + agent._session_db_created = True + agent._codex_session = MagicMock() + agent._codex_session.run_turn.return_value = _make_turn() + agent.tool_progress_callback = None + + # Model the real flow: the inbound user turn is flushed at turn start + # (turn_context._persist_session) on the SAME `messages` list the codex + # path later reuses. That flush stamps _DB_PERSISTED_MARKER on the user + # dict, so the codex-path flush skips it — no duplicate. + user_msg = {"role": "user", "content": "USER_TURN"} + messages = [user_msg] + agent._flush_messages_to_session_db(messages) # turn-start flush + + result = run_codex_app_server_turn( + agent, + user_message="USER_TURN", + original_user_message="USER_TURN", + messages=messages, + effective_task_id="task-1", + ) + assert result["agent_persisted"] is True + + rows = db.get_messages(sid, include_inactive=True) + contents = [r["content"] for r in rows] + # Exactly one user turn, exactly one assistant turn — no duplicates. + assert contents.count("USER_TURN") == 1, contents + assert contents.count("CODEX_ASSISTANT") == 1, contents + # session_search can now see the codex conversation. + hits = {r["session_id"] for r in db.search_messages("CODEX_ASSISTANT")} + assert sid in hits + finally: + import shutil + + shutil.rmtree(tmp) + + +class TestGatewayPersistedResolution: + """The gateway default must preserve standard-runtime skip-db behaviour.""" + + @staticmethod + def _resolve_persistence_block(agent_result, session_db_present): + # gateway/run.py persistence block: + # agent_persisted = agent_result.get("agent_persisted", self._session_db is not None) + return agent_result.get("agent_persisted", session_db_present) + + @staticmethod + def _resolve_passthrough(result_holder0): + # gateway/run.py result_holder passthrough: + # result_holder[0].get("agent_persisted", True) if result_holder[0] else True + return result_holder0.get("agent_persisted", True) if result_holder0 else True + + def test_codex_result_keeps_gateway_skip(self): + # Codex now self-persists → gateway must SKIP (agent_persisted True). + codex = {"agent_persisted": True} + assert self._resolve_persistence_block(codex, True) is True + assert self._resolve_persistence_block(codex, False) is True + assert self._resolve_passthrough(codex) is True + + def test_standard_runtime_preserves_skip_db(self): + # Standard runtime omits the key → old behaviour: skip iff DB present. + standard = {"final_response": "ok"} + assert self._resolve_persistence_block(standard, True) is True + assert self._resolve_persistence_block(standard, False) is False + assert self._resolve_passthrough(standard) is True + + def test_missing_result_holder_defaults_persisted(self): + assert self._resolve_passthrough(None) is True