Async Patterns in FastAPI: Beyond the Basics
Advanced async programming patterns for FastAPI applications, including background tasks, streaming, and connection pooling.
Introduction
FastAPI's async support is one of its greatest strengths, but using it effectively requires understanding Python's asyncio ecosystem. This guide covers advanced patterns for building high-performance async applications.
Understanding Async in FastAPI
FastAPI can run both sync and async endpoints:
# Sync - runs in thread pool
@app.get("/sync")
def sync_endpoint():
return {"type": "sync"}
# Async - runs in event loop
@app.get("/async")
async def async_endpoint():
return {"type": "async"}
Rule of thumb: Use async def when doing I/O operations (database, HTTP calls). Use def for CPU-bound operations.
Pattern 1: Efficient Database Access
Connection Pooling
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
)
Session Management
async def get_db():
async with AsyncSession(engine) as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
Pattern 2: Background Tasks
For fire-and-forget operations:
from fastapi import BackgroundTasks
@app.post("/send-email")
async def send_email(
email: str,
background_tasks: BackgroundTasks
):
background_tasks.add_task(send_email_task, email)
return {"status": "queued"}
async def send_email_task(email: str):
# This runs after response is sent
await email_service.send(email)
Pattern 3: Streaming Responses
For large responses or real-time data:
from fastapi.responses import StreamingResponse
import asyncio
@app.get("/stream")
async def stream_data():
async def generate():
for i in range(100):
yield f"data: {i}\n\n"
await asyncio.sleep(0.1)
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
Pattern 4: Concurrent Requests
When you need multiple external calls:
import asyncio
import httpx
@app.get("/aggregate")
async def aggregate_data():
async with httpx.AsyncClient() as client:
# Run all requests concurrently
results = await asyncio.gather(
client.get("https://api1.example.com/data"),
client.get("https://api2.example.com/data"),
client.get("https://api3.example.com/data"),
)
return {
"api1": results[0].json(),
"api2": results[1].json(),
"api3": results[2].json(),
}
Pattern 5: Rate Limiting with Semaphores
Limit concurrent operations:
import asyncio
# Allow max 10 concurrent database operations
db_semaphore = asyncio.Semaphore(10)
async def rate_limited_query(query: str):
async with db_semaphore:
return await database.execute(query)
Pattern 6: Timeout Handling
Prevent hanging requests:
import asyncio
@app.get("/with-timeout")
async def endpoint_with_timeout():
try:
result = await asyncio.wait_for(
slow_operation(),
timeout=5.0
)
return result
except asyncio.TimeoutError:
raise HTTPException(408, "Request timeout")
Pattern 7: Graceful Shutdown
Handle in-flight requests during shutdown:
from contextlib import asynccontextmanager
import asyncio
active_tasks: set = set()
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
# Wait for active tasks on shutdown
if active_tasks:
await asyncio.gather(*active_tasks)
app = FastAPI(lifespan=lifespan)
Common Pitfalls
Performance Tips
Conclusion
Async patterns in FastAPI can significantly improve performance when used correctly. Focus on understanding when async helps and always measure the impact of your optimizations.