fix(bg-review): scope stdout/stderr silencing to the worker thread (#55966)
The background memory/skill review thread wrapped its whole body in process-global contextlib.redirect_stdout/stderr(devnull). Those rebind sys.stdout/sys.stderr for the ENTIRE process, so for the full duration of the review (tens of seconds) every other thread — including a gateway event-loop thread driving a Telegram long-poll — also wrote to devnull. Any bare print/sys.stderr.write from those threads during the window was silently lost (#55769 / #55925). Replace the global redirect with thread_scoped_silence(): a per-thread routing proxy installed once as sys.stdout/sys.stderr that sends only the registered (bg-review) thread's writes to devnull and passes every other thread through to the real stream. Depth-counted so nested use composes. Verified: a concurrent thread writing while the bg-review thread is inside the silence window keeps its output on the real stream.
This commit is contained in:
parent
972aa33d37
commit
b5267671f2
3 changed files with 311 additions and 12 deletions
|
|
@ -18,12 +18,13 @@ for invariants and PR review criteria.
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
import contextlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from agent.thread_scoped_output import thread_scoped_silence
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
|
@ -602,9 +603,15 @@ def _run_review_in_thread(
|
|||
review_agent = None
|
||||
review_messages: List[Dict] = []
|
||||
try:
|
||||
with open(os.devnull, "w", encoding="utf-8") as _devnull, \
|
||||
contextlib.redirect_stdout(_devnull), \
|
||||
contextlib.redirect_stderr(_devnull):
|
||||
# Silence stdout/stderr for THIS worker thread only. A process-global
|
||||
# ``contextlib.redirect_stdout(devnull)`` here would also blank
|
||||
# ``sys.stdout``/``sys.stderr`` for every other thread — including a
|
||||
# gateway event-loop thread driving a Telegram long-poll — for the full
|
||||
# duration of the review (tens of seconds), swallowing their console
|
||||
# output (#55769 / #55925). ``thread_scoped_silence`` routes only this
|
||||
# thread's writes to devnull and leaves all other threads on the real
|
||||
# streams.
|
||||
with thread_scoped_silence():
|
||||
# Inherit the parent agent's live runtime (provider, model,
|
||||
# base_url, api_key, api_mode) so the fork uses the exact
|
||||
# same credentials the main turn is using. Without this,
|
||||
|
|
@ -822,16 +829,14 @@ def _run_review_in_thread(
|
|||
logger.warning("Background memory/skill review failed: %s", e)
|
||||
agent._emit_auxiliary_failure("background review", e)
|
||||
finally:
|
||||
# Safety-net cleanup for the exception path. Normal
|
||||
# completion already shut down inside redirect_stdout above.
|
||||
# Re-open devnull here so any teardown output (Honcho flush,
|
||||
# Hindsight sync, background thread joins) stays silent even
|
||||
# on the exception path where redirect_stdout already exited.
|
||||
# Safety-net cleanup for the exception path. Normal completion already
|
||||
# shut down inside the thread-scoped silence above. Re-enter the
|
||||
# thread-scoped silence here so teardown output (Honcho flush, Hindsight
|
||||
# sync, background thread joins) stays quiet even on the exception path,
|
||||
# without blanking other threads' streams.
|
||||
if review_agent is not None:
|
||||
try:
|
||||
with open(os.devnull, "w", encoding="utf-8") as _fn, \
|
||||
contextlib.redirect_stdout(_fn), \
|
||||
contextlib.redirect_stderr(_fn):
|
||||
with thread_scoped_silence():
|
||||
try:
|
||||
review_agent.shutdown_memory_provider()
|
||||
except Exception:
|
||||
|
|
|
|||
147
agent/thread_scoped_output.py
Normal file
147
agent/thread_scoped_output.py
Normal file
|
|
@ -0,0 +1,147 @@
|
|||
"""Thread-scoped stdout/stderr silencing for background worker threads.
|
||||
|
||||
``contextlib.redirect_stdout``/``redirect_stderr`` reassign the *process-global*
|
||||
``sys.stdout``/``sys.stderr``. When a daemon worker thread (e.g. the background
|
||||
memory/skill review) wraps its whole body in those context managers, every other
|
||||
thread in the process — including a gateway's asyncio event-loop thread driving a
|
||||
Telegram long-poll — sees ``sys.stdout``/``sys.stderr`` pointing at ``devnull``
|
||||
for the full duration. Any bare ``print`` / ``sys.stderr.write`` from those other
|
||||
threads is silently lost during that window (see issue #55769 / #55925).
|
||||
|
||||
This module installs a thin proxy as ``sys.stdout``/``sys.stderr`` that routes
|
||||
writes per-thread: threads registered as "silenced" go to a sink; every other
|
||||
thread passes through to the *original* stream. The proxy is installed once,
|
||||
idempotently, and is never uninstalled (uninstalling would race other threads
|
||||
mid-write), so the only observable effect for unregistered threads is one extra
|
||||
attribute lookup per write.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import contextlib
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
from typing import Iterator, TextIO
|
||||
|
||||
__all__ = ["thread_scoped_silence"]
|
||||
|
||||
_install_lock = threading.Lock()
|
||||
# Maps the proxy we installed for a given attribute ("stdout"/"stderr") so we
|
||||
# never double-wrap and so we can recover the original stream.
|
||||
_installed: dict[str, "_ThreadRoutingStream"] = {}
|
||||
|
||||
|
||||
class _ThreadRoutingStream:
|
||||
"""A ``sys.stdout``/``sys.stderr`` stand-in that routes writes per-thread.
|
||||
|
||||
Threads whose ident is in ``_silenced`` write to ``_sink``; all other
|
||||
threads write to ``_passthrough`` (the original stream captured at install
|
||||
time). Attribute access for anything other than the methods we override
|
||||
is delegated to the *current* target so things like ``.encoding`` /
|
||||
``.fileno()`` behave like the underlying stream for the calling thread.
|
||||
"""
|
||||
|
||||
def __init__(self, passthrough: TextIO, sink: TextIO) -> None:
|
||||
self._passthrough = passthrough
|
||||
self._sink = sink
|
||||
# ident -> nesting depth. A thread is silenced while depth > 0, so
|
||||
# nested ``thread_scoped_silence()`` on the same thread composes
|
||||
# correctly (the inner exit decrements rather than fully clearing).
|
||||
self._silenced: dict[int, int] = {}
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _target(self) -> TextIO:
|
||||
if self._silenced.get(threading.get_ident(), 0) > 0:
|
||||
return self._sink
|
||||
return self._passthrough
|
||||
|
||||
# --- registration -----------------------------------------------------
|
||||
def silence(self, ident: int) -> None:
|
||||
with self._lock:
|
||||
self._silenced[ident] = self._silenced.get(ident, 0) + 1
|
||||
|
||||
def unsilence(self, ident: int) -> None:
|
||||
with self._lock:
|
||||
depth = self._silenced.get(ident, 0) - 1
|
||||
if depth > 0:
|
||||
self._silenced[ident] = depth
|
||||
else:
|
||||
self._silenced.pop(ident, None)
|
||||
|
||||
# --- file-like surface ------------------------------------------------
|
||||
def write(self, data): # type: ignore[no-untyped-def]
|
||||
try:
|
||||
return self._target().write(data)
|
||||
except Exception:
|
||||
return len(data) if isinstance(data, str) else 0
|
||||
|
||||
def flush(self): # type: ignore[no-untyped-def]
|
||||
try:
|
||||
return self._target().flush()
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def writelines(self, lines): # type: ignore[no-untyped-def]
|
||||
target = self._target()
|
||||
try:
|
||||
return target.writelines(lines)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def isatty(self) -> bool:
|
||||
try:
|
||||
return bool(self._target().isatty())
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def fileno(self): # type: ignore[no-untyped-def]
|
||||
return self._target().fileno()
|
||||
|
||||
def __getattr__(self, name): # type: ignore[no-untyped-def]
|
||||
# Delegate everything we don't override (encoding, buffer, mode, ...)
|
||||
# to the calling thread's current target.
|
||||
return getattr(self._target(), name)
|
||||
|
||||
|
||||
def _ensure_installed(attr: str, sink: TextIO) -> "_ThreadRoutingStream":
|
||||
"""Install (idempotently) a routing proxy as ``sys.<attr>`` and return it."""
|
||||
with _install_lock:
|
||||
proxy = _installed.get(attr)
|
||||
current = getattr(sys, attr, None)
|
||||
if proxy is not None and current is proxy:
|
||||
return proxy
|
||||
# Capture whatever is currently bound as the passthrough. If a prior
|
||||
# global redirect_stdout is active we deliberately route non-silenced
|
||||
# threads to *that* (matching prior behaviour) rather than guessing at
|
||||
# the "real" stream.
|
||||
passthrough = current if current is not None else sink
|
||||
proxy = _ThreadRoutingStream(passthrough, sink)
|
||||
setattr(sys, attr, proxy)
|
||||
_installed[attr] = proxy
|
||||
return proxy
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def thread_scoped_silence() -> Iterator[None]:
|
||||
"""Silence ``stdout``/``stderr`` for the *current thread only*.
|
||||
|
||||
Other threads keep writing to the real streams. Use this around a worker
|
||||
thread's body instead of ``contextlib.redirect_stdout(devnull)`` when the
|
||||
process is multi-threaded and another thread must keep its console output.
|
||||
"""
|
||||
sink = open(os.devnull, "w", encoding="utf-8")
|
||||
ident = threading.get_ident()
|
||||
out_proxy = _ensure_installed("stdout", sink)
|
||||
err_proxy = _ensure_installed("stderr", sink)
|
||||
out_proxy.silence(ident)
|
||||
err_proxy.silence(ident)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
out_proxy.unsilence(ident)
|
||||
err_proxy.unsilence(ident)
|
||||
try:
|
||||
sink.close()
|
||||
except Exception:
|
||||
pass
|
||||
147
tests/agent/test_thread_scoped_output.py
Normal file
147
tests/agent/test_thread_scoped_output.py
Normal file
|
|
@ -0,0 +1,147 @@
|
|||
"""Tests for agent.thread_scoped_output.thread_scoped_silence.
|
||||
|
||||
Behaviour contract: a thread inside ``thread_scoped_silence()`` has its
|
||||
stdout/stderr routed to devnull, while every OTHER thread keeps writing to the
|
||||
real stream — even concurrently, while the first thread is still inside the
|
||||
context. This is the property the old process-global
|
||||
``contextlib.redirect_stdout(devnull)`` violated (issue #55769 / #55925).
|
||||
"""
|
||||
|
||||
import io
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
from agent.thread_scoped_output import thread_scoped_silence
|
||||
|
||||
|
||||
def _run_with_real_stream(fn):
|
||||
"""Bind a StringIO as the real stdout, run fn, return what reached it."""
|
||||
real_out = io.StringIO()
|
||||
orig = sys.stdout
|
||||
sys.stdout = real_out
|
||||
try:
|
||||
fn()
|
||||
finally:
|
||||
sys.stdout = orig
|
||||
return real_out.getvalue()
|
||||
|
||||
|
||||
def test_current_thread_is_silenced():
|
||||
def body():
|
||||
with thread_scoped_silence():
|
||||
print("dropped")
|
||||
print("kept")
|
||||
|
||||
captured = _run_with_real_stream(body)
|
||||
assert "dropped" not in captured
|
||||
assert "kept" in captured
|
||||
|
||||
|
||||
def test_concurrent_thread_keeps_output_during_silence_window():
|
||||
"""A loud thread writing WHILE another thread is silenced must survive."""
|
||||
inside_silence = threading.Event()
|
||||
loud_done = threading.Event()
|
||||
|
||||
def silenced_worker():
|
||||
with thread_scoped_silence():
|
||||
print("SILENCED")
|
||||
inside_silence.set()
|
||||
# Hold the silence window until the loud thread has written.
|
||||
loud_done.wait(timeout=2.0)
|
||||
|
||||
def loud_worker():
|
||||
inside_silence.wait(timeout=2.0)
|
||||
print("LOUD")
|
||||
loud_done.set()
|
||||
|
||||
def body():
|
||||
t1 = threading.Thread(target=silenced_worker)
|
||||
t2 = threading.Thread(target=loud_worker)
|
||||
t1.start()
|
||||
t2.start()
|
||||
t1.join(timeout=3.0)
|
||||
t2.join(timeout=3.0)
|
||||
|
||||
captured = _run_with_real_stream(body)
|
||||
assert "SILENCED" not in captured
|
||||
assert "LOUD" in captured
|
||||
|
||||
|
||||
def test_stderr_is_also_routed_per_thread():
|
||||
real_err = io.StringIO()
|
||||
orig = sys.stderr
|
||||
sys.stderr = real_err
|
||||
try:
|
||||
with thread_scoped_silence():
|
||||
sys.stderr.write("err-dropped\n")
|
||||
sys.stderr.write("err-kept\n")
|
||||
finally:
|
||||
sys.stderr = orig
|
||||
out = real_err.getvalue()
|
||||
assert "err-dropped" not in out
|
||||
assert "err-kept" in out
|
||||
|
||||
|
||||
def test_nested_silence_same_thread_composes():
|
||||
def body():
|
||||
with thread_scoped_silence():
|
||||
with thread_scoped_silence():
|
||||
print("inner")
|
||||
# Still inside the OUTER context — depth-counted, so this thread
|
||||
# remains silenced after the inner context exits.
|
||||
print("after-inner")
|
||||
print("after-outer")
|
||||
|
||||
captured = _run_with_real_stream(body)
|
||||
assert "inner" not in captured
|
||||
assert "after-inner" not in captured
|
||||
assert "after-outer" in captured
|
||||
|
||||
|
||||
def test_unsilence_cleans_up_after_exit():
|
||||
"""After the context exits, the calling thread writes to the real stream."""
|
||||
seen = []
|
||||
|
||||
def body():
|
||||
with thread_scoped_silence():
|
||||
pass
|
||||
print("post")
|
||||
seen.append("post")
|
||||
|
||||
captured = _run_with_real_stream(body)
|
||||
assert "post" in captured
|
||||
assert seen == ["post"]
|
||||
|
||||
|
||||
def test_many_concurrent_silenced_and_loud_threads():
|
||||
"""Stress: interleaved silenced/loud threads keep their respective fates."""
|
||||
start = threading.Event()
|
||||
results_lock = threading.Lock()
|
||||
|
||||
def silenced(i):
|
||||
start.wait(timeout=2.0)
|
||||
with thread_scoped_silence():
|
||||
print(f"S{i}")
|
||||
time.sleep(0.05)
|
||||
|
||||
def loud(i):
|
||||
start.wait(timeout=2.0)
|
||||
time.sleep(0.02)
|
||||
print(f"L{i}")
|
||||
|
||||
def body():
|
||||
threads = []
|
||||
for i in range(5):
|
||||
threads.append(threading.Thread(target=silenced, args=(i,)))
|
||||
threads.append(threading.Thread(target=loud, args=(i,)))
|
||||
for t in threads:
|
||||
t.start()
|
||||
start.set()
|
||||
for t in threads:
|
||||
t.join(timeout=3.0)
|
||||
|
||||
captured = _run_with_real_stream(body)
|
||||
for i in range(5):
|
||||
assert f"S{i}" not in captured, f"silenced S{i} leaked"
|
||||
assert f"L{i}" in captured, f"loud L{i} swallowed"
|
||||
Loading…
Add table
Add a link
Reference in a new issue