-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Description
Initial Checks
- I confirm that I'm using the latest version of MCP Python SDK
- I confirm that I searched for my issue in https://github.com/modelcontextprotocol/python-sdk/issues before opening this issue
Description
tldr: in calls to an SSE server (like Atlassian) there is an infinite hang druing a request, caused by a deadlock; the deadlock occurs when token refresh yields a request in the middle of the client.stream method that is trying to monitor SSE events, not field requests
Resulting Behavior - After tokens expire or after a new instance is re-built from storage, the SSE server will not work until full token-clearing and re-auth
As long as a token refresh attempt is not made, the server will work.
There are two circumstances when a refresh attempt is made, causing the SSE call to hang:
- Tokens expire during a working SSE session
- When a Client uses a stale token from storage
Source of failure for each scenario
Failure Mode
OAuthClientProvider
lazy loads and is passed intoaconnect_sse()
without having checked refresh- The
OAuthClientProvider.async_auth_flow()
generator enters the refresh logic - It yields a refresh request and waits for a response (line ~544 in auth.py)
aconnect_sse()
usesclient.stream()
which is trying to open an SSE connection- There's no response mechanism - SSE expects to start streaming events, not handle a token refresh request
- DEADLOCK: Auth generator waiting for refresh response that will never come
Temporary Success
- First request in
aconnect_sse()
usesclient.stream()
yield 401 which sends everything back out of the SSE context OAuthClientProvider
is able to perform full re-auth- When next sent to
aconnect_sse()
fresh auth headers are availble and SSE stream succeeds - SSE is long-lived so works for as long as tokens don't need refresh
Root cause
In /mcp/client/sse.py, this call with aconnect_sse
hangs infinitely.
async with anyio.create_task_group() as tg:
try:
logger.debug(f"Connecting to SSE endpoint: {remove_request_params(url)}")
async with httpx_client_factory(
headers=headers, auth=auth, timeout=httpx.Timeout(timeout, read=sse_read_timeout)
) as client:
async with aconnect_sse( #<======================== This hangs indefinitely with a 200 response
client,
"GET",
url,
) as event_source:
event_source.response.raise_for_status()
This is because OAuthClientProvider
, which is attached to the client, on token refresh, tries to yield when in the middle of client.stream
here:
if not self.context.is_token_valid() and self.context.can_refresh_token():
# Try to refresh token
refresh_request = await self._refresh_token()
refresh_response = yield refresh_request. #<======================== Causes client.stream() to hang
if not await self._handle_refresh_response(refresh_response):
# Refresh failed, need full re-authentication
self._initialized = False
Other yields in the broader method occur after the stream got a 401 and so are not in the same context.
The Fix - Force token refresh check outside of the stream context
Any fix needs to remove the OAuthClientProvider
from client
before calling aconnect_sse
. That way it won't yield the refresh request.
There is probably a more elgant/direct approach but the below worked. I had to create a separate client to avoid some lock conflict.
async with anyio.create_task_group() as tg:
try:
logger.debug(f"Connecting to SSE endpoint: {remove_request_params(url)}")
async with httpx_client_factory(
headers=headers, auth=auth, timeout=httpx.Timeout(timeout, read=sse_read_timeout)
) as client:
#----- Start of fix -------
auth_headers = {}
if auth:
# Initialize auth to load stored tokens
if hasattr(auth, '_initialize') and not getattr(auth, '_initialized', False):
await auth._initialize()
# Check if tokens need refresh
if hasattr(auth, 'context') and auth.context:
# Check if token is expired or needs refresh
if not auth.context.is_token_valid() and auth.context.can_refresh_token():
logger.debug("Token needs refresh before SSE connection")
# Use a separate client for refresh to avoid lock issues
async with httpx.AsyncClient() as refresh_client:
# Create a dummy request to trigger the refresh flow
dummy_request = httpx.Request("GET", url)
# Run the auth flow to refresh tokens
auth_gen = auth.async_auth_flow(dummy_request)
try:
# Start the generator
refresh_request = await auth_gen.asend(None)
# Execute the refresh request
refresh_response = await refresh_client.request(
refresh_request.method,
refresh_request.url,
data=refresh_request.content,
headers=refresh_request.headers,
)
# Send response back to generator
await auth_gen.asend(refresh_response)
logger.debug("Token refreshed successfully before SSE connection")
except StopAsyncIteration:
# Normal completion of auth flow
pass
except Exception as e:
logger.error(f"Failed to refresh token before SSE: {e}")
# Extract the token after potential refresh
if auth.context.current_tokens and auth.context.current_tokens.access_token:
token = auth.context.current_tokens.access_token
auth_headers['Authorization'] = f"Bearer {token}"
# CRITICAL: Remove auth from client to prevent generator interference
client.auth = None
#----- Resume existing code -------
async with aconnect_sse(
client,
"GET",
url,
headers=auth_headers, #<============================== Must add this too
) as event_source:
event_source.response.raise_for_status()
Since you removed auth from the client, you have to insert the headers later for the POST call as well:
async def post_writer(endpoint_url: str):
try:
async with write_stream_reader:
async for session_message in write_stream_reader:
logger.debug(f"Sending client message: {session_message}")
response = await client.post(
endpoint_url,
json=session_message.message.model_dump(
by_alias=True,
mode="json",
exclude_none=True,
),
headers=auth_headers, #<======================== Cause client.stream() to hang
Example Code
Python & MCP Python SDK
1.12.4 (but same code as current version)