Skip to content

Clients Using Storage Face Deadlock on Token Refresh for SSE #1326

@Norcim133

Description

@Norcim133

Initial Checks

Description

tldr: in calls to an SSE server (like Atlassian) there is an infinite hang druing a request, caused by a deadlock; the deadlock occurs when token refresh yields a request in the middle of the client.stream method that is trying to monitor SSE events, not field requests

Resulting Behavior - After tokens expire or after a new instance is re-built from storage, the SSE server will not work until full token-clearing and re-auth

As long as a token refresh attempt is not made, the server will work.

There are two circumstances when a refresh attempt is made, causing the SSE call to hang:

  1. Tokens expire during a working SSE session
  2. When a Client uses a stale token from storage

Source of failure for each scenario

Failure Mode

  1. OAuthClientProvider lazy loads and is passed into aconnect_sse() without having checked refresh
  2. The OAuthClientProvider.async_auth_flow() generator enters the refresh logic
  3. It yields a refresh request and waits for a response (line ~544 in auth.py)
  4. aconnect_sse() uses client.stream() which is trying to open an SSE connection
  5. There's no response mechanism - SSE expects to start streaming events, not handle a token refresh request
  6. DEADLOCK: Auth generator waiting for refresh response that will never come

Temporary Success

  • First request in aconnect_sse() uses client.stream() yield 401 which sends everything back out of the SSE context
  • OAuthClientProvider is able to perform full re-auth
  • When next sent to aconnect_sse() fresh auth headers are availble and SSE stream succeeds
  • SSE is long-lived so works for as long as tokens don't need refresh

Root cause

In /mcp/client/sse.py, this call with aconnect_sse hangs infinitely.

async with anyio.create_task_group() as tg:
    try:
        logger.debug(f"Connecting to SSE endpoint: {remove_request_params(url)}")
        async with httpx_client_factory(
            headers=headers, auth=auth, timeout=httpx.Timeout(timeout, read=sse_read_timeout)
        ) as client:
            async with aconnect_sse(  #<======================== This hangs indefinitely with a 200 response
                client,
                "GET",
                url,
            ) as event_source:
                event_source.response.raise_for_status()

This is because OAuthClientProvider, which is attached to the client, on token refresh, tries to yield when in the middle of client.stream here:

            if not self.context.is_token_valid() and self.context.can_refresh_token():
                # Try to refresh token
                refresh_request = await self._refresh_token()
                refresh_response = yield refresh_request. #<======================== Causes client.stream() to hang

                if not await self._handle_refresh_response(refresh_response):
                    # Refresh failed, need full re-authentication
                    self._initialized = False

Other yields in the broader method occur after the stream got a 401 and so are not in the same context.

The Fix - Force token refresh check outside of the stream context

Any fix needs to remove the OAuthClientProvider from client before calling aconnect_sse. That way it won't yield the refresh request.

There is probably a more elgant/direct approach but the below worked. I had to create a separate client to avoid some lock conflict.

async with anyio.create_task_group() as tg:
    try:
        logger.debug(f"Connecting to SSE endpoint: {remove_request_params(url)}")
        async with httpx_client_factory(
            headers=headers, auth=auth, timeout=httpx.Timeout(timeout, read=sse_read_timeout)
        ) as client:
            
            #----- Start of fix -------
            auth_headers = {}
            if auth:
                # Initialize auth to load stored tokens
                if hasattr(auth, '_initialize') and not getattr(auth, '_initialized', False):
                    await auth._initialize()

                # Check if tokens need refresh
                if hasattr(auth, 'context') and auth.context:
                    # Check if token is expired or needs refresh
                    if not auth.context.is_token_valid() and auth.context.can_refresh_token():
                        logger.debug("Token needs refresh before SSE connection")

                        # Use a separate client for refresh to avoid lock issues
                        async with httpx.AsyncClient() as refresh_client:
                            # Create a dummy request to trigger the refresh flow
                            dummy_request = httpx.Request("GET", url)

                            # Run the auth flow to refresh tokens
                            auth_gen = auth.async_auth_flow(dummy_request)
                            try:
                                # Start the generator
                                refresh_request = await auth_gen.asend(None)

                                # Execute the refresh request
                                refresh_response = await refresh_client.request(
                                    refresh_request.method,
                                    refresh_request.url,
                                    data=refresh_request.content,
                                    headers=refresh_request.headers,
                                )

                                # Send response back to generator
                                await auth_gen.asend(refresh_response)
                                logger.debug("Token refreshed successfully before SSE connection")
                            except StopAsyncIteration:
                                # Normal completion of auth flow
                                pass
                            except Exception as e:
                                logger.error(f"Failed to refresh token before SSE: {e}")

                    # Extract the token after potential refresh
                    if auth.context.current_tokens and auth.context.current_tokens.access_token:
                        token = auth.context.current_tokens.access_token
                        auth_headers['Authorization'] = f"Bearer {token}"

            # CRITICAL: Remove auth from client to prevent generator interference
            client.auth = None
            #----- Resume existing code -------
            async with aconnect_sse(
                client,
                "GET",
                url,
                headers=auth_headers,    #<============================== Must add this too
            ) as event_source:
                event_source.response.raise_for_status()

Since you removed auth from the client, you have to insert the headers later for the POST call as well:

                    async def post_writer(endpoint_url: str):
                        try:
                            async with write_stream_reader:
                                async for session_message in write_stream_reader:
                                    logger.debug(f"Sending client message: {session_message}")
                                    response = await client.post(
                                        endpoint_url,
                                        json=session_message.message.model_dump(
                                            by_alias=True,
                                            mode="json",
                                            exclude_none=True,
                                        ),
                                        headers=auth_headers,  #<======================== Cause client.stream() to hang

Example Code

Python & MCP Python SDK

1.12.4 (but same code as current version)

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions