fix(gateway): pairing store cannot bypass configured allowlist

A user who tapped Always on an approval button gets a pairing-store entry.
_is_user_authorized() checked the pairing store BEFORE the allowlist and
returned True unconditionally, so a paired-but-not-allowed user permanently
bypassed TELEGRAM_ALLOWED_USERS (or equivalent) even after being removed from
the allowlist (#23778).

Record pairing membership but only honor it in the no-allowlist branch. When
an allowlist IS configured, the paired user must appear in the canonical
allowed_ids set (the same set that resolves WhatsApp aliases, SimpleX names,
group allowlists, and the '*' wildcard), so pairing grants no extra access.

Cherry-picked/rebased from #47736 (#23805) by ygd58; membership check rewritten
to reuse the existing allowlist logic. Adds regression tests.
This commit is contained in:
ygd58 2026-07-01 04:28:27 -07:00 committed by Teknium
parent 03bbd37dd7
commit 50aaa426c1
2 changed files with 123 additions and 3 deletions

View file

@ -417,10 +417,17 @@ class GatewayAuthorizationMixin:
if getattr(source, "role_authorized", False) is True:
return True
# Check pairing store (always checked, regardless of allowlists)
# Pairing store membership. Recorded here but NOT returned yet: if an
# allowlist is configured, a previously-paired user must STILL be in
# that allowlist to be honored. Otherwise a user who tapped "Always" on
# an approval button could permanently bypass TELEGRAM_ALLOWED_USERS (or
# equivalent) via the pairing store even after being removed from the
# allowlist (issue #23778). When no allowlist is configured, pairing is
# the intended access path and is honored in the no-allowlist branch
# below; when an allowlist IS configured, the pairing entry only counts
# if the user is also in it (folded into the final membership check).
platform_name = source.platform.value if source.platform else ""
if self.pairing_store.is_approved(platform_name, user_id):
return True
is_paired = self.pairing_store.is_approved(platform_name, user_id)
# Check platform-specific and global allowlists
platform_allowlist = os.getenv(platform_env_map.get(source.platform, ""), "").strip()
@ -432,6 +439,11 @@ class GatewayAuthorizationMixin:
global_allowlist = os.getenv("GATEWAY_ALLOWED_USERS", "").strip()
if not platform_allowlist and not group_user_allowlist and not group_chat_allowlist and not global_allowlist:
# No env allowlist configured. A pairing-store entry is the intended
# access path here (the user completed the pairing handshake), so
# honor it — there is no allowlist to re-validate against.
if is_paired:
return True
# No env allowlist configured. Adapters that own their own
# config-driven access policy (dm_policy / group_policy /
# allow_from / group_allow_from) gate access at intake, so for those

View file

@ -0,0 +1,108 @@
"""Regression guard: pairing store must not bypass a configured allowlist (#23778).
A user who tapped "Always" on an approval button gets a pairing-store entry.
Before the fix, ``_is_user_authorized()`` returned True from the pairing store
BEFORE the allowlist was ever consulted, so a paired-but-not-allowed user
permanently bypassed ``TELEGRAM_ALLOWED_USERS`` (or equivalent) even after being
removed from the allowlist. The fix records pairing membership but only honors
it when no allowlist is configured; when an allowlist IS configured, the paired
user must still appear in it.
"""
from types import SimpleNamespace
import pytest
from gateway.session import Platform, SessionSource
@pytest.fixture(autouse=True)
def _isolate_env(monkeypatch):
for var in (
"TELEGRAM_ALLOWED_USERS",
"TELEGRAM_ALLOW_ALL_USERS",
"TELEGRAM_GROUP_ALLOWED_USERS",
"TELEGRAM_GROUP_ALLOWED_CHATS",
"GATEWAY_ALLOW_ALL_USERS",
"GATEWAY_ALLOWED_USERS",
):
monkeypatch.delenv(var, raising=False)
def _make_runner(*, paired: bool):
from gateway.run import GatewayRunner
runner = object.__new__(GatewayRunner)
runner.pairing_store = SimpleNamespace(is_approved=lambda *_a, **_kw: paired)
return runner
def _make_source(user_id: str = "attacker42", chat_type: str = "dm"):
return SessionSource(
platform=Platform.TELEGRAM,
chat_id="123",
chat_type=chat_type,
user_id=user_id,
user_name="SomeHuman",
is_bot=False,
)
def test_paired_user_denied_when_not_in_platform_allowlist(monkeypatch):
"""The core bypass: paired but absent from TELEGRAM_ALLOWED_USERS → denied."""
runner = _make_runner(paired=True)
monkeypatch.setenv("TELEGRAM_ALLOWED_USERS", "owner1,owner2")
assert runner._is_user_authorized(_make_source("attacker42")) is False
def test_paired_user_denied_when_not_in_global_allowlist(monkeypatch):
runner = _make_runner(paired=True)
monkeypatch.setenv("GATEWAY_ALLOWED_USERS", "owner1,owner2")
assert runner._is_user_authorized(_make_source("attacker42")) is False
def test_paired_user_allowed_when_still_in_platform_allowlist(monkeypatch):
"""A paired user who is also in the allowlist stays authorized."""
runner = _make_runner(paired=True)
monkeypatch.setenv("TELEGRAM_ALLOWED_USERS", "owner1,attacker42")
assert runner._is_user_authorized(_make_source("attacker42")) is True
def test_paired_user_allowed_when_still_in_global_allowlist(monkeypatch):
runner = _make_runner(paired=True)
monkeypatch.setenv("GATEWAY_ALLOWED_USERS", "owner1,attacker42")
assert runner._is_user_authorized(_make_source("attacker42")) is True
def test_paired_user_allowed_with_wildcard_allowlist(monkeypatch):
"""A "*" allowlist means everyone; a paired user is honored."""
runner = _make_runner(paired=True)
monkeypatch.setenv("TELEGRAM_ALLOWED_USERS", "*")
assert runner._is_user_authorized(_make_source("attacker42")) is True
def test_paired_user_allowed_when_no_allowlist_configured(monkeypatch):
"""No allowlist configured → pairing is the intended access path, honored."""
runner = _make_runner(paired=True)
assert runner._is_user_authorized(_make_source("attacker42")) is True
def test_unpaired_user_denied_when_no_allowlist_configured(monkeypatch):
"""No allowlist and not paired → default-deny (no fail-open)."""
runner = _make_runner(paired=False)
assert runner._is_user_authorized(_make_source("randouser")) is False
def test_unpaired_user_in_allowlist_still_allowed(monkeypatch):
"""Pairing changes did not regress the plain allowlist path."""
runner = _make_runner(paired=False)
monkeypatch.setenv("TELEGRAM_ALLOWED_USERS", "owner1")
assert runner._is_user_authorized(_make_source("owner1")) is True