Skip to content

h0rn3t/fastapi-async-sqlalchemy

 
 

Repository files navigation

SQLAlchemy FastAPI middleware

ci ci codecov License: MIT pip Downloads Updates

Description

Provides SQLAlchemy middleware for FastAPI using AsyncSession and async engine.

Install

      pip install fastapi-async-sqlalchemy

It also works with sqlmodel

Examples

Note that the session object provided by db.session is based on the Python3.7+ ContextVar. This means that each session is linked to the individual request context in which it was created.

from fastapi import FastAPI
from fastapi_async_sqlalchemy import SQLAlchemyMiddleware
from fastapi_async_sqlalchemy import db  # provide access to a database session
from sqlalchemy import column
from sqlalchemy import table

app = FastAPI()
app.add_middleware(
    SQLAlchemyMiddleware,
    db_url="postgresql+asyncpg://user:user@192.168.88.200:5432/primary_db",
    engine_args={              # engine arguments example
        "echo": True,          # print all SQL statements
        "pool_pre_ping": True, # feature will normally emit SQL equivalent to “SELECT 1” each time a connection is checked out from the pool
        "pool_size": 5,        # number of connections to keep open at a time
        "max_overflow": 10,    # number of connections to allow to be opened above pool_size
    },
)
# once the middleware is applied, any route can then access the database session
# from the global ``db``

foo = table("ms_files", column("id"))

# Usage inside of a route
@app.get("/")
async def get_files():
    result = await db.session.execute(foo.select())
    return result.fetchall()

async def get_db_fetch():
    # It uses the same ``db`` object and use it as a context manager:
    async with db():
        result = await db.session.execute(foo.select())
        return result.fetchall()

# Usage inside of a route using a db context
@app.get("/db_context")
async def db_context():
    return await get_db_fetch()

# Usage outside of a route using a db context
@app.on_event("startup")
async def on_startup():
    # We are outside of a request context, therefore we cannot rely on ``SQLAlchemyMiddleware``
    # to create a database session for us.
    result = await get_db_fetch()


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8002)

Usage of multiple databases

databases.py

from fastapi import FastAPI
from fastapi_async_sqlalchemy import create_middleware_and_session_proxy

FirstSQLAlchemyMiddleware, first_db = create_middleware_and_session_proxy()
SecondSQLAlchemyMiddleware, second_db = create_middleware_and_session_proxy()

main.py

from fastapi import FastAPI

from databases import FirstSQLAlchemyMiddleware, SecondSQLAlchemyMiddleware
from routes import router

app = FastAPI()

app.include_router(router)

app.add_middleware(
    FirstSQLAlchemyMiddleware,
    db_url="postgresql+asyncpg://user:user@192.168.88.200:5432/primary_db",
    engine_args={
        "pool_size": 5,
        "max_overflow": 10,
    },
)
app.add_middleware(
    SecondSQLAlchemyMiddleware,
    db_url="mysql+aiomysql://user:user@192.168.88.200:5432/primary_db",
    engine_args={
        "pool_size": 5,
        "max_overflow": 10,
    },
)

routes.py

import asyncio

from fastapi import APIRouter
from sqlalchemy import column, table, text

from databases import first_db, second_db

router = APIRouter()

foo = table("ms_files", column("id"))

@router.get("/first-db-files")
async def get_files_from_first_db():
    result = await first_db.session.execute(foo.select())
    return result.fetchall()


@router.get("/second-db-files")
async def get_files_from_second_db():
    result = await second_db.session.execute(foo.select())
    return result.fetchall()


@router.get("/concurrent-queries")
async def parallel_select():
    async with first_db(multi_sessions=True):
        async def execute_query(query):
            return await first_db.session.execute(text(query))

        tasks = [
            asyncio.create_task(execute_query("SELECT 1")),
            asyncio.create_task(execute_query("SELECT 2")),
            asyncio.create_task(execute_query("SELECT 3")),
            asyncio.create_task(execute_query("SELECT 4")),
            asyncio.create_task(execute_query("SELECT 5")),
            asyncio.create_task(execute_query("SELECT 6")),
        ]

        await asyncio.gather(*tasks)

Using SQLAlchemy Events

SQLAlchemy events work seamlessly with this package. You can listen to various events like before_insert, after_insert, before_update, after_update, etc.

from fastapi import FastAPI
from fastapi_async_sqlalchemy import SQLAlchemyMiddleware, db
from sqlalchemy import Column, Integer, String, event
from sqlalchemy.orm import DeclarativeBase

app = FastAPI()
app.add_middleware(
    SQLAlchemyMiddleware,
    db_url="postgresql+asyncpg://user:user@192.168.88.200:5432/primary_db",
)

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    email = Column(String(100))

# Event listeners
@event.listens_for(User, 'before_insert')
def before_insert_listener(mapper, connection, target):
    print(f"About to insert user: {target.name}")
    # You can modify the target object here
    target.name = target.name.title()  # Capitalize name

@event.listens_for(User, 'after_insert')
def after_insert_listener(mapper, connection, target):
    print(f"User {target.name} inserted with ID: {target.id}")

@event.listens_for(User, 'before_update')
def before_update_listener(mapper, connection, target):
    print(f"About to update user: {target.name}")

@event.listens_for(User, 'after_update')
def after_update_listener(mapper, connection, target):
    print(f"User {target.name} updated")

# Usage in routes
@app.post("/users")
async def create_user(name: str, email: str):
    user = User(name=name, email=email)
    db.session.add(user)
    await db.session.commit()
    return {"id": user.id, "name": user.name, "email": user.email}

@app.put("/users/{user_id}")
async def update_user(user_id: int, name: str):
    user = await db.session.get(User, user_id)
    if user:
        user.name = name
        await db.session.commit()
        return {"id": user.id, "name": user.name}
    return {"error": "User not found"}

Advanced Event Usage

You can also use events for more complex scenarios like auditing, validation, or triggering other actions:

from datetime import datetime
from sqlalchemy import Column, DateTime, event

class AuditMixin:
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

class User(Base, AuditMixin):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    email = Column(String(100))

# Validation event
@event.listens_for(User, 'before_insert')
@event.listens_for(User, 'before_update')
def validate_user(mapper, connection, target):
    if not target.email or '@' not in target.email:
        raise ValueError("Invalid email address")
    if not target.name or len(target.name.strip()) < 2:
        raise ValueError("Name must be at least 2 characters long")

# Audit logging event
@event.listens_for(User, 'after_insert')
@event.listens_for(User, 'after_update')
@event.listens_for(User, 'after_delete')
def audit_user_changes(mapper, connection, target):
    # Log changes to audit table or external system
    action = "INSERT" if mapper.class_.__name__ == "User" else "UPDATE"
    print(f"AUDIT: {action} on User {target.id} at {datetime.utcnow()}")

All SQLAlchemy event types are supported:

  • before_insert, after_insert
  • before_update, after_update
  • before_delete, after_delete
  • before_bulk_insert, after_bulk_insert
  • before_bulk_update, after_bulk_update
  • before_bulk_delete, after_bulk_delete

Events work with both single database setups and multiple database configurations.

About

FastAPI Async SQLAlchemy middleware

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 7

Languages