feat(relay): Phase 5 Unit C — wake primitive (gateway side) (#51595)
Register a per-instance wakeUrl and forward it to the connector at self-provision so a suspended gateway can be poked awake when buffered work arrives (pairs with the connector-side WakePoker). - relay_wake_url() resolver (env GATEWAY_RELAY_WAKE_URL, then gateway.relay_wake_url in config.yaml), mirroring relay_instance_id() - thread wake_url through _post_provision (adds wakeUrl to the body only when set) + self_provision_relay (resolve, forward, log) - hermes gateway enroll --wake-url <url> persists GATEWAY_RELAY_WAKE_URL - document the §5.2 wake poke in relay-connector-contract.md §3.3 - tests: relay_wake_url resolution (env/config/absent), provision forwarding, body-only-when-set (6 new; 130 relay tests pass) The actual reconnect+drain on wake is Unit B's loop; this unit only wires the wake SIGNAL. Opt-in: absent wakeUrl => connector never pokes.
This commit is contained in:
parent
6ef679420e
commit
6e88f7b6f7
5 changed files with 191 additions and 1 deletions
|
|
@ -225,6 +225,40 @@ only flip/drain ITS OWN instance.
|
|||
> primitive is "when the gateway drains, relay flips to buffered + replays on
|
||||
> reconnect, with no loss/dup"; WHAT triggers the drain is out of scope.
|
||||
|
||||
### 3.3 Wake poke (§5.2)
|
||||
|
||||
The other half of the sleep/wake loop: how a SUSPENDED gateway finds out it has
|
||||
buffered work waiting. A PRIMITIVE — nothing here suspends a machine; it wires
|
||||
the wake SIGNAL so a future scale-to-zero behaviour layer can rely on "buffered
|
||||
⇒ wake poked."
|
||||
|
||||
- **Registration.** The gateway registers a **wake URL** at enroll/provision —
|
||||
any reachable URL the connector can GET to wake it (a Fly autostart hostname,
|
||||
a dashboard host). Self-hosted: `hermes gateway enroll --wake-url <url>` (or
|
||||
`GATEWAY_RELAY_WAKE_URL` / `gateway.relay_wake_url`). Managed/NAS: stamped into
|
||||
the container env beside `GATEWAY_RELAY_URL`. Forwarded in the
|
||||
`/relay/provision` body as `wakeUrl` and stored per-instance on the connector's
|
||||
secret record (gateway-asserted but safely scoped — same posture as
|
||||
`instanceId`; the org/tenant stays token-verified, so a gateway can only
|
||||
register a wake target for ITS OWN instance). DISTINCT from the retired
|
||||
`gatewayEndpoint`: a **poke target**, not a delivery target.
|
||||
- **The poke.** When a buffered-only (going-idle) destination receives its FIRST
|
||||
buffered event, the connector issues a **payload-free, unsigned GET** to that
|
||||
instance's registered `wakeUrl`, **directly** (NOT NAS-mediated — relay stays
|
||||
NAS-independent). It carries no tenant data and no inbound: it only says "you
|
||||
have buffered work, reconnect." Tenant authority is re-established the normal
|
||||
way when the gateway re-dials (the authenticated WS upgrade), so a leaked/
|
||||
guessed wake URL can at worst cause a spurious reconnect of ITS OWN instance.
|
||||
Rate-limited per instance (one poke per cooldown window, not per event), and
|
||||
best-effort — a failed poke is swallowed; the gateway still drains whenever it
|
||||
next reconnects on its own. No new frame: the wake is an out-of-band HTTP GET,
|
||||
not a relay-WS message (the socket is down — that's the whole point).
|
||||
|
||||
> NOT in scope (deferred behaviour): the actual machine suspend (Fly
|
||||
> `autostop:"suspend"`) and the autonomous idle timer that decides to sleep. The
|
||||
> primitive is "buffered event for a sleeping instance ⇒ its wakeUrl gets poked";
|
||||
> WHAT makes the instance sleep (and wake-to-serve) is the behaviour layer.
|
||||
|
||||
---
|
||||
|
||||
## 4. Outbound: action set
|
||||
|
|
|
|||
|
|
@ -158,6 +158,37 @@ def relay_instance_id() -> Optional[str]:
|
|||
return value or None
|
||||
|
||||
|
||||
def relay_wake_url() -> Optional[str]:
|
||||
"""The gateway's WAKE URL, forwarded at provision (Phase 5 §5.2 wake PRIMITIVE).
|
||||
|
||||
A poke target the connector issues a payload-free GET to when a buffered-only
|
||||
(going-idle) destination for this instance receives its first buffered event,
|
||||
so a suspended gateway wakes, reconnects its relay WS, and drains its
|
||||
delivery-leg backlog. The value's *source* differs by deployment but the code
|
||||
path is uniform: a managed/NAS container has ``GATEWAY_RELAY_WAKE_URL`` stamped
|
||||
in (NAS knows the Fly autostart / dashboard hostname); a self-hosted operator
|
||||
sets it explicitly (or passes ``--wake-url`` to ``hermes gateway enroll``).
|
||||
|
||||
Gateway-asserted but safely scoped: the org/tenant stays token-verified, so a
|
||||
dishonest gateway can only register a wake target for ITS OWN instance — the
|
||||
same posture as ``relay_instance_id()`` / the retired ``relay_endpoint()``.
|
||||
Absent -> the connector stores null and simply can't wake this instance
|
||||
(buffering still works; the gateway drains whenever it next reconnects).
|
||||
|
||||
Env first (Docker/NAS), then ``gateway.relay_wake_url`` in config.yaml.
|
||||
"""
|
||||
value = os.environ.get("GATEWAY_RELAY_WAKE_URL", "").strip()
|
||||
if not value:
|
||||
try:
|
||||
from gateway.run import _load_gateway_config # late import to avoid cycle
|
||||
|
||||
cfg = (_load_gateway_config().get("gateway") or {})
|
||||
value = str(cfg.get("relay_wake_url", "") or "").strip()
|
||||
except Exception: # noqa: BLE001 - config absence/parse must never crash boot
|
||||
value = ""
|
||||
return value.rstrip("/") or None
|
||||
|
||||
|
||||
def _provision_url(relay_dial_url: str) -> str:
|
||||
"""Map the ``ws(s)://…/relay`` dial URL to the ``http(s)://…/relay/provision`` POST URL."""
|
||||
raw = relay_dial_url.rstrip("/")
|
||||
|
|
@ -274,6 +305,7 @@ def _post_provision(
|
|||
gateway_endpoint: Optional[str],
|
||||
route_keys: list[str],
|
||||
instance_id: Optional[str] = None,
|
||||
wake_url: Optional[str] = None,
|
||||
timeout: float = 15.0,
|
||||
) -> dict:
|
||||
"""POST to the connector's ``/relay/provision`` and return the JSON body.
|
||||
|
|
@ -299,6 +331,10 @@ def _post_provision(
|
|||
# connector store null (back-compat) rather than binding an empty string.
|
||||
if instance_id:
|
||||
body["instanceId"] = instance_id
|
||||
# Same for the wake URL (Phase 5 §5.2): omit when absent so the connector
|
||||
# stores null and simply can't wake this instance (buffering still works).
|
||||
if wake_url:
|
||||
body["wakeUrl"] = wake_url
|
||||
data = json.dumps(body).encode("utf-8")
|
||||
req = urllib.request.Request(
|
||||
provision_url,
|
||||
|
|
@ -404,6 +440,7 @@ def self_provision_relay() -> bool:
|
|||
endpoint = relay_endpoint()
|
||||
route_keys = relay_route_keys()
|
||||
instance_id = relay_instance_id()
|
||||
wake_url = relay_wake_url()
|
||||
|
||||
try:
|
||||
result = _post_provision(
|
||||
|
|
@ -415,6 +452,7 @@ def self_provision_relay() -> bool:
|
|||
gateway_endpoint=endpoint,
|
||||
route_keys=route_keys,
|
||||
instance_id=instance_id,
|
||||
wake_url=wake_url,
|
||||
)
|
||||
except RuntimeError as exc:
|
||||
logger.warning("relay self-provision failed (%s); gateway will boot without relay auth", exc)
|
||||
|
|
@ -430,12 +468,13 @@ def self_provision_relay() -> bool:
|
|||
os.environ["GATEWAY_RELAY_DELIVERY_KEY"] = str(result.get("deliveryKey") or "")
|
||||
tenant = str(result.get("tenant") or "")
|
||||
logger.info(
|
||||
"relay self-provisioned (gateway_id=%s tenant=%s routes=%d inbound=%s instance=%s)",
|
||||
"relay self-provisioned (gateway_id=%s tenant=%s routes=%d inbound=%s instance=%s wake=%s)",
|
||||
os.environ["GATEWAY_RELAY_ID"],
|
||||
tenant or "?",
|
||||
len(route_keys),
|
||||
"yes" if endpoint else "outbound-only",
|
||||
instance_id or "unbound",
|
||||
"yes" if wake_url else "none",
|
||||
)
|
||||
return True
|
||||
|
||||
|
|
|
|||
|
|
@ -223,6 +223,14 @@ def cmd_gateway_enroll(args) -> None:
|
|||
if explicit_url:
|
||||
to_write["GATEWAY_RELAY_URL"] = explicit_url.rstrip("/")
|
||||
|
||||
# Phase 5 §5.2: persist the wake URL so self_provision_relay forwards it to
|
||||
# the connector (which pokes it to wake this gateway when buffered work
|
||||
# arrives while it's idle). Optional — omitted ⇒ the connector can't wake it,
|
||||
# but the gateway still drains on its next reconnect.
|
||||
explicit_wake_url = (getattr(args, "wake_url", None) or "").strip()
|
||||
if explicit_wake_url:
|
||||
to_write["GATEWAY_RELAY_WAKE_URL"] = explicit_wake_url.rstrip("/")
|
||||
|
||||
for key, value in to_write.items():
|
||||
if not value:
|
||||
continue
|
||||
|
|
@ -242,6 +250,8 @@ def cmd_gateway_enroll(args) -> None:
|
|||
print(" GATEWAY_RELAY_DELIVERY_KEY=<hidden>")
|
||||
if explicit_url:
|
||||
print(f" GATEWAY_RELAY_URL={explicit_url.rstrip('/')}")
|
||||
if explicit_wake_url:
|
||||
print(f" GATEWAY_RELAY_WAKE_URL={explicit_wake_url.rstrip('/')}")
|
||||
print()
|
||||
print(
|
||||
" The gateway now authenticates its relay WS upgrade with the per-gateway\n"
|
||||
|
|
|
|||
|
|
@ -282,6 +282,19 @@ def build_gateway_parser(
|
|||
"Defaults to gw-<hostname>."
|
||||
),
|
||||
)
|
||||
gateway_enroll.add_argument(
|
||||
"--wake-url",
|
||||
dest="wake_url",
|
||||
default=None,
|
||||
help=(
|
||||
"Phase 5 §5.2 wake URL: a reachable URL the connector pokes "
|
||||
"(payload-free GET) to wake this gateway when buffered work arrives "
|
||||
"while it's idle/suspended, so it reconnects and drains. Persisted as "
|
||||
"GATEWAY_RELAY_WAKE_URL in ~/.hermes/.env and forwarded at provision. "
|
||||
"Optional — without it the gateway still drains whenever it next "
|
||||
"reconnects on its own."
|
||||
),
|
||||
)
|
||||
gateway_enroll.set_defaults(func=cmd_gateway_enroll)
|
||||
|
||||
# =========================================================================
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ def _clean_env(monkeypatch):
|
|||
"GATEWAY_RELAY_PLATFORM",
|
||||
"GATEWAY_RELAY_BOT_ID",
|
||||
"GATEWAY_RELAY_INSTANCE_ID",
|
||||
"GATEWAY_RELAY_WAKE_URL",
|
||||
):
|
||||
monkeypatch.delenv(k, raising=False)
|
||||
# Never read config.yaml off disk in these tests.
|
||||
|
|
@ -255,6 +256,99 @@ def test_post_provision_body_includes_instanceId_only_when_set(monkeypatch):
|
|||
assert "instanceId" not in sent["body"]
|
||||
|
||||
|
||||
# ─────────────────── wake-url forwarding (Phase 5 Unit C) ───────────────────
|
||||
|
||||
def test_relay_wake_url_from_env(monkeypatch):
|
||||
monkeypatch.setenv("GATEWAY_RELAY_WAKE_URL", " https://wake.example/poke ")
|
||||
assert relay.relay_wake_url() == "https://wake.example/poke"
|
||||
|
||||
|
||||
def test_relay_wake_url_absent_is_none():
|
||||
assert relay.relay_wake_url() is None
|
||||
|
||||
|
||||
def test_relay_wake_url_from_config(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
"gateway.run._load_gateway_config",
|
||||
lambda: {"gateway": {"relay_wake_url": "https://wake.from-config/poke"}},
|
||||
raising=False,
|
||||
)
|
||||
assert relay.relay_wake_url() == "https://wake.from-config/poke"
|
||||
|
||||
|
||||
def test_forwards_wake_url_to_provision(monkeypatch):
|
||||
"""A suspendable agent stamped with GATEWAY_RELAY_WAKE_URL forwards it to the
|
||||
connector so the connector can poke it awake when the first buffered event
|
||||
lands on a flipped destination (Unit C wake primitive)."""
|
||||
_arm(monkeypatch)
|
||||
monkeypatch.setenv("GATEWAY_RELAY_WAKE_URL", "https://wake.example/poke")
|
||||
captured: dict = {}
|
||||
monkeypatch.setattr(relay, "_post_provision", _stub_post(captured))
|
||||
|
||||
assert relay.self_provision_relay() is True
|
||||
assert captured["wake_url"] == "https://wake.example/poke"
|
||||
|
||||
|
||||
def test_wake_url_absent_forwards_none(monkeypatch):
|
||||
"""No stamp (self-hosted / non-suspendable) -> wake_url None; the connector
|
||||
stores null and simply never pokes (it can't wake what it can't reach)."""
|
||||
_arm(monkeypatch)
|
||||
captured: dict = {}
|
||||
monkeypatch.setattr(relay, "_post_provision", _stub_post(captured))
|
||||
|
||||
assert relay.self_provision_relay() is True
|
||||
assert captured["wake_url"] is None
|
||||
|
||||
|
||||
def test_post_provision_body_includes_wakeUrl_only_when_set(monkeypatch):
|
||||
"""The real _post_provision adds `wakeUrl` to the JSON body ONLY when a value
|
||||
is supplied — omitting it lets the connector store null (back-compat)."""
|
||||
import json
|
||||
|
||||
sent: dict = {}
|
||||
|
||||
class _Resp:
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *a):
|
||||
return False
|
||||
|
||||
def read(self):
|
||||
return json.dumps({"secret": "a" * 64, "deliveryKey": "b" * 64, "tenant": "t", "gatewayId": "gw-1"}).encode()
|
||||
|
||||
def _fake_urlopen(req, timeout=None): # noqa: ANN001
|
||||
sent["body"] = json.loads(req.data.decode())
|
||||
return _Resp()
|
||||
|
||||
monkeypatch.setattr("urllib.request.urlopen", _fake_urlopen)
|
||||
|
||||
# With a wake url -> present in the body.
|
||||
relay._post_provision(
|
||||
provision_url="https://c.example/relay/provision",
|
||||
access_token="tok",
|
||||
gateway_id="gw-1",
|
||||
platform="discord",
|
||||
bot_id="app",
|
||||
gateway_endpoint=None,
|
||||
route_keys=[],
|
||||
wake_url="https://wake.example/poke",
|
||||
)
|
||||
assert sent["body"]["wakeUrl"] == "https://wake.example/poke"
|
||||
|
||||
# Without one -> the key is absent entirely (not "").
|
||||
relay._post_provision(
|
||||
provision_url="https://c.example/relay/provision",
|
||||
access_token="tok",
|
||||
gateway_id="gw-1",
|
||||
platform="discord",
|
||||
bot_id="app",
|
||||
gateway_endpoint=None,
|
||||
route_keys=[],
|
||||
)
|
||||
assert "wakeUrl" not in sent["body"]
|
||||
|
||||
|
||||
# ─────────────────────────── fail-soft ───────────────────────────
|
||||
|
||||
def test_no_nas_token_is_non_fatal(monkeypatch):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue