diff --git a/cron/scheduler.py b/cron/scheduler.py index 998af72d7..2e0b204b7 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -305,6 +305,62 @@ _running_lock = threading.Lock() _sequential_pool: Optional[concurrent.futures.ThreadPoolExecutor] = None +class _ReadWriteLock: + """Writer-preferring readers-writer lock. + + Guards the process-global ``os.environ["TERMINAL_CWD"]`` override that a + workdir cron job applies for the whole of its agent run. Workdir jobs are + writers: they mutate the shared env and need exclusive access. Workdir-less + jobs are readers: they only observe ``TERMINAL_CWD`` (indirectly, via the + terminal / file / code-exec tools), so any number of them may run + concurrently with each other, but none may run alongside a writer — that is + exactly what stops a workdir-less job from picking up another job's workdir + override and running its commands in the wrong directory. + + Writer preference bounds the wait for a workdir job (dispatched on the + single-thread sequential pool) so a stream of workdir-less readers cannot + starve it. + """ + + def __init__(self) -> None: + self._cond = threading.Condition(threading.Lock()) + self._readers = 0 + self._writer_active = False + self._writers_waiting = 0 + + def acquire_read(self) -> None: + with self._cond: + while self._writer_active or self._writers_waiting > 0: + self._cond.wait() + self._readers += 1 + + def release_read(self) -> None: + with self._cond: + self._readers -= 1 + if self._readers == 0: + self._cond.notify_all() + + def acquire_write(self) -> None: + with self._cond: + self._writers_waiting += 1 + try: + while self._writer_active or self._readers > 0: + self._cond.wait() + finally: + self._writers_waiting -= 1 + self._writer_active = True + + def release_write(self) -> None: + with self._cond: + self._writer_active = False + self._cond.notify_all() + + +# Serializes the per-job TERMINAL_CWD override against every other concurrently +# running cron job. See _ReadWriteLock and run_job for the usage contract. +_terminal_cwd_lock = _ReadWriteLock() + + def _get_parallel_pool(max_workers: Optional[int]) -> concurrent.futures.ThreadPoolExecutor: """Return (or create) the persistent parallel pool.""" global _parallel_pool, _parallel_pool_max_workers @@ -2252,9 +2308,15 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: # .cursorrules from the job's project dir, AND # - the terminal, file, and code-exec tools run commands from there. # - # tick() serializes workdir-jobs outside the parallel pool, so mutating - # os.environ["TERMINAL_CWD"] here is safe for those jobs. For workdir-less - # jobs we leave TERMINAL_CWD untouched — preserves the original behaviour + # os.environ["TERMINAL_CWD"] is process-global, so this override is + # serialized by _terminal_cwd_lock (acquired just below): a workdir job + # holds it as a writer for its whole run, excluding every other job, while + # workdir-less jobs hold it as readers and stay parallel with each other. + # The sequential pool only keeps workdir jobs from overlapping EACH OTHER; + # the lock is what additionally keeps a concurrently-firing workdir-less + # parallel-pool job from observing this override and running its shell / + # file / code-exec commands in the wrong directory. For workdir-less jobs + # we leave TERMINAL_CWD untouched — preserves the original behaviour # (skip_context_files=True, tools use whatever cwd the scheduler has). _job_workdir = (job.get("workdir") or "").strip() or None if _job_workdir and not Path(_job_workdir).is_dir(): @@ -2265,6 +2327,13 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: job_id, _job_workdir, ) _job_workdir = None + + _holds_cwd_write = _job_workdir is not None + if _holds_cwd_write: + _terminal_cwd_lock.acquire_write() + else: + _terminal_cwd_lock.acquire_read() + _prior_terminal_cwd = os.environ.get("TERMINAL_CWD", "_UNSET_") if _job_workdir: os.environ["TERMINAL_CWD"] = _job_workdir @@ -2773,6 +2842,12 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: os.environ.pop("TERMINAL_CWD", None) else: os.environ["TERMINAL_CWD"] = _prior_terminal_cwd + # Release the cwd lock now that the env is restored, so a waiting + # workdir job (or queued reader) can proceed without seeing the override. + if _holds_cwd_write: + _terminal_cwd_lock.release_write() + else: + _terminal_cwd_lock.release_read() # Clean up ContextVar session/delivery state for this job. clear_session_vars(_ctx_tokens) for _var_name in _cron_delivery_vars: @@ -2999,9 +3074,11 @@ def tick(verbose: bool = True, adapters=None, loop=None, sync: bool = True) -> i return run_one_job(job, adapters=adapters, loop=loop, verbose=verbose) # Partition due jobs: those with a per-job workdir mutate - # os.environ["TERMINAL_CWD"] inside run_job, which is process-global — - # so they MUST run sequentially to avoid corrupting each other. Jobs - # without a workdir leave env untouched and stay parallel-safe. + # os.environ["TERMINAL_CWD"] inside run_job, which is process-global, so + # they queue on the single-thread sequential pool to run one at a time. + # That alone only keeps workdir jobs from overlapping EACH OTHER; + # run_job's _terminal_cwd_lock is what additionally stops a concurrently + # firing workdir-less parallel-pool job from observing the override. sequential_jobs = [j for j in due_jobs if (j.get("workdir") or "").strip()] parallel_jobs = [j for j in due_jobs if not (j.get("workdir") or "").strip()] diff --git a/tests/cron/test_terminal_cwd_lock.py b/tests/cron/test_terminal_cwd_lock.py new file mode 100644 index 000000000..946e24bb1 --- /dev/null +++ b/tests/cron/test_terminal_cwd_lock.py @@ -0,0 +1,134 @@ +"""Tests for the TERMINAL_CWD readers-writer lock in cron/scheduler.py. + +Workdir cron jobs override the process-global ``os.environ["TERMINAL_CWD"]`` +for their whole agent run. Workdir-less jobs run concurrently on a separate +pool and read that same global (via the terminal / file / code-exec tools), so +without serialization they execute commands in another job's workdir. + +``_ReadWriteLock`` models workdir jobs as writers (exclusive) and workdir-less +jobs as readers (concurrent with each other, excluded from a writer's run). +These tests assert that contract. +""" + +import threading + + +def _lock(): + import cron.scheduler as sched + + return sched._ReadWriteLock() + + +def test_multiple_readers_run_concurrently(): + """Workdir-less jobs (readers) hold the lock simultaneously.""" + lock = _lock() + # Barrier of 3 only releases if both reader threads hold the read lock at + # the same time as the main thread waits — proving readers are concurrent. + barrier = threading.Barrier(3, timeout=5) + + def reader(): + lock.acquire_read() + try: + barrier.wait() + finally: + lock.release_read() + + threads = [threading.Thread(target=reader) for _ in range(2)] + for t in threads: + t.start() + + # Does not raise BrokenBarrierError -> both readers were holding at once. + barrier.wait(timeout=5) + for t in threads: + t.join(timeout=5) + assert not t.is_alive() + + +def test_writer_waits_for_active_reader(): + """A workdir job (writer) cannot acquire while a reader holds the lock.""" + lock = _lock() + order = [] + reader_holding = threading.Event() + let_reader_go = threading.Event() + + def reader(): + lock.acquire_read() + try: + reader_holding.set() + let_reader_go.wait(timeout=5) + order.append("reader-release") + finally: + lock.release_read() + + def writer(): + reader_holding.wait(timeout=5) + lock.acquire_write() + try: + order.append("writer-acquire") + finally: + lock.release_write() + + rt = threading.Thread(target=reader) + wt = threading.Thread(target=writer) + rt.start() + wt.start() + + # Give the writer time to try (and block) while the reader still holds. + reader_holding.wait(timeout=5) + let_reader_go.set() + + rt.join(timeout=5) + wt.join(timeout=5) + assert not rt.is_alive() and not wt.is_alive() + # The writer only ran after the reader released — never alongside it. + assert order == ["reader-release", "writer-acquire"] + + +def test_reader_never_observes_writer_override(): + """Regression: the cross-pool TERMINAL_CWD corruption. + + A workdir job (writer) overriding the shared cwd must never be observed by + a concurrent workdir-less job (reader). ``shared["cwd"]`` stands in for + ``os.environ["TERMINAL_CWD"]``: the reader, even though it starts while the + writer holds the override, must block until the writer restores the value. + """ + lock = _lock() + shared = {"cwd": ""} + observations = [] + writer_holding = threading.Event() + release_writer = threading.Event() + + def writer(): + lock.acquire_write() + try: + shared["cwd"] = "/project/A" + writer_holding.set() + release_writer.wait(timeout=5) + finally: + shared["cwd"] = "" + lock.release_write() + + def reader(): + # Start only once the writer holds the lock and has applied the + # override — the exact window the old code corrupted. + writer_holding.wait(timeout=5) + lock.acquire_read() + try: + observations.append(shared["cwd"]) + finally: + lock.release_read() + + wt = threading.Thread(target=writer) + rt = threading.Thread(target=reader) + wt.start() + rt.start() + + # The reader is now blocked on the writer; let the writer finish. + writer_holding.wait(timeout=5) + release_writer.set() + + wt.join(timeout=5) + rt.join(timeout=5) + assert not wt.is_alive() and not rt.is_alive() + # The reader saw the restored value, never the writer's /project/A override. + assert observations == [""]