SQLAlchemy FastAPI middleware
Description
Provides SQLAlchemy middleware for FastAPI using AsyncSession and async engine.
Install
pip install fastapi-async-sqlalchemy
Important !!!
If you use sqlmodel
install sqlalchemy<=1.4.41
Examples
Note that the session object provided by db.session
is based on the Python3.7+ ContextVar
. This means that
each session is linked to the individual request context in which it was created.
from fastapi import FastAPI
from fastapi_async_sqlalchemy import SQLAlchemyMiddleware
from fastapi_async_sqlalchemy import db
from sqlalchemy import column
from sqlalchemy import table
app = FastAPI()
app.add_middleware(
SQLAlchemyMiddleware,
db_url="postgresql+asyncpg://user:user@192.168.88.200:5432/primary_db",
engine_args={
"echo": True,
"pool_pre_ping": True,
"pool_size": 5,
"max_overflow": 10,
},
)
foo = table("ms_files", column("id"))
@app.get("/")
async def get_files():
result = await db.session.execute(foo.select())
return result.fetchall()
async def get_db_fetch():
async with db():
result = await db.session.execute(foo.select())
return result.fetchall()
@app.get("/db_context")
async def db_context():
return await get_db_fetch()
@app.on_event("startup")
async def on_startup():
result = await get_db_fetch()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8002)
Usage of multiple databases
databases.py
from fastapi import FastAPI
from fastapi_async_sqlalchemy import create_middleware_and_session_proxy
FirstSQLAlchemyMiddleware, first_db = create_middleware_and_session_proxy()
SecondSQLAlchemyMiddleware, second_db = create_middleware_and_session_proxy()
main.py
from fastapi import FastAPI
from databases import FirstSQLAlchemyMiddleware, SecondSQLAlchemyMiddleware
from routes import router
app = FastAPI()
app.include_router(router)
app.add_middleware(
FirstSQLAlchemyMiddleware,
db_url="postgresql+asyncpg://user:user@192.168.88.200:5432/primary_db",
engine_args={
"pool_size": 5,
"max_overflow": 10,
},
)
app.add_middleware(
SecondSQLAlchemyMiddleware,
db_url="mysql+aiomysql://user:user@192.168.88.200:5432/primary_db",
engine_args={
"pool_size": 5,
"max_overflow": 10,
},
)
routes.py
from fastapi import APIRouter
from sqlalchemy import column
from sqlalchemy import table
from databases import first_db, second_db
router = APIRouter()
foo = table("ms_files", column("id"))
@router.get("/first-db-files")
async def get_files_from_first_db():
result = await first_db.session.execute(foo.select())
return result.fetchall()
@router.get("/second-db-files")
async def get_files_from_second_db():
result = await second_db.session.execute(foo.select())
return result.fetchall()