-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Streamable HTTP - improve usability, fast mcp and auth #641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super nice fastmcp glowup :) A couple comments inline and I wouldn't mind taking another pass in the morning if you're still working on it, but approving to unblock in the meantime
@@ -669,6 +700,80 @@ async def sse_endpoint(request: Request) -> None: | |||
debug=self.settings.debug, routes=routes, middleware=middleware | |||
) | |||
|
|||
def streamable_http_app(self) -> Starlette: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if you've thought about the use case, but we're hoping to be able to use one FastAPI app with multiple routes that all have separate stateless MCP servers.
On your branch this works:
"""Example Echo MCP Server."""
from fastapi import APIRouter
from mcp.server.fastmcp import FastMCP
from starlette.applications import Starlette
mcp = FastMCP(name="EchoServer")
@mcp.tool(description="A simple echo tool")
def echo(message: str) -> str:
return f"Echo: {message}"
# Get the Starlette app and convert to router
app: Starlette = mcp.streamable_http_app()
with a parent app like
from routers import echo
app = FastAPI(lifespan=echo.app.router.lifespan_context)
app.mount("/echo", echo.app)
Although that involves kind of digging into your implementation to know that the lifespan function sets up the session manager and that it ends up internally in the starlette router. It might be helpful to have this exposed on the FastMCP object to make that more clear/explicit? either way it'll take a bit of documentation
from routers import echo
app = FastAPI(lifespan=lambda app: echo.mcp.session_manager.run())
app.mount("/echo", echo.mcp.streamable_http_app())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@baxen, thank you, yeah, haven't considered that use case, would definitely add documentation!
Do you have any suggestions on how to make it more clear/explicit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i haven't thought of anything better than having the FastMCP object having the StreamableHTTPSessionManager as a member. The error it already raises when run hasn't been called is a good hint, and as a member on FastMCP it'll be easier to find it and call it's run method (like in that last snippet)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/mcp/server/fastmcp/server.py
Outdated
else: # transport == "streamable_http" | ||
anyio.run(self.run_streamable_http_async) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: it might be better to have another elif here and then throw an exception in the else block. If someone introduces a new transport but misses updating this code they might believe their code is working even though it is really using a different transport.
from starlette.middleware import Middleware | ||
from starlette.routing import Mount |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: what's the intention behind importing these things inside this method? Is it to not force dev to have these dependencies if they are not using this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that's right, if one only uses run(transport="stdio"), they never need those dependencies
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it -- makes sense and is pretty clean since this method is only invoked during server startup as far as i understand. we should just be careful that people dont start doing this in methods that are called after server startup since then import errors will only become visible in the runtime.
# Store the task group for later use | ||
self._task_group = tg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: Does this mean that StreamableHTTPSessionManager is not thread safe (two threads using same manager instance will overwrite eachother's self._task_group)? Might be worth documenting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same, maybe not worth a fix but def worth a note in the comment header
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated, making only once instance being able to use run + docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
one concern around the .run()
function being called on the same instance across multiple tasks, happy for you to do another loop on that (either document, fix, accept as is)
# Store the task group for later use | ||
self._task_group = tg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thread safety note on is prob worth it but new changes LGTM!
StreamableHTTPSessionManager
[on top of auth inspector changes + some changes to handle shttp]
