Provides SQLAlchemy middleware for FastAPI using AsyncSession and async engine.
pip install fastapi-async-sqlalchemy
It also works with sqlmodel
Note that the session object provided by db.session
is based on the Python3.7+ ContextVar
. This means that
each session is linked to the individual request context in which it was created.
from fastapi import FastAPI
from fastapi_async_sqlalchemy import SQLAlchemyMiddleware
from fastapi_async_sqlalchemy import db # provide access to a database session
from sqlalchemy import column
from sqlalchemy import table
app = FastAPI()
app.add_middleware(
SQLAlchemyMiddleware,
db_url="postgresql+asyncpg://user:user@192.168.88.200:5432/primary_db",
engine_args={ # engine arguments example
"echo": True, # print all SQL statements
"pool_pre_ping": True, # feature will normally emit SQL equivalent to “SELECT 1” each time a connection is checked out from the pool
"pool_size": 5, # number of connections to keep open at a time
"max_overflow": 10, # number of connections to allow to be opened above pool_size
},
)
# once the middleware is applied, any route can then access the database session
# from the global ``db``
foo = table("ms_files", column("id"))
# Usage inside of a route
@app.get("/")
async def get_files():
result = await db.session.execute(foo.select())
return result.fetchall()
async def get_db_fetch():
# It uses the same ``db`` object and use it as a context manager:
async with db():
result = await db.session.execute(foo.select())
return result.fetchall()
# Usage inside of a route using a db context
@app.get("/db_context")
async def db_context():
return await get_db_fetch()
# Usage outside of a route using a db context
@app.on_event("startup")
async def on_startup():
# We are outside of a request context, therefore we cannot rely on ``SQLAlchemyMiddleware``
# to create a database session for us.
result = await get_db_fetch()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8002)
databases.py
from fastapi import FastAPI
from fastapi_async_sqlalchemy import create_middleware_and_session_proxy
FirstSQLAlchemyMiddleware, first_db = create_middleware_and_session_proxy()
SecondSQLAlchemyMiddleware, second_db = create_middleware_and_session_proxy()
main.py
from fastapi import FastAPI
from databases import FirstSQLAlchemyMiddleware, SecondSQLAlchemyMiddleware
from routes import router
app = FastAPI()
app.include_router(router)
app.add_middleware(
FirstSQLAlchemyMiddleware,
db_url="postgresql+asyncpg://user:user@192.168.88.200:5432/primary_db",
engine_args={
"pool_size": 5,
"max_overflow": 10,
},
)
app.add_middleware(
SecondSQLAlchemyMiddleware,
db_url="mysql+aiomysql://user:user@192.168.88.200:5432/primary_db",
engine_args={
"pool_size": 5,
"max_overflow": 10,
},
)
routes.py
import asyncio
from fastapi import APIRouter
from sqlalchemy import column, table, text
from databases import first_db, second_db
router = APIRouter()
foo = table("ms_files", column("id"))
@router.get("/first-db-files")
async def get_files_from_first_db():
result = await first_db.session.execute(foo.select())
return result.fetchall()
@router.get("/second-db-files")
async def get_files_from_second_db():
result = await second_db.session.execute(foo.select())
return result.fetchall()
@router.get("/concurrent-queries")
async def parallel_select():
async with first_db(multi_sessions=True):
async def execute_query(query):
return await first_db.session.execute(text(query))
tasks = [
asyncio.create_task(execute_query("SELECT 1")),
asyncio.create_task(execute_query("SELECT 2")),
asyncio.create_task(execute_query("SELECT 3")),
asyncio.create_task(execute_query("SELECT 4")),
asyncio.create_task(execute_query("SELECT 5")),
asyncio.create_task(execute_query("SELECT 6")),
]
await asyncio.gather(*tasks)
SQLAlchemy events work seamlessly with this package. You can listen to various events like before_insert
, after_insert
, before_update
, after_update
, etc.
from fastapi import FastAPI
from fastapi_async_sqlalchemy import SQLAlchemyMiddleware, db
from sqlalchemy import Column, Integer, String, event
from sqlalchemy.orm import DeclarativeBase
app = FastAPI()
app.add_middleware(
SQLAlchemyMiddleware,
db_url="postgresql+asyncpg://user:user@192.168.88.200:5432/primary_db",
)
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(50))
email = Column(String(100))
# Event listeners
@event.listens_for(User, 'before_insert')
def before_insert_listener(mapper, connection, target):
print(f"About to insert user: {target.name}")
# You can modify the target object here
target.name = target.name.title() # Capitalize name
@event.listens_for(User, 'after_insert')
def after_insert_listener(mapper, connection, target):
print(f"User {target.name} inserted with ID: {target.id}")
@event.listens_for(User, 'before_update')
def before_update_listener(mapper, connection, target):
print(f"About to update user: {target.name}")
@event.listens_for(User, 'after_update')
def after_update_listener(mapper, connection, target):
print(f"User {target.name} updated")
# Usage in routes
@app.post("/users")
async def create_user(name: str, email: str):
user = User(name=name, email=email)
db.session.add(user)
await db.session.commit()
return {"id": user.id, "name": user.name, "email": user.email}
@app.put("/users/{user_id}")
async def update_user(user_id: int, name: str):
user = await db.session.get(User, user_id)
if user:
user.name = name
await db.session.commit()
return {"id": user.id, "name": user.name}
return {"error": "User not found"}
You can also use events for more complex scenarios like auditing, validation, or triggering other actions:
from datetime import datetime
from sqlalchemy import Column, DateTime, event
class AuditMixin:
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class User(Base, AuditMixin):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(50))
email = Column(String(100))
# Validation event
@event.listens_for(User, 'before_insert')
@event.listens_for(User, 'before_update')
def validate_user(mapper, connection, target):
if not target.email or '@' not in target.email:
raise ValueError("Invalid email address")
if not target.name or len(target.name.strip()) < 2:
raise ValueError("Name must be at least 2 characters long")
# Audit logging event
@event.listens_for(User, 'after_insert')
@event.listens_for(User, 'after_update')
@event.listens_for(User, 'after_delete')
def audit_user_changes(mapper, connection, target):
# Log changes to audit table or external system
action = "INSERT" if mapper.class_.__name__ == "User" else "UPDATE"
print(f"AUDIT: {action} on User {target.id} at {datetime.utcnow()}")
All SQLAlchemy event types are supported:
before_insert
,after_insert
before_update
,after_update
before_delete
,after_delete
before_bulk_insert
,after_bulk_insert
before_bulk_update
,after_bulk_update
before_bulk_delete
,after_bulk_delete
Events work with both single database setups and multiple database configurations.