fix(skills): require review forks to read before writing skills

This commit is contained in:
kyssta-exe 2026-06-30 12:52:41 +00:00 committed by Teknium
parent e55e9fad2c
commit 20871c1d94
4 changed files with 201 additions and 2 deletions

View file

@ -746,6 +746,13 @@ def _run_review_in_thread(
"{tool_name}. Only memory/skill tools are allowed."
),
)
try:
from tools.skill_manager_tool import _reset_background_review_read_marks
_reset_background_review_read_marks()
except Exception:
pass
try:
# Routed to a different model -> replay a digest (cache is cold
# on that model anyway, so minimise cold-written tokens). Same

View file

@ -957,6 +957,9 @@ class TestExternalSkillMutations:
_create_skill("my-skill", VALID_SKILL_CONTENT)
token = set_current_write_origin(BACKGROUND_REVIEW)
try:
from tools.skill_manager_tool import mark_background_review_skill_read
mark_background_review_skill_read(tmp_path / "my-skill" / "SKILL.md")
with patch(
"tools.skill_usage.get_record",
side_effect=lambda n: {"pinned": False},
@ -1173,6 +1176,7 @@ def _curator_pass(tmp_path, *, monkeypatch):
skills_root.mkdir(parents=True, exist_ok=True)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
with patch("tools.skill_manager_tool.SKILLS_DIR", skills_root), \
patch("tools.skills_tool.SKILLS_DIR", skills_root), \
patch("agent.skill_utils.get_all_skills_dirs", return_value=[skills_root]), \
patch("tools.skill_provenance.is_background_review", return_value=True):
yield skills_root
@ -1281,3 +1285,66 @@ class TestCuratorConsolidationDeleteGuard:
rec = skill_usage.get_record("narrow")
# Record kept (not forgotten) and marked archived.
assert rec.get("state") == skill_usage.STATE_ARCHIVED
def test_background_review_patch_requires_skill_view_first(self, tmp_path, monkeypatch):
from tools.skills_tool import skill_view
from tools.skill_manager_tool import _reset_background_review_read_marks
_reset_background_review_read_marks()
with _curator_pass(tmp_path, monkeypatch=monkeypatch):
_create_skill("reviewed", _skill_content("reviewed"))
blocked = json.loads(skill_manage(
action="patch",
name="reviewed",
old_string="Step 1: Do the thing.",
new_string="Step 1: Do the thing safely.",
))
assert blocked["success"] is False
assert blocked.get("_read_before_write_required") is True
viewed = json.loads(skill_view("reviewed"))
assert viewed["success"] is True
allowed = json.loads(skill_manage(
action="patch",
name="reviewed",
old_string="Step 1: Do the thing.",
new_string="Step 1: Do the thing safely.",
))
assert allowed["success"] is True, allowed
_reset_background_review_read_marks()
def test_background_review_support_file_overwrite_requires_that_file_read(self, tmp_path, monkeypatch):
from tools.skills_tool import skill_view
from tools.skill_manager_tool import _reset_background_review_read_marks
_reset_background_review_read_marks()
with _curator_pass(tmp_path, monkeypatch=monkeypatch):
_create_skill("reviewed", _skill_content("reviewed"))
ref = tmp_path / ".hermes" / "skills" / "reviewed" / "references"
ref.mkdir()
(ref / "workflow.md").write_text("old workflow\n", encoding="utf-8")
# Reading SKILL.md does not authorize overwriting a linked file.
assert json.loads(skill_view("reviewed"))["success"] is True
blocked = json.loads(skill_manage(
action="write_file",
name="reviewed",
file_path="references/workflow.md",
file_content="new workflow\n",
))
assert blocked["success"] is False
assert blocked.get("_read_before_write_required") is True
assert json.loads(skill_view("reviewed", "references/workflow.md"))["success"] is True
allowed = json.loads(skill_manage(
action="write_file",
name="reviewed",
file_path="references/workflow.md",
file_content="new workflow\n",
))
assert allowed["success"] is True, allowed
_reset_background_review_read_marks()

View file

@ -38,15 +38,58 @@ import os
import re
import shutil
import tempfile
import contextvars as _ctxvars
from pathlib import Path
from hermes_constants import get_hermes_home, display_hermes_home
from typing import Dict, Any, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple
from hermes_constants import get_hermes_home, display_hermes_home
from utils import atomic_replace, is_truthy_value
from hermes_cli.config import cfg_get
logger = logging.getLogger(__name__)
_background_review_read_paths: "_ctxvars.ContextVar[frozenset[str]]" = _ctxvars.ContextVar(
"background_review_read_paths", default=frozenset()
)
def mark_background_review_skill_read(path: Path) -> None:
"""Record that the active background-review fork has read a skill file.
The autonomous review fork is allowed to evolve skills, but it must not
patch or rewrite content it has only inferred from the transcript. The
skill_view tool calls this after returning file content to the model; write
paths below require the corresponding target path to be present when the
current origin is ``background_review``.
"""
try:
from tools.skill_provenance import is_background_review
if not is_background_review():
return
except Exception:
return
try:
resolved = str(path.resolve())
except Exception:
resolved = str(path)
current = set(_background_review_read_paths.get())
current.add(resolved)
_background_review_read_paths.set(frozenset(current))
def _background_review_has_read(path: Path) -> bool:
try:
resolved = str(path.resolve())
except Exception:
resolved = str(path)
return resolved in _background_review_read_paths.get()
def _reset_background_review_read_marks() -> None:
"""Test helper: clear read-before-write marks for the current context."""
_background_review_read_paths.set(frozenset())
# Import security scanner — external hub installs always get scanned;
# agent-created skills only get scanned when skills.guard_agent_created is on.
try:
@ -320,6 +363,36 @@ def _background_review_write_guard(
return None
def _background_review_read_before_write_guard(
name: str,
target: Path,
action: str,
file_label: str,
) -> Optional[Dict[str, Any]]:
"""Require review forks to load the exact target before mutating it."""
try:
from tools.skill_provenance import is_background_review
if not is_background_review():
return None
except Exception:
return None
if _background_review_has_read(target):
return None
return {
"success": False,
"error": (
f"Refusing background curator {action} for skill '{name}': "
f"the current {file_label} content has not been loaded in this "
"review turn. Call skill_view(name) for SKILL.md, or "
"skill_view(name, file_path=...) for a supporting file, then "
"retry the write using the content just returned."
),
"_read_before_write_required": True,
}
def _background_review_preflight(action: str, name: str) -> Optional[Dict[str, Any]]:
if action not in {"edit", "patch", "delete", "write_file", "remove_file"}:
return None
@ -786,6 +859,12 @@ def _edit_skill(name: str, content: str) -> Dict[str, Any]:
return guard
skill_md = existing["path"] / "SKILL.md"
read_guard = _background_review_read_before_write_guard(
name, skill_md, "edit", "SKILL.md"
)
if read_guard:
return read_guard
# Back up original content for rollback
original_content = skill_md.read_text(encoding="utf-8") if skill_md.exists() else None
_atomic_write_text(skill_md, content)
@ -849,6 +928,7 @@ def _patch_skill(
target, err = _resolve_skill_target(skill_dir, file_path)
if err:
return {"success": False, "error": err}
assert target is not None
else:
# Patching SKILL.md
target = skill_dir / "SKILL.md"
@ -856,6 +936,15 @@ def _patch_skill(
if not target.exists():
return {"success": False, "error": f"File not found: {target.relative_to(skill_dir)}"}
read_guard = _background_review_read_before_write_guard(
name,
target,
"patch",
"SKILL.md" if not file_path else file_path,
)
if read_guard:
return read_guard
content = target.read_text(encoding="utf-8")
# Use the same fuzzy matching engine as the file patch tool.
@ -1057,6 +1146,13 @@ def _write_file(name: str, file_path: str, file_content: str) -> Dict[str, Any]:
target, err = _resolve_skill_target(existing["path"], file_path)
if err:
return {"success": False, "error": err}
assert target is not None
if target.exists():
read_guard = _background_review_read_before_write_guard(
name, target, "write_file", file_path
)
if read_guard:
return read_guard
target.parent.mkdir(parents=True, exist_ok=True)
# Back up for rollback
original_content = target.read_text(encoding="utf-8") if target.exists() else None
@ -1096,6 +1192,7 @@ def _remove_file(name: str, file_path: str) -> Dict[str, Any]:
target, err = _resolve_skill_target(skill_dir, file_path)
if err:
return {"success": False, "error": err}
assert target is not None
if not target.exists():
# List what's actually there for the model to see
available = []
@ -1111,6 +1208,12 @@ def _remove_file(name: str, file_path: str) -> Dict[str, Any]:
"available_files": available if available else None,
}
read_guard = _background_review_read_before_write_guard(
name, target, "remove_file", file_path
)
if read_guard:
return read_guard
target.unlink()
# Clean up empty subdirectories

View file

@ -1281,6 +1281,17 @@ def skill_view(
ensure_ascii=False,
)
try:
from tools.skill_manager_tool import mark_background_review_skill_read
mark_background_review_skill_read(target_file)
except Exception:
logger.debug(
"Could not record background-review skill read for %s",
target_file,
exc_info=True,
)
return json.dumps(
{
"success": True,
@ -1484,6 +1495,17 @@ def skill_view(
if capture_result["gateway_setup_hint"]:
result["gateway_setup_hint"] = capture_result["gateway_setup_hint"]
try:
from tools.skill_manager_tool import mark_background_review_skill_read
mark_background_review_skill_read(skill_md)
except Exception:
logger.debug(
"Could not record background-review skill read for %s",
skill_md,
exc_info=True,
)
if setup_needed:
missing_items = [
f"env ${env_name}" for env_name in remaining_missing_required_envs