fix(cron): release TERMINAL_CWD lock even when run_job body raises
Rework follow-up on the per-job TERMINAL_CWD readers-writer lock. The lock was acquired BEFORE the try: whose finally: is the only release site, with the env-override statements (os.environ[TERMINAL_CWD] = workdir; logger.info) sitting in the unprotected window between acquire and try. Any exception there — a raising log handler, an os.environ error, a thread interrupt — propagated out of run_job WITHOUT running the finally, leaking the lock. A leaked writer permanently deadlocks the whole scheduler (every future cron job blocks on acquire_*); a leaked reader blocks all writers. - Snapshot _prior_terminal_cwd before the acquire (so the finally can always restore env even if the body raises before the override). - Open the try: immediately after acquire and move the env-override lines inside it, so the existing finally always releases the lock. - Add a mutation-verified regression test: a workdir job whose in-window logger.info raises must still release the writer lock (a subsequent acquire_write must not block).
This commit is contained in:
parent
abc349bd79
commit
7f71a48a3a
2 changed files with 72 additions and 5 deletions
|
|
@ -2328,18 +2328,28 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
|||
)
|
||||
_job_workdir = None
|
||||
|
||||
# Snapshot the current env value BEFORE acquiring the lock so the finally
|
||||
# below can always restore it, even if an exception fires before we set the
|
||||
# override inside the try. This read can't leak the lock (it precedes the
|
||||
# acquire) and is a no-op for workdir-less jobs (they never mutate the env).
|
||||
_prior_terminal_cwd = os.environ.get("TERMINAL_CWD", "_UNSET_")
|
||||
|
||||
_holds_cwd_write = _job_workdir is not None
|
||||
if _holds_cwd_write:
|
||||
_terminal_cwd_lock.acquire_write()
|
||||
else:
|
||||
_terminal_cwd_lock.acquire_read()
|
||||
|
||||
_prior_terminal_cwd = os.environ.get("TERMINAL_CWD", "_UNSET_")
|
||||
if _job_workdir:
|
||||
os.environ["TERMINAL_CWD"] = _job_workdir
|
||||
logger.info("Job '%s': using workdir %s", job_id, _job_workdir)
|
||||
|
||||
# Everything after the acquire MUST live inside this try, so the finally
|
||||
# below always releases the lock even if the env override or any later
|
||||
# statement raises. A leaked writer would deadlock the whole scheduler
|
||||
# (every future job blocks on acquire_*); a leaked reader blocks all
|
||||
# future writers. Acquire itself can't leak (it either blocks or returns).
|
||||
try:
|
||||
if _job_workdir:
|
||||
os.environ["TERMINAL_CWD"] = _job_workdir
|
||||
logger.info("Job '%s': using workdir %s", job_id, _job_workdir)
|
||||
|
||||
# Re-read .env and config.yaml fresh every run so provider/key
|
||||
# changes take effect without a gateway restart. Route through
|
||||
# load_hermes_dotenv (not a bare load_dotenv) and reset the secret-
|
||||
|
|
|
|||
|
|
@ -132,3 +132,60 @@ def test_reader_never_observes_writer_override():
|
|||
assert not wt.is_alive() and not rt.is_alive()
|
||||
# The reader saw the restored value, never the writer's /project/A override.
|
||||
assert observations == ["<scheduler>"]
|
||||
|
||||
|
||||
def test_run_job_releases_cwd_lock_when_body_raises(tmp_path):
|
||||
"""A workdir job whose run_job body raises must still RELEASE the writer lock.
|
||||
|
||||
Regression for the leak that made the fix "still broken": the acquire was
|
||||
placed before the try whose finally releases, so an exception in the
|
||||
unprotected window (or anywhere in the body) leaked the writer lock and
|
||||
deadlocked the whole scheduler. This asserts the lock is free again after a
|
||||
raising run — acquire_write() must not block.
|
||||
"""
|
||||
from unittest.mock import MagicMock, patch
|
||||
import cron.scheduler as sched
|
||||
|
||||
workdir = tmp_path / "proj"
|
||||
workdir.mkdir()
|
||||
job = {"id": "boom-job", "name": "boom", "prompt": "hi", "workdir": str(workdir)}
|
||||
|
||||
# Force a raise in the WINDOW BETWEEN acquire and the try body — the exact
|
||||
# spot the buggy placement left unprotected. With the fix these statements
|
||||
# are inside the try (finally releases); with the bug the lock leaks.
|
||||
# logger.info(...) fires right after os.environ["TERMINAL_CWD"] is set for a
|
||||
# workdir job, in that window, so making it raise exercises the leak path.
|
||||
real_info = sched.logger.info
|
||||
|
||||
def _raise_on_workdir_log(msg, *args, **kwargs):
|
||||
if isinstance(msg, str) and "using workdir" in msg:
|
||||
raise RuntimeError("boom")
|
||||
return real_info(msg, *args, **kwargs)
|
||||
|
||||
with patch("cron.scheduler._hermes_home", tmp_path), \
|
||||
patch("cron.scheduler._resolve_origin", return_value=None), \
|
||||
patch("hermes_cli.env_loader.load_hermes_dotenv"), \
|
||||
patch("hermes_cli.env_loader.reset_secret_source_cache"), \
|
||||
patch.object(sched.logger, "info", side_effect=_raise_on_workdir_log), \
|
||||
patch("hermes_state.SessionDB", return_value=MagicMock()):
|
||||
# run_job catches its own body exceptions and returns (False, ...);
|
||||
# it must not propagate, and it must release the lock either way.
|
||||
success, _out, _final, _err = sched.run_job(job)
|
||||
|
||||
assert success is False
|
||||
|
||||
# If the writer lock leaked, this acquire would block forever. Prove it's
|
||||
# free by acquiring as a writer from another thread under a short timeout.
|
||||
acquired = threading.Event()
|
||||
|
||||
def try_acquire():
|
||||
sched._terminal_cwd_lock.acquire_write()
|
||||
try:
|
||||
acquired.set()
|
||||
finally:
|
||||
sched._terminal_cwd_lock.release_write()
|
||||
|
||||
t = threading.Thread(target=try_acquire, daemon=True)
|
||||
t.start()
|
||||
assert acquired.wait(timeout=5), "writer lock was leaked by run_job on exception"
|
||||
t.join(timeout=5)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue