fix(bedrock): fall back to non-streaming InvokeModel when IAM denies InvokeModelWithResponseStream (#44293)

IAM policies scoped to bedrock:InvokeModel only (a common least-privilege
setup) reject converse_stream() with AccessDeniedException. The agent loop
hard-prefers streaming and the denial never matched the 'stream not
supported' auto-fallback, so InvokeModel-only users looped on AccessDenied
forever.

- agent/bedrock_adapter.py: new is_streaming_access_denied_error()
  detector (ClientError code check + wrapped-SDK message match);
  call_converse_stream() falls back to converse() on denial.
- agent/chat_completion_helpers.py: bedrock_converse streaming branch
  retries inline via converse() and sets _disable_streaming so later
  turns skip the doomed stream attempt; the chat-completions retry
  block also recognizes the denial for the AnthropicBedrock SDK path
  (message pre-check avoids importing bedrock_adapter — and its lazy
  boto3 install — for unrelated providers).

Both paths print a one-line notice telling the user which IAM action
restores streaming.
This commit is contained in:
Teknium 2026-06-11 07:15:30 -07:00 committed by GitHub
parent b1af653bf6
commit e24c935cf3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 304 additions and 1 deletions

View file

@ -208,6 +208,41 @@ def is_stale_connection_error(exc: BaseException) -> bool:
return False
def is_streaming_access_denied_error(exc: BaseException) -> bool:
"""Return True when AWS denied the ``bedrock:InvokeModelWithResponseStream`` action.
IAM policies scoped to ``bedrock:InvokeModel`` only (a common least-privilege
setup) reject ``converse_stream()`` with an ``AccessDeniedException`` whose
message names the streaming action, e.g.::
User: arn:aws:iam::123456789012:user/x is not authorized to perform:
bedrock:InvokeModelWithResponseStream on resource: ...
This is permanent for the session retrying the stream can never succeed
so callers should flip to the non-streaming ``converse()`` path (which maps
to ``bedrock:InvokeModel``) instead of burning retries.
Detection is deliberately message-based: boto3 surfaces this as a
``ClientError`` with ``Error.Code == "AccessDeniedException"``, and the
AnthropicBedrock SDK wraps the same AWS response in its own exception
types, but both preserve the action name in the message.
"""
msg = str(exc).lower()
if "invokemodelwithresponsestream" not in msg:
return False
# ClientError with an explicit access-denied code is the canonical form.
try:
from botocore.exceptions import ClientError
except ImportError: # pragma: no cover — botocore always present with boto3
ClientError = None # type: ignore[assignment]
if ClientError is not None and isinstance(exc, ClientError):
code = (getattr(exc, "response", None) or {}).get("Error", {}).get("Code", "")
return code in ("AccessDeniedException", "UnauthorizedException")
# Wrapped forms (e.g. AnthropicBedrock SDK PermissionDeniedError) — match
# on the authorization-failure phrasing AWS uses.
return "not authorized" in msg or "accessdenied" in msg
# ---------------------------------------------------------------------------
# AWS credential detection
# ---------------------------------------------------------------------------
@ -1003,6 +1038,16 @@ def call_converse_stream(
try:
response = client.converse_stream(**kwargs)
except Exception as exc:
if is_streaming_access_denied_error(exc):
# IAM allows bedrock:InvokeModel but not
# InvokeModelWithResponseStream — permanent for this session.
# Fall back to the non-streaming converse() path.
logger.info(
"bedrock: converse_stream denied by IAM on (region=%s, model=%s) — "
"falling back to non-streaming converse().",
region, model,
)
return normalize_converse_response(client.converse(**kwargs))
if is_stale_connection_error(exc):
logger.warning(
"bedrock: stale-connection error on converse_stream(region=%s, "

View file

@ -1615,6 +1615,8 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
_get_bedrock_runtime_client,
invalidate_runtime_client,
is_stale_connection_error,
is_streaming_access_denied_error,
normalize_converse_response,
stream_converse_with_callbacks,
)
region = api_kwargs.pop("__bedrock_region__", "us-east-1")
@ -1623,6 +1625,29 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
try:
raw_response = client.converse_stream(**api_kwargs)
except Exception as _bedrock_exc:
# IAM policies scoped to bedrock:InvokeModel only (no
# InvokeModelWithResponseStream) reject converse_stream()
# with AccessDeniedException. That denial is permanent for
# the session — fall back to the non-streaming converse()
# inline (it maps to bedrock:InvokeModel) and disable
# streaming for subsequent calls so we don't re-fail every
# turn.
if is_streaming_access_denied_error(_bedrock_exc):
agent._disable_streaming = True
agent._safe_print(
"\n⚠ AWS IAM denied bedrock:InvokeModelWithResponseStream — "
"falling back to non-streaming InvokeModel.\n"
" Grant that action to restore streaming output.\n"
)
logger.info(
"bedrock: converse_stream denied by IAM (%s) — "
"using non-streaming converse() for this session.",
type(_bedrock_exc).__name__,
)
result["response"] = normalize_converse_response(
client.converse(**api_kwargs)
)
return
# Evict the cached client on stale-connection failures
# so the outer retry loop builds a fresh client/pool.
if is_stale_connection_error(_bedrock_exc):
@ -2424,9 +2449,34 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
"stream" in _err_lower
and "not supported" in _err_lower
)
if _is_stream_unsupported:
# AWS Bedrock (AnthropicBedrock SDK path): IAM policies
# with bedrock:InvokeModel but not
# InvokeModelWithResponseStream reject messages.stream()
# with a permission error naming the streaming action.
# Permanent for the session — flip to non-streaming
# (messages.create() maps to bedrock:InvokeModel).
_is_bedrock_stream_denied = False
if (
not _is_stream_unsupported
and "invokemodelwithresponsestream" in _err_lower
):
# Cheap message pre-check before importing the
# adapter — bedrock_adapter triggers a lazy boto3
# install at import time, which must not run for
# unrelated providers' stream errors.
from agent.bedrock_adapter import (
is_streaming_access_denied_error,
)
_is_bedrock_stream_denied = (
is_streaming_access_denied_error(e)
)
if _is_stream_unsupported or _is_bedrock_stream_denied:
agent._disable_streaming = True
agent._safe_print(
"\n⚠ AWS IAM denied bedrock:InvokeModelWithResponseStream. "
"Switching to non-streaming.\n"
" Grant that action to restore streaming output.\n"
if _is_bedrock_stream_denied else
"\n⚠ Streaming is not supported for this "
"model/provider. Switching to non-streaming.\n"
" To avoid this delay, set display.streaming: false "

View file

@ -1471,3 +1471,127 @@ class TestCallConverseInvalidatesOnStaleError:
)
assert _bedrock_runtime_client_cache.get("us-east-1") is live_client
class TestStreamingAccessDeniedDetection:
"""is_streaming_access_denied_error() recognizes IAM denials of
bedrock:InvokeModelWithResponseStream (InvokeModel-only policies)."""
def _denied_client_error(self):
from botocore.exceptions import ClientError
return ClientError(
error_response={
"Error": {
"Code": "AccessDeniedException",
"Message": (
"User: arn:aws:iam::123456789012:user/x is not "
"authorized to perform: "
"bedrock:InvokeModelWithResponseStream on resource: "
"arn:aws:bedrock:us-east-1::foundation-model/"
"anthropic.claude-3-sonnet-20240229-v1:0"
),
}
},
operation_name="ConverseStream",
)
def test_matches_access_denied_client_error(self):
pytest.importorskip("botocore", reason="botocore required for Bedrock exception tests")
from agent.bedrock_adapter import is_streaming_access_denied_error
assert is_streaming_access_denied_error(self._denied_client_error()) is True
def test_ignores_access_denied_for_other_actions(self):
"""AccessDenied on InvokeModel itself is NOT a streaming-only denial."""
pytest.importorskip("botocore", reason="botocore required for Bedrock exception tests")
from agent.bedrock_adapter import is_streaming_access_denied_error
from botocore.exceptions import ClientError
exc = ClientError(
error_response={
"Error": {
"Code": "AccessDeniedException",
"Message": (
"User is not authorized to perform: bedrock:InvokeModel"
),
}
},
operation_name="Converse",
)
assert is_streaming_access_denied_error(exc) is False
def test_ignores_validation_error_mentioning_action(self):
"""Non-authz ClientErrors don't match even if the action name appears."""
pytest.importorskip("botocore", reason="botocore required for Bedrock exception tests")
from agent.bedrock_adapter import is_streaming_access_denied_error
from botocore.exceptions import ClientError
exc = ClientError(
error_response={
"Error": {
"Code": "ValidationException",
"Message": "InvokeModelWithResponseStream input malformed",
}
},
operation_name="ConverseStream",
)
assert is_streaming_access_denied_error(exc) is False
def test_matches_wrapped_sdk_permission_error(self):
"""Non-ClientError wrappers (AnthropicBedrock SDK) match on message."""
from agent.bedrock_adapter import is_streaming_access_denied_error
exc = RuntimeError(
"PermissionDeniedError: user is not authorized to perform: "
"bedrock:InvokeModelWithResponseStream"
)
assert is_streaming_access_denied_error(exc) is True
def test_ignores_unrelated_errors(self):
from agent.bedrock_adapter import is_streaming_access_denied_error
assert is_streaming_access_denied_error(ValueError("boom")) is False
assert is_streaming_access_denied_error(
RuntimeError("stream not supported")
) is False
class TestCallConverseStreamIamFallback:
"""call_converse_stream() falls back to converse() when IAM denies the
streaming action InvokeModel-only policies keep working."""
def test_falls_back_to_converse_on_streaming_denial(self):
pytest.importorskip("botocore", reason="botocore required for Bedrock exception tests")
from agent.bedrock_adapter import (
_bedrock_runtime_client_cache,
call_converse_stream,
reset_client_cache,
)
from botocore.exceptions import ClientError
reset_client_cache()
client = MagicMock()
client.converse_stream.side_effect = ClientError(
error_response={
"Error": {
"Code": "AccessDeniedException",
"Message": (
"User is not authorized to perform: "
"bedrock:InvokeModelWithResponseStream"
),
}
},
operation_name="ConverseStream",
)
client.converse.return_value = {
"output": {"message": {"role": "assistant", "content": [{"text": "hi"}]}},
"stopReason": "end_turn",
"usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 2},
}
_bedrock_runtime_client_cache["us-east-1"] = client
result = call_converse_stream(
region="us-east-1",
model="anthropic.claude-3-sonnet-20240229-v1:0",
messages=[{"role": "user", "content": "hi"}],
)
client.converse.assert_called_once()
assert result.choices[0].message.content == "hi"
# Not a stale connection — client stays cached.
assert _bedrock_runtime_client_cache.get("us-east-1") is client

View file

@ -1573,3 +1573,87 @@ class TestCopilotACPStreamingDecision:
_use_streaming = False
assert _use_streaming is True
class TestBedrockIamStreamingFallback:
"""bedrock_converse streaming branch: IAM denial of
InvokeModelWithResponseStream falls back to converse() inline and sets
_disable_streaming for the rest of the session."""
def _make_bedrock_agent(self):
from run_agent import AIAgent
agent = AIAgent(
api_key="test-key",
base_url="https://openrouter.ai/api/v1",
model="anthropic.claude-3-sonnet-20240229-v1:0",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
agent.api_mode = "bedrock_converse"
agent._interrupt_requested = False
return agent
def test_iam_denial_falls_back_inline_and_disables_streaming(self):
pytest.importorskip("botocore", reason="botocore required for Bedrock tests")
from botocore.exceptions import ClientError
agent = self._make_bedrock_agent()
client = MagicMock()
client.converse_stream.side_effect = ClientError(
error_response={
"Error": {
"Code": "AccessDeniedException",
"Message": (
"User is not authorized to perform: "
"bedrock:InvokeModelWithResponseStream"
),
}
},
operation_name="ConverseStream",
)
client.converse.return_value = {
"output": {"message": {"role": "assistant", "content": [{"text": "hi"}]}},
"stopReason": "end_turn",
"usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 2},
}
with patch(
"agent.bedrock_adapter._get_bedrock_runtime_client",
return_value=client,
):
response = agent._interruptible_streaming_api_call(
{"modelId": agent.model, "messages": []}
)
client.converse.assert_called_once()
assert response.choices[0].message.content == "hi"
assert getattr(agent, "_disable_streaming", False) is True
def test_other_bedrock_errors_still_propagate(self):
pytest.importorskip("botocore", reason="botocore required for Bedrock tests")
from botocore.exceptions import ClientError
agent = self._make_bedrock_agent()
client = MagicMock()
client.converse_stream.side_effect = ClientError(
error_response={
"Error": {"Code": "ThrottlingException", "Message": "slow down"}
},
operation_name="ConverseStream",
)
with patch(
"agent.bedrock_adapter._get_bedrock_runtime_client",
return_value=client,
):
with pytest.raises(ClientError):
agent._interruptible_streaming_api_call(
{"modelId": agent.model, "messages": []}
)
client.converse.assert_not_called()
assert getattr(agent, "_disable_streaming", False) is False