From b5267671f22ed9f3ee42fc469005c721a84d4618 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Tue, 30 Jun 2026 17:28:33 -0700 Subject: [PATCH] fix(bg-review): scope stdout/stderr silencing to the worker thread (#55966) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The background memory/skill review thread wrapped its whole body in process-global contextlib.redirect_stdout/stderr(devnull). Those rebind sys.stdout/sys.stderr for the ENTIRE process, so for the full duration of the review (tens of seconds) every other thread — including a gateway event-loop thread driving a Telegram long-poll — also wrote to devnull. Any bare print/sys.stderr.write from those threads during the window was silently lost (#55769 / #55925). Replace the global redirect with thread_scoped_silence(): a per-thread routing proxy installed once as sys.stdout/sys.stderr that sends only the registered (bg-review) thread's writes to devnull and passes every other thread through to the real stream. Depth-counted so nested use composes. Verified: a concurrent thread writing while the bg-review thread is inside the silence window keeps its output on the real stream. --- agent/background_review.py | 29 +++-- agent/thread_scoped_output.py | 147 +++++++++++++++++++++++ tests/agent/test_thread_scoped_output.py | 147 +++++++++++++++++++++++ 3 files changed, 311 insertions(+), 12 deletions(-) create mode 100644 agent/thread_scoped_output.py create mode 100644 tests/agent/test_thread_scoped_output.py diff --git a/agent/background_review.py b/agent/background_review.py index 71ae745d7..4b22b7038 100644 --- a/agent/background_review.py +++ b/agent/background_review.py @@ -18,12 +18,13 @@ for invariants and PR review criteria. from __future__ import annotations -import contextlib import json import logging import os from typing import Any, Dict, List, Optional +from agent.thread_scoped_output import thread_scoped_silence + logger = logging.getLogger(__name__) @@ -602,9 +603,15 @@ def _run_review_in_thread( review_agent = None review_messages: List[Dict] = [] try: - with open(os.devnull, "w", encoding="utf-8") as _devnull, \ - contextlib.redirect_stdout(_devnull), \ - contextlib.redirect_stderr(_devnull): + # Silence stdout/stderr for THIS worker thread only. A process-global + # ``contextlib.redirect_stdout(devnull)`` here would also blank + # ``sys.stdout``/``sys.stderr`` for every other thread — including a + # gateway event-loop thread driving a Telegram long-poll — for the full + # duration of the review (tens of seconds), swallowing their console + # output (#55769 / #55925). ``thread_scoped_silence`` routes only this + # thread's writes to devnull and leaves all other threads on the real + # streams. + with thread_scoped_silence(): # Inherit the parent agent's live runtime (provider, model, # base_url, api_key, api_mode) so the fork uses the exact # same credentials the main turn is using. Without this, @@ -822,16 +829,14 @@ def _run_review_in_thread( logger.warning("Background memory/skill review failed: %s", e) agent._emit_auxiliary_failure("background review", e) finally: - # Safety-net cleanup for the exception path. Normal - # completion already shut down inside redirect_stdout above. - # Re-open devnull here so any teardown output (Honcho flush, - # Hindsight sync, background thread joins) stays silent even - # on the exception path where redirect_stdout already exited. + # Safety-net cleanup for the exception path. Normal completion already + # shut down inside the thread-scoped silence above. Re-enter the + # thread-scoped silence here so teardown output (Honcho flush, Hindsight + # sync, background thread joins) stays quiet even on the exception path, + # without blanking other threads' streams. if review_agent is not None: try: - with open(os.devnull, "w", encoding="utf-8") as _fn, \ - contextlib.redirect_stdout(_fn), \ - contextlib.redirect_stderr(_fn): + with thread_scoped_silence(): try: review_agent.shutdown_memory_provider() except Exception: diff --git a/agent/thread_scoped_output.py b/agent/thread_scoped_output.py new file mode 100644 index 000000000..e9e494ab8 --- /dev/null +++ b/agent/thread_scoped_output.py @@ -0,0 +1,147 @@ +"""Thread-scoped stdout/stderr silencing for background worker threads. + +``contextlib.redirect_stdout``/``redirect_stderr`` reassign the *process-global* +``sys.stdout``/``sys.stderr``. When a daemon worker thread (e.g. the background +memory/skill review) wraps its whole body in those context managers, every other +thread in the process — including a gateway's asyncio event-loop thread driving a +Telegram long-poll — sees ``sys.stdout``/``sys.stderr`` pointing at ``devnull`` +for the full duration. Any bare ``print`` / ``sys.stderr.write`` from those other +threads is silently lost during that window (see issue #55769 / #55925). + +This module installs a thin proxy as ``sys.stdout``/``sys.stderr`` that routes +writes per-thread: threads registered as "silenced" go to a sink; every other +thread passes through to the *original* stream. The proxy is installed once, +idempotently, and is never uninstalled (uninstalling would race other threads +mid-write), so the only observable effect for unregistered threads is one extra +attribute lookup per write. +""" + +from __future__ import annotations + +import contextlib +import os +import sys +import threading +from typing import Iterator, TextIO + +__all__ = ["thread_scoped_silence"] + +_install_lock = threading.Lock() +# Maps the proxy we installed for a given attribute ("stdout"/"stderr") so we +# never double-wrap and so we can recover the original stream. +_installed: dict[str, "_ThreadRoutingStream"] = {} + + +class _ThreadRoutingStream: + """A ``sys.stdout``/``sys.stderr`` stand-in that routes writes per-thread. + + Threads whose ident is in ``_silenced`` write to ``_sink``; all other + threads write to ``_passthrough`` (the original stream captured at install + time). Attribute access for anything other than the methods we override + is delegated to the *current* target so things like ``.encoding`` / + ``.fileno()`` behave like the underlying stream for the calling thread. + """ + + def __init__(self, passthrough: TextIO, sink: TextIO) -> None: + self._passthrough = passthrough + self._sink = sink + # ident -> nesting depth. A thread is silenced while depth > 0, so + # nested ``thread_scoped_silence()`` on the same thread composes + # correctly (the inner exit decrements rather than fully clearing). + self._silenced: dict[int, int] = {} + self._lock = threading.Lock() + + def _target(self) -> TextIO: + if self._silenced.get(threading.get_ident(), 0) > 0: + return self._sink + return self._passthrough + + # --- registration ----------------------------------------------------- + def silence(self, ident: int) -> None: + with self._lock: + self._silenced[ident] = self._silenced.get(ident, 0) + 1 + + def unsilence(self, ident: int) -> None: + with self._lock: + depth = self._silenced.get(ident, 0) - 1 + if depth > 0: + self._silenced[ident] = depth + else: + self._silenced.pop(ident, None) + + # --- file-like surface ------------------------------------------------ + def write(self, data): # type: ignore[no-untyped-def] + try: + return self._target().write(data) + except Exception: + return len(data) if isinstance(data, str) else 0 + + def flush(self): # type: ignore[no-untyped-def] + try: + return self._target().flush() + except Exception: + return None + + def writelines(self, lines): # type: ignore[no-untyped-def] + target = self._target() + try: + return target.writelines(lines) + except Exception: + return None + + def isatty(self) -> bool: + try: + return bool(self._target().isatty()) + except Exception: + return False + + def fileno(self): # type: ignore[no-untyped-def] + return self._target().fileno() + + def __getattr__(self, name): # type: ignore[no-untyped-def] + # Delegate everything we don't override (encoding, buffer, mode, ...) + # to the calling thread's current target. + return getattr(self._target(), name) + + +def _ensure_installed(attr: str, sink: TextIO) -> "_ThreadRoutingStream": + """Install (idempotently) a routing proxy as ``sys.`` and return it.""" + with _install_lock: + proxy = _installed.get(attr) + current = getattr(sys, attr, None) + if proxy is not None and current is proxy: + return proxy + # Capture whatever is currently bound as the passthrough. If a prior + # global redirect_stdout is active we deliberately route non-silenced + # threads to *that* (matching prior behaviour) rather than guessing at + # the "real" stream. + passthrough = current if current is not None else sink + proxy = _ThreadRoutingStream(passthrough, sink) + setattr(sys, attr, proxy) + _installed[attr] = proxy + return proxy + + +@contextlib.contextmanager +def thread_scoped_silence() -> Iterator[None]: + """Silence ``stdout``/``stderr`` for the *current thread only*. + + Other threads keep writing to the real streams. Use this around a worker + thread's body instead of ``contextlib.redirect_stdout(devnull)`` when the + process is multi-threaded and another thread must keep its console output. + """ + sink = open(os.devnull, "w", encoding="utf-8") + ident = threading.get_ident() + out_proxy = _ensure_installed("stdout", sink) + err_proxy = _ensure_installed("stderr", sink) + out_proxy.silence(ident) + err_proxy.silence(ident) + try: + yield + finally: + out_proxy.unsilence(ident) + err_proxy.unsilence(ident) + try: + sink.close() + except Exception: + pass diff --git a/tests/agent/test_thread_scoped_output.py b/tests/agent/test_thread_scoped_output.py new file mode 100644 index 000000000..389990546 --- /dev/null +++ b/tests/agent/test_thread_scoped_output.py @@ -0,0 +1,147 @@ +"""Tests for agent.thread_scoped_output.thread_scoped_silence. + +Behaviour contract: a thread inside ``thread_scoped_silence()`` has its +stdout/stderr routed to devnull, while every OTHER thread keeps writing to the +real stream — even concurrently, while the first thread is still inside the +context. This is the property the old process-global +``contextlib.redirect_stdout(devnull)`` violated (issue #55769 / #55925). +""" + +import io +import sys +import threading +import time + +from agent.thread_scoped_output import thread_scoped_silence + + +def _run_with_real_stream(fn): + """Bind a StringIO as the real stdout, run fn, return what reached it.""" + real_out = io.StringIO() + orig = sys.stdout + sys.stdout = real_out + try: + fn() + finally: + sys.stdout = orig + return real_out.getvalue() + + +def test_current_thread_is_silenced(): + def body(): + with thread_scoped_silence(): + print("dropped") + print("kept") + + captured = _run_with_real_stream(body) + assert "dropped" not in captured + assert "kept" in captured + + +def test_concurrent_thread_keeps_output_during_silence_window(): + """A loud thread writing WHILE another thread is silenced must survive.""" + inside_silence = threading.Event() + loud_done = threading.Event() + + def silenced_worker(): + with thread_scoped_silence(): + print("SILENCED") + inside_silence.set() + # Hold the silence window until the loud thread has written. + loud_done.wait(timeout=2.0) + + def loud_worker(): + inside_silence.wait(timeout=2.0) + print("LOUD") + loud_done.set() + + def body(): + t1 = threading.Thread(target=silenced_worker) + t2 = threading.Thread(target=loud_worker) + t1.start() + t2.start() + t1.join(timeout=3.0) + t2.join(timeout=3.0) + + captured = _run_with_real_stream(body) + assert "SILENCED" not in captured + assert "LOUD" in captured + + +def test_stderr_is_also_routed_per_thread(): + real_err = io.StringIO() + orig = sys.stderr + sys.stderr = real_err + try: + with thread_scoped_silence(): + sys.stderr.write("err-dropped\n") + sys.stderr.write("err-kept\n") + finally: + sys.stderr = orig + out = real_err.getvalue() + assert "err-dropped" not in out + assert "err-kept" in out + + +def test_nested_silence_same_thread_composes(): + def body(): + with thread_scoped_silence(): + with thread_scoped_silence(): + print("inner") + # Still inside the OUTER context — depth-counted, so this thread + # remains silenced after the inner context exits. + print("after-inner") + print("after-outer") + + captured = _run_with_real_stream(body) + assert "inner" not in captured + assert "after-inner" not in captured + assert "after-outer" in captured + + +def test_unsilence_cleans_up_after_exit(): + """After the context exits, the calling thread writes to the real stream.""" + seen = [] + + def body(): + with thread_scoped_silence(): + pass + print("post") + seen.append("post") + + captured = _run_with_real_stream(body) + assert "post" in captured + assert seen == ["post"] + + +def test_many_concurrent_silenced_and_loud_threads(): + """Stress: interleaved silenced/loud threads keep their respective fates.""" + start = threading.Event() + results_lock = threading.Lock() + + def silenced(i): + start.wait(timeout=2.0) + with thread_scoped_silence(): + print(f"S{i}") + time.sleep(0.05) + + def loud(i): + start.wait(timeout=2.0) + time.sleep(0.02) + print(f"L{i}") + + def body(): + threads = [] + for i in range(5): + threads.append(threading.Thread(target=silenced, args=(i,))) + threads.append(threading.Thread(target=loud, args=(i,))) + for t in threads: + t.start() + start.set() + for t in threads: + t.join(timeout=3.0) + + captured = _run_with_real_stream(body) + for i in range(5): + assert f"S{i}" not in captured, f"silenced S{i} leaked" + assert f"L{i}" in captured, f"loud L{i} swallowed"