fix(matrix): block unsafe image redirects per-hop

Matrix outbound image downloads validated only the final URL after
following redirects, so a public URL that 302-redirects to loopback /
private-network / cloud-metadata endpoints had already connected to the
unsafe hop before the check ran.

Re-validate every redirect hop before following it:
- aiohttp path resolves redirects manually with allow_redirects=False,
  validating each Location via is_safe_url (aiohttp can't use the httpx
  response event hook).
- httpx fallback installs the shared _ssrf_redirect_guard event hook.

Regression tests cover per-hop blocking of an unsafe redirect, following
a safe redirect chain, and httpx guard wiring.
This commit is contained in:
heathley 2026-07-01 02:22:29 -07:00 committed by Teknium
parent 868fa9566a
commit a8a97c358f
2 changed files with 185 additions and 36 deletions

View file

@ -57,7 +57,7 @@ import mimetypes
import os
import re
import time
from urllib.parse import urlsplit, urlunsplit
from urllib.parse import urljoin, urlsplit, urlunsplit
from dataclasses import dataclass, field
from html import escape as _html_escape
@ -128,6 +128,7 @@ from gateway.platforms.base import (
SendResult,
resolve_proxy_url,
proxy_kwargs_for_aiohttp,
_ssrf_redirect_guard,
)
from gateway.platforms.helpers import ThreadParticipationTracker
@ -1766,36 +1767,57 @@ class MatrixAdapter(BasePlatformAdapter):
fname = url.rsplit("/", 1)[-1].split("?")[0] or "image.png"
def _safe_redirect_target(current_url: str, location: str) -> str:
"""Resolve a redirect Location and re-validate it against SSRF policy.
A public-looking URL can 302-redirect the gateway toward loopback,
private-network, or cloud-metadata endpoints. Validating only the
final URL is insufficient because the connection to the unsafe hop
has already been made. Re-check every hop before following it.
"""
next_url = urljoin(current_url, location)
if not is_safe_url(next_url):
raise ValueError("blocked unsafe redirect URL")
return next_url
try:
import aiohttp as _aiohttp
_sess_kw, _req_kw = proxy_kwargs_for_aiohttp(self._proxy_url)
async with _aiohttp.ClientSession(**_sess_kw) as http:
async with http.get(
url,
timeout=_aiohttp.ClientTimeout(total=30),
allow_redirects=True,
**_req_kw,
) as resp:
resp.raise_for_status()
if not is_safe_url(str(resp.url)):
raise ValueError("blocked unsafe redirect URL")
_check_content_length(resp.headers)
parts: list[bytes] = []
total = 0
async for chunk in resp.content.iter_chunked(65536):
total = _append_chunk(parts, total, bytes(chunk))
ct = _check_image_content_type(
getattr(resp, "content_type", None)
or resp.headers.get("content-type", "application/octet-stream")
)
return b"".join(parts), ct, fname
fetch_url = url
for _ in range(20):
async with http.get(
fetch_url,
timeout=_aiohttp.ClientTimeout(total=30),
allow_redirects=False,
**_req_kw,
) as resp:
if resp.status in {301, 302, 303, 307, 308}:
location = resp.headers.get("Location")
if not location:
raise ValueError("redirect missing Location")
fetch_url = _safe_redirect_target(fetch_url, location)
continue
resp.raise_for_status()
_check_content_length(resp.headers)
parts: list[bytes] = []
total = 0
async for chunk in resp.content.iter_chunked(65536):
total = _append_chunk(parts, total, bytes(chunk))
ct = _check_image_content_type(
getattr(resp, "content_type", None)
or resp.headers.get("content-type", "application/octet-stream")
)
return b"".join(parts), ct, fname
raise ValueError("too many redirects")
except ImportError:
import httpx
_httpx_kw: dict = {}
if self._proxy_url:
_httpx_kw["proxy"] = self._proxy_url
_httpx_kw["event_hooks"] = {"response": [_ssrf_redirect_guard]}
async with httpx.AsyncClient(**_httpx_kw) as http:
async with http.stream(
"GET",
@ -1804,8 +1826,6 @@ class MatrixAdapter(BasePlatformAdapter):
timeout=30,
) as resp:
resp.raise_for_status()
if not is_safe_url(str(resp.url)):
raise ValueError("blocked unsafe redirect URL")
_check_content_length(resp.headers)
parts: list[bytes] = []
total = 0

View file

