From 201b646d672733f75fc8d213f7b5b6c6efbc97de Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Fri, 3 Jul 2026 03:31:27 +0530 Subject: [PATCH] fix(gateway): complete on_session_end coverage across all eviction paths MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to the cherry-picked #31856 fix. The contributor's guard defers idle-TTL eviction until the session store reports the session expired, so the expiry watcher can tear the agent down and fire MemoryProvider.on_session_end() with the live transcript. Two gaps remained: 1. Memory-leak regression for mode='none' sessions. _is_session_expired() returns False forever for the 'none' reset policy, so the naive guard would never idle-evict those agents — reopening the unbounded-cache leak the idle sweep (#11565) exists to relieve. Added SessionStore.is_session_finalizable() (a public predicate: will the expiry watcher EVER finalize this session?) and gate the deferral on it. mode='none' agents fall through to soft eviction as before. 2. on_session_end still dropped on the LRU-cap path. Both cache-pressure paths (_enforce_agent_cache_cap and _sweep_idle_cached_agents) soft-evict via _release_evicted_agent_soft, which by design does NOT fire on_session_end. If cache pressure evicts a finalizable-but-not-yet-expired agent before it expires, the watcher later finds no cached agent and the hook is skipped. Added _commit_memory_before_soft_evict(): at LRU eviction, if the session is finalizable and not yet expired, commit end-of-session extraction via the live agent's own (fully-scoped) memory manager using commit_memory_session() — extraction WITHOUT provider teardown, so the eviction stays soft and a resumed turn keeps working. Skipped for mode='none' (no missed boundary to compensate) and expired sessions (the watcher tears those down directly). This closes #11205 for ALL eviction paths and reset policies, not just the idle-sweep + finite-policy case, while preserving the soft-eviction resumability contract (never calls close() on a live session). Tests: 5 new cases in test_agent_cache.py (mode='none' still reaped, LRU-cap commits for finalizable / skips for none, real is_session_finalizable predicate); all mutation-checked. Contributor's original 2 tests updated to assert the finalizable path explicitly. --- gateway/run.py | 104 +++++++++++++++++-- gateway/session.py | 31 ++++++ tests/gateway/test_agent_cache.py | 165 +++++++++++++++++++++++++++++- 3 files changed, 292 insertions(+), 8 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index 3826b383e..d3992a760 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -15589,6 +15589,72 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew agent._last_flushed_db_idx = 0 agent._api_call_count = 0 + def _commit_memory_before_soft_evict(self, agent: Any, key: str) -> None: + """Fire on_session_end extraction before soft-evicting a live agent. + + Soft eviction (``_release_evicted_agent_soft``) deliberately keeps the + session resumable and does NOT fire ``on_session_end`` — that hook is + reserved for the true session boundary, tear-down done by + ``_session_expiry_watcher`` when the session finally expires. + + But the watcher tears down whatever agent it finds in ``_agent_cache`` + at expiry time. If cache pressure (the LRU cap) soft-evicts a + finalizable session's agent BEFORE it expires, the watcher later finds + no cached agent and ``on_session_end`` is silently skipped — memory + providers never see the transcript (#11205, LRU-cap variant). + + We hold the live, fully-scoped agent right now, so commit its + end-of-session memory extraction here using the agent's own memory + manager (correct per-user/chat scoping, no reconstruction). This uses + ``commit_memory_session`` — extraction WITHOUT provider teardown — so + the eviction stays soft and a resumed turn keeps working. + + Only fires for sessions the expiry watcher will eventually finalize + (finite reset policy). For ``mode == "none"`` sessions the watcher + never runs, so there is no missed-boundary to compensate for and we + skip the commit (the agent is simply released). Best-effort: any + failure is swallowed so eviction still proceeds. + """ + if agent is None or not hasattr(agent, "commit_memory_session"): + return + if getattr(agent, "_memory_manager", None) is None: + return # no external memory provider — nothing to commit + try: + _store = getattr(self, "session_store", None) + if _store is None: + return + _store._ensure_loaded() + entry = _store._entries.get(key) + if entry is None: + return + # Only compensate when the watcher would otherwise expect to find + # this agent at expiry (finite policy, not yet expired). Expired + # sessions are torn down by the watcher directly; mode="none" + # sessions are never finalized. + if not _store.is_session_finalizable(entry): + return + if _store._is_session_expired(entry): + return + messages = getattr(agent, "_session_messages", None) + agent.commit_memory_session(messages if isinstance(messages, list) else None) + logger.debug( + "Committed on_session_end extraction before soft-evicting " + "finalizable session=%s (cache pressure, pre-expiry)", key, + ) + except Exception as _e: + logger.debug("Pre-evict memory commit failed for %s: %s", key, _e) + + def _commit_then_release_soft(self, agent: Any, key: str) -> None: + """Commit end-of-session memory (if warranted), then soft-release. + + Runs on the daemon eviction thread so the memory-provider call and the + client teardown never block the caller's held cache lock. Order matters: + commit uses the live agent's memory manager before ``release_clients`` + drops the message buffer. + """ + self._commit_memory_before_soft_evict(agent, key) + self._release_evicted_agent_soft(agent) + def _release_evicted_agent_soft(self, agent: Any) -> None: """Soft cleanup for cache-evicted agents — preserves session tool state. @@ -15687,9 +15753,15 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew key, len(_cache), ) if agent is not None: + # Commit end-of-session memory extraction, then soft-release, + # both on the daemon thread so the (possibly network-bound) + # provider call never blocks the held cache lock. The commit + # only fires for finalizable-not-yet-expired sessions whose + # agent would otherwise vanish before the expiry watcher can + # fire on_session_end (#11205, LRU-cap variant). threading.Thread( - target=self._release_evicted_agent_soft, - args=(agent,), + target=self._commit_then_release_soft, + args=(agent, key), daemon=True, name=f"agent-cache-evict-{key[:24]}", ).start() @@ -15737,17 +15809,35 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew # session genuinely expires, at which point the watcher # (gateway/run.py _session_expiry_watcher) tears it down # properly. (#11205 follow-up) + # + # BUT only defer when the watcher will EVER finalize this + # session. For a mode == "none" session the watcher never + # fires (is_session_finalizable() is False), so deferring + # would pin the agent in cache for the gateway's entire + # lifetime — the exact leak this idle sweep exists to + # relieve. Those sessions fall through to soft eviction + # WITHOUT on_session_end, and that is correct: a mode=="none" + # session never reaches a session-end boundary, so there is + # no missed on_session_end to compensate for. (The finite + # case — a session evicted under LRU-cap pressure before it + # expires — is instead covered by _commit_memory_before_soft_ + # evict on the cap path, which fires on_session_end via the + # live agent's memory manager before releasing it.) session_entry = None + _store = getattr(self, "session_store", None) try: - _store = getattr(self, "session_store", None) if _store is not None: _store._ensure_loaded() session_entry = _store._entries.get(key) except Exception: - pass - if session_entry is not None: - if not _store._is_session_expired(session_entry): - continue # keep agent — session hasn't expired + session_entry = None + if ( + session_entry is not None + and _store is not None + and _store.is_session_finalizable(session_entry) + and not _store._is_session_expired(session_entry) + ): + continue # keep agent — finite session hasn't expired to_evict.append((key, agent)) for key, _ in to_evict: _cache.pop(key, None) diff --git a/gateway/session.py b/gateway/session.py index 2a75aa16d..67bd84aaa 100644 --- a/gateway/session.py +++ b/gateway/session.py @@ -1264,6 +1264,37 @@ class SessionStore: return False + def is_session_finalizable(self, entry: SessionEntry) -> bool: + """Return True if the expiry watcher will *ever* finalize this session. + + The expiry watcher (``GatewayRunner._session_expiry_watcher``) only + tears an agent down — and only then fires ``on_session_end`` — for + sessions whose reset policy eventually expires. A ``mode == "none"`` + session never expires (``_is_session_expired`` returns ``False`` + forever), so the watcher will never finalize it. + + This distinction matters for the agent-cache idle sweep: deferring + idle eviction to "let the watcher finalize it later" is only correct + when the watcher WILL run for this session. For a ``mode == "none"`` + session, deferring pins the cached agent in memory for the gateway's + entire lifetime with no finalization ever coming — the exact leak the + idle sweep exists to relieve. Callers use this predicate to decide + whether the session store owns the eviction boundary (finalizable) or + the idle sweep must still reap the agent itself (not finalizable). + + Public wrapper so callers don't reach into policy internals. Errors + resolving the policy are treated as "not finalizable" (safe: the idle + sweep falls back to reaping the agent rather than pinning it). + """ + try: + policy = self.config.get_reset_policy( + platform=entry.platform, + session_type=entry.chat_type, + ) + return policy.mode != "none" + except Exception: + return False + def _is_session_ended_in_db(self, session_id: str) -> bool: """Return True iff state.db has this session with a non-null end_reason. diff --git a/tests/gateway/test_agent_cache.py b/tests/gateway/test_agent_cache.py index 6efec75d2..73a4941db 100644 --- a/tests/gateway/test_agent_cache.py +++ b/tests/gateway/test_agent_cache.py @@ -675,6 +675,89 @@ class TestAgentCacheBoundedGrowth: # Hard-cleanup path must NOT have fired — that's for session expiry only. assert cleanup_calls == [] + def test_cap_commits_memory_before_evicting_finalizable(self, monkeypatch): + """LRU-cap eviction of a finalizable, not-yet-expired agent commits + on_session_end extraction before releasing. + + The agent would otherwise vanish from _agent_cache before the expiry + watcher runs, so the watcher would never fire on_session_end() and + memory providers would miss the transcript (#11205, LRU-cap variant). + We hold the live agent at eviction time, so commit its memory then. + """ + from gateway import run as gw_run + + monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 1) + runner = self._bounded_runner() + + commit_calls: list = [] + release_calls: list = [] + runner._release_evicted_agent_soft = lambda agent: release_calls.append(agent) + + # Finalizable (finite policy), not yet expired. + runner.session_store = MagicMock() + runner.session_store._entries = {"old": MagicMock(), "new": MagicMock()} + runner.session_store.is_session_finalizable.return_value = True + runner.session_store._is_session_expired.return_value = False + + old_agent = self._fake_agent() + old_agent._memory_manager = MagicMock() # has an external provider + old_agent._session_messages = [{"role": "user", "content": "hi"}] + old_agent.commit_memory_session = lambda msgs=None: commit_calls.append(msgs) + new_agent = self._fake_agent() + + with runner._agent_cache_lock: + runner._agent_cache["old"] = (old_agent, "sig_old") + runner._agent_cache["new"] = (new_agent, "sig_new") + runner._enforce_agent_cache_cap() + + import time as _t + deadline = _t.time() + 2.0 + while _t.time() < deadline and not release_calls: + _t.sleep(0.02) + # Memory committed with the live transcript, THEN client released. + assert commit_calls == [[{"role": "user", "content": "hi"}]] + assert old_agent in release_calls + + def test_cap_skips_memory_commit_for_non_finalizable(self, monkeypatch): + """LRU-cap eviction of a mode='none' agent does NOT commit memory. + + The expiry watcher never finalizes a mode='none' session, so there is + no missed on_session_end boundary to compensate for. Committing here + would fire premature/repeat extraction for a session that simply keeps + living. The agent is released without a commit. + """ + from gateway import run as gw_run + + monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 1) + runner = self._bounded_runner() + + commit_calls: list = [] + release_calls: list = [] + runner._release_evicted_agent_soft = lambda agent: release_calls.append(agent) + + runner.session_store = MagicMock() + runner.session_store._entries = {"old": MagicMock(), "new": MagicMock()} + runner.session_store.is_session_finalizable.return_value = False # mode='none' + runner.session_store._is_session_expired.return_value = False + + old_agent = self._fake_agent() + old_agent._memory_manager = MagicMock() + old_agent._session_messages = [{"role": "user", "content": "hi"}] + old_agent.commit_memory_session = lambda msgs=None: commit_calls.append(msgs) + new_agent = self._fake_agent() + + with runner._agent_cache_lock: + runner._agent_cache["old"] = (old_agent, "sig_old") + runner._agent_cache["new"] = (new_agent, "sig_new") + runner._enforce_agent_cache_cap() + + import time as _t + deadline = _t.time() + 2.0 + while _t.time() < deadline and not release_calls: + _t.sleep(0.02) + assert commit_calls == [] # no premature extraction + assert old_agent in release_calls # still released + def test_idle_ttl_sweep_evicts_stale_agents(self, monkeypatch): """_sweep_idle_cached_agents removes agents idle past the TTL.""" from gateway import run as gw_run @@ -725,10 +808,13 @@ class TestAgentCacheBoundedGrowth: import time as _t stale = self._fake_agent(last_activity=_t.time() - 10.0) - # Session store says the session is still alive. + # Session store says the session is still alive AND is finalizable + # (finite reset policy) — so deferring eviction is correct: the expiry + # watcher will find this agent later and fire on_session_end(). session_entry = MagicMock() runner.session_store = MagicMock() runner.session_store._entries = {"stale-session": session_entry} + runner.session_store.is_session_finalizable.return_value = True runner.session_store._is_session_expired.return_value = False runner._agent_cache["stale-session"] = (stale, "sig") @@ -752,6 +838,7 @@ class TestAgentCacheBoundedGrowth: session_entry = MagicMock() runner.session_store = MagicMock() runner.session_store._entries = {"stale-session": session_entry} + runner.session_store.is_session_finalizable.return_value = True runner.session_store._is_session_expired.return_value = True runner._agent_cache["stale-session"] = (stale, "sig") @@ -760,6 +847,82 @@ class TestAgentCacheBoundedGrowth: assert evicted == 1 assert "stale-session" not in runner._agent_cache + def test_idle_sweep_evicts_non_finalizable_session(self, monkeypatch): + """A mode='none' session's idle agent IS still evicted. + + is_session_finalizable() is False for reset-policy 'none': the expiry + watcher never finalizes such a session, so deferring eviction would + pin the cached agent for the gateway's whole lifetime — the exact + leak the idle sweep exists to relieve. The sweep must reap it even + though _is_session_expired() is (and stays) False. + """ + from gateway import run as gw_run + + monkeypatch.setattr(gw_run, "_AGENT_CACHE_IDLE_TTL_SECS", 0.01) + runner = self._bounded_runner() + runner._cleanup_agent_resources = MagicMock() + + import time as _t + stale = self._fake_agent(last_activity=_t.time() - 10.0) + + session_entry = MagicMock() + runner.session_store = MagicMock() + runner.session_store._entries = {"never-session": session_entry} + # mode='none' → never finalizable, never expired. + runner.session_store.is_session_finalizable.return_value = False + runner.session_store._is_session_expired.return_value = False + + runner._agent_cache["never-session"] = (stale, "sig") + + evicted = runner._sweep_idle_cached_agents() + assert evicted == 1 + assert "never-session" not in runner._agent_cache + + def test_is_session_finalizable_real_predicate(self, tmp_path): + """is_session_finalizable() reflects the real reset policy. + + Uses a real SessionStore + GatewayConfig (no mocks) so the predicate + is exercised against actual get_reset_policy() output: True for finite + policies (idle/daily/both), False only for mode='none'. + """ + from datetime import datetime + from unittest.mock import patch as _patch + + from gateway.config import GatewayConfig, Platform, SessionResetPolicy + from gateway.session import ( + SessionEntry, SessionSource, SessionStore, build_session_key, + ) + + def _entry_for(platform: Platform) -> SessionEntry: + src = SessionSource( + platform=platform, user_id="u1", chat_id="c1", + user_name="t", chat_type="dm", + ) + return SessionEntry( + session_key=build_session_key(src), + session_id="s1", + created_at=datetime.now(), + updated_at=datetime.now(), + origin=src, + platform=src.platform, + chat_type=src.chat_type, + ) + + config = GatewayConfig() + # Give Telegram a 'none' policy via the per-platform override; leave the + # default policy finite ('both') for the Discord case. + config.default_reset_policy = SessionResetPolicy(mode="both") + config.reset_by_platform[Platform.TELEGRAM] = SessionResetPolicy(mode="none") + + with _patch("gateway.session.SessionStore._ensure_loaded"): + store = SessionStore(sessions_dir=tmp_path, config=config) + store._db = None + + # mode='none' → never finalized by the watcher. + assert store.is_session_finalizable(_entry_for(Platform.TELEGRAM)) is False + # default 'both' → finite, will eventually expire. + assert store.is_session_finalizable(_entry_for(Platform.DISCORD)) is True + def test_plain_dict_cache_is_tolerated(self): """Test fixtures using plain {} don't crash _enforce_agent_cache_cap.""" from gateway.run import GatewayRunner