fix(memory): degrade gracefully after repeated at-capacity consolidation failures (#42405)

Builds on the zero-match feedback fix (previous commit) to close the silent-hang
symptom: when memory is at capacity, a failed `add`/`replace`/`remove`
consolidation could loop the whole turn to iteration-budget exhaustion and
deliver no user-facing reply.

#41755 turned the at-capacity overflow error into a *commanded* in-turn retry
("...then retry this add — all in this turn"); combined with the fragile
substring-only `replace`/`remove` matching (LLMs can't reliably re-quote a long
entry verbatim), the model loops add↔replace on inexact guesses until the turn
dies. The existing tool_guardrails halt would catch this, but hard_stop_enabled
is opt-in (off by default), so a default install still hangs.

This fixes it at the memory layer without changing global guardrail behavior:
- MemoryStore tracks per-turn consolidation failures; after a cap (3) it drops
  the "retry in this turn" instruction and returns a terminal "leave memory
  unchanged, continue your reply" result, so a failed memory side effect can
  never block the turn's reply.
- The counter resets on any successful write (progress) and at each turn
  boundary (turn_context.reset_consolidation_failures, guarded via getattr so
  plugin memory stores without the method are a no-op).

Co-authored-by: liuhao1024 <sunsky.lau@gmail.com>
This commit is contained in:
kshitijk4poor 2026-06-30 13:56:28 +05:30 committed by kshitij
parent 62a1bf4c55
commit a5e8cd4d40
3 changed files with 173 additions and 22 deletions

View file

@ -223,6 +223,9 @@ def build_turn_context(
agent._unicode_sanitization_passes = 0
agent._tool_guardrails.reset_for_turn()
agent._tool_guardrail_halt_decision = None
_reset_consol = getattr(agent._memory_store, "reset_consolidation_failures", None)
if callable(_reset_consol):
_reset_consol()
agent._vision_supported = True
# Pre-turn connection health check: clean up dead TCP connections.

View file

@ -330,6 +330,10 @@ class TestMemoryStoreReplace:
store.add("memory", "fact A")
result = store.replace("memory", "nonexistent", "new")
assert result["success"] is False
assert "No entry matched" in result["error"]
# Zero-match must return current entries so the agent can self-correct
# instead of looping blindly (#42405, co-author #42417).
assert result["current_entries"] == ["fact A"]
def test_replace_ambiguous_match(self, store):
store.add("memory", "server A runs nginx")
@ -361,14 +365,119 @@ class TestMemoryStoreRemove:
assert len(store.memory_entries) == 0
def test_remove_no_match(self, store):
store.add("memory", "fact A")
result = store.remove("memory", "nonexistent")
assert result["success"] is False
assert "No entry matched" in result["error"]
# Zero-match must return current entries (#42405, co-author #42417).
assert result["current_entries"] == ["fact A"]
def test_remove_empty_old_text(self, store):
result = store.remove("memory", " ")
assert result["success"] is False
class TestMemoryConsolidationGracefulDegrade:
"""Fix #3 for #42405: a failed at-capacity consolidation must never loop the
turn to budget exhaustion after a per-turn cap of failures, memory ops
return a terminal 'stop, continue your reply' result instead of the
'retry — all in this turn' instruction."""
def test_zero_match_failures_degrade_after_cap(self, store):
store.add("memory", "fact A")
cap = store._MAX_CONSOLIDATION_FAILURES_PER_TURN
# First `cap` failures still hand back previews + the self-correct hint.
for _ in range(cap):
r = store.replace("memory", "nonexistent", "new")
assert r["success"] is False
assert "current_entries" in r # actionable feedback, keep trying
assert "retry with the exact text" in r["error"]
# The next failure degrades: terminal, no retry instruction.
r = store.replace("memory", "nonexistent", "new")
assert r["success"] is False
assert r["done"] is True
assert "current_entries" not in r
assert "continue with your reply" in r["error"]
def test_add_overflow_degrades_after_cap(self, store):
# Fill near the 500-char user/memory limit so add() overflows.
store.add("memory", "x" * 200)
store.add("memory", "y" * 200)
cap = store._MAX_CONSOLIDATION_FAILURES_PER_TURN
big = "z" * 200
for _ in range(cap):
r = store.add("memory", big)
assert r["success"] is False
assert "retry this add" in r["error"] # still instructs in-turn retry
r = store.add("memory", big)
assert r["success"] is False
assert r["done"] is True
assert "continue with your reply" in r["error"]
def test_failures_mix_across_actions_share_one_budget(self, store):
store.add("memory", "fact A")
cap = store._MAX_CONSOLIDATION_FAILURES_PER_TURN
# Interleave replace + remove failures — they share the per-turn counter.
actions = [lambda: store.replace("memory", "nope", "x"),
lambda: store.remove("memory", "nope")]
for i in range(cap):
assert actions[i % 2]()["success"] is False
# cap+1th failure (any action) degrades.
r = store.remove("memory", "nope")
assert "continue with your reply" in r["error"]
def test_success_resets_failure_budget(self, store):
store.add("memory", "real entry")
cap = store._MAX_CONSOLIDATION_FAILURES_PER_TURN
for _ in range(cap):
store.replace("memory", "nonexistent", "new")
# A successful op resets the counter — progress was made.
ok = store.replace("memory", "real entry", "updated entry")
assert ok["success"] is True
# Now a fresh failure is treated as the first again (still actionable).
r = store.replace("memory", "nonexistent", "new")
assert "current_entries" in r
assert "continue with your reply" not in r["error"]
def test_reset_consolidation_failures_clears_budget(self, store):
store.add("memory", "fact A")
cap = store._MAX_CONSOLIDATION_FAILURES_PER_TURN
for _ in range(cap + 1):
store.replace("memory", "nonexistent", "new")
# New turn boundary resets the budget.
store.reset_consolidation_failures()
r = store.replace("memory", "nonexistent", "new")
assert "current_entries" in r # actionable again, not degraded
assert "continue with your reply" not in r["error"]
def test_apply_batch_failures_count_toward_budget(self, store):
"""apply_batch is the primary at-capacity consolidation path; its
failures must also degrade so a looping batch can't exhaust the turn
(#42405 whole-bug-class — sibling call path)."""
store.add("memory", "fact A")
cap = store._MAX_CONSOLIDATION_FAILURES_PER_TURN
bad_batch = [{"action": "replace", "old_text": "nope", "content": "x"}]
for _ in range(cap):
r = store.apply_batch("memory", bad_batch)
assert r["success"] is False
assert "current_entries" in r # still actionable under cap
r = store.apply_batch("memory", bad_batch)
assert r["success"] is False
assert r["done"] is True
assert "continue with your reply" in r["error"]
def test_apply_batch_and_single_op_share_budget(self, store):
"""A batch failure followed by single-op failures shares one counter."""
store.add("memory", "fact A")
cap = store._MAX_CONSOLIDATION_FAILURES_PER_TURN
store.apply_batch("memory", [{"action": "remove", "old_text": "nope"}])
for _ in range(cap - 1):
store.replace("memory", "nope", "x")
# cap reached across batch + single ops → next degrades.
r = store.replace("memory", "nope", "x")
assert "continue with your reply" in r["error"]
class TestMemoryStorePersistence:
def test_save_and_load_roundtrip(self, tmp_path, monkeypatch):
monkeypatch.setattr("tools.memory_tool.get_memory_dir", lambda: tmp_path)

View file

@ -121,6 +121,12 @@ class MemoryStore:
Tool responses always reflect this live state.
"""
# After this many failed consolidation attempts (overflow / zero-match) in
# ONE turn, stop instructing the model to "retry in this turn" and return a
# terminal "save skipped" result so a fragile replace/add can't loop the
# turn to budget exhaustion and suppress the user's reply (issue #42405).
_MAX_CONSOLIDATION_FAILURES_PER_TURN = 3
def __init__(self, memory_char_limit: int = 2200, user_char_limit: int = 1375):
self.memory_entries: List[str] = []
self.user_entries: List[str] = []
@ -128,6 +134,36 @@ class MemoryStore:
self.user_char_limit = user_char_limit
# Frozen snapshot for system prompt -- set once at load_from_disk()
self._system_prompt_snapshot: Dict[str, str] = {"memory": "", "user": ""}
# Per-turn counter of failed at-capacity consolidation attempts; reset
# at each turn boundary by reset_consolidation_failures() (#42405).
self._consolidation_failures = 0
def reset_consolidation_failures(self) -> None:
"""Reset the per-turn consolidation-failure counter (call at turn start)."""
self._consolidation_failures = 0
def _consolidation_failure(self, response: Dict[str, Any]) -> Dict[str, Any]:
"""Count an at-capacity consolidation failure and degrade gracefully.
Under the per-turn cap, return ``response`` unchanged (it already tells
the model how to self-correct + retry in this turn). Once the cap is
exceeded, drop the retry instruction and return a TERMINAL result so the
model stops looping memory calls and proceeds to answer the user a
failed memory side effect must never block the turn's reply (#42405).
"""
self._consolidation_failures += 1
if self._consolidation_failures <= self._MAX_CONSOLIDATION_FAILURES_PER_TURN:
return response
return {
"success": False,
"done": True,
"error": (
f"Memory consolidation failed {self._consolidation_failures} times "
"this turn. Stop retrying memory calls — leave memory unchanged for "
"now and continue with your reply to the user. The fact can be saved "
"in a later turn."
),
}
def load_from_disk(self):
"""Load entries from MEMORY.md and USER.md, capture system prompt snapshot.
@ -330,8 +366,7 @@ class MemoryStore:
if new_total > limit:
current = self._char_count(target)
previews = [e[:80] + ("..." if len(e) > 80 else "") for e in entries]
return {
return self._consolidation_failure({
"success": False,
"error": (
f"Memory at {current:,}/{limit:,} chars. "
@ -340,10 +375,9 @@ class MemoryStore:
f"shorter ones or 'remove' stale or less important entries (see "
f"current_entries below), then retry this add — all in this turn."
),
"previews": previews,
"current_entries": entries,
"usage": f"{current:,}/{limit:,}",
}
})
entries.append(content)
self._set_entries(target, entries)
@ -374,19 +408,17 @@ class MemoryStore:
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
if not matches:
previews = [e[:80] + ("..." if len(e) > 80 else "") for e in entries]
return {
return self._consolidation_failure({
"success": False,
"error": f"No entry matched '{old_text}'. Check the previews below and retry with the exact text of the entry you want to replace.",
"previews": previews,
"error": f"No entry matched '{old_text}'. Check current_entries below and retry with the exact text of the entry you want to replace.",
"current_entries": entries,
}
})
if len(matches) > 1:
# If all matches are identical (exact duplicates), operate on the first one
unique_texts = {e for _, e in matches}
if len(unique_texts) > 1:
previews = [e[:80] + ("..." if len(e) > 80 else "") for _, e in matches]
previews = self._previews([e for _, e in matches])
return {
"success": False,
"error": f"Multiple entries matched '{old_text}'. Be more specific.",
@ -404,7 +436,7 @@ class MemoryStore:
if new_total > limit:
current = self._char_count(target)
return {
return self._consolidation_failure({
"success": False,
"error": (
f"Replacement would put memory at {new_total:,}/{limit:,} chars. "
@ -414,7 +446,7 @@ class MemoryStore:
),
"current_entries": entries,
"usage": f"{current:,}/{limit:,}",
}
})
entries[idx] = new_content
self._set_entries(target, entries)
@ -437,19 +469,17 @@ class MemoryStore:
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
if not matches:
previews = [e[:80] + ("..." if len(e) > 80 else "") for e in entries]
return {
return self._consolidation_failure({
"success": False,
"error": f"No entry matched '{old_text}'. Check the previews below and retry with the exact text of the entry you want to remove.",
"previews": previews,
"error": f"No entry matched '{old_text}'. Check current_entries below and retry with the exact text of the entry you want to remove.",
"current_entries": entries,
}
})
if len(matches) > 1:
# If all matches are identical (exact duplicates), remove the first one
unique_texts = {e for _, e in matches}
if len(unique_texts) > 1:
previews = [e[:80] + ("..." if len(e) > 80 else "") for _, e in matches]
previews = self._previews([e for _, e in matches])
return {
"success": False,
"error": f"Multiple entries matched '{old_text}'. Be more specific.",
@ -554,7 +584,7 @@ class MemoryStore:
new_total = len(ENTRY_DELIMITER.join(working)) if working else 0
if new_total > limit:
current = self._char_count(target)
return {
return self._consolidation_failure({
"success": False,
"error": (
f"After applying all {len(operations)} operations, memory would be at "
@ -563,7 +593,7 @@ class MemoryStore:
),
"current_entries": self._entries_for(target),
"usage": f"{current:,}/{limit:,}",
}
})
# Commit.
self._set_entries(target, working)
@ -575,12 +605,12 @@ class MemoryStore:
"""Build a batch-abort error that reports live (uncommitted) state."""
current = self._char_count(target)
limit = self._char_limit(target)
return {
return self._consolidation_failure({
"success": False,
"error": message + " No operations were applied (batch is all-or-nothing).",
"current_entries": self._entries_for(target),
"usage": f"{current:,}/{limit:,}",
}
})
def format_for_system_prompt(self, target: str) -> Optional[str]:
"""
@ -597,7 +627,16 @@ class MemoryStore:
# -- Internal helpers --
@staticmethod
def _previews(entries: List[str], width: int = 80) -> List[str]:
"""Truncated one-line previews of entries for error feedback."""
return [e[:width] + ("..." if len(e) > width else "") for e in entries]
def _success_response(self, target: str, message: str = None) -> Dict[str, Any]:
# A successful write means the consolidation loop made progress, so the
# per-turn failure budget resets (the cap counts consecutive failures,
# not lifetime ones within a turn) (#42405).
self._consolidation_failures = 0
entries = self._entries_for(target)
current = self._char_count(target)
limit = self._char_limit(target)