@ -3385,6 +3385,7 @@ class TestMatrixImageOnlyMediaNormalization:
class _Response:
url = "https://example.com/image.png"
status = 200
headers = {"Content-Length": "11"}
content_type = "image/png"
content = _Content()
@ -3434,6 +3435,7 @@ class TestMatrixImageOnlyMediaNormalization:
class _Response:
url = "https://example.com/image.png"
status = 200
headers = {}
content_type = "image/png"
content = _Content()
@ -3472,15 +3474,82 @@ class TestMatrixImageOnlyMediaNormalization:
@pytest.mark.asyncio
async def test_external_media_download_rejects_unsafe_redirect(self, monkeypatch):
"""A 302 to a private/loopback target must be blocked per-hop, before
the redirect is followed (not only re-checked on the final URL)."""
import aiohttp
import tools.url_safety as url_safety
class _RedirectResponse:
status = 302
headers = {"Location": "http://127.0.0.1/private.png"}
content_type = "image/png"
async def __aenter__(self):
return self
async def __aexit__(self, *_args):
return None
def raise_for_status(self):
return None
class _Session:
def __init__(self):
self.requested = []
async def __aenter__(self):
return self
async def __aexit__(self, *_args):
return None
def get(self, url, *_args, **_kwargs):
self.requested.append(url)
return _RedirectResponse()
session = _Session()
monkeypatch.setattr(aiohttp, "ClientSession", lambda **_kwargs: session)
monkeypatch.setattr(
url_safety,
"is_safe_url",
lambda candidate, **_kwargs: str(candidate) == "https://example.com/image.png",
)
with pytest.raises(ValueError, match="unsafe redirect"):
await self.adapter._download_external_media_with_cap(
"https://example.com/image.png"
)
# Only the initial public URL was fetched — the loopback hop was never
# followed because it was rejected before the next GET.
assert session.requested == ["https://example.com/image.png"]
@pytest.mark.asyncio
async def test_external_media_download_follows_safe_redirect(self, monkeypatch):
"""A redirect to another allowed URL is followed and its body returned."""
import aiohttp
import tools.url_safety as url_safety
class _Content:
async def iter_chunked(self, _size):
yield b"ok"
yield b"imgbytes"
class _Response:
url = "http://127.0.0.1/private.png"
class _RedirectResponse:
status = 302
headers = {"Location": "https://cdn.example.com/final.png"}
content_type = "image/png"
async def __aenter__(self):
return self
async def __aexit__(self, *_args):
return None
def raise_for_status(self):
return None
class _OkResponse:
status = 200
headers = {}
content_type = "image/png"
content = _Content()
@ -3495,26 +3564,85 @@ class TestMatrixImageOnlyMediaNormalization:
return None
class _Session:
def __init__(self):
self.requested = []
async def __aenter__(self):
return self
async def __aexit__(self, *_args):
return None
def get(self, *_args, **_kwargs):
return _Response()
def get(self, url, *_args, **_kwargs):
self.requested.append(url)
return _RedirectResponse() if len(self.requested) == 1 else _OkResponse()
monkeypatch.setattr(aiohttp, "ClientSession", lambda **_kwargs: _Session())
monkeypatch.setattr(
url_safety,
"is_safe_url",
lambda candidate, **_kwargs: str(candidate) == "https://example.com/image.png",
session = _Session()
monkeypatch.setattr(aiohttp, "ClientSession", lambda **_kwargs: session)
monkeypatch.setattr(url_safety, "is_safe_url", lambda *_args, **_kwargs: True)
data, ct, _fname = await self.adapter._download_external_media_with_cap(
"https://example.com/image.png"
)
with pytest.raises(ValueError, match="unsafe redirect"):
await self.adapter._download_external_media_with_cap(
"https://example.com/image.png"
)
assert data == b"imgbytes"
assert ct == "image/png"
assert session.requested == [
"https://example.com/image.png",
"https://cdn.example.com/final.png",
]
@pytest.mark.asyncio
async def test_external_media_download_httpx_installs_redirect_guard(self, monkeypatch):
"""The httpx fallback re-checks redirect targets via the shared guard."""
import tools.url_safety as url_safety
from gateway.platforms.base import _ssrf_redirect_guard
clients = []
class _Content:
async def iter_chunked(self, _size):
yield b"ok"
class _Response:
headers = {"content-type": "image/png"}
async def __aenter__(self):
return self
async def __aexit__(self, *_args):
return None
def raise_for_status(self):
return None
async def aiter_bytes(self):
yield b"ok"
class _Client:
def __init__(self, **kwargs):
self.kwargs = kwargs
clients.append(self)
async def __aenter__(self):
return self
async def __aexit__(self, *_args):
return None
def stream(self, *_args, **_kwargs):
return _Response()
monkeypatch.setattr(url_safety, "is_safe_url", lambda *_args, **_kwargs: True)
with patch.dict(sys.modules, {"aiohttp": None}):
with patch("httpx.AsyncClient", _Client):
data, ct, _fname = await self.adapter._download_external_media_with_cap(
"https://example.com/image.png"
)
assert data == b"ok"
assert ct == "image/png"
assert clients[0].kwargs["event_hooks"]["response"] == [_ssrf_redirect_guard]
@pytest.mark.asyncio
async def test_external_media_download_rejects_unsafe_initial_url(self):
@ -3534,6 +3662,7 @@ class TestMatrixImageOnlyMediaNormalization:
class _Response:
url = "https://example.com/image.png"
status = 200
headers = {}
content_type = "text/html"
content = _Content()