feat(gateway): expose active cron job state via /api/jobs enrichment
Adds is_running, current_session_id, and current_started_at fields to GET /api/jobs by cross-referencing job IDs against active cron sessions in state.db. Changes: - hermes_state.py: new SessionDB.get_active_cron_sessions() method that queries sessions WHERE id LIKE 'cron_%' AND ended_at IS NULL - api_server.py: enrich _handle_list_jobs() output using the active session data - tests: SessionDB unit tests + API endpoint integration tests Closes #33770
This commit is contained in:
parent
321ce94e25
commit
c8f7b496d0
4 changed files with 145 additions and 0 deletions
|
|
@ -3093,6 +3093,22 @@ class APIServerAdapter(BasePlatformAdapter):
|
|||
try:
|
||||
include_disabled = request.query.get("include_disabled", "").lower() in {"true", "1"}
|
||||
jobs = _cron_list(include_disabled=include_disabled)
|
||||
# Enrich with active cron session info so callers can tell
|
||||
# which jobs are currently running without reading state.db.
|
||||
try:
|
||||
session_db = self._ensure_session_db()
|
||||
if session_db:
|
||||
active = session_db.get_active_cron_sessions()
|
||||
for job in jobs:
|
||||
job_id = job.get("id")
|
||||
if job_id in active:
|
||||
job["is_running"] = True
|
||||
job["current_session_id"] = active[job_id]["session_id"]
|
||||
job["current_started_at"] = active[job_id]["started_at"]
|
||||
else:
|
||||
job["is_running"] = False
|
||||
except Exception:
|
||||
pass # enrichment is best-effort; list is still valid
|
||||
return web.json_response({"jobs": jobs})
|
||||
except Exception as e:
|
||||
return web.json_response({"error": str(e)}, status=500)
|
||||
|
|
|
|||
|
|
@ -791,6 +791,37 @@ class SessionDB:
|
|||
)
|
||||
self._execute_write(_do)
|
||||
|
||||
def get_active_cron_sessions(self) -> dict[str, dict[str, str | float]]:
|
||||
"""Return active cron sessions keyed by job_id.
|
||||
|
||||
Active cron sessions have IDs matching ``cron_{job_id}_{timestamp}``
|
||||
with a NULL ``ended_at``. Returns ``{job_id: {"session_id": str,
|
||||
"started_at": float}}``.
|
||||
"""
|
||||
import logging as _logging
|
||||
|
||||
result: dict[str, dict[str, str | float]] = {}
|
||||
try:
|
||||
with self._lock:
|
||||
cursor = self._conn.execute(
|
||||
"SELECT id, started_at FROM sessions "
|
||||
"WHERE id LIKE 'cron_%' AND ended_at IS NULL"
|
||||
)
|
||||
rows = cursor.fetchall()
|
||||
except Exception as _exc:
|
||||
_logging.getLogger(__name__).debug(
|
||||
"Failed to query active cron sessions: %s", _exc
|
||||
)
|
||||
return result
|
||||
for row in rows:
|
||||
sid: str = row["id"]
|
||||
# Parse job_id from cron_{job_id}_{YYYYMMDD_HHMMSS}
|
||||
parts = sid.split("_")
|
||||
if len(parts) >= 3:
|
||||
job_id = parts[1]
|
||||
result[job_id] = {"session_id": sid, "started_at": row["started_at"]}
|
||||
return result
|
||||
|
||||
def update_system_prompt(self, session_id: str, system_prompt: str) -> None:
|
||||
"""Store the full assembled system prompt snapshot."""
|
||||
def _do(conn):
|
||||
|
|
|
|||
|
|
@ -131,6 +131,58 @@ class TestListJobs:
|
|||
assert resp.status == 200
|
||||
mock_list.assert_called_once_with(include_disabled=False)
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# 3-5. test_list_jobs_is_running enrichment
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_jobs_sets_is_running_false(self, adapter):
|
||||
"""Jobs not in active cron sessions get is_running=False."""
|
||||
app = _create_app(adapter)
|
||||
mock_session_db = MagicMock()
|
||||
mock_session_db.get_active_cron_sessions.return_value = {}
|
||||
async with TestClient(TestServer(app)) as cli:
|
||||
with patch(
|
||||
f"{_MOD}._CRON_AVAILABLE", True
|
||||
), patch(
|
||||
f"{_MOD}._cron_list", return_value=[SAMPLE_JOB]
|
||||
), patch.object(
|
||||
adapter, "_ensure_session_db", return_value=mock_session_db
|
||||
):
|
||||
resp = await cli.get("/api/jobs")
|
||||
assert resp.status == 200
|
||||
data = await resp.json()
|
||||
assert data["jobs"][0]["is_running"] is False
|
||||
assert "current_session_id" not in data["jobs"][0]
|
||||
assert "current_started_at" not in data["jobs"][0]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_jobs_sets_is_running_true(self, adapter):
|
||||
"""Jobs with active cron sessions get is_running=True and metadata."""
|
||||
app = _create_app(adapter)
|
||||
mock_session_db = MagicMock()
|
||||
mock_session_db.get_active_cron_sessions.return_value = {
|
||||
"aabbccddeeff": {
|
||||
"session_id": "cron_aabbccddeeff_20260528_120000",
|
||||
"started_at": 1748443200.0,
|
||||
}
|
||||
}
|
||||
async with TestClient(TestServer(app)) as cli:
|
||||
with patch(
|
||||
f"{_MOD}._CRON_AVAILABLE", True
|
||||
), patch(
|
||||
f"{_MOD}._cron_list", return_value=[SAMPLE_JOB]
|
||||
), patch.object(
|
||||
adapter, "_ensure_session_db", return_value=mock_session_db
|
||||
):
|
||||
resp = await cli.get("/api/jobs")
|
||||
assert resp.status == 200
|
||||
data = await resp.json()
|
||||
job = data["jobs"][0]
|
||||
assert job["is_running"] is True
|
||||
assert job["current_session_id"] == "cron_aabbccddeeff_20260528_120000"
|
||||
assert job["current_started_at"] == 1748443200.0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 3-7. test_create_job and validation
|
||||
|
|
|
|||
|
|
@ -137,6 +137,52 @@ class TestSessionLifecycle:
|
|||
assert child["parent_session_id"] == "parent"
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Active cron sessions
|
||||
# =========================================================================
|
||||
|
||||
class TestActiveCronSessions:
|
||||
def test_no_cron_sessions(self, db):
|
||||
"""No cron sessions -> empty dict."""
|
||||
assert db.get_active_cron_sessions() == {}
|
||||
|
||||
def test_active_cron_session_returned(self, db):
|
||||
"""An un-ended cron session shows up in the results."""
|
||||
db.create_session(session_id="cron_abc123def456_20260528_120000", source="cron")
|
||||
active = db.get_active_cron_sessions()
|
||||
assert "abc123def456" in active
|
||||
assert active["abc123def456"]["session_id"] == "cron_abc123def456_20260528_120000"
|
||||
assert isinstance(active["abc123def456"]["started_at"], float)
|
||||
|
||||
def test_ended_cron_session_not_included(self, db):
|
||||
"""Sessions with ended_at set are excluded."""
|
||||
sid = "cron_abc123def456_20260528_120000"
|
||||
db.create_session(session_id=sid, source="cron")
|
||||
db.end_session(sid, "cron_complete")
|
||||
assert db.get_active_cron_sessions() == {}
|
||||
|
||||
def test_non_cron_sessions_ignored(self, db):
|
||||
"""Non-cron sessions (no cron_ prefix) are not included."""
|
||||
db.create_session(session_id="cli_abc123_20260528", source="cli")
|
||||
assert db.get_active_cron_sessions() == {}
|
||||
|
||||
def test_multiple_active_cron_sessions(self, db):
|
||||
"""Multiple active cron jobs each appear keyed by job_id."""
|
||||
db.create_session(session_id="cron_job1_20260528_120000", source="cron")
|
||||
db.create_session(session_id="cron_job2_20260528_120100", source="cron")
|
||||
active = db.get_active_cron_sessions()
|
||||
assert set(active.keys()) == {"job1", "job2"}
|
||||
|
||||
def test_mixed_active_and_completed_cron_sessions(self, db):
|
||||
"""Only un-ended cron sessions are returned."""
|
||||
db.create_session(session_id="cron_job1_20260528_120000", source="cron")
|
||||
sid2 = "cron_job2_20260528_120100"
|
||||
db.create_session(session_id=sid2, source="cron")
|
||||
db.end_session(sid2, "cron_complete")
|
||||
active = db.get_active_cron_sessions()
|
||||
assert set(active.keys()) == {"job1"}
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Message storage
|
||||
# =========================================================================
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue