From e24c935cf39db371a900ec0588324ca947219b16 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Thu, 11 Jun 2026 07:15:30 -0700 Subject: [PATCH] fix(bedrock): fall back to non-streaming InvokeModel when IAM denies InvokeModelWithResponseStream (#44293) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit IAM policies scoped to bedrock:InvokeModel only (a common least-privilege setup) reject converse_stream() with AccessDeniedException. The agent loop hard-prefers streaming and the denial never matched the 'stream not supported' auto-fallback, so InvokeModel-only users looped on AccessDenied forever. - agent/bedrock_adapter.py: new is_streaming_access_denied_error() detector (ClientError code check + wrapped-SDK message match); call_converse_stream() falls back to converse() on denial. - agent/chat_completion_helpers.py: bedrock_converse streaming branch retries inline via converse() and sets _disable_streaming so later turns skip the doomed stream attempt; the chat-completions retry block also recognizes the denial for the AnthropicBedrock SDK path (message pre-check avoids importing bedrock_adapter — and its lazy boto3 install — for unrelated providers). Both paths print a one-line notice telling the user which IAM action restores streaming. --- agent/bedrock_adapter.py | 45 ++++++++++ agent/chat_completion_helpers.py | 52 +++++++++++- tests/agent/test_bedrock_adapter.py | 124 ++++++++++++++++++++++++++++ tests/run_agent/test_streaming.py | 84 +++++++++++++++++++ 4 files changed, 304 insertions(+), 1 deletion(-) diff --git a/agent/bedrock_adapter.py b/agent/bedrock_adapter.py index 12c7afb8c..e3abba843 100644 --- a/agent/bedrock_adapter.py +++ b/agent/bedrock_adapter.py @@ -208,6 +208,41 @@ def is_stale_connection_error(exc: BaseException) -> bool: return False +def is_streaming_access_denied_error(exc: BaseException) -> bool: + """Return True when AWS denied the ``bedrock:InvokeModelWithResponseStream`` action. + + IAM policies scoped to ``bedrock:InvokeModel`` only (a common least-privilege + setup) reject ``converse_stream()`` with an ``AccessDeniedException`` whose + message names the streaming action, e.g.:: + + User: arn:aws:iam::123456789012:user/x is not authorized to perform: + bedrock:InvokeModelWithResponseStream on resource: ... + + This is permanent for the session — retrying the stream can never succeed — + so callers should flip to the non-streaming ``converse()`` path (which maps + to ``bedrock:InvokeModel``) instead of burning retries. + + Detection is deliberately message-based: boto3 surfaces this as a + ``ClientError`` with ``Error.Code == "AccessDeniedException"``, and the + AnthropicBedrock SDK wraps the same AWS response in its own exception + types, but both preserve the action name in the message. + """ + msg = str(exc).lower() + if "invokemodelwithresponsestream" not in msg: + return False + # ClientError with an explicit access-denied code is the canonical form. + try: + from botocore.exceptions import ClientError + except ImportError: # pragma: no cover — botocore always present with boto3 + ClientError = None # type: ignore[assignment] + if ClientError is not None and isinstance(exc, ClientError): + code = (getattr(exc, "response", None) or {}).get("Error", {}).get("Code", "") + return code in ("AccessDeniedException", "UnauthorizedException") + # Wrapped forms (e.g. AnthropicBedrock SDK PermissionDeniedError) — match + # on the authorization-failure phrasing AWS uses. + return "not authorized" in msg or "accessdenied" in msg + + # --------------------------------------------------------------------------- # AWS credential detection # --------------------------------------------------------------------------- @@ -1003,6 +1038,16 @@ def call_converse_stream( try: response = client.converse_stream(**kwargs) except Exception as exc: + if is_streaming_access_denied_error(exc): + # IAM allows bedrock:InvokeModel but not + # InvokeModelWithResponseStream — permanent for this session. + # Fall back to the non-streaming converse() path. + logger.info( + "bedrock: converse_stream denied by IAM on (region=%s, model=%s) — " + "falling back to non-streaming converse().", + region, model, + ) + return normalize_converse_response(client.converse(**kwargs)) if is_stale_connection_error(exc): logger.warning( "bedrock: stale-connection error on converse_stream(region=%s, " diff --git a/agent/chat_completion_helpers.py b/agent/chat_completion_helpers.py index dcf9e24fc..1ee1702b4 100644 --- a/agent/chat_completion_helpers.py +++ b/agent/chat_completion_helpers.py @@ -1615,6 +1615,8 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= _get_bedrock_runtime_client, invalidate_runtime_client, is_stale_connection_error, + is_streaming_access_denied_error, + normalize_converse_response, stream_converse_with_callbacks, ) region = api_kwargs.pop("__bedrock_region__", "us-east-1") @@ -1623,6 +1625,29 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= try: raw_response = client.converse_stream(**api_kwargs) except Exception as _bedrock_exc: + # IAM policies scoped to bedrock:InvokeModel only (no + # InvokeModelWithResponseStream) reject converse_stream() + # with AccessDeniedException. That denial is permanent for + # the session — fall back to the non-streaming converse() + # inline (it maps to bedrock:InvokeModel) and disable + # streaming for subsequent calls so we don't re-fail every + # turn. + if is_streaming_access_denied_error(_bedrock_exc): + agent._disable_streaming = True + agent._safe_print( + "\n⚠ AWS IAM denied bedrock:InvokeModelWithResponseStream — " + "falling back to non-streaming InvokeModel.\n" + " Grant that action to restore streaming output.\n" + ) + logger.info( + "bedrock: converse_stream denied by IAM (%s) — " + "using non-streaming converse() for this session.", + type(_bedrock_exc).__name__, + ) + result["response"] = normalize_converse_response( + client.converse(**api_kwargs) + ) + return # Evict the cached client on stale-connection failures # so the outer retry loop builds a fresh client/pool. if is_stale_connection_error(_bedrock_exc): @@ -2424,9 +2449,34 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= "stream" in _err_lower and "not supported" in _err_lower ) - if _is_stream_unsupported: + # AWS Bedrock (AnthropicBedrock SDK path): IAM policies + # with bedrock:InvokeModel but not + # InvokeModelWithResponseStream reject messages.stream() + # with a permission error naming the streaming action. + # Permanent for the session — flip to non-streaming + # (messages.create() maps to bedrock:InvokeModel). + _is_bedrock_stream_denied = False + if ( + not _is_stream_unsupported + and "invokemodelwithresponsestream" in _err_lower + ): + # Cheap message pre-check before importing the + # adapter — bedrock_adapter triggers a lazy boto3 + # install at import time, which must not run for + # unrelated providers' stream errors. + from agent.bedrock_adapter import ( + is_streaming_access_denied_error, + ) + _is_bedrock_stream_denied = ( + is_streaming_access_denied_error(e) + ) + if _is_stream_unsupported or _is_bedrock_stream_denied: agent._disable_streaming = True agent._safe_print( + "\n⚠ AWS IAM denied bedrock:InvokeModelWithResponseStream. " + "Switching to non-streaming.\n" + " Grant that action to restore streaming output.\n" + if _is_bedrock_stream_denied else "\n⚠ Streaming is not supported for this " "model/provider. Switching to non-streaming.\n" " To avoid this delay, set display.streaming: false " diff --git a/tests/agent/test_bedrock_adapter.py b/tests/agent/test_bedrock_adapter.py index 5f98fe5cf..e1112b12d 100644 --- a/tests/agent/test_bedrock_adapter.py +++ b/tests/agent/test_bedrock_adapter.py @@ -1471,3 +1471,127 @@ class TestCallConverseInvalidatesOnStaleError: ) assert _bedrock_runtime_client_cache.get("us-east-1") is live_client + + +class TestStreamingAccessDeniedDetection: + """is_streaming_access_denied_error() recognizes IAM denials of + bedrock:InvokeModelWithResponseStream (InvokeModel-only policies).""" + + def _denied_client_error(self): + from botocore.exceptions import ClientError + return ClientError( + error_response={ + "Error": { + "Code": "AccessDeniedException", + "Message": ( + "User: arn:aws:iam::123456789012:user/x is not " + "authorized to perform: " + "bedrock:InvokeModelWithResponseStream on resource: " + "arn:aws:bedrock:us-east-1::foundation-model/" + "anthropic.claude-3-sonnet-20240229-v1:0" + ), + } + }, + operation_name="ConverseStream", + ) + + def test_matches_access_denied_client_error(self): + pytest.importorskip("botocore", reason="botocore required for Bedrock exception tests") + from agent.bedrock_adapter import is_streaming_access_denied_error + assert is_streaming_access_denied_error(self._denied_client_error()) is True + + def test_ignores_access_denied_for_other_actions(self): + """AccessDenied on InvokeModel itself is NOT a streaming-only denial.""" + pytest.importorskip("botocore", reason="botocore required for Bedrock exception tests") + from agent.bedrock_adapter import is_streaming_access_denied_error + from botocore.exceptions import ClientError + exc = ClientError( + error_response={ + "Error": { + "Code": "AccessDeniedException", + "Message": ( + "User is not authorized to perform: bedrock:InvokeModel" + ), + } + }, + operation_name="Converse", + ) + assert is_streaming_access_denied_error(exc) is False + + def test_ignores_validation_error_mentioning_action(self): + """Non-authz ClientErrors don't match even if the action name appears.""" + pytest.importorskip("botocore", reason="botocore required for Bedrock exception tests") + from agent.bedrock_adapter import is_streaming_access_denied_error + from botocore.exceptions import ClientError + exc = ClientError( + error_response={ + "Error": { + "Code": "ValidationException", + "Message": "InvokeModelWithResponseStream input malformed", + } + }, + operation_name="ConverseStream", + ) + assert is_streaming_access_denied_error(exc) is False + + def test_matches_wrapped_sdk_permission_error(self): + """Non-ClientError wrappers (AnthropicBedrock SDK) match on message.""" + from agent.bedrock_adapter import is_streaming_access_denied_error + exc = RuntimeError( + "PermissionDeniedError: user is not authorized to perform: " + "bedrock:InvokeModelWithResponseStream" + ) + assert is_streaming_access_denied_error(exc) is True + + def test_ignores_unrelated_errors(self): + from agent.bedrock_adapter import is_streaming_access_denied_error + assert is_streaming_access_denied_error(ValueError("boom")) is False + assert is_streaming_access_denied_error( + RuntimeError("stream not supported") + ) is False + + +class TestCallConverseStreamIamFallback: + """call_converse_stream() falls back to converse() when IAM denies the + streaming action — InvokeModel-only policies keep working.""" + + def test_falls_back_to_converse_on_streaming_denial(self): + pytest.importorskip("botocore", reason="botocore required for Bedrock exception tests") + from agent.bedrock_adapter import ( + _bedrock_runtime_client_cache, + call_converse_stream, + reset_client_cache, + ) + from botocore.exceptions import ClientError + + reset_client_cache() + client = MagicMock() + client.converse_stream.side_effect = ClientError( + error_response={ + "Error": { + "Code": "AccessDeniedException", + "Message": ( + "User is not authorized to perform: " + "bedrock:InvokeModelWithResponseStream" + ), + } + }, + operation_name="ConverseStream", + ) + client.converse.return_value = { + "output": {"message": {"role": "assistant", "content": [{"text": "hi"}]}}, + "stopReason": "end_turn", + "usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 2}, + } + _bedrock_runtime_client_cache["us-east-1"] = client + + result = call_converse_stream( + region="us-east-1", + model="anthropic.claude-3-sonnet-20240229-v1:0", + messages=[{"role": "user", "content": "hi"}], + ) + + client.converse.assert_called_once() + assert result.choices[0].message.content == "hi" + # Not a stale connection — client stays cached. + assert _bedrock_runtime_client_cache.get("us-east-1") is client diff --git a/tests/run_agent/test_streaming.py b/tests/run_agent/test_streaming.py index 5af349fa8..c78978047 100644 --- a/tests/run_agent/test_streaming.py +++ b/tests/run_agent/test_streaming.py @@ -1573,3 +1573,87 @@ class TestCopilotACPStreamingDecision: _use_streaming = False assert _use_streaming is True + + +class TestBedrockIamStreamingFallback: + """bedrock_converse streaming branch: IAM denial of + InvokeModelWithResponseStream falls back to converse() inline and sets + _disable_streaming for the rest of the session.""" + + def _make_bedrock_agent(self): + from run_agent import AIAgent + + agent = AIAgent( + api_key="test-key", + base_url="https://openrouter.ai/api/v1", + model="anthropic.claude-3-sonnet-20240229-v1:0", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "bedrock_converse" + agent._interrupt_requested = False + return agent + + def test_iam_denial_falls_back_inline_and_disables_streaming(self): + pytest.importorskip("botocore", reason="botocore required for Bedrock tests") + from botocore.exceptions import ClientError + + agent = self._make_bedrock_agent() + + client = MagicMock() + client.converse_stream.side_effect = ClientError( + error_response={ + "Error": { + "Code": "AccessDeniedException", + "Message": ( + "User is not authorized to perform: " + "bedrock:InvokeModelWithResponseStream" + ), + } + }, + operation_name="ConverseStream", + ) + client.converse.return_value = { + "output": {"message": {"role": "assistant", "content": [{"text": "hi"}]}}, + "stopReason": "end_turn", + "usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 2}, + } + + with patch( + "agent.bedrock_adapter._get_bedrock_runtime_client", + return_value=client, + ): + response = agent._interruptible_streaming_api_call( + {"modelId": agent.model, "messages": []} + ) + + client.converse.assert_called_once() + assert response.choices[0].message.content == "hi" + assert getattr(agent, "_disable_streaming", False) is True + + def test_other_bedrock_errors_still_propagate(self): + pytest.importorskip("botocore", reason="botocore required for Bedrock tests") + from botocore.exceptions import ClientError + + agent = self._make_bedrock_agent() + + client = MagicMock() + client.converse_stream.side_effect = ClientError( + error_response={ + "Error": {"Code": "ThrottlingException", "Message": "slow down"} + }, + operation_name="ConverseStream", + ) + + with patch( + "agent.bedrock_adapter._get_bedrock_runtime_client", + return_value=client, + ): + with pytest.raises(ClientError): + agent._interruptible_streaming_api_call( + {"modelId": agent.model, "messages": []} + ) + + client.converse.assert_not_called() + assert getattr(agent, "_disable_streaming", False) is False