fix(gateway): complete on_session_end coverage across all eviction paths
Follow-up to the cherry-picked #31856 fix. The contributor's guard defers idle-TTL eviction until the session store reports the session expired, so the expiry watcher can tear the agent down and fire MemoryProvider.on_session_end() with the live transcript. Two gaps remained: 1. Memory-leak regression for mode='none' sessions. _is_session_expired() returns False forever for the 'none' reset policy, so the naive guard would never idle-evict those agents — reopening the unbounded-cache leak the idle sweep (#11565) exists to relieve. Added SessionStore.is_session_finalizable() (a public predicate: will the expiry watcher EVER finalize this session?) and gate the deferral on it. mode='none' agents fall through to soft eviction as before. 2. on_session_end still dropped on the LRU-cap path. Both cache-pressure paths (_enforce_agent_cache_cap and _sweep_idle_cached_agents) soft-evict via _release_evicted_agent_soft, which by design does NOT fire on_session_end. If cache pressure evicts a finalizable-but-not-yet-expired agent before it expires, the watcher later finds no cached agent and the hook is skipped. Added _commit_memory_before_soft_evict(): at LRU eviction, if the session is finalizable and not yet expired, commit end-of-session extraction via the live agent's own (fully-scoped) memory manager using commit_memory_session() — extraction WITHOUT provider teardown, so the eviction stays soft and a resumed turn keeps working. Skipped for mode='none' (no missed boundary to compensate) and expired sessions (the watcher tears those down directly). This closes #11205 for ALL eviction paths and reset policies, not just the idle-sweep + finite-policy case, while preserving the soft-eviction resumability contract (never calls close() on a live session). Tests: 5 new cases in test_agent_cache.py (mode='none' still reaped, LRU-cap commits for finalizable / skips for none, real is_session_finalizable predicate); all mutation-checked. Contributor's original 2 tests updated to assert the finalizable path explicitly.
This commit is contained in:
parent
90b618f48a
commit
201b646d67
3 changed files with 292 additions and 8 deletions
104
gateway/run.py
104
gateway/run.py
|
|
@ -15589,6 +15589,72 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
agent._last_flushed_db_idx = 0
|
||||
agent._api_call_count = 0
|
||||
|
||||
def _commit_memory_before_soft_evict(self, agent: Any, key: str) -> None:
|
||||
"""Fire on_session_end extraction before soft-evicting a live agent.
|
||||
|
||||
Soft eviction (``_release_evicted_agent_soft``) deliberately keeps the
|
||||
session resumable and does NOT fire ``on_session_end`` — that hook is
|
||||
reserved for the true session boundary, tear-down done by
|
||||
``_session_expiry_watcher`` when the session finally expires.
|
||||
|
||||
But the watcher tears down whatever agent it finds in ``_agent_cache``
|
||||
at expiry time. If cache pressure (the LRU cap) soft-evicts a
|
||||
finalizable session's agent BEFORE it expires, the watcher later finds
|
||||
no cached agent and ``on_session_end`` is silently skipped — memory
|
||||
providers never see the transcript (#11205, LRU-cap variant).
|
||||
|
||||
We hold the live, fully-scoped agent right now, so commit its
|
||||
end-of-session memory extraction here using the agent's own memory
|
||||
manager (correct per-user/chat scoping, no reconstruction). This uses
|
||||
``commit_memory_session`` — extraction WITHOUT provider teardown — so
|
||||
the eviction stays soft and a resumed turn keeps working.
|
||||
|
||||
Only fires for sessions the expiry watcher will eventually finalize
|
||||
(finite reset policy). For ``mode == "none"`` sessions the watcher
|
||||
never runs, so there is no missed-boundary to compensate for and we
|
||||
skip the commit (the agent is simply released). Best-effort: any
|
||||
failure is swallowed so eviction still proceeds.
|
||||
"""
|
||||
if agent is None or not hasattr(agent, "commit_memory_session"):
|
||||
return
|
||||
if getattr(agent, "_memory_manager", None) is None:
|
||||
return # no external memory provider — nothing to commit
|
||||
try:
|
||||
_store = getattr(self, "session_store", None)
|
||||
if _store is None:
|
||||
return
|
||||
_store._ensure_loaded()
|
||||
entry = _store._entries.get(key)
|
||||
if entry is None:
|
||||
return
|
||||
# Only compensate when the watcher would otherwise expect to find
|
||||
# this agent at expiry (finite policy, not yet expired). Expired
|
||||
# sessions are torn down by the watcher directly; mode="none"
|
||||
# sessions are never finalized.
|
||||
if not _store.is_session_finalizable(entry):
|
||||
return
|
||||
if _store._is_session_expired(entry):
|
||||
return
|
||||
messages = getattr(agent, "_session_messages", None)
|
||||
agent.commit_memory_session(messages if isinstance(messages, list) else None)
|
||||
logger.debug(
|
||||
"Committed on_session_end extraction before soft-evicting "
|
||||
"finalizable session=%s (cache pressure, pre-expiry)", key,
|
||||
)
|
||||
except Exception as _e:
|
||||
logger.debug("Pre-evict memory commit failed for %s: %s", key, _e)
|
||||
|
||||
def _commit_then_release_soft(self, agent: Any, key: str) -> None:
|
||||
"""Commit end-of-session memory (if warranted), then soft-release.
|
||||
|
||||
Runs on the daemon eviction thread so the memory-provider call and the
|
||||
client teardown never block the caller's held cache lock. Order matters:
|
||||
commit uses the live agent's memory manager before ``release_clients``
|
||||
drops the message buffer.
|
||||
"""
|
||||
self._commit_memory_before_soft_evict(agent, key)
|
||||
self._release_evicted_agent_soft(agent)
|
||||
|
||||
def _release_evicted_agent_soft(self, agent: Any) -> None:
|
||||
"""Soft cleanup for cache-evicted agents — preserves session tool state.
|
||||
|
||||
|
|
@ -15687,9 +15753,15 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
key, len(_cache),
|
||||
)
|
||||
if agent is not None:
|
||||
# Commit end-of-session memory extraction, then soft-release,
|
||||
# both on the daemon thread so the (possibly network-bound)
|
||||
# provider call never blocks the held cache lock. The commit
|
||||
# only fires for finalizable-not-yet-expired sessions whose
|
||||
# agent would otherwise vanish before the expiry watcher can
|
||||
# fire on_session_end (#11205, LRU-cap variant).
|
||||
threading.Thread(
|
||||
target=self._release_evicted_agent_soft,
|
||||
args=(agent,),
|
||||
target=self._commit_then_release_soft,
|
||||
args=(agent, key),
|
||||
daemon=True,
|
||||
name=f"agent-cache-evict-{key[:24]}",
|
||||
).start()
|
||||
|
|
@ -15737,17 +15809,35 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
# session genuinely expires, at which point the watcher
|
||||
# (gateway/run.py _session_expiry_watcher) tears it down
|
||||
# properly. (#11205 follow-up)
|
||||
#
|
||||
# BUT only defer when the watcher will EVER finalize this
|
||||
# session. For a mode == "none" session the watcher never
|
||||
# fires (is_session_finalizable() is False), so deferring
|
||||
# would pin the agent in cache for the gateway's entire
|
||||
# lifetime — the exact leak this idle sweep exists to
|
||||
# relieve. Those sessions fall through to soft eviction
|
||||
# WITHOUT on_session_end, and that is correct: a mode=="none"
|
||||
# session never reaches a session-end boundary, so there is
|
||||
# no missed on_session_end to compensate for. (The finite
|
||||
# case — a session evicted under LRU-cap pressure before it
|
||||
# expires — is instead covered by _commit_memory_before_soft_
|
||||
# evict on the cap path, which fires on_session_end via the
|
||||
# live agent's memory manager before releasing it.)
|
||||
session_entry = None
|
||||
_store = getattr(self, "session_store", None)
|
||||
try:
|
||||
_store = getattr(self, "session_store", None)
|
||||
if _store is not None:
|
||||
_store._ensure_loaded()
|
||||
session_entry = _store._entries.get(key)
|
||||
except Exception:
|
||||
pass
|
||||
if session_entry is not None:
|
||||
if not _store._is_session_expired(session_entry):
|
||||
continue # keep agent — session hasn't expired
|
||||
session_entry = None
|
||||
if (
|
||||
session_entry is not None
|
||||
and _store is not None
|
||||
and _store.is_session_finalizable(session_entry)
|
||||
and not _store._is_session_expired(session_entry)
|
||||
):
|
||||
continue # keep agent — finite session hasn't expired
|
||||
to_evict.append((key, agent))
|
||||
for key, _ in to_evict:
|
||||
_cache.pop(key, None)
|
||||
|
|
|
|||
|
|
@ -1264,6 +1264,37 @@ class SessionStore:
|
|||
|
||||
return False
|
||||
|
||||
def is_session_finalizable(self, entry: SessionEntry) -> bool:
|
||||
"""Return True if the expiry watcher will *ever* finalize this session.
|
||||
|
||||
The expiry watcher (``GatewayRunner._session_expiry_watcher``) only
|
||||
tears an agent down — and only then fires ``on_session_end`` — for
|
||||
sessions whose reset policy eventually expires. A ``mode == "none"``
|
||||
session never expires (``_is_session_expired`` returns ``False``
|
||||
forever), so the watcher will never finalize it.
|
||||
|
||||
This distinction matters for the agent-cache idle sweep: deferring
|
||||
idle eviction to "let the watcher finalize it later" is only correct
|
||||
when the watcher WILL run for this session. For a ``mode == "none"``
|
||||
session, deferring pins the cached agent in memory for the gateway's
|
||||
entire lifetime with no finalization ever coming — the exact leak the
|
||||
idle sweep exists to relieve. Callers use this predicate to decide
|
||||
whether the session store owns the eviction boundary (finalizable) or
|
||||
the idle sweep must still reap the agent itself (not finalizable).
|
||||
|
||||
Public wrapper so callers don't reach into policy internals. Errors
|
||||
resolving the policy are treated as "not finalizable" (safe: the idle
|
||||
sweep falls back to reaping the agent rather than pinning it).
|
||||
"""
|
||||
try:
|
||||
policy = self.config.get_reset_policy(
|
||||
platform=entry.platform,
|
||||
session_type=entry.chat_type,
|
||||
)
|
||||
return policy.mode != "none"
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _is_session_ended_in_db(self, session_id: str) -> bool:
|
||||
"""Return True iff state.db has this session with a non-null end_reason.
|
||||
|
||||
|
|
|
|||
|
|
@ -675,6 +675,89 @@ class TestAgentCacheBoundedGrowth:
|
|||
# Hard-cleanup path must NOT have fired — that's for session expiry only.
|
||||
assert cleanup_calls == []
|
||||
|
||||
def test_cap_commits_memory_before_evicting_finalizable(self, monkeypatch):
|
||||
"""LRU-cap eviction of a finalizable, not-yet-expired agent commits
|
||||
on_session_end extraction before releasing.
|
||||
|
||||
The agent would otherwise vanish from _agent_cache before the expiry
|
||||
watcher runs, so the watcher would never fire on_session_end() and
|
||||
memory providers would miss the transcript (#11205, LRU-cap variant).
|
||||
We hold the live agent at eviction time, so commit its memory then.
|
||||
"""
|
||||
from gateway import run as gw_run
|
||||
|
||||
monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 1)
|
||||
runner = self._bounded_runner()
|
||||
|
||||
commit_calls: list = []
|
||||
release_calls: list = []
|
||||
runner._release_evicted_agent_soft = lambda agent: release_calls.append(agent)
|
||||
|
||||
# Finalizable (finite policy), not yet expired.
|
||||
runner.session_store = MagicMock()
|
||||
runner.session_store._entries = {"old": MagicMock(), "new": MagicMock()}
|
||||
runner.session_store.is_session_finalizable.return_value = True
|
||||
runner.session_store._is_session_expired.return_value = False
|
||||
|
||||
old_agent = self._fake_agent()
|
||||
old_agent._memory_manager = MagicMock() # has an external provider
|
||||
old_agent._session_messages = [{"role": "user", "content": "hi"}]
|
||||
old_agent.commit_memory_session = lambda msgs=None: commit_calls.append(msgs)
|
||||
new_agent = self._fake_agent()
|
||||
|
||||
with runner._agent_cache_lock:
|
||||
runner._agent_cache["old"] = (old_agent, "sig_old")
|
||||
runner._agent_cache["new"] = (new_agent, "sig_new")
|
||||
runner._enforce_agent_cache_cap()
|
||||
|
||||
import time as _t
|
||||
deadline = _t.time() + 2.0
|
||||
while _t.time() < deadline and not release_calls:
|
||||
_t.sleep(0.02)
|
||||
# Memory committed with the live transcript, THEN client released.
|
||||
assert commit_calls == [[{"role": "user", "content": "hi"}]]
|
||||
assert old_agent in release_calls
|
||||
|
||||
def test_cap_skips_memory_commit_for_non_finalizable(self, monkeypatch):
|
||||
"""LRU-cap eviction of a mode='none' agent does NOT commit memory.
|
||||
|
||||
The expiry watcher never finalizes a mode='none' session, so there is
|
||||
no missed on_session_end boundary to compensate for. Committing here
|
||||
would fire premature/repeat extraction for a session that simply keeps
|
||||
living. The agent is released without a commit.
|
||||
"""
|
||||
from gateway import run as gw_run
|
||||
|
||||
monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 1)
|
||||
runner = self._bounded_runner()
|
||||
|
||||
commit_calls: list = []
|
||||
release_calls: list = []
|
||||
runner._release_evicted_agent_soft = lambda agent: release_calls.append(agent)
|
||||
|
||||
runner.session_store = MagicMock()
|
||||
runner.session_store._entries = {"old": MagicMock(), "new": MagicMock()}
|
||||
runner.session_store.is_session_finalizable.return_value = False # mode='none'
|
||||
runner.session_store._is_session_expired.return_value = False
|
||||
|
||||
old_agent = self._fake_agent()
|
||||
old_agent._memory_manager = MagicMock()
|
||||
old_agent._session_messages = [{"role": "user", "content": "hi"}]
|
||||
old_agent.commit_memory_session = lambda msgs=None: commit_calls.append(msgs)
|
||||
new_agent = self._fake_agent()
|
||||
|
||||
with runner._agent_cache_lock:
|
||||
runner._agent_cache["old"] = (old_agent, "sig_old")
|
||||
runner._agent_cache["new"] = (new_agent, "sig_new")
|
||||
runner._enforce_agent_cache_cap()
|
||||
|
||||
import time as _t
|
||||
deadline = _t.time() + 2.0
|
||||
while _t.time() < deadline and not release_calls:
|
||||
_t.sleep(0.02)
|
||||
assert commit_calls == [] # no premature extraction
|
||||
assert old_agent in release_calls # still released
|
||||
|
||||
def test_idle_ttl_sweep_evicts_stale_agents(self, monkeypatch):
|
||||
"""_sweep_idle_cached_agents removes agents idle past the TTL."""
|
||||
from gateway import run as gw_run
|
||||
|
|
@ -725,10 +808,13 @@ class TestAgentCacheBoundedGrowth:
|
|||
import time as _t
|
||||
stale = self._fake_agent(last_activity=_t.time() - 10.0)
|
||||
|
||||
# Session store says the session is still alive.
|
||||
# Session store says the session is still alive AND is finalizable
|
||||
# (finite reset policy) — so deferring eviction is correct: the expiry
|
||||
# watcher will find this agent later and fire on_session_end().
|
||||
session_entry = MagicMock()
|
||||
runner.session_store = MagicMock()
|
||||
runner.session_store._entries = {"stale-session": session_entry}
|
||||
runner.session_store.is_session_finalizable.return_value = True
|
||||
runner.session_store._is_session_expired.return_value = False
|
||||
|
||||
runner._agent_cache["stale-session"] = (stale, "sig")
|
||||
|
|
@ -752,6 +838,7 @@ class TestAgentCacheBoundedGrowth:
|
|||
session_entry = MagicMock()
|
||||
runner.session_store = MagicMock()
|
||||
runner.session_store._entries = {"stale-session": session_entry}
|
||||
runner.session_store.is_session_finalizable.return_value = True
|
||||
runner.session_store._is_session_expired.return_value = True
|
||||
|
||||
runner._agent_cache["stale-session"] = (stale, "sig")
|
||||
|
|
@ -760,6 +847,82 @@ class TestAgentCacheBoundedGrowth:
|
|||
assert evicted == 1
|
||||
assert "stale-session" not in runner._agent_cache
|
||||
|
||||
def test_idle_sweep_evicts_non_finalizable_session(self, monkeypatch):
|
||||
"""A mode='none' session's idle agent IS still evicted.
|
||||
|
||||
is_session_finalizable() is False for reset-policy 'none': the expiry
|
||||
watcher never finalizes such a session, so deferring eviction would
|
||||
pin the cached agent for the gateway's whole lifetime — the exact
|
||||
leak the idle sweep exists to relieve. The sweep must reap it even
|
||||
though _is_session_expired() is (and stays) False.
|
||||
"""
|
||||
from gateway import run as gw_run
|
||||
|
||||
monkeypatch.setattr(gw_run, "_AGENT_CACHE_IDLE_TTL_SECS", 0.01)
|
||||
runner = self._bounded_runner()
|
||||
runner._cleanup_agent_resources = MagicMock()
|
||||
|
||||
import time as _t
|
||||
stale = self._fake_agent(last_activity=_t.time() - 10.0)
|
||||
|
||||
session_entry = MagicMock()
|
||||
runner.session_store = MagicMock()
|
||||
runner.session_store._entries = {"never-session": session_entry}
|
||||
# mode='none' → never finalizable, never expired.
|
||||
runner.session_store.is_session_finalizable.return_value = False
|
||||
runner.session_store._is_session_expired.return_value = False
|
||||
|
||||
runner._agent_cache["never-session"] = (stale, "sig")
|
||||
|
||||
evicted = runner._sweep_idle_cached_agents()
|
||||
assert evicted == 1
|
||||
assert "never-session" not in runner._agent_cache
|
||||
|
||||
def test_is_session_finalizable_real_predicate(self, tmp_path):
|
||||
"""is_session_finalizable() reflects the real reset policy.
|
||||
|
||||
Uses a real SessionStore + GatewayConfig (no mocks) so the predicate
|
||||
is exercised against actual get_reset_policy() output: True for finite
|
||||
policies (idle/daily/both), False only for mode='none'.
|
||||
"""
|
||||
from datetime import datetime
|
||||
from unittest.mock import patch as _patch
|
||||
|
||||
from gateway.config import GatewayConfig, Platform, SessionResetPolicy
|
||||
from gateway.session import (
|
||||
SessionEntry, SessionSource, SessionStore, build_session_key,
|
||||
)
|
||||
|
||||
def _entry_for(platform: Platform) -> SessionEntry:
|
||||
src = SessionSource(
|
||||
platform=platform, user_id="u1", chat_id="c1",
|
||||
user_name="t", chat_type="dm",
|
||||
)
|
||||
return SessionEntry(
|
||||
session_key=build_session_key(src),
|
||||
session_id="s1",
|
||||
created_at=datetime.now(),
|
||||
updated_at=datetime.now(),
|
||||
origin=src,
|
||||
platform=src.platform,
|
||||
chat_type=src.chat_type,
|
||||
)
|
||||
|
||||
config = GatewayConfig()
|
||||
# Give Telegram a 'none' policy via the per-platform override; leave the
|
||||
# default policy finite ('both') for the Discord case.
|
||||
config.default_reset_policy = SessionResetPolicy(mode="both")
|
||||
config.reset_by_platform[Platform.TELEGRAM] = SessionResetPolicy(mode="none")
|
||||
|
||||
with _patch("gateway.session.SessionStore._ensure_loaded"):
|
||||
store = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
store._db = None
|
||||
|
||||
# mode='none' → never finalized by the watcher.
|
||||
assert store.is_session_finalizable(_entry_for(Platform.TELEGRAM)) is False
|
||||
# default 'both' → finite, will eventually expire.
|
||||
assert store.is_session_finalizable(_entry_for(Platform.DISCORD)) is True
|
||||
|
||||
def test_plain_dict_cache_is_tolerated(self):
|
||||
"""Test fixtures using plain {} don't crash _enforce_agent_cache_cap."""
|
||||
from gateway.run import GatewayRunner
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue