Skip to content

Streamable HTTP - improve usability, fast mcp and auth #641

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 106 commits into from
May 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
106 commits
Select commit Hold shift + click to select a range
682ff1e
wip
praboud-ant Mar 6, 2025
331d51e
Unwind changes
praboud-ant Mar 6, 2025
d283f56
wip
praboud-ant Mar 7, 2025
e96d280
Get tests passing
praboud-ant Mar 10, 2025
1e9dd4c
Clean up provider interface
praboud-ant Mar 10, 2025
d535089
Lint
praboud-ant Mar 10, 2025
031cadf
Clean up registration endpoint
praboud-ant Mar 10, 2025
765efb6
Lint
praboud-ant Mar 10, 2025
0637bc3
update token + revoke to use form data
praboud-ant Mar 10, 2025
b99633a
Adjust more things to fit spec
praboud-ant Mar 10, 2025
9ae1c21
Lint
praboud-ant Mar 10, 2025
50683b9
Remove dup
praboud-ant Mar 10, 2025
2c5f26a
Comment
praboud-ant Mar 10, 2025
e605994
Refactor back to authorize()
praboud-ant Mar 10, 2025
e7c5f87
Improve validation for /token
praboud-ant Mar 11, 2025
83c0c9f
Improve validation for registration
praboud-ant Mar 11, 2025
0c1aae9
Improve /authorize validation & add tests
praboud-ant Mar 11, 2025
038fb04
Hoist oauth token expiration check into bearer auth middleware
praboud-ant Mar 11, 2025
a4e17f3
Add tests for /revoke validation
praboud-ant Mar 11, 2025
5f11c60
Lint + typecheck
praboud-ant Mar 11, 2025
571913a
Clean up unused error classes
praboud-ant Mar 11, 2025
d43647f
Update to use Python 3.10 types
praboud-ant Mar 11, 2025
9d72c1e
Use classes for handlers
praboud-ant Mar 11, 2025
a5079af
Refactor
praboud-ant Mar 11, 2025
c4c2608
Simplify bearer auth logic
praboud-ant Mar 11, 2025
bc62d73
Avoid asyncio dependency in tests
praboud-ant Mar 11, 2025
3852179
Add comment
praboud-ant Mar 11, 2025
874838a
Lint
praboud-ant Mar 11, 2025
f788d79
Add json_response.py comment
praboud-ant Mar 11, 2025
152feb9
Format
praboud-ant Mar 11, 2025
f37ebc4
Move around the response models to be closer to the handlers
praboud-ant Mar 11, 2025
c2873fd
Get rid of silly TS comments
praboud-ant Mar 11, 2025
fe2c029
Remove ClientAuthRequest
praboud-ant Mar 11, 2025
3a13f5d
Reorganize AuthInfo
praboud-ant Mar 11, 2025
37c5fc4
Refactor client metadata endpoint
praboud-ant Mar 11, 2025
792d302
Make metadata more spec compliant
praboud-ant Mar 12, 2025
6c48b11
Use python 3.10 types everywhere
praboud-ant Mar 12, 2025
a437566
Add back authorization to the /revoke endpoint, simplify revoke
praboud-ant Mar 12, 2025
9fee929
Move around validation logic
praboud-ant Mar 12, 2025
d79be8f
Fixups while integrating new auth capabilities
praboud-ant Mar 19, 2025
8d637b4
Pull all auth settings out into a separate config
praboud-ant Mar 19, 2025
8c86bce
Move router file to be routes
praboud-ant Mar 19, 2025
31618c1
Add auth context middleware
praboud-ant Mar 19, 2025
5ebbc19
Validate scopes + provide default
praboud-ant Mar 19, 2025
50673c6
Validate grant_types on registration
praboud-ant Mar 19, 2025
02d76f3
auth: client implementation
dsp-ant Mar 12, 2025
88edddc
update lock
dsp-ant Mar 12, 2025
d774be7
fix
dsp-ant Mar 12, 2025
a09e958
foo
dsp-ant Mar 14, 2025
4e73552
Format
praboud-ant Mar 19, 2025
56f694e
Move StreamingASGITransport into the library code, so MCP integration…
praboud-ant Mar 19, 2025
60da682
Improved error handling, generic types for provider
praboud-ant Mar 21, 2025
374a0b4
Rename AuthInfo to AccessToken
praboud-ant Mar 21, 2025
fb5a568
Rename
praboud-ant Mar 22, 2025
76ddc65
Add docs
praboud-ant Mar 22, 2025
e42dbf5
Merge remote-tracking branch 'origin/main' into praboud/auth
praboud-ant Mar 22, 2025
10e00e7
Typecheck
praboud-ant Mar 22, 2025
87571d8
Return 401 on missing auth, not 403
praboud-ant Mar 25, 2025
c6f991b
Convert AuthContextMiddleware to plain ASGI middleware & add tests
praboud-ant Mar 25, 2025
482149e
Fix redirect_uri handling
praboud-ant Mar 25, 2025
5230180
Remove client for now
praboud-ant Mar 25, 2025
8e15abc
Add test for auth context middleware
praboud-ant Mar 25, 2025
0a1a408
Add CORS support
praboud-ant Mar 25, 2025
3069aa3
Comment
praboud-ant Mar 27, 2025
16f0688
Merge remote-tracking branch 'origin/main' into praboud/auth
praboud-ant Mar 27, 2025
5ecc7f0
Remove client tests
praboud-ant Mar 27, 2025
8c251c9
Add ignores
praboud-ant Mar 27, 2025
f46dcb1
Merge remote-tracking branch 'origin/main' into praboud/auth
praboud-ant Apr 18, 2025
d3725cf
Review feedback
praboud-ant Apr 18, 2025
07f4e3a
Fix stream resource leaks and upgrade Starlette
bhosmer-ant Apr 13, 2025
1237148
Lint
praboud-ant Apr 18, 2025
1ad1842
Review comments
praboud-ant Apr 18, 2025
fa068dd
Rename OAuthServerProvider to OAuthAuthorizationServerProvider
praboud-ant Apr 18, 2025
9b5709a
Merge branch 'main' into praboud/auth
ihrpr Apr 30, 2025
67d568b
revert starlette upgrade
ihrpr Apr 30, 2025
16a7efa
add python-multipart - was missing
ihrpr Apr 30, 2025
91c09a4
ruff
ihrpr Apr 30, 2025
0582bf5
try fixing test
ihrpr Apr 30, 2025
8194bce
increse timeout
ihrpr Apr 30, 2025
2c63020
fix test
ihrpr May 1, 2025
2ea68f2
test
ihrpr May 1, 2025
b0fe041
fix test
ihrpr May 1, 2025
ba366e3
test
ihrpr May 1, 2025
e1a9fec
test
ihrpr May 1, 2025
af4221f
skip test
ihrpr May 1, 2025
f2840fe
Test auth (#609)
ihrpr May 1, 2025
cda4401
remove pyright upgrade and ruff format
ihrpr May 1, 2025
f2cc6ee
uv lock
ihrpr May 1, 2025
9c78e5e
add an example
ihrpr May 1, 2025
59e6c64
comments
ihrpr May 1, 2025
cb7f0c4
comment
ihrpr May 1, 2025
8ddf7f7
ruff
ihrpr May 1, 2025
2c85c66
switch to localhost
pcarleton May 2, 2025
ed6dad2
add scopes to oauth metadata
pcarleton May 2, 2025
e9cee55
separate mcp scope from github scope
pcarleton May 2, 2025
0eef578
ruff
pcarleton May 2, 2025
01f3f05
Merge branch 'main' into ihrpr/auth-example
pcarleton May 2, 2025
ebb9151
Merge branch 'main' into ihrpr/auth-example
ihrpr May 6, 2025
fdab98f
fast mcp and auth
ihrpr May 6, 2025
c9f686f
ruff
ihrpr May 6, 2025
013a295
integration test for stateless sttp
ihrpr May 7, 2025
3d8ff7a
Merge branch 'main' into ihrpr/shttp
ihrpr May 7, 2025
0f7b167
expose session_manager in FastMCP
ihrpr May 8, 2025
97fe920
Merge branch 'main' into ihrpr/shttp
ihrpr May 8, 2025
8c8d80d
add doc and make sure session manager can only be run once
ihrpr May 8, 2025
ee70cb1
adding tests
ihrpr May 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,37 +1,17 @@
import contextlib
import logging
from collections.abc import AsyncIterator

import anyio
import click
import mcp.types as types
from mcp.server.lowlevel import Server
from mcp.server.streamableHttp import (
StreamableHTTPServerTransport,
)
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from starlette.applications import Starlette
from starlette.routing import Mount
from starlette.types import Receive, Scope, Send

logger = logging.getLogger(__name__)
# Global task group that will be initialized in the lifespan
task_group = None


@contextlib.asynccontextmanager
async def lifespan(app):
"""Application lifespan context manager for managing task group."""
global task_group

async with anyio.create_task_group() as tg:
task_group = tg
logger.info("Application started, task group initialized!")
try:
yield
finally:
logger.info("Application shutting down, cleaning up resources...")
if task_group:
tg.cancel_scope.cancel()
task_group = None
logger.info("Resources cleaned up successfully.")


@click.command()
Expand Down Expand Up @@ -122,35 +102,28 @@ async def list_tools() -> list[types.Tool]:
)
]

# ASGI handler for stateless HTTP connections
async def handle_streamable_http(scope, receive, send):
logger.debug("Creating new transport")
# Use lock to prevent race conditions when creating new sessions
http_transport = StreamableHTTPServerTransport(
mcp_session_id=None,
is_json_response_enabled=json_response,
)
async with http_transport.connect() as streams:
read_stream, write_stream = streams

if not task_group:
raise RuntimeError("Task group is not initialized")

async def run_server():
await app.run(
read_stream,
write_stream,
app.create_initialization_options(),
# Runs in standalone mode for stateless deployments
# where clients perform initialization with any node
standalone_mode=True,
)

# Start server task
task_group.start_soon(run_server)

# Handle the HTTP request and return the response
await http_transport.handle_request(scope, receive, send)
# Create the session manager with true stateless mode
session_manager = StreamableHTTPSessionManager(
app=app,
event_store=None,
json_response=json_response,
stateless=True,
)

async def handle_streamable_http(
scope: Scope, receive: Receive, send: Send
) -> None:
await session_manager.handle_request(scope, receive, send)

@contextlib.asynccontextmanager
async def lifespan(app: Starlette) -> AsyncIterator[None]:
"""Context manager for session manager."""
async with session_manager.run():
logger.info("Application started with StreamableHTTP session manager!")
try:
yield
finally:
logger.info("Application shutting down...")

# Create an ASGI application using the transport
starlette_app = Starlette(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,58 +1,22 @@
import contextlib
import logging
from http import HTTPStatus
from uuid import uuid4
from collections.abc import AsyncIterator

import anyio
import click
import mcp.types as types
from mcp.server.lowlevel import Server
from mcp.server.streamable_http import (
MCP_SESSION_ID_HEADER,
StreamableHTTPServerTransport,
)
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from pydantic import AnyUrl
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import Response
from starlette.routing import Mount
from starlette.types import Receive, Scope, Send

from .event_store import InMemoryEventStore

# Configure logging
logger = logging.getLogger(__name__)

# Global task group that will be initialized in the lifespan
task_group = None

# Event store for resumability
# The InMemoryEventStore enables resumability support for StreamableHTTP transport.
# It stores SSE events with unique IDs, allowing clients to:
# 1. Receive event IDs for each SSE message
# 2. Resume streams by sending Last-Event-ID in GET requests
# 3. Replay missed events after reconnection
# Note: This in-memory implementation is for demonstration ONLY.
# For production, use a persistent storage solution.
event_store = InMemoryEventStore()


@contextlib.asynccontextmanager
async def lifespan(app):
"""Application lifespan context manager for managing task group."""
global task_group

async with anyio.create_task_group() as tg:
task_group = tg
logger.info("Application started, task group initialized!")
try:
yield
finally:
logger.info("Application shutting down, cleaning up resources...")
if task_group:
tg.cancel_scope.cancel()
task_group = None
logger.info("Resources cleaned up successfully.")


@click.command()
@click.option("--port", default=3000, help="Port to listen on for HTTP")
Expand Down Expand Up @@ -156,60 +120,38 @@ async def list_tools() -> list[types.Tool]:
)
]

# We need to store the server instances between requests
server_instances = {}
# Lock to prevent race conditions when creating new sessions
session_creation_lock = anyio.Lock()
# Create event store for resumability
# The InMemoryEventStore enables resumability support for StreamableHTTP transport.
# It stores SSE events with unique IDs, allowing clients to:
# 1. Receive event IDs for each SSE message
# 2. Resume streams by sending Last-Event-ID in GET requests
# 3. Replay missed events after reconnection
# Note: This in-memory implementation is for demonstration ONLY.
# For production, use a persistent storage solution.
event_store = InMemoryEventStore()

# Create the session manager with our app and event store
session_manager = StreamableHTTPSessionManager(
app=app,
event_store=event_store, # Enable resumability
json_response=json_response,
)

# ASGI handler for streamable HTTP connections
async def handle_streamable_http(scope, receive, send):
request = Request(scope, receive)
request_mcp_session_id = request.headers.get(MCP_SESSION_ID_HEADER)
if (
request_mcp_session_id is not None
and request_mcp_session_id in server_instances
):
transport = server_instances[request_mcp_session_id]
logger.debug("Session already exists, handling request directly")
await transport.handle_request(scope, receive, send)
elif request_mcp_session_id is None:
# try to establish new session
logger.debug("Creating new transport")
# Use lock to prevent race conditions when creating new sessions
async with session_creation_lock:
new_session_id = uuid4().hex
http_transport = StreamableHTTPServerTransport(
mcp_session_id=new_session_id,
is_json_response_enabled=json_response,
event_store=event_store, # Enable resumability
)
server_instances[http_transport.mcp_session_id] = http_transport
logger.info(f"Created new transport with session ID: {new_session_id}")

async def run_server(task_status=None):
async with http_transport.connect() as streams:
read_stream, write_stream = streams
if task_status:
task_status.started()
await app.run(
read_stream,
write_stream,
app.create_initialization_options(),
)

if not task_group:
raise RuntimeError("Task group is not initialized")

await task_group.start(run_server)

# Handle the HTTP request and return the response
await http_transport.handle_request(scope, receive, send)
else:
response = Response(
"Bad Request: No valid session ID provided",
status_code=HTTPStatus.BAD_REQUEST,
)
await response(scope, receive, send)
async def handle_streamable_http(
scope: Scope, receive: Receive, send: Send
) -> None:
await session_manager.handle_request(scope, receive, send)

@contextlib.asynccontextmanager
async def lifespan(app: Starlette) -> AsyncIterator[None]:
"""Context manager for managing session manager lifecycle."""
async with session_manager.run():
logger.info("Application started with StreamableHTTP session manager!")
try:
yield
finally:
logger.info("Application shutting down...")

# Create an ASGI application using the transport
starlette_app = Starlette(
Expand Down
Loading
Loading