fix(cron): commit one-shot dispatch before side effect to stop crash re-fire loop (#56177)

A finite one-shot cron job whose side effect kills the tick (gateway
suicide, OOM, segfault, hard-timeout) re-fired forever: mark_job_run —
which increments repeat.completed and removes the job — runs AFTER the
job, so an abrupt tick death never records completion and every
supervisor relaunch re-dispatches the job (#38758).

Commit the dispatch BEFORE the side effect:
- claim_dispatch() increments repeat.completed under the cross-process
  jobs lock and persists it before run_job(), converting finite
  one-shots from at-least-once to at-most-times.
- Called from run_one_job (the shared body used by BOTH the built-in
  ticker and the external Chronos fire_due path) before run_job.
- mark_job_run skips the increment for pre-claimed one-shots (no
  double-count) and still removes at the limit.
- get_due_jobs drops a stale one-shot already at its dispatch limit so
  a job claimed-but-not-cleaned-up after a crash stops appearing as due.
- No-op for recurring jobs (advance_next_run) and infinite/no-repeat
  one-shots; a handed-in job dict absent from the store proceeds.

Closes #38758
This commit is contained in:
Teknium 2026-07-01 01:30:36 -07:00 committed by GitHub
parent 80d0ff8da5
commit 84c724d692
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 262 additions and 6 deletions

View file

@ -1249,13 +1249,27 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None,
# be claimed again on its next fire (Phase 4C CAS).
job["fire_claim"] = None
# Increment completed count
# Increment completed count. Finite one-shot jobs are
# pre-claimed by claim_dispatch() BEFORE the side effect runs
# (issue #38758), which already incremented completed — do not
# double-count them here. Recurring jobs and direct callers
# with no pre-run claim still get the legacy increment.
if job.get("repeat"):
job["repeat"]["completed"] = job["repeat"].get("completed", 0) + 1
repeat = job["repeat"]
times = repeat.get("times")
completed = repeat.get("completed", 0)
kind = job.get("schedule", {}).get("kind")
preclaimed_oneshot = (
kind == "once"
and times is not None
and times > 0
and completed > 0
)
if not preclaimed_oneshot:
completed += 1
repeat["completed"] = completed
# Check if we've hit the repeat limit
times = job["repeat"].get("times")
completed = job["repeat"]["completed"]
if times is not None and times > 0 and completed >= times:
# Remove the job (limit reached)
jobs.pop(i)
@ -1300,6 +1314,69 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None,
logger.warning("mark_job_run: job_id %s not found, skipping save", job_id)
def claim_dispatch(job_id: str) -> bool:
"""Atomically claim a finite one-shot job dispatch BEFORE execution.
Increments ``repeat.completed`` under the cross-process jobs lock and
persists the claim immediately, so that if the tick dies mid-execution
(gateway kill, OOM, segfault, hard-timeout) the dispatch is not lost.
This converts finite one-shot jobs from *at-least-once* to *at-most-times*
semantics a job that self-destructs fires at most ``repeat.times`` times
instead of infinitely (issue #38758).
Returns ``True`` if the caller may proceed to run the job, ``False`` if the
dispatch limit is already reached (in which case the stale job is removed).
Only claims jobs with ``schedule.kind == "once"`` and ``repeat.times > 0``.
Recurring jobs (they use ``advance_next_run``) and infinite-repeat / no-repeat
jobs are left unchanged and always allowed to proceed.
"""
with _jobs_lock():
jobs = load_jobs()
for i, job in enumerate(jobs):
if job["id"] != job_id:
continue
if job.get("schedule", {}).get("kind") != "once":
return True # recurring jobs use advance_next_run(), not dispatch claims
repeat = job.get("repeat")
if not repeat:
return True # no repeat limit — always dispatch
times = repeat.get("times")
if times is None or times <= 0:
return True # infinite — always dispatch
completed = repeat.get("completed", 0)
if completed >= times:
# Already dispatched the max number of times (e.g. a prior
# tick claimed then died before mark_job_run could remove it).
# Clean up so it stops appearing as due on every tick.
jobs.pop(i)
save_jobs(jobs)
logger.info(
"Job '%s': dispatch limit reached (%d/%d) — removing",
job.get("name", job["id"]),
completed,
times,
)
return False
# Claim this dispatch before the side effect runs.
repeat["completed"] = completed + 1
save_jobs(jobs)
logger.debug(
"Job '%s': claimed dispatch %d/%d",
job.get("name", job["id"]),
repeat["completed"],
times,
)
return True
logger.debug(
"claim_dispatch: job_id %s not in store — proceeding without claim "
"(handed-in job dict; nothing to persist a claim against)",
job_id,
)
return True
def advance_next_run(job_id: str) -> bool:
"""Preemptively advance next_run_at for a recurring job before execution.
@ -1543,6 +1620,31 @@ def _get_due_jobs_locked() -> List[Dict[str, Any]]:
break
# Fall through to due.append(job) — execute once now
# One-shot dispatch-limit guard (issue #38758): a finite one-shot
# claimed via claim_dispatch() but whose tick died before
# mark_job_run could remove it will have completed >= times while
# still looking due (last_run_at was never written, so the
# recovery helper re-armed it). Remove it instead of re-firing.
if kind == "once":
repeat = job.get("repeat")
if repeat:
times = repeat.get("times")
completed = repeat.get("completed", 0)
if times is not None and times > 0 and completed >= times:
logger.info(
"Job '%s': one-shot dispatch limit reached (%d/%d) "
"— removing stale due entry",
job.get("name", job["id"]),
completed,
times,
)
for rj in raw_jobs:
if rj["id"] == job["id"]:
raw_jobs.remove(rj)
needs_save = True
break
continue
due.append(job)
if needs_save:

View file

@ -237,7 +237,7 @@ _LEGACY_HOME_TARGET_ENV_VARS = {
"QQBOT_HOME_CHANNEL": "QQ_HOME_CHANNEL",
}
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run, claim_dispatch
# Sentinel: when a cron agent has nothing new to report, it can start its
# response with this marker to suppress delivery. Output is still saved
@ -2779,6 +2779,20 @@ def run_one_job(job: dict, *, adapters=None, loop=None, verbose: bool = False) -
failure is recorded via ``mark_job_run``), False only if processing raised.
"""
try:
# Pre-run dispatch claim (issue #38758): atomically commit a finite
# one-shot's dispatch BEFORE its side effect runs, so a tick that dies
# mid-execution (gateway kill, OOM, segfault, hard-timeout) cannot
# re-fire the job forever on restart. No-op for recurring jobs (they
# use advance_next_run) and infinite/no-repeat jobs. This lives here in
# the shared body so BOTH the built-in ticker and the external provider
# (Chronos fire_due) get at-most-times semantics.
if not claim_dispatch(job["id"]):
logger.info(
"Job '%s': one-shot dispatch limit reached — skipping",
job.get("name", job["id"]),
)
return True # not an error — already handled/removed
success, output, final_response, error = run_job(job)
output_file = save_job_output(job["id"], output)

View file

@ -19,6 +19,7 @@ from cron.jobs import (
remove_job,
mark_job_run,
advance_next_run,
claim_dispatch,
get_due_jobs,
save_job_output,
)
@ -1314,3 +1315,102 @@ class TestCronOutputRetention:
"hermes_cli.config.load_config", lambda: {"cron": {"output_retention": "oops"}}
)
assert jobs._cron_output_keep() == jobs._CRON_OUTPUT_DEFAULT_KEEP
# =========================================================================
# claim_dispatch — pre-run one-shot crash safety (issue #38758)
# =========================================================================
class TestClaimDispatch:
"""One-shot jobs must commit their dispatch BEFORE the side effect runs, so
a tick that dies mid-execution (gateway kill, OOM, hard-timeout) can re-fire
the job at most ``repeat.times`` times instead of infinitely."""
def _oneshot(self, times=1, completed=0):
return {
"id": "os1",
"name": "one-shot",
"enabled": True,
"schedule": {"kind": "once", "run_at": "2026-01-01T00:00:00+00:00"},
"repeat": {"times": times, "completed": completed},
}
def test_claim_increments_and_persists(self, tmp_cron_dir):
save_jobs([self._oneshot(times=1, completed=0)])
assert claim_dispatch("os1") is True
# Persisted BEFORE any side effect — survives a crash.
assert load_jobs()[0]["repeat"]["completed"] == 1
def test_already_dispatched_oneshot_is_removed(self, tmp_cron_dir):
# A prior tick claimed (completed==times) then died before mark_job_run
# could remove the job. The next claim must refuse AND clean up.
save_jobs([self._oneshot(times=1, completed=1)])
assert claim_dispatch("os1") is False
assert load_jobs() == [] # removed, will not re-fire
def test_recurring_job_is_not_claimed(self, tmp_cron_dir):
job = {
"id": "rec",
"schedule": {"kind": "interval", "minutes": 5},
"repeat": {"times": 3, "completed": 0},
}
save_jobs([job])
assert claim_dispatch("rec") is True
# Recurring jobs use advance_next_run(); claim must NOT touch completed.
assert load_jobs()[0]["repeat"]["completed"] == 0
def test_infinite_oneshot_not_claimed(self, tmp_cron_dir):
job = self._oneshot(times=0, completed=0) # times<=0 means infinite
save_jobs([job])
assert claim_dispatch("os1") is True
assert load_jobs()[0]["repeat"]["completed"] == 0
def test_no_repeat_block_not_claimed(self, tmp_cron_dir):
job = {"id": "os1", "schedule": {"kind": "once", "run_at": "2026-01-01T00:00:00+00:00"}}
save_jobs([job])
assert claim_dispatch("os1") is True
assert "repeat" not in load_jobs()[0]
def test_missing_job_proceeds(self, tmp_cron_dir):
# A handed-in job dict not persisted in the store (external provider /
# direct caller) can't be claimed — proceed rather than suppress it.
save_jobs([])
assert claim_dispatch("ghost") is True
def test_mark_job_run_does_not_double_count_preclaimed_oneshot(self, tmp_cron_dir):
# Full lifecycle: claim bumps completed to times, then mark_job_run must
# NOT increment again — it recognizes the pre-claim and removes the job.
save_jobs([self._oneshot(times=1, completed=0)])
assert claim_dispatch("os1") is True
assert load_jobs()[0]["repeat"]["completed"] == 1
mark_job_run("os1", success=True)
assert load_jobs() == [] # completed once, removed — not fired twice
def test_mark_job_run_still_increments_recurring(self, tmp_cron_dir):
# The double-count guard is one-shot-specific; recurring jobs keep the
# legacy post-run increment.
job = {
"id": "rec",
"schedule": {"kind": "interval", "minutes": 5},
"repeat": {"times": 3, "completed": 1},
}
save_jobs([job])
mark_job_run("rec", success=True)
assert load_jobs()[0]["repeat"]["completed"] == 2
def test_get_due_jobs_removes_stale_maxed_oneshot(self, tmp_cron_dir):
# A claimed one-shot whose tick died leaves completed>=times with
# last_run_at still unset, so the recovery helper re-arms it as due.
# get_due_jobs must drop it instead of returning it for another fire.
past = (datetime.now(timezone.utc) - timedelta(seconds=5)).isoformat()
save_jobs([{
"id": "os1",
"name": "one-shot",
"enabled": True,
"schedule": {"kind": "once", "run_at": past},
"repeat": {"times": 1, "completed": 1},
"next_run_at": None,
}])
due = get_due_jobs()
assert due == []
assert load_jobs() == [] # cleaned up

View file

@ -2572,6 +2572,46 @@ class TestSilentDelivery:
)
class TestOneShotDispatchClaim:
"""run_one_job must claim a finite one-shot's dispatch BEFORE run_job so a
tick that dies mid-execution can't re-fire it forever (issue #38758)."""
def _oneshot(self):
return {
"id": "monitor-job",
"name": "monitor",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
"schedule": {"kind": "once", "run_at": "2026-01-01T00:00:00+00:00"},
"repeat": {"times": 1, "completed": 0},
}
def test_claim_runs_before_run_job(self):
order = []
with patch("cron.scheduler.get_due_jobs", return_value=[self._oneshot()]), \
patch("cron.scheduler.claim_dispatch", side_effect=lambda _id: order.append("claim") or True), \
patch("cron.scheduler.run_job", side_effect=lambda _j: order.append("run") or (True, "# out", "ok", None)), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result"), \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
tick(verbose=False)
assert order == ["claim", "run"] # claim strictly before side effect
def test_refused_claim_skips_run_job(self):
with patch("cron.scheduler.get_due_jobs", return_value=[self._oneshot()]), \
patch("cron.scheduler.claim_dispatch", return_value=False), \
patch("cron.scheduler.run_job") as run_mock, \
patch("cron.scheduler.save_job_output"), \
patch("cron.scheduler._deliver_result") as deliver_mock, \
patch("cron.scheduler.mark_job_run") as mark_mock:
from cron.scheduler import tick
tick(verbose=False)
run_mock.assert_not_called()
deliver_mock.assert_not_called()
mark_mock.assert_not_called()
class TestBuildJobPromptSilentHint:
"""Verify _build_job_prompt always injects [SILENT] guidance."""