diff --git a/cron/jobs.py b/cron/jobs.py index dd69ef55e..4f788a4a3 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -1249,13 +1249,27 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None, # be claimed again on its next fire (Phase 4C CAS). job["fire_claim"] = None - # Increment completed count + # Increment completed count. Finite one-shot jobs are + # pre-claimed by claim_dispatch() BEFORE the side effect runs + # (issue #38758), which already incremented completed — do not + # double-count them here. Recurring jobs and direct callers + # with no pre-run claim still get the legacy increment. if job.get("repeat"): - job["repeat"]["completed"] = job["repeat"].get("completed", 0) + 1 - + repeat = job["repeat"] + times = repeat.get("times") + completed = repeat.get("completed", 0) + kind = job.get("schedule", {}).get("kind") + preclaimed_oneshot = ( + kind == "once" + and times is not None + and times > 0 + and completed > 0 + ) + if not preclaimed_oneshot: + completed += 1 + repeat["completed"] = completed + # Check if we've hit the repeat limit - times = job["repeat"].get("times") - completed = job["repeat"]["completed"] if times is not None and times > 0 and completed >= times: # Remove the job (limit reached) jobs.pop(i) @@ -1300,6 +1314,69 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None, logger.warning("mark_job_run: job_id %s not found, skipping save", job_id) +def claim_dispatch(job_id: str) -> bool: + """Atomically claim a finite one-shot job dispatch BEFORE execution. + + Increments ``repeat.completed`` under the cross-process jobs lock and + persists the claim immediately, so that if the tick dies mid-execution + (gateway kill, OOM, segfault, hard-timeout) the dispatch is not lost. + This converts finite one-shot jobs from *at-least-once* to *at-most-times* + semantics — a job that self-destructs fires at most ``repeat.times`` times + instead of infinitely (issue #38758). + + Returns ``True`` if the caller may proceed to run the job, ``False`` if the + dispatch limit is already reached (in which case the stale job is removed). + + Only claims jobs with ``schedule.kind == "once"`` and ``repeat.times > 0``. + Recurring jobs (they use ``advance_next_run``) and infinite-repeat / no-repeat + jobs are left unchanged and always allowed to proceed. + """ + with _jobs_lock(): + jobs = load_jobs() + for i, job in enumerate(jobs): + if job["id"] != job_id: + continue + if job.get("schedule", {}).get("kind") != "once": + return True # recurring jobs use advance_next_run(), not dispatch claims + repeat = job.get("repeat") + if not repeat: + return True # no repeat limit — always dispatch + times = repeat.get("times") + if times is None or times <= 0: + return True # infinite — always dispatch + completed = repeat.get("completed", 0) + if completed >= times: + # Already dispatched the max number of times (e.g. a prior + # tick claimed then died before mark_job_run could remove it). + # Clean up so it stops appearing as due on every tick. + jobs.pop(i) + save_jobs(jobs) + logger.info( + "Job '%s': dispatch limit reached (%d/%d) — removing", + job.get("name", job["id"]), + completed, + times, + ) + return False + # Claim this dispatch before the side effect runs. + repeat["completed"] = completed + 1 + save_jobs(jobs) + logger.debug( + "Job '%s': claimed dispatch %d/%d", + job.get("name", job["id"]), + repeat["completed"], + times, + ) + return True + + logger.debug( + "claim_dispatch: job_id %s not in store — proceeding without claim " + "(handed-in job dict; nothing to persist a claim against)", + job_id, + ) + return True + + def advance_next_run(job_id: str) -> bool: """Preemptively advance next_run_at for a recurring job before execution. @@ -1543,6 +1620,31 @@ def _get_due_jobs_locked() -> List[Dict[str, Any]]: break # Fall through to due.append(job) — execute once now + # One-shot dispatch-limit guard (issue #38758): a finite one-shot + # claimed via claim_dispatch() but whose tick died before + # mark_job_run could remove it will have completed >= times while + # still looking due (last_run_at was never written, so the + # recovery helper re-armed it). Remove it instead of re-firing. + if kind == "once": + repeat = job.get("repeat") + if repeat: + times = repeat.get("times") + completed = repeat.get("completed", 0) + if times is not None and times > 0 and completed >= times: + logger.info( + "Job '%s': one-shot dispatch limit reached (%d/%d) " + "— removing stale due entry", + job.get("name", job["id"]), + completed, + times, + ) + for rj in raw_jobs: + if rj["id"] == job["id"]: + raw_jobs.remove(rj) + needs_save = True + break + continue + due.append(job) if needs_save: diff --git a/cron/scheduler.py b/cron/scheduler.py index 13944c69d..82f10ee94 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -237,7 +237,7 @@ _LEGACY_HOME_TARGET_ENV_VARS = { "QQBOT_HOME_CHANNEL": "QQ_HOME_CHANNEL", } -from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run +from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run, claim_dispatch # Sentinel: when a cron agent has nothing new to report, it can start its # response with this marker to suppress delivery. Output is still saved @@ -2779,6 +2779,20 @@ def run_one_job(job: dict, *, adapters=None, loop=None, verbose: bool = False) - failure is recorded via ``mark_job_run``), False only if processing raised. """ try: + # Pre-run dispatch claim (issue #38758): atomically commit a finite + # one-shot's dispatch BEFORE its side effect runs, so a tick that dies + # mid-execution (gateway kill, OOM, segfault, hard-timeout) cannot + # re-fire the job forever on restart. No-op for recurring jobs (they + # use advance_next_run) and infinite/no-repeat jobs. This lives here in + # the shared body so BOTH the built-in ticker and the external provider + # (Chronos fire_due) get at-most-times semantics. + if not claim_dispatch(job["id"]): + logger.info( + "Job '%s': one-shot dispatch limit reached — skipping", + job.get("name", job["id"]), + ) + return True # not an error — already handled/removed + success, output, final_response, error = run_job(job) output_file = save_job_output(job["id"], output) diff --git a/tests/cron/test_jobs.py b/tests/cron/test_jobs.py index 9a182bf8c..50eb7ab77 100644 --- a/tests/cron/test_jobs.py +++ b/tests/cron/test_jobs.py @@ -19,6 +19,7 @@ from cron.jobs import ( remove_job, mark_job_run, advance_next_run, + claim_dispatch, get_due_jobs, save_job_output, ) @@ -1314,3 +1315,102 @@ class TestCronOutputRetention: "hermes_cli.config.load_config", lambda: {"cron": {"output_retention": "oops"}} ) assert jobs._cron_output_keep() == jobs._CRON_OUTPUT_DEFAULT_KEEP + + +# ========================================================================= +# claim_dispatch — pre-run one-shot crash safety (issue #38758) +# ========================================================================= + +class TestClaimDispatch: + """One-shot jobs must commit their dispatch BEFORE the side effect runs, so + a tick that dies mid-execution (gateway kill, OOM, hard-timeout) can re-fire + the job at most ``repeat.times`` times instead of infinitely.""" + + def _oneshot(self, times=1, completed=0): + return { + "id": "os1", + "name": "one-shot", + "enabled": True, + "schedule": {"kind": "once", "run_at": "2026-01-01T00:00:00+00:00"}, + "repeat": {"times": times, "completed": completed}, + } + + def test_claim_increments_and_persists(self, tmp_cron_dir): + save_jobs([self._oneshot(times=1, completed=0)]) + assert claim_dispatch("os1") is True + # Persisted BEFORE any side effect — survives a crash. + assert load_jobs()[0]["repeat"]["completed"] == 1 + + def test_already_dispatched_oneshot_is_removed(self, tmp_cron_dir): + # A prior tick claimed (completed==times) then died before mark_job_run + # could remove the job. The next claim must refuse AND clean up. + save_jobs([self._oneshot(times=1, completed=1)]) + assert claim_dispatch("os1") is False + assert load_jobs() == [] # removed, will not re-fire + + def test_recurring_job_is_not_claimed(self, tmp_cron_dir): + job = { + "id": "rec", + "schedule": {"kind": "interval", "minutes": 5}, + "repeat": {"times": 3, "completed": 0}, + } + save_jobs([job]) + assert claim_dispatch("rec") is True + # Recurring jobs use advance_next_run(); claim must NOT touch completed. + assert load_jobs()[0]["repeat"]["completed"] == 0 + + def test_infinite_oneshot_not_claimed(self, tmp_cron_dir): + job = self._oneshot(times=0, completed=0) # times<=0 means infinite + save_jobs([job]) + assert claim_dispatch("os1") is True + assert load_jobs()[0]["repeat"]["completed"] == 0 + + def test_no_repeat_block_not_claimed(self, tmp_cron_dir): + job = {"id": "os1", "schedule": {"kind": "once", "run_at": "2026-01-01T00:00:00+00:00"}} + save_jobs([job]) + assert claim_dispatch("os1") is True + assert "repeat" not in load_jobs()[0] + + def test_missing_job_proceeds(self, tmp_cron_dir): + # A handed-in job dict not persisted in the store (external provider / + # direct caller) can't be claimed — proceed rather than suppress it. + save_jobs([]) + assert claim_dispatch("ghost") is True + + def test_mark_job_run_does_not_double_count_preclaimed_oneshot(self, tmp_cron_dir): + # Full lifecycle: claim bumps completed to times, then mark_job_run must + # NOT increment again — it recognizes the pre-claim and removes the job. + save_jobs([self._oneshot(times=1, completed=0)]) + assert claim_dispatch("os1") is True + assert load_jobs()[0]["repeat"]["completed"] == 1 + mark_job_run("os1", success=True) + assert load_jobs() == [] # completed once, removed — not fired twice + + def test_mark_job_run_still_increments_recurring(self, tmp_cron_dir): + # The double-count guard is one-shot-specific; recurring jobs keep the + # legacy post-run increment. + job = { + "id": "rec", + "schedule": {"kind": "interval", "minutes": 5}, + "repeat": {"times": 3, "completed": 1}, + } + save_jobs([job]) + mark_job_run("rec", success=True) + assert load_jobs()[0]["repeat"]["completed"] == 2 + + def test_get_due_jobs_removes_stale_maxed_oneshot(self, tmp_cron_dir): + # A claimed one-shot whose tick died leaves completed>=times with + # last_run_at still unset, so the recovery helper re-arms it as due. + # get_due_jobs must drop it instead of returning it for another fire. + past = (datetime.now(timezone.utc) - timedelta(seconds=5)).isoformat() + save_jobs([{ + "id": "os1", + "name": "one-shot", + "enabled": True, + "schedule": {"kind": "once", "run_at": past}, + "repeat": {"times": 1, "completed": 1}, + "next_run_at": None, + }]) + due = get_due_jobs() + assert due == [] + assert load_jobs() == [] # cleaned up diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index 39460ca99..08b5b5392 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -2572,6 +2572,46 @@ class TestSilentDelivery: ) +class TestOneShotDispatchClaim: + """run_one_job must claim a finite one-shot's dispatch BEFORE run_job so a + tick that dies mid-execution can't re-fire it forever (issue #38758).""" + + def _oneshot(self): + return { + "id": "monitor-job", + "name": "monitor", + "deliver": "origin", + "origin": {"platform": "telegram", "chat_id": "123"}, + "schedule": {"kind": "once", "run_at": "2026-01-01T00:00:00+00:00"}, + "repeat": {"times": 1, "completed": 0}, + } + + def test_claim_runs_before_run_job(self): + order = [] + with patch("cron.scheduler.get_due_jobs", return_value=[self._oneshot()]), \ + patch("cron.scheduler.claim_dispatch", side_effect=lambda _id: order.append("claim") or True), \ + patch("cron.scheduler.run_job", side_effect=lambda _j: order.append("run") or (True, "# out", "ok", None)), \ + patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \ + patch("cron.scheduler._deliver_result"), \ + patch("cron.scheduler.mark_job_run"): + from cron.scheduler import tick + tick(verbose=False) + assert order == ["claim", "run"] # claim strictly before side effect + + def test_refused_claim_skips_run_job(self): + with patch("cron.scheduler.get_due_jobs", return_value=[self._oneshot()]), \ + patch("cron.scheduler.claim_dispatch", return_value=False), \ + patch("cron.scheduler.run_job") as run_mock, \ + patch("cron.scheduler.save_job_output"), \ + patch("cron.scheduler._deliver_result") as deliver_mock, \ + patch("cron.scheduler.mark_job_run") as mark_mock: + from cron.scheduler import tick + tick(verbose=False) + run_mock.assert_not_called() + deliver_mock.assert_not_called() + mark_mock.assert_not_called() + + class TestBuildJobPromptSilentHint: """Verify _build_job_prompt always injects [SILENT] guidance."""