From c8f7b496d0f89f24d438053f5dbe1dd690b35991 Mon Sep 17 00:00:00 2001 From: BarnacleBoy Date: Thu, 28 May 2026 20:26:47 +0000 Subject: [PATCH] feat(gateway): expose active cron job state via /api/jobs enrichment Adds is_running, current_session_id, and current_started_at fields to GET /api/jobs by cross-referencing job IDs against active cron sessions in state.db. Changes: - hermes_state.py: new SessionDB.get_active_cron_sessions() method that queries sessions WHERE id LIKE 'cron_%' AND ended_at IS NULL - api_server.py: enrich _handle_list_jobs() output using the active session data - tests: SessionDB unit tests + API endpoint integration tests Closes #33770 --- gateway/platforms/api_server.py | 16 +++++++++ hermes_state.py | 31 ++++++++++++++++ tests/gateway/test_api_server_jobs.py | 52 +++++++++++++++++++++++++++ tests/test_hermes_state.py | 46 ++++++++++++++++++++++++ 4 files changed, 145 insertions(+) diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index a56be5573..6db29a784 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -3093,6 +3093,22 @@ class APIServerAdapter(BasePlatformAdapter): try: include_disabled = request.query.get("include_disabled", "").lower() in {"true", "1"} jobs = _cron_list(include_disabled=include_disabled) + # Enrich with active cron session info so callers can tell + # which jobs are currently running without reading state.db. + try: + session_db = self._ensure_session_db() + if session_db: + active = session_db.get_active_cron_sessions() + for job in jobs: + job_id = job.get("id") + if job_id in active: + job["is_running"] = True + job["current_session_id"] = active[job_id]["session_id"] + job["current_started_at"] = active[job_id]["started_at"] + else: + job["is_running"] = False + except Exception: + pass # enrichment is best-effort; list is still valid return web.json_response({"jobs": jobs}) except Exception as e: return web.json_response({"error": str(e)}, status=500) diff --git a/hermes_state.py b/hermes_state.py index ba33598b9..37feb5844 100644 --- a/hermes_state.py +++ b/hermes_state.py @@ -791,6 +791,37 @@ class SessionDB: ) self._execute_write(_do) + def get_active_cron_sessions(self) -> dict[str, dict[str, str | float]]: + """Return active cron sessions keyed by job_id. + + Active cron sessions have IDs matching ``cron_{job_id}_{timestamp}`` + with a NULL ``ended_at``. Returns ``{job_id: {"session_id": str, + "started_at": float}}``. + """ + import logging as _logging + + result: dict[str, dict[str, str | float]] = {} + try: + with self._lock: + cursor = self._conn.execute( + "SELECT id, started_at FROM sessions " + "WHERE id LIKE 'cron_%' AND ended_at IS NULL" + ) + rows = cursor.fetchall() + except Exception as _exc: + _logging.getLogger(__name__).debug( + "Failed to query active cron sessions: %s", _exc + ) + return result + for row in rows: + sid: str = row["id"] + # Parse job_id from cron_{job_id}_{YYYYMMDD_HHMMSS} + parts = sid.split("_") + if len(parts) >= 3: + job_id = parts[1] + result[job_id] = {"session_id": sid, "started_at": row["started_at"]} + return result + def update_system_prompt(self, session_id: str, system_prompt: str) -> None: """Store the full assembled system prompt snapshot.""" def _do(conn): diff --git a/tests/gateway/test_api_server_jobs.py b/tests/gateway/test_api_server_jobs.py index 087bfc5b4..90ac5359b 100644 --- a/tests/gateway/test_api_server_jobs.py +++ b/tests/gateway/test_api_server_jobs.py @@ -131,6 +131,58 @@ class TestListJobs: assert resp.status == 200 mock_list.assert_called_once_with(include_disabled=False) + # ------------------------------------------------------------------- + # 3-5. test_list_jobs_is_running enrichment + # ------------------------------------------------------------------- + + @pytest.mark.asyncio + async def test_list_jobs_sets_is_running_false(self, adapter): + """Jobs not in active cron sessions get is_running=False.""" + app = _create_app(adapter) + mock_session_db = MagicMock() + mock_session_db.get_active_cron_sessions.return_value = {} + async with TestClient(TestServer(app)) as cli: + with patch( + f"{_MOD}._CRON_AVAILABLE", True + ), patch( + f"{_MOD}._cron_list", return_value=[SAMPLE_JOB] + ), patch.object( + adapter, "_ensure_session_db", return_value=mock_session_db + ): + resp = await cli.get("/api/jobs") + assert resp.status == 200 + data = await resp.json() + assert data["jobs"][0]["is_running"] is False + assert "current_session_id" not in data["jobs"][0] + assert "current_started_at" not in data["jobs"][0] + + @pytest.mark.asyncio + async def test_list_jobs_sets_is_running_true(self, adapter): + """Jobs with active cron sessions get is_running=True and metadata.""" + app = _create_app(adapter) + mock_session_db = MagicMock() + mock_session_db.get_active_cron_sessions.return_value = { + "aabbccddeeff": { + "session_id": "cron_aabbccddeeff_20260528_120000", + "started_at": 1748443200.0, + } + } + async with TestClient(TestServer(app)) as cli: + with patch( + f"{_MOD}._CRON_AVAILABLE", True + ), patch( + f"{_MOD}._cron_list", return_value=[SAMPLE_JOB] + ), patch.object( + adapter, "_ensure_session_db", return_value=mock_session_db + ): + resp = await cli.get("/api/jobs") + assert resp.status == 200 + data = await resp.json() + job = data["jobs"][0] + assert job["is_running"] is True + assert job["current_session_id"] == "cron_aabbccddeeff_20260528_120000" + assert job["current_started_at"] == 1748443200.0 + # --------------------------------------------------------------------------- # 3-7. test_create_job and validation diff --git a/tests/test_hermes_state.py b/tests/test_hermes_state.py index d08157621..0930d2a6e 100644 --- a/tests/test_hermes_state.py +++ b/tests/test_hermes_state.py @@ -137,6 +137,52 @@ class TestSessionLifecycle: assert child["parent_session_id"] == "parent" +# ========================================================================= +# Active cron sessions +# ========================================================================= + +class TestActiveCronSessions: + def test_no_cron_sessions(self, db): + """No cron sessions -> empty dict.""" + assert db.get_active_cron_sessions() == {} + + def test_active_cron_session_returned(self, db): + """An un-ended cron session shows up in the results.""" + db.create_session(session_id="cron_abc123def456_20260528_120000", source="cron") + active = db.get_active_cron_sessions() + assert "abc123def456" in active + assert active["abc123def456"]["session_id"] == "cron_abc123def456_20260528_120000" + assert isinstance(active["abc123def456"]["started_at"], float) + + def test_ended_cron_session_not_included(self, db): + """Sessions with ended_at set are excluded.""" + sid = "cron_abc123def456_20260528_120000" + db.create_session(session_id=sid, source="cron") + db.end_session(sid, "cron_complete") + assert db.get_active_cron_sessions() == {} + + def test_non_cron_sessions_ignored(self, db): + """Non-cron sessions (no cron_ prefix) are not included.""" + db.create_session(session_id="cli_abc123_20260528", source="cli") + assert db.get_active_cron_sessions() == {} + + def test_multiple_active_cron_sessions(self, db): + """Multiple active cron jobs each appear keyed by job_id.""" + db.create_session(session_id="cron_job1_20260528_120000", source="cron") + db.create_session(session_id="cron_job2_20260528_120100", source="cron") + active = db.get_active_cron_sessions() + assert set(active.keys()) == {"job1", "job2"} + + def test_mixed_active_and_completed_cron_sessions(self, db): + """Only un-ended cron sessions are returned.""" + db.create_session(session_id="cron_job1_20260528_120000", source="cron") + sid2 = "cron_job2_20260528_120100" + db.create_session(session_id=sid2, source="cron") + db.end_session(sid2, "cron_complete") + active = db.get_active_cron_sessions() + assert set(active.keys()) == {"job1"} + + # ========================================================================= # Message storage # =========================================================================