From 4580c03e7d5ee6f50861dc50c1bfafcdfbb7d06e Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Wed, 1 Jul 2026 01:57:55 -0700 Subject: [PATCH] test(gateway): align salvaged #54947-cluster tests with async cache helper The three salvaged PRs (#46647, #54583, #55013) were authored against a tree where _refresh_agent_cache_message_count was sync and _session_db was the raw SessionDB. On current main the helper is async and awaits the AsyncSessionDB facade, and _run_agent was split into _run_agent_inner. - Wrap test _session_db in AsyncSessionDB so the awaited get_session works - Make refresh-calling tests async + await the helper - Point the placement-guard test at _run_agent_inner (recursion lives there post-mixin-extraction) - Relocated production call sites now correctly await the async helper --- tests/gateway/test_agent_cache.py | 21 ++++---- ...test_first_turn_session_meta_rebaseline.py | 8 ++- .../test_session_id_cache_coherence.py | 54 +++++++++++-------- 3 files changed, 51 insertions(+), 32 deletions(-) diff --git a/tests/gateway/test_agent_cache.py b/tests/gateway/test_agent_cache.py index e7974bd07..2750cd004 100644 --- a/tests/gateway/test_agent_cache.py +++ b/tests/gateway/test_agent_cache.py @@ -1715,7 +1715,8 @@ class TestAgentCacheMessageCountRebaseline: with runner._agent_cache_lock: assert runner._agent_cache["telegram:s1"][2] == 5 - def test_in_band_followup_reuses_cached_agent(self, tmp_path): + @pytest.mark.asyncio + async def test_in_band_followup_reuses_cached_agent(self, tmp_path): """Behavioral regression for the in-band queued (/queue) follow-up. #46237 re-baselines the snapshot only on the EXTERNAL-turn boundary @@ -1755,7 +1756,7 @@ class TestAgentCacheMessageCountRebaseline: assert self._guard_would_reuse(runner, "telegram:s1", "s1") is False # The fix: re-baseline at the follow-up boundary. - runner._refresh_agent_cache_message_count("telegram:s1", "s1") + await runner._refresh_agent_cache_message_count("telegram:s1", "s1") # The in-band follow-up now REUSES the cached, warm-prefix agent. assert self._guard_would_reuse(runner, "telegram:s1", "s1") is True @@ -1768,18 +1769,20 @@ class TestAgentCacheMessageCountRebaseline: The behavioral test above proves the re-baseline makes the in-band follow-up reuse the cached agent, but it calls the helper directly — it would still pass if the production call were deleted. This guards - the actual call site: inside ``_run_agent`` the queued (/queue) - follow-up recurses via ``followup_result = await self._run_agent(...)`` - and the re-baseline MUST run BEFORE that recursion (running it only - after, like the external-turn site at 8888, is too late for the - in-band path — the follow-up would already have rebuilt). + the actual call site: the queued (/queue) follow-up recurses via + ``followup_result = await self._run_agent(...)`` inside + ``_run_agent_inner`` and the re-baseline MUST run BEFORE that + recursion (running it only after, like the external-turn site, is too + late for the in-band path — the follow-up would already have rebuilt). """ import inspect from gateway.run import GatewayRunner - src = inspect.getsource(GatewayRunner._run_agent) + # The recursion + pre-recursion re-baseline live in the extracted + # ``_run_agent_inner`` (older trees had them inline in ``_run_agent``). + src = inspect.getsource(GatewayRunner._run_agent_inner) marker = "followup_result = await self._run_agent(" - assert marker in src, "in-band queued follow-up recursion not found in _run_agent" + assert marker in src, "in-band queued follow-up recursion not found in _run_agent_inner" before_recursion = src[: src.index(marker)] assert "_refresh_agent_cache_message_count" in before_recursion, ( "the in-band queued follow-up recursion must be preceded by a " diff --git a/tests/gateway/test_first_turn_session_meta_rebaseline.py b/tests/gateway/test_first_turn_session_meta_rebaseline.py index d64546380..1a5e5891b 100644 --- a/tests/gateway/test_first_turn_session_meta_rebaseline.py +++ b/tests/gateway/test_first_turn_session_meta_rebaseline.py @@ -68,8 +68,12 @@ def _bootstrap(monkeypatch, tmp_path, db): runner._is_user_authorized = lambda _source: True runner._set_session_env = lambda _context: None runner._handle_active_session_busy_message = AsyncMock(return_value=False) - # REAL SessionDB so the guard's get_session(...).message_count is live. - runner._session_db = db + # REAL SessionDB behind the async facade the gateway holds — the + # production re-baseline does ``await self._session_db.get_session(...)``, + # so it must be the AsyncSessionDB wrapper, not the raw sync DB. + from hermes_state import AsyncSessionDB + + runner._session_db = AsyncSessionDB(db) runner._recover_telegram_topic_thread_id = lambda _source: None runner._cache_session_source = lambda _key, _source: None runner._is_session_run_current = lambda _key, _gen: True diff --git a/tests/gateway/test_session_id_cache_coherence.py b/tests/gateway/test_session_id_cache_coherence.py index 8013b85fe..07ca78374 100644 --- a/tests/gateway/test_session_id_cache_coherence.py +++ b/tests/gateway/test_session_id_cache_coherence.py @@ -28,6 +28,10 @@ REAL cache lock, mirroring the structure used by import threading +import pytest + +from hermes_state import AsyncSessionDB + def _make_runner(): """Create a minimal GatewayRunner with just the cache infrastructure.""" @@ -60,7 +64,9 @@ def _guard_would_reuse(runner, session_key, session_id): session — guard fires, agent rebuilds). """ try: - row = runner._session_db.get_session(session_id) + # Mirror the production guard, which reads the sync underlying DB + # (``self._session_db._db.get_session``) off the async facade. + row = runner._session_db._db.get_session(session_id) live = row.get("message_count", 0) if row else None except Exception: live = None @@ -111,7 +117,7 @@ class TestSessionIdCacheCoherence: db.append_message("sA", role="user", content="another from A") # sA count = 3, sB count = 0 runner = _make_runner() - runner._session_db = db + runner._session_db = AsyncSessionDB(db) agent = object() # Build cache from session A (mc=3, sid=sA). @@ -128,7 +134,8 @@ class TestSessionIdCacheCoherence: with runner._agent_cache_lock: assert runner._agent_cache["telegram:USER1"][0] is agent - def test_same_session_id_turns_still_reuse(self, tmp_path): + @pytest.mark.asyncio + async def test_same_session_id_turns_still_reuse(self, tmp_path): """#46237 / #45966 invariant: consecutive same-session turns must REUSE the cached agent (prompt cache preserved).""" from hermes_state import SessionDB @@ -136,7 +143,7 @@ class TestSessionIdCacheCoherence: db = SessionDB(db_path=tmp_path / "sessions.db") db.create_session("s1", source="telegram") runner = _make_runner() - runner._session_db = db + runner._session_db = AsyncSessionDB(db) agent = object() _row = db.get_session("s1") @@ -149,7 +156,7 @@ class TestSessionIdCacheCoherence: db.append_message("s1", role="user", content="u") db.append_message("s1", role="assistant", content="a") # Post-turn re-baseline (the #46237 fix). - runner._refresh_agent_cache_message_count("telegram:s1", "s1") + await runner._refresh_agent_cache_message_count("telegram:s1", "s1") if _guard_would_reuse(runner, "telegram:s1", "s1"): reuses += 1 @@ -157,7 +164,8 @@ class TestSessionIdCacheCoherence: with runner._agent_cache_lock: assert runner._agent_cache["telegram:s1"][0] is agent - def test_cross_process_write_still_invalidates(self, tmp_path): + @pytest.mark.asyncio + async def test_cross_process_write_still_invalidates(self, tmp_path): """The original #45966 invariant must hold: a DIFFERENT process appending to the same session in the shared DB invalidates the cache (genuine cross-process write).""" @@ -166,7 +174,7 @@ class TestSessionIdCacheCoherence: db = SessionDB(db_path=tmp_path / "sessions.db") db.create_session("s1", source="telegram") runner = _make_runner() - runner._session_db = db + runner._session_db = AsyncSessionDB(db) agent = object() with runner._agent_cache_lock: @@ -178,7 +186,7 @@ class TestSessionIdCacheCoherence: # Our own turn + re-baseline → reuse next turn. db.append_message("s1", role="user", content="u") db.append_message("s1", role="assistant", content="a") - runner._refresh_agent_cache_message_count("telegram:s1", "s1") + await runner._refresh_agent_cache_message_count("telegram:s1", "s1") assert _guard_would_reuse(runner, "telegram:s1", "s1") is True # ANOTHER process (e.g. the desktop dashboard backend) appends a @@ -188,7 +196,8 @@ class TestSessionIdCacheCoherence: # Guard must invalidate. assert _guard_would_reuse(runner, "telegram:s1", "s1") is False - def test_refresh_skips_when_session_id_differs(self, tmp_path): + @pytest.mark.asyncio + async def test_refresh_skips_when_session_id_differs(self, tmp_path): """_refresh_agent_cache_message_count must NOT refresh the cached snapshot when the current session_id differs from the one the snapshot belongs to. Otherwise the snapshot gets overwritten with @@ -201,7 +210,7 @@ class TestSessionIdCacheCoherence: db.create_session("sB", source="telegram") db.append_message("sA", role="user", content="x") runner = _make_runner() - runner._session_db = db + runner._session_db = AsyncSessionDB(db) agent = object() # Cache built from session A: (agent, sig, mc=1, sid=sA). @@ -211,7 +220,7 @@ class TestSessionIdCacheCoherence: # Someone (the call site at line 9540) calls the re-baseline with # the CURRENT session_id — which is sB after a switch. The # snapshot is from sA → must NOT be touched. - runner._refresh_agent_cache_message_count("telegram:USER1", "sB") + await runner._refresh_agent_cache_message_count("telegram:USER1", "sB") with runner._agent_cache_lock: cached = runner._agent_cache["telegram:USER1"] @@ -223,7 +232,8 @@ class TestSessionIdCacheCoherence: ) assert cached[0] is agent - def test_refresh_refreshes_when_session_id_matches(self, tmp_path): + @pytest.mark.asyncio + async def test_refresh_refreshes_when_session_id_matches(self, tmp_path): """Sanity: when the snapshot's session_id matches the current one, the re-baseline still runs and updates the count to the live value.""" from hermes_state import SessionDB @@ -231,7 +241,7 @@ class TestSessionIdCacheCoherence: db = SessionDB(db_path=tmp_path / "sessions.db") db.create_session("s1", source="telegram") runner = _make_runner() - runner._session_db = db + runner._session_db = AsyncSessionDB(db) agent = object() with runner._agent_cache_lock: @@ -240,12 +250,13 @@ class TestSessionIdCacheCoherence: # s1's own turn flushes two rows. db.append_message("s1", role="user", content="u") db.append_message("s1", role="assistant", content="a") - runner._refresh_agent_cache_message_count("telegram:s1", "s1") + await runner._refresh_agent_cache_message_count("telegram:s1", "s1") with runner._agent_cache_lock: assert runner._agent_cache["telegram:s1"][2] == 2 - def test_legacy_2tuple_and_pending_sentinel_untouched(self, tmp_path): + @pytest.mark.asyncio + async def test_legacy_2tuple_and_pending_sentinel_untouched(self, tmp_path): """Backward-compat: legacy 2-tuples and pending-sentinel 3-tuples are not affected by the fix. The 2-tuple opts out of the guard; the sentinel is left as-is by the re-baseline.""" @@ -256,24 +267,25 @@ class TestSessionIdCacheCoherence: db.create_session("s1", source="telegram") db.append_message("s1", role="user", content="hi") runner = _make_runner() - runner._session_db = db + runner._session_db = AsyncSessionDB(db) # Legacy 2-tuple — untouched. with runner._agent_cache_lock: runner._agent_cache["telegram:s1"] = (object(), "sig") - runner._refresh_agent_cache_message_count("telegram:s1", "s1") + await runner._refresh_agent_cache_message_count("telegram:s1", "s1") with runner._agent_cache_lock: assert len(runner._agent_cache["telegram:s1"]) == 2 # Pending sentinel — untouched. with runner._agent_cache_lock: runner._agent_cache["telegram:s1"] = (_AGENT_PENDING_SENTINEL, "sig", 0) - runner._refresh_agent_cache_message_count("telegram:s1", "s1") + await runner._refresh_agent_cache_message_count("telegram:s1", "s1") with runner._agent_cache_lock: assert runner._agent_cache["telegram:s1"][0] is _AGENT_PENDING_SENTINEL assert runner._agent_cache["telegram:s1"][2] == 0 - def test_legacy_3tuple_session_id_unknown_still_guarded(self, tmp_path): + @pytest.mark.asyncio + async def test_legacy_3tuple_session_id_unknown_still_guarded(self, tmp_path): """An entry in the OLD 3-tuple shape (agent, sig, mc) with no session_id — entries already in the cache from BEFORE the fix — must STILL be guarded by the cross-process check. The fix only @@ -288,7 +300,7 @@ class TestSessionIdCacheCoherence: db.create_session("s1", source="telegram") db.append_message("s1", role="user", content="x") runner = _make_runner() - runner._session_db = db + runner._session_db = AsyncSessionDB(db) # Existing entry in old 3-tuple shape, no session_id recorded. # Snapshot is mc=0; live is mc=1 — guard must fire (invalidate). @@ -300,7 +312,7 @@ class TestSessionIdCacheCoherence: assert _guard_would_reuse(runner, "telegram:s1", "s1") is False # After the re-baseline (same session_id) snapshot matches live. - runner._refresh_agent_cache_message_count("telegram:s1", "s1") + await runner._refresh_agent_cache_message_count("telegram:s1", "s1") with runner._agent_cache_lock: assert runner._agent_cache["telegram:s1"][2] == 1 assert _guard_would_reuse(runner, "telegram:s1", "s1") is True