diff --git a/agent/turn_context.py b/agent/turn_context.py index 5ec0a0a90..88980b4ad 100644 --- a/agent/turn_context.py +++ b/agent/turn_context.py @@ -223,6 +223,9 @@ def build_turn_context( agent._unicode_sanitization_passes = 0 agent._tool_guardrails.reset_for_turn() agent._tool_guardrail_halt_decision = None + _reset_consol = getattr(agent._memory_store, "reset_consolidation_failures", None) + if callable(_reset_consol): + _reset_consol() agent._vision_supported = True # Pre-turn connection health check: clean up dead TCP connections. diff --git a/tests/tools/test_memory_tool.py b/tests/tools/test_memory_tool.py index 06e8cf0c6..a7c822cb1 100644 --- a/tests/tools/test_memory_tool.py +++ b/tests/tools/test_memory_tool.py @@ -330,6 +330,10 @@ class TestMemoryStoreReplace: store.add("memory", "fact A") result = store.replace("memory", "nonexistent", "new") assert result["success"] is False + assert "No entry matched" in result["error"] + # Zero-match must return current entries so the agent can self-correct + # instead of looping blindly (#42405, co-author #42417). + assert result["current_entries"] == ["fact A"] def test_replace_ambiguous_match(self, store): store.add("memory", "server A runs nginx") @@ -361,14 +365,119 @@ class TestMemoryStoreRemove: assert len(store.memory_entries) == 0 def test_remove_no_match(self, store): + store.add("memory", "fact A") result = store.remove("memory", "nonexistent") assert result["success"] is False + assert "No entry matched" in result["error"] + # Zero-match must return current entries (#42405, co-author #42417). + assert result["current_entries"] == ["fact A"] def test_remove_empty_old_text(self, store): result = store.remove("memory", " ") assert result["success"] is False +class TestMemoryConsolidationGracefulDegrade: + """Fix #3 for #42405: a failed at-capacity consolidation must never loop the + turn to budget exhaustion — after a per-turn cap of failures, memory ops + return a terminal 'stop, continue your reply' result instead of the + 'retry — all in this turn' instruction.""" + + def test_zero_match_failures_degrade_after_cap(self, store): + store.add("memory", "fact A") + cap = store._MAX_CONSOLIDATION_FAILURES_PER_TURN + # First `cap` failures still hand back previews + the self-correct hint. + for _ in range(cap): + r = store.replace("memory", "nonexistent", "new") + assert r["success"] is False + assert "current_entries" in r # actionable feedback, keep trying + assert "retry with the exact text" in r["error"] + # The next failure degrades: terminal, no retry instruction. + r = store.replace("memory", "nonexistent", "new") + assert r["success"] is False + assert r["done"] is True + assert "current_entries" not in r + assert "continue with your reply" in r["error"] + + def test_add_overflow_degrades_after_cap(self, store): + # Fill near the 500-char user/memory limit so add() overflows. + store.add("memory", "x" * 200) + store.add("memory", "y" * 200) + cap = store._MAX_CONSOLIDATION_FAILURES_PER_TURN + big = "z" * 200 + for _ in range(cap): + r = store.add("memory", big) + assert r["success"] is False + assert "retry this add" in r["error"] # still instructs in-turn retry + r = store.add("memory", big) + assert r["success"] is False + assert r["done"] is True + assert "continue with your reply" in r["error"] + + def test_failures_mix_across_actions_share_one_budget(self, store): + store.add("memory", "fact A") + cap = store._MAX_CONSOLIDATION_FAILURES_PER_TURN + # Interleave replace + remove failures — they share the per-turn counter. + actions = [lambda: store.replace("memory", "nope", "x"), + lambda: store.remove("memory", "nope")] + for i in range(cap): + assert actions[i % 2]()["success"] is False + # cap+1th failure (any action) degrades. + r = store.remove("memory", "nope") + assert "continue with your reply" in r["error"] + + def test_success_resets_failure_budget(self, store): + store.add("memory", "real entry") + cap = store._MAX_CONSOLIDATION_FAILURES_PER_TURN + for _ in range(cap): + store.replace("memory", "nonexistent", "new") + # A successful op resets the counter — progress was made. + ok = store.replace("memory", "real entry", "updated entry") + assert ok["success"] is True + # Now a fresh failure is treated as the first again (still actionable). + r = store.replace("memory", "nonexistent", "new") + assert "current_entries" in r + assert "continue with your reply" not in r["error"] + + def test_reset_consolidation_failures_clears_budget(self, store): + store.add("memory", "fact A") + cap = store._MAX_CONSOLIDATION_FAILURES_PER_TURN + for _ in range(cap + 1): + store.replace("memory", "nonexistent", "new") + # New turn boundary resets the budget. + store.reset_consolidation_failures() + r = store.replace("memory", "nonexistent", "new") + assert "current_entries" in r # actionable again, not degraded + assert "continue with your reply" not in r["error"] + + def test_apply_batch_failures_count_toward_budget(self, store): + """apply_batch is the primary at-capacity consolidation path; its + failures must also degrade so a looping batch can't exhaust the turn + (#42405 whole-bug-class — sibling call path).""" + store.add("memory", "fact A") + cap = store._MAX_CONSOLIDATION_FAILURES_PER_TURN + bad_batch = [{"action": "replace", "old_text": "nope", "content": "x"}] + for _ in range(cap): + r = store.apply_batch("memory", bad_batch) + assert r["success"] is False + assert "current_entries" in r # still actionable under cap + r = store.apply_batch("memory", bad_batch) + assert r["success"] is False + assert r["done"] is True + assert "continue with your reply" in r["error"] + + def test_apply_batch_and_single_op_share_budget(self, store): + """A batch failure followed by single-op failures shares one counter.""" + store.add("memory", "fact A") + cap = store._MAX_CONSOLIDATION_FAILURES_PER_TURN + store.apply_batch("memory", [{"action": "remove", "old_text": "nope"}]) + for _ in range(cap - 1): + store.replace("memory", "nope", "x") + # cap reached across batch + single ops → next degrades. + r = store.replace("memory", "nope", "x") + assert "continue with your reply" in r["error"] + + class TestMemoryStorePersistence: def test_save_and_load_roundtrip(self, tmp_path, monkeypatch): monkeypatch.setattr("tools.memory_tool.get_memory_dir", lambda: tmp_path) diff --git a/tools/memory_tool.py b/tools/memory_tool.py index 17b52e3c3..02315bd07 100644 --- a/tools/memory_tool.py +++ b/tools/memory_tool.py @@ -121,6 +121,12 @@ class MemoryStore: Tool responses always reflect this live state. """ + # After this many failed consolidation attempts (overflow / zero-match) in + # ONE turn, stop instructing the model to "retry in this turn" and return a + # terminal "save skipped" result so a fragile replace/add can't loop the + # turn to budget exhaustion and suppress the user's reply (issue #42405). + _MAX_CONSOLIDATION_FAILURES_PER_TURN = 3 + def __init__(self, memory_char_limit: int = 2200, user_char_limit: int = 1375): self.memory_entries: List[str] = [] self.user_entries: List[str] = [] @@ -128,6 +134,36 @@ class MemoryStore: self.user_char_limit = user_char_limit # Frozen snapshot for system prompt -- set once at load_from_disk() self._system_prompt_snapshot: Dict[str, str] = {"memory": "", "user": ""} + # Per-turn counter of failed at-capacity consolidation attempts; reset + # at each turn boundary by reset_consolidation_failures() (#42405). + self._consolidation_failures = 0 + + def reset_consolidation_failures(self) -> None: + """Reset the per-turn consolidation-failure counter (call at turn start).""" + self._consolidation_failures = 0 + + def _consolidation_failure(self, response: Dict[str, Any]) -> Dict[str, Any]: + """Count an at-capacity consolidation failure and degrade gracefully. + + Under the per-turn cap, return ``response`` unchanged (it already tells + the model how to self-correct + retry in this turn). Once the cap is + exceeded, drop the retry instruction and return a TERMINAL result so the + model stops looping memory calls and proceeds to answer the user — a + failed memory side effect must never block the turn's reply (#42405). + """ + self._consolidation_failures += 1 + if self._consolidation_failures <= self._MAX_CONSOLIDATION_FAILURES_PER_TURN: + return response + return { + "success": False, + "done": True, + "error": ( + f"Memory consolidation failed {self._consolidation_failures} times " + "this turn. Stop retrying memory calls — leave memory unchanged for " + "now and continue with your reply to the user. The fact can be saved " + "in a later turn." + ), + } def load_from_disk(self): """Load entries from MEMORY.md and USER.md, capture system prompt snapshot. @@ -330,8 +366,7 @@ class MemoryStore: if new_total > limit: current = self._char_count(target) - previews = [e[:80] + ("..." if len(e) > 80 else "") for e in entries] - return { + return self._consolidation_failure({ "success": False, "error": ( f"Memory at {current:,}/{limit:,} chars. " @@ -340,10 +375,9 @@ class MemoryStore: f"shorter ones or 'remove' stale or less important entries (see " f"current_entries below), then retry this add — all in this turn." ), - "previews": previews, "current_entries": entries, "usage": f"{current:,}/{limit:,}", - } + }) entries.append(content) self._set_entries(target, entries) @@ -374,19 +408,17 @@ class MemoryStore: matches = [(i, e) for i, e in enumerate(entries) if old_text in e] if not matches: - previews = [e[:80] + ("..." if len(e) > 80 else "") for e in entries] - return { + return self._consolidation_failure({ "success": False, - "error": f"No entry matched '{old_text}'. Check the previews below and retry with the exact text of the entry you want to replace.", - "previews": previews, + "error": f"No entry matched '{old_text}'. Check current_entries below and retry with the exact text of the entry you want to replace.", "current_entries": entries, - } + }) if len(matches) > 1: # If all matches are identical (exact duplicates), operate on the first one unique_texts = {e for _, e in matches} if len(unique_texts) > 1: - previews = [e[:80] + ("..." if len(e) > 80 else "") for _, e in matches] + previews = self._previews([e for _, e in matches]) return { "success": False, "error": f"Multiple entries matched '{old_text}'. Be more specific.", @@ -404,7 +436,7 @@ class MemoryStore: if new_total > limit: current = self._char_count(target) - return { + return self._consolidation_failure({ "success": False, "error": ( f"Replacement would put memory at {new_total:,}/{limit:,} chars. " @@ -414,7 +446,7 @@ class MemoryStore: ), "current_entries": entries, "usage": f"{current:,}/{limit:,}", - } + }) entries[idx] = new_content self._set_entries(target, entries) @@ -437,19 +469,17 @@ class MemoryStore: matches = [(i, e) for i, e in enumerate(entries) if old_text in e] if not matches: - previews = [e[:80] + ("..." if len(e) > 80 else "") for e in entries] - return { + return self._consolidation_failure({ "success": False, - "error": f"No entry matched '{old_text}'. Check the previews below and retry with the exact text of the entry you want to remove.", - "previews": previews, + "error": f"No entry matched '{old_text}'. Check current_entries below and retry with the exact text of the entry you want to remove.", "current_entries": entries, - } + }) if len(matches) > 1: # If all matches are identical (exact duplicates), remove the first one unique_texts = {e for _, e in matches} if len(unique_texts) > 1: - previews = [e[:80] + ("..." if len(e) > 80 else "") for _, e in matches] + previews = self._previews([e for _, e in matches]) return { "success": False, "error": f"Multiple entries matched '{old_text}'. Be more specific.", @@ -554,7 +584,7 @@ class MemoryStore: new_total = len(ENTRY_DELIMITER.join(working)) if working else 0 if new_total > limit: current = self._char_count(target) - return { + return self._consolidation_failure({ "success": False, "error": ( f"After applying all {len(operations)} operations, memory would be at " @@ -563,7 +593,7 @@ class MemoryStore: ), "current_entries": self._entries_for(target), "usage": f"{current:,}/{limit:,}", - } + }) # Commit. self._set_entries(target, working) @@ -575,12 +605,12 @@ class MemoryStore: """Build a batch-abort error that reports live (uncommitted) state.""" current = self._char_count(target) limit = self._char_limit(target) - return { + return self._consolidation_failure({ "success": False, "error": message + " No operations were applied (batch is all-or-nothing).", "current_entries": self._entries_for(target), "usage": f"{current:,}/{limit:,}", - } + }) def format_for_system_prompt(self, target: str) -> Optional[str]: """ @@ -597,7 +627,16 @@ class MemoryStore: # -- Internal helpers -- + @staticmethod + def _previews(entries: List[str], width: int = 80) -> List[str]: + """Truncated one-line previews of entries for error feedback.""" + return [e[:width] + ("..." if len(e) > width else "") for e in entries] + def _success_response(self, target: str, message: str = None) -> Dict[str, Any]: + # A successful write means the consolidation loop made progress, so the + # per-turn failure budget resets (the cap counts consecutive failures, + # not lifetime ones within a turn) (#42405). + self._consolidation_failures = 0 entries = self._entries_for(target) current = self._char_count(target) limit = self._char_limit(target)