fix(cron): isolate per-job TERMINAL_CWD from concurrent cron jobs
A cron job with a per-job `workdir` overrides the process-global `os.environ["TERMINAL_CWD"]` for the entire duration of its agent run and restores it afterwards. The scheduler dispatches workdir jobs on a single-thread sequential pool and workdir-less jobs on a separate parallel pool, and the in-code comments claimed this made the override safe. That only prevents two workdir jobs from overlapping each other. The two pools run concurrently in the same process and share `os.environ`, so while a workdir job has `TERMINAL_CWD` pointed at its project directory, any workdir-less job firing in the same window reads that same global through the terminal, file, and code-exec tools and runs its commands in the wrong directory. The corruption window spans the whole workdir-job run, and a file write or delete can land in another job's tree. This serializes the override with a writer-preferring readers-writer lock. Workdir jobs acquire it as writers (exclusive for their whole run); workdir- less jobs acquire it as readers, so they still run in parallel with each other but never alongside a workdir job's override. The guarantee is based on run overlap rather than tick boundaries, so it also holds when a workdir job spans ticks. ## What does this PR do? Fixes a directory-isolation bug in the cron scheduler: a workdir cron job's process-global `TERMINAL_CWD` override could be observed by a concurrently running workdir-less cron job, causing that job's shell/file/code-exec commands to execute in the wrong directory. ## Related Issue N/A ## Type of Change - [x] 🐛 Bug fix (non-breaking change that fixes an issue) - [ ] ✨ New feature (non-breaking change that adds functionality) - [ ] 🔒 Security fix - [ ] 📝 Documentation update - [ ] ✅ Tests (adding or improving test coverage) - [ ] ♻️ Refactor (no behavior change) - [ ] 🎯 New skill (bundled or hub) ## Changes Made - `cron/scheduler.py`: add `_ReadWriteLock` (writer-preferring) and the module-global `_terminal_cwd_lock`. - `cron/scheduler.py`: in `run_job`, acquire the lock as a writer for workdir jobs and as a reader for workdir-less jobs, spanning the `TERMINAL_CWD` override and its restore in the `finally` block. - `cron/scheduler.py`: correct the stale comments in `run_job` and `tick` that claimed the sequential pool alone made the override safe. - `tests/cron/test_terminal_cwd_lock.py`: new tests for reader concurrency, writer exclusion, and the no-cross-observation regression. ## How to Test 1. `python -m pytest tests/cron/test_terminal_cwd_lock.py -q` — the regression test `test_reader_never_observes_writer_override` fails without the lock and passes with it. 2. `python -m pytest tests/cron/test_cron_workdir.py tests/cron/test_parallel_pool.py -q` — confirms the existing `TERMINAL_CWD` set/restore and pool behaviour are unchanged. ## Checklist ### Code - [x] I've read the Contributing Guide - [x] My commit messages follow Conventional Commits (`fix(scope):`, etc.) - [x] I searched for existing PRs to make sure this isn't a duplicate - [x] My PR contains only changes related to this fix - [x] I've run the affected `tests/cron/` suites and all tests pass - [x] I've added tests for my changes (required for bug fixes) - [x] I've tested on my platform: macOS 15 (Darwin 25.5) ### Documentation & Housekeeping - [x] I've updated relevant documentation (docstrings/comments) — or N/A - [x] I've updated `cli-config.yaml.example` if I added/changed config keys — N/A - [x] I've updated `CONTRIBUTING.md` or `AGENTS.md` if I changed architecture — N/A - [x] I've considered cross-platform impact (Windows, macOS) — uses stdlib `threading` only - [x] I've updated tool descriptions/schemas if I changed tool behavior — N/A
This commit is contained in:
parent
db0fd8f290
commit
abc349bd79
2 changed files with 217 additions and 6 deletions
|
|
@ -305,6 +305,62 @@ _running_lock = threading.Lock()
|
|||
_sequential_pool: Optional[concurrent.futures.ThreadPoolExecutor] = None
|
||||
|
||||
|
||||
class _ReadWriteLock:
|
||||
"""Writer-preferring readers-writer lock.
|
||||
|
||||
Guards the process-global ``os.environ["TERMINAL_CWD"]`` override that a
|
||||
workdir cron job applies for the whole of its agent run. Workdir jobs are
|
||||
writers: they mutate the shared env and need exclusive access. Workdir-less
|
||||
jobs are readers: they only observe ``TERMINAL_CWD`` (indirectly, via the
|
||||
terminal / file / code-exec tools), so any number of them may run
|
||||
concurrently with each other, but none may run alongside a writer — that is
|
||||
exactly what stops a workdir-less job from picking up another job's workdir
|
||||
override and running its commands in the wrong directory.
|
||||
|
||||
Writer preference bounds the wait for a workdir job (dispatched on the
|
||||
single-thread sequential pool) so a stream of workdir-less readers cannot
|
||||
starve it.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._cond = threading.Condition(threading.Lock())
|
||||
self._readers = 0
|
||||
self._writer_active = False
|
||||
self._writers_waiting = 0
|
||||
|
||||
def acquire_read(self) -> None:
|
||||
with self._cond:
|
||||
while self._writer_active or self._writers_waiting > 0:
|
||||
self._cond.wait()
|
||||
self._readers += 1
|
||||
|
||||
def release_read(self) -> None:
|
||||
with self._cond:
|
||||
self._readers -= 1
|
||||
if self._readers == 0:
|
||||
self._cond.notify_all()
|
||||
|
||||
def acquire_write(self) -> None:
|
||||
with self._cond:
|
||||
self._writers_waiting += 1
|
||||
try:
|
||||
while self._writer_active or self._readers > 0:
|
||||
self._cond.wait()
|
||||
finally:
|
||||
self._writers_waiting -= 1
|
||||
self._writer_active = True
|
||||
|
||||
def release_write(self) -> None:
|
||||
with self._cond:
|
||||
self._writer_active = False
|
||||
self._cond.notify_all()
|
||||
|
||||
|
||||
# Serializes the per-job TERMINAL_CWD override against every other concurrently
|
||||
# running cron job. See _ReadWriteLock and run_job for the usage contract.
|
||||
_terminal_cwd_lock = _ReadWriteLock()
|
||||
|
||||
|
||||
def _get_parallel_pool(max_workers: Optional[int]) -> concurrent.futures.ThreadPoolExecutor:
|
||||
"""Return (or create) the persistent parallel pool."""
|
||||
global _parallel_pool, _parallel_pool_max_workers
|
||||
|
|
@ -2252,9 +2308,15 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
|||
# .cursorrules from the job's project dir, AND
|
||||
# - the terminal, file, and code-exec tools run commands from there.
|
||||
#
|
||||
# tick() serializes workdir-jobs outside the parallel pool, so mutating
|
||||
# os.environ["TERMINAL_CWD"] here is safe for those jobs. For workdir-less
|
||||
# jobs we leave TERMINAL_CWD untouched — preserves the original behaviour
|
||||
# os.environ["TERMINAL_CWD"] is process-global, so this override is
|
||||
# serialized by _terminal_cwd_lock (acquired just below): a workdir job
|
||||
# holds it as a writer for its whole run, excluding every other job, while
|
||||
# workdir-less jobs hold it as readers and stay parallel with each other.
|
||||
# The sequential pool only keeps workdir jobs from overlapping EACH OTHER;
|
||||
# the lock is what additionally keeps a concurrently-firing workdir-less
|
||||
# parallel-pool job from observing this override and running its shell /
|
||||
# file / code-exec commands in the wrong directory. For workdir-less jobs
|
||||
# we leave TERMINAL_CWD untouched — preserves the original behaviour
|
||||
# (skip_context_files=True, tools use whatever cwd the scheduler has).
|
||||
_job_workdir = (job.get("workdir") or "").strip() or None
|
||||
if _job_workdir and not Path(_job_workdir).is_dir():
|
||||
|
|
@ -2265,6 +2327,13 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
|||
job_id, _job_workdir,
|
||||
)
|
||||
_job_workdir = None
|
||||
|
||||
_holds_cwd_write = _job_workdir is not None
|
||||
if _holds_cwd_write:
|
||||
_terminal_cwd_lock.acquire_write()
|
||||
else:
|
||||
_terminal_cwd_lock.acquire_read()
|
||||
|
||||
_prior_terminal_cwd = os.environ.get("TERMINAL_CWD", "_UNSET_")
|
||||
if _job_workdir:
|
||||
os.environ["TERMINAL_CWD"] = _job_workdir
|
||||
|
|
@ -2773,6 +2842,12 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
|||
os.environ.pop("TERMINAL_CWD", None)
|
||||
else:
|
||||
os.environ["TERMINAL_CWD"] = _prior_terminal_cwd
|
||||
# Release the cwd lock now that the env is restored, so a waiting
|
||||
# workdir job (or queued reader) can proceed without seeing the override.
|
||||
if _holds_cwd_write:
|
||||
_terminal_cwd_lock.release_write()
|
||||
else:
|
||||
_terminal_cwd_lock.release_read()
|
||||
# Clean up ContextVar session/delivery state for this job.
|
||||
clear_session_vars(_ctx_tokens)
|
||||
for _var_name in _cron_delivery_vars:
|
||||
|
|
@ -2999,9 +3074,11 @@ def tick(verbose: bool = True, adapters=None, loop=None, sync: bool = True) -> i
|
|||
return run_one_job(job, adapters=adapters, loop=loop, verbose=verbose)
|
||||
|
||||
# Partition due jobs: those with a per-job workdir mutate
|
||||
# os.environ["TERMINAL_CWD"] inside run_job, which is process-global —
|
||||
# so they MUST run sequentially to avoid corrupting each other. Jobs
|
||||
# without a workdir leave env untouched and stay parallel-safe.
|
||||
# os.environ["TERMINAL_CWD"] inside run_job, which is process-global, so
|
||||
# they queue on the single-thread sequential pool to run one at a time.
|
||||
# That alone only keeps workdir jobs from overlapping EACH OTHER;
|
||||
# run_job's _terminal_cwd_lock is what additionally stops a concurrently
|
||||
# firing workdir-less parallel-pool job from observing the override.
|
||||
sequential_jobs = [j for j in due_jobs if (j.get("workdir") or "").strip()]
|
||||
parallel_jobs = [j for j in due_jobs if not (j.get("workdir") or "").strip()]
|
||||
|
||||
|
|
|
|||
134
tests/cron/test_terminal_cwd_lock.py
Normal file
134
tests/cron/test_terminal_cwd_lock.py
Normal file
|
|
@ -0,0 +1,134 @@
|
|||
"""Tests for the TERMINAL_CWD readers-writer lock in cron/scheduler.py.
|
||||
|
||||
Workdir cron jobs override the process-global ``os.environ["TERMINAL_CWD"]``
|
||||
for their whole agent run. Workdir-less jobs run concurrently on a separate
|
||||
pool and read that same global (via the terminal / file / code-exec tools), so
|
||||
without serialization they execute commands in another job's workdir.
|
||||
|
||||
``_ReadWriteLock`` models workdir jobs as writers (exclusive) and workdir-less
|
||||
jobs as readers (concurrent with each other, excluded from a writer's run).
|
||||
These tests assert that contract.
|
||||
"""
|
||||
|
||||
import threading
|
||||
|
||||
|
||||
def _lock():
|
||||
import cron.scheduler as sched
|
||||
|
||||
return sched._ReadWriteLock()
|
||||
|
||||
|
||||
def test_multiple_readers_run_concurrently():
|
||||
"""Workdir-less jobs (readers) hold the lock simultaneously."""
|
||||
lock = _lock()
|
||||
# Barrier of 3 only releases if both reader threads hold the read lock at
|
||||
# the same time as the main thread waits — proving readers are concurrent.
|
||||
barrier = threading.Barrier(3, timeout=5)
|
||||
|
||||
def reader():
|
||||
lock.acquire_read()
|
||||
try:
|
||||
barrier.wait()
|
||||
finally:
|
||||
lock.release_read()
|
||||
|
||||
threads = [threading.Thread(target=reader) for _ in range(2)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
|
||||
# Does not raise BrokenBarrierError -> both readers were holding at once.
|
||||
barrier.wait(timeout=5)
|
||||
for t in threads:
|
||||
t.join(timeout=5)
|
||||
assert not t.is_alive()
|
||||
|
||||
|
||||
def test_writer_waits_for_active_reader():
|
||||
"""A workdir job (writer) cannot acquire while a reader holds the lock."""
|
||||
lock = _lock()
|
||||
order = []
|
||||
reader_holding = threading.Event()
|
||||
let_reader_go = threading.Event()
|
||||
|
||||
def reader():
|
||||
lock.acquire_read()
|
||||
try:
|
||||
reader_holding.set()
|
||||
let_reader_go.wait(timeout=5)
|
||||
order.append("reader-release")
|
||||
finally:
|
||||
lock.release_read()
|
||||
|
||||
def writer():
|
||||
reader_holding.wait(timeout=5)
|
||||
lock.acquire_write()
|
||||
try:
|
||||
order.append("writer-acquire")
|
||||
finally:
|
||||
lock.release_write()
|
||||
|
||||
rt = threading.Thread(target=reader)
|
||||
wt = threading.Thread(target=writer)
|
||||
rt.start()
|
||||
wt.start()
|
||||
|
||||
# Give the writer time to try (and block) while the reader still holds.
|
||||
reader_holding.wait(timeout=5)
|
||||
let_reader_go.set()
|
||||
|
||||
rt.join(timeout=5)
|
||||
wt.join(timeout=5)
|
||||
assert not rt.is_alive() and not wt.is_alive()
|
||||
# The writer only ran after the reader released — never alongside it.
|
||||
assert order == ["reader-release", "writer-acquire"]
|
||||
|
||||
|
||||
def test_reader_never_observes_writer_override():
|
||||
"""Regression: the cross-pool TERMINAL_CWD corruption.
|
||||
|
||||
A workdir job (writer) overriding the shared cwd must never be observed by
|
||||
a concurrent workdir-less job (reader). ``shared["cwd"]`` stands in for
|
||||
``os.environ["TERMINAL_CWD"]``: the reader, even though it starts while the
|
||||
writer holds the override, must block until the writer restores the value.
|
||||
"""
|
||||
lock = _lock()
|
||||
shared = {"cwd": "<scheduler>"}
|
||||
observations = []
|
||||
writer_holding = threading.Event()
|
||||
release_writer = threading.Event()
|
||||
|
||||
def writer():
|
||||
lock.acquire_write()
|
||||
try:
|
||||
shared["cwd"] = "/project/A"
|
||||
writer_holding.set()
|
||||
release_writer.wait(timeout=5)
|
||||
finally:
|
||||
shared["cwd"] = "<scheduler>"
|
||||
lock.release_write()
|
||||
|
||||
def reader():
|
||||
# Start only once the writer holds the lock and has applied the
|
||||
# override — the exact window the old code corrupted.
|
||||
writer_holding.wait(timeout=5)
|
||||
lock.acquire_read()
|
||||
try:
|
||||
observations.append(shared["cwd"])
|
||||
finally:
|
||||
lock.release_read()
|
||||
|
||||
wt = threading.Thread(target=writer)
|
||||
rt = threading.Thread(target=reader)
|
||||
wt.start()
|
||||
rt.start()
|
||||
|
||||
# The reader is now blocked on the writer; let the writer finish.
|
||||
writer_holding.wait(timeout=5)
|
||||
release_writer.set()
|
||||
|
||||
wt.join(timeout=5)
|
||||
rt.join(timeout=5)
|
||||
assert not wt.is_alive() and not rt.is_alive()
|
||||
# The reader saw the restored value, never the writer's /project/A override.
|
||||
assert observations == ["<scheduler>"]
|
||||
Loading…
Add table
Add a link
Reference in a new issue