FastAPI
FastAPI 是一个现代、快速(高性能)的 Web 框架,用于基于标准 Python 类型提示使用 Python 3.7+ 构建 API。它是基于 Starlette 和 Pydantic 构建的。
简介
FastAPI 特性
FastAPI 核心特性:
- 快速:与 NodeJS 和 Go 相当的高性能
- 自动文档:自动生成交互式 API 文档(Swagger UI 和 ReDoc)
- 类型提示:基于 Python 类型提示进行参数声明和数据验证
- 依赖注入:强大且优雅的依赖注入系统
- 安全性:内置 OAuth2 和 JWT 认证支持
- 异步支持:原生支持异步编程
- 简单易用:设计简单,易于学习和使用
- 标准兼容:基于 OpenAPI 规范
适用场景:
- RESTful API 服务
- 微服务架构
- 实时应用(WebSocket)
- 高性能后端服务
- 需要自动文档的 API
安装 FastAPI
# 创建虚拟环境
python -m venv venv
# Windows 激活
venv\Scripts\activate
# Linux/Mac 激活
source venv/bin/activate
# 安装 FastAPI 和 ASGI 服务器
pip install fastapi
pip install uvicorn
# 安装常用扩展
pip install python-multipart # 表单数据
pip install python-jose # JWT 令牌
pip install passlib[bcrypt] # 密码哈希
pip install sqlalchemy # ORM
pip install databases # 数据库异步支持
pip install aiofiles # 异步文件操作
pip install websockets # WebSocket 支持
# 查看版本
python -c "import fastapi; print(fastapi.__version__)"
快速开始
第一个 API
# main.py
from fastapi import FastAPI
# 创建应用实例
app = FastAPI()
# 基本路由
@app.get('/')
def read_root():
return {'Hello': 'World'}
# 带路径参数
@app.get('/items/{item_id}')
def read_item(item_id: int):
return {'item_id': item_id}
# 运行应用
# uvicorn main:app --reload
# 运行开发服务器
uvicorn main:app --reload
# 指定端口
uvicorn main:app --port 8000
# 监听所有 IP
uvicorn main:app --host 0.0.0.0
# 生产环境
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
应用配置
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI(
title='My API',
description='My API Description',
version='1.0.0',
docs_url='/docs', # Swagger UI
redoc_url='/redoc', # ReDoc
openapi_url='/openapi.json' # OpenAPI schema
)
# CORS 中间件
app.add_middleware(
CORSMiddleware,
allow_origins=['*'], # 生产环境应指定具体域名
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'],
)
# 元数据
@app.get('/')
def root():
return {
'message': 'Welcome to the API',
'docs': '/docs',
'redoc': '/redoc'
}
路径操作
路径参数
from fastapi import FastAPI
from typing import Optional
app = FastAPI()
# 基本路径参数
@app.get('/items/{item_id}')
def read_item(item_id: int):
return {'item_id': item_id}
# 路径类型转换
@app.get('/users/{user_id}')
def read_user(user_id: int):
return {'user_id': user_id, 'type': type(user_id)}
# 可选路径参数
@app.get('/items/{item_id}')
def read_item(item_id: int, q: Optional[str] = None):
return {'item_id': item_id, 'q': q}
# 多个路径参数
@app.get('/users/{user_id}/items/{item_id}')
def read_user_item(user_id: int, item_id: int):
return {'user_id': user_id, 'item_id': item_id}
# 预设路径参数
@app.get('/models/{model_name}')
def read_model(model_name: str):
if model_name not in ['foo', 'bar']:
return {'error': 'Model not found'}
return {'model_name': model_name}
# 路径转换器
@app.get('/files/{file_path:path}')
def read_file(file_path: str):
return {'file_path': file_path}
查询参数
from fastapi import FastAPI
from typing import Optional, List
app = FastAPI()
# 基本查询参数
@app.get('/items/')
def read_items(skip: int = 0, limit: int = 10):
return {'skip': skip, 'limit': limit}
# 可选查询参数
@app.get('/items/{item_id}')
def read_item(item_id: int, q: Optional[str] = None):
if q:
return {'item_id': item_id, 'q': q}
return {'item_id': item_id}
# 多个查询参数
@app.get('/items/')
def read_items(
skip: int = 0,
limit: int = 10,
q: Optional[str] = None,
sort: Optional[str] = None
):
return {
'skip': skip,
'limit': limit,
'q': q,
'sort': sort
}
# 查询参数列表
@app.get('/items/')
def read_items(ids: List[int] = Query(None)):
return {'ids': ids}
# 必需查询参数
@app.get('/items/{item_id}')
def read_item(item_id: int, needy: str):
return {'item_id': item_id, 'needy': needy}
# 带验证的查询参数
from fastapi import Query
@app.get('/items/')
def read_items(
q: Optional[str] = Query(
None,
min_length=3,
max_length=50,
regex='^fixedquery$'
)
):
results = {'items': [{'item_id': 'Foo'}, {'item_id': 'Bar'}]}
if q:
results.update({'q': q})
return results
# 多个值查询参数
@app.get('/items/')
def read_items(
q: List[str] = Query(['foo', 'bar'], min_length=2)
):
query_items = {'q': q}
return query_items
请求体
from fastapi import FastAPI
from pydantic import BaseModel, Field
from typing import Optional
app = FastAPI()
# 定义数据模型
class Item(BaseModel):
name: str
description: Optional[str] = None
price: float
tax: Optional[float] = None
class User(BaseModel):
username: str
full_name: Optional[str] = None
# PUT 请求体
@app.put('/items/{item_id}')
def update_item(item_id: int, item: Item):
return {'item_id': item_id, **item.dict()}
# 请求体 + 路径参数 + 查询参数
@app.put('/items/{item_id}')
def update_item(
item_id: int,
item: Item,
q: Optional[str] = None
):
result = {'item_id': item_id, **item.dict()}
if q:
result.update({'q': q})
return result
# 多个请求体
@app.put('/items/{item_id}')
def update_item(item_id: int, item: Item, user: User):
results = {
'item_id': item_id,
'item': item,
'user': user
}
return results
# 嵌入请求体
class Item(BaseModel):
name: str
description: Optional[str] = None
price: float
tax: Optional[float] = None
tags: List[str] = []
@app.put('/items/{item_id}')
def update_item(item_id: int, item: Item):
return {'item_id': item_id, 'item': item}
请求数据验证
from fastapi import FastAPI, Body, Query, Path
from pydantic import BaseModel, Field, HttpUrl
from typing import Optional, List
app = FastAPI()
class Item(BaseModel):
name: str = Field(..., min_length=3, max_length=50)
description: Optional[str] = Field(
None,
max_length=300,
regex='^\\w+$'
)
price: float = Field(..., gt=0, le=1000000)
tax: Optional[float] = Field(None, ge=0, le=100)
tags: List[str] = []
class Config:
schema_extra = {
'example': {
'name': 'Foo',
'description': 'The description',
'price': 35.4,
'tax': 3.2,
'tags': ['electronics', 'computers']
}
}
class Image(BaseModel):
url: HttpUrl
name: str
@app.put('/items/{item_id}')
def update_item(
item_id: int,
item: Item = Body(..., embed=True),
image: Image = Body(None, embed=True)
):
results = {'item_id': item_id, 'item': item}
if image:
results.update({'image': image})
return results
# 混合使用 Path, Query, Body
@app.put('/items/{item_id}')
def update_item(
item_id: int = Path(..., title='The ID of the item', ge=1),
item: Item = Body(..., embed=True),
q: Optional[str] = Query(None, max_length=50)
):
results = {'item_id': item_id, 'item': item}
if q:
results.update({'q': q})
return results
响应模型
响应结构
from fastapi import FastAPI, Response
from fastapi.responses import JSONResponse, RedirectResponse
from typing import Dict
app = FastAPI()
# 返回字典
@app.get('/items/')
def read_items() -> Dict[str, float]:
return {'item_id': 1.0}
# 返回响应对象
@app.get('/items/')
def get_items():
content = {'message': 'Hello World'}
return JSONResponse(content=content)
# 返回纯文本
@app.get('/items/')
def get_items():
data = 'Hello World'
return Response(content=data, media_type='text/plain')
# 重定向
@app.get('/typer')
async def redirect_typer():
return RedirectResponse(url='https://typer.tiangolo.com')
# 流式响应
from fastapi.responses import StreamingResponse
import io
@app.get('/items')
async def read_items():
def iterfile():
yield b'Hello'
yield b'World'
return StreamingResponse(iterfile(), media_type='text/plain')
状态码
from fastapi import FastAPI, status
app = FastAPI()
# 默认 200
@app.post('/items/')
def create_item(item: Item):
return item
# 指定状态码
@app.post('/items/', status_code=201)
def create_item(item: Item):
return item
# 使用 status 模块
@app.post('/items/', status_code=status.HTTP_201_CREATED)
def create_item(item: Item):
return item
# 常用状态码
@app.get('/items/{item_id}', status_code=200)
def read_item(item_id: int):
return {'item_id': item_id}
@app.post('/items/', status_code=201)
def create_item(item: Item):
return item
@app.put('/items/{item_id}', status_code=200)
def update_item(item_id: int, item: Item):
return {'item_id': item_id, **item.dict()}
@app.delete('/items/{item_id}', status_code=204)
def delete_item(item_id: int):
return None
响应模型
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from pydantic import BaseModel
app = FastAPI()
# 定义输出模型
class ItemOut(BaseModel):
name: str
price: float
description: Optional[str] = None
# 使用 response_model
@app.post('/items/', response_model=ItemOut)
def create_item(item: Item):
return item
# response_model_exclude
class UserIn(BaseModel):
username: str
password: str
email: str
full_name: Optional[str] = None
class UserOut(BaseModel):
username: str
email: str
full_name: Optional[str] = None
@app.post('/user/', response_model=UserOut, status_code=201)
def create_user(user: UserIn):
return user
# 排除字段
@app.post('/user/', response_model=UserOut, response_model_exclude_unset=True)
def create_user(user: UserIn):
return user
# 多个响应模型
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from typing import Union
class Item(BaseModel):
name: str
description: Optional[str] = None
class Message(BaseModel):
message: str
@app.post('/items/', response_model=Union[Item, Message])
def create_item(item: Item):
if not item.description:
return JSONResponse(
status_code=200,
content={'message': 'Description is required'}
)
return item
数据验证
Pydantic 模型
from fastapi import FastAPI
from pydantic import BaseModel, Field, validator
from typing import Optional, List
from datetime import datetime
app = FastAPI()
class Item(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
description: Optional[str] = None
price: float = Field(..., gt=0)
tax: Optional[float] = Field(None, ge=0)
tags: List[str] = []
created_at: datetime = Field(default_factory=datetime.now)
# 类级别的验证
@validator('price')
def price_must_be_positive(cls, v):
if v <= 0:
raise ValueError('Price must be positive')
return v
# 验证多个字段
@validator('name')
def name_must_not_contain_space(cls, v):
if ' ' in v:
raise ValueError('Name must not contain space')
return v
class Config:
schema_extra = {
'example': {
'name': 'Example Item',
'description': 'A test item',
'price': 99.99,
'tax': 9.99,
'tags': ['electronics', 'test']
}
}
@app.post('/items/')
def create_item(item: Item):
return item
字段验证
from pydantic import BaseModel, Field, EmailStr, HttpUrl
from typing import Optional
class User(BaseModel):
# 字符串验证
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr # 邮箱验证
# 数字验证
age: int = Field(..., ge=18, le=120) # 18-120岁
height: Optional[float] = Field(None, gt=0, le=3.0)
# URL 验证
website: Optional[HttpUrl] = None
# 正则验证
phone: str = Field(..., regex=r'^\+?\d{10,15}$')
# 选择验证
gender: str = Field(..., regex='^(male|female|other)$')
# 布尔值
is_active: bool = True
# 列表验证
tags: List[str] = Field(..., min_items=1, max_items=10)
class Product(BaseModel):
# 必需字段
name: str = Field(..., min_length=1)
# 可选字段带默认值
description: Optional[str] = None
price: Optional[float] = Field(None, ge=0)
# 嵌套模型
owner: Optional[User] = None
# 枚举
class Status(str):
available = 'available'
pending = 'pending'
sold = 'sold'
status: Status = Status.pending
复杂类型
from pydantic import BaseModel, Field
from typing import List, Dict, Set, Tuple
class Address(BaseModel):
street: str
city: str
country: str
zip_code: str
class Person(BaseModel):
# 列表
tags: List[str] = []
# 集合(自动去重)
unique_tags: Set[str] = set()
# 字典
metadata: Dict[str, str] = {}
# 元组(固定长度)
location: Tuple[float, float] # (纬度, 经度)
# 嵌套模型
address: Optional[Address] = None
# 列表中嵌套模型
friends: List['Person'] = []
# 联合类型
from typing import Union
identifier: Union[int, str]
class Product(BaseModel):
# 使用别名
display_name: str = Field(alias='displayName')
# 多个别名
name: str = Field(..., alias=['productName', 'product_name'])
class Config:
# 允许按别名访问
allow_population_by_field_name = True
依赖注入
Depends
from fastapi import FastAPI, Depends, Header, HTTPException, status
from typing import Optional
app = FastAPI()
# 简单依赖
def common_parameters(q: Optional[str] = None, skip: int = 0, limit: int = 100):
return {'q': q, 'skip': skip, 'limit': limit}
@app.get('/items/')
def read_items(commons: dict = Depends(common_parameters)):
return commons
@app.get('/users/')
def read_users(commons: dict = Depends(common_parameters)):
return commons
# 类作为依赖
class CommonParams:
def __init__(
self,
q: Optional[str] = None,
skip: int = 0,
limit: int = 100
):
self.q = q
self.skip = skip
self.limit = limit
@app.get('/items/')
def read_items(commons: CommonParams = Depends(CommonParams)):
response = {}
if commons.q:
response.update({'q': commons.q})
items = fake_items_db[commons.skip: commons.skip + commons.limit]
response.update({'items': items})
return response
# 子依赖
def query_extractor(q: Optional[str] = None):
return q
def query_or_cookie_extractor(
q: str = Depends(query_extractor),
last_query: Optional[str] = Cookie(None)
):
if not q:
return last_query
return q
@app.get('/items/')
def read_query(query: str = Depends(query_or_cookie_extractor)):
return {'query': query}
类依赖
from fastapi import FastAPI, Depends
from typing import Optional
app = FastAPI()
class Paginator:
def __init__(self, page: int = 1, per_page: int = 10):
self.page = page
self.per_page = per_page
self.offset = (page - 1) * per_page
@app.get('/items/')
def get_items(paginator: Paginator = Depends(Paginator)):
items = fake_items_db[
paginator.offset: paginator.offset + paginator.per_page
]
return {'items': items, 'page': paginator.page}
# 带方法的依赖类
class UserService:
def __init__(self, db_session):
self.db = db_session
def get_user(self, user_id: int):
return self.db.query(User).filter(User.id == user_id).first()
@app.get('/users/{user_id}')
def get_user(
user_id: int,
user_service: UserService = Depends(UserService)
):
user = user_service.get_user(user_id)
return user
依赖层级
from fastapi import FastAPI, Depends
app = FastAPI()
def get_query(q: Optional[str] = None):
return q
def get_query_extractor(
query: str = Depends(get_query),
last_query: Optional[str] = None
):
if not query:
return last_query
return query
class User:
def __init__(self, name: str):
self.name = name
def get_user(user_name: str = Header(None)):
if not user_name:
raise HTTPException(status_code=400, detail='No user name')
return User(name=user_name)
@app.get('/items/')
def read_items(
query: str = Depends(get_query_extractor),
user: User = Depends(get_user)
):
return {'query': query, 'user_name': user.name}
# 路径装饰器中的依赖
@app.get('/items/', dependencies=[Depends(get_query_extractor)])
def read_items():
return fake_items_db
安全
OAuth2
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='token')
@app.post('/token')
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
# 验证用户名和密码
user = fake_users_db.get(form_data.username)
if not user:
raise HTTPException(status_code=400, detail='Incorrect username')
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail='Incorrect password')
# 返回访问令牌
return {'access_token': user.username, 'token_type': 'bearer'}
# 使用令牌
@app.get('/users/me')
def read_users_me(token: str = Depends(oauth2_scheme)):
return {'token': token, 'user': fake_decode_token(token)}
JWT 令牌
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
app = FastAPI()
# 密码哈希
pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
# JWT 配置
SECRET_KEY = 'your-secret-key'
ALGORITHM = 'HS256'
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='token')
# 模型
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
# 辅助函数
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({'exp': expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
# 令牌端点
@app.post('/token', response_model=Token)
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends()
):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Incorrect username or password',
headers={'WWW-Authenticate': 'Bearer'},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={'sub': user.username}, expires_delta=access_token_expires
)
return {'access_token': access_token, 'token_type': 'bearer'}
# 获取当前用户
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Could not validate credentials',
headers={'WWW-Authenticate': 'Bearer'},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get('sub')
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
# 受保护的路由
@app.get('/users/me')
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
API 密钥
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import APIKeyHeader
app = FastAPI()
api_key_header = APIKeyHeader(name='X-API-Key')
async def get_api_key(api_key: str = Depends(api_key_header)):
if api_key != 'expected-api-key':
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail='Could not validate API Key'
)
return api_key
@app.get('/protected')
async def protected_route(api_key: str = Depends(get_api_key)):
return {'message': 'Access granted', 'api_key': api_key}
异常处理
HTTPException
from fastapi import FastAPI, HTTPException
app = FastAPI()
items = {'foo': 'The Foo wrestlers'}
@app.get('/items/{item_id}')
def read_item(item_id: str):
if item_id not in items:
raise HTTPException(
status_code=404,
detail='Item not found',
headers={'X-Error': 'There goes my error'}
)
return {'item': items[item_id]}
自定义异常
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
app = FastAPI()
class UnicornException(Exception):
def __init__(self, name: str):
self.name = name
@app.exception_handler(UnicornException)
async def unicorn_exception_handler(request: Request, exc: UnicornException):
return JSONResponse(
status_code=418,
content={'message': f'Oops! {exc.name} did something. There goes a rainbow...'},
)
@app.get('/unicorns/{name}')
def read_unicorn(name: str):
if name == 'yolo':
raise UnicornException(name=name)
return {'unicorn_name': name}
全局异常处理
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
app = FastAPI()
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
return JSONResponse(
status_code=500,
content={'detail': str(exc)}
)
# 或自定义全局异常处理器
class AppException(Exception):
def __init__(self, status_code: int, detail: str):
self.status_code = status_code
self.detail = detail
@app.exception_handler(AppException)
async def app_exception_handler(request: Request, exc: AppException):
return JSONResponse(
status_code=exc.status_code,
content={'detail': exc.detail}
)
中间件
CORS
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
origins = [
'http://localhost',
'http://localhost:8080',
'https://example.com',
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'],
)
自定义中间件
from fastapi import FastAPI, Request
app = FastAPI()
@app.middleware('http')
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers['X-Process-Time'] = str(process_time)
return response
# 中间件执行顺序
@app.middleware('http')
async def middleware_one(request: Request, call_next):
print('Middleware 1 - Before')
response = await call_next(request)
print('Middleware 1 - After')
return response
@app.middleware('http')
async def middleware_two(request: Request, call_next):
print('Middleware 2 - Before')
response = await call_next(request)
print('Middleware 2 - After')
return response
WebSocket
基本用法
from fastapi import FastAPI, WebSocket
from typing import List
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket('/ws/{client_id}')
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.send_personal_message(f'Client {client_id}: {data}', websocket)
await manager.broadcast(f'Client {client_id}: {data}')
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f'Client #{client_id} left')
WebSocket 认证
from fastapi import FastAPI, WebSocket, WebSocketException, status, Query, Depends
from typing import Optional
app = FastAPI()
async def get_token(
websocket: WebSocket,
token: Optional[str] = Query(None)
):
if token is None:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
raise WebSocketException(code=1008, reason='Token not provided')
return token
@app.websocket('/ws')
async def websocket_endpoint(
websocket: WebSocket,
token: str = Depends(get_token)
):
await websocket.accept()
await websocket.send_text('Connected')
await websocket.close()
数据库集成
SQLAlchemy
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from pydantic import BaseModel
app = FastAPI()
# 数据库配置
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = 'sqlite:///./test.db'
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={'check_same_thread': False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# 模型
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
Base.metadata.create_all(bind=engine)
# Pydantic 模型
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
class Config:
orm_mode = True
# 依赖项
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# 路由
@app.post('/users/', response_model=User, status_code=status.HTTP_201_CREATED)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
# 检查邮箱是否已存在
db_user = db.query(User).filter(User.email == user.email).first()
if db_user:
raise HTTPException(status_code=400, detail='Email already registered')
# 创建新用户
fake_hashed_password = user.password + 'notreallyhashed'
db_user = User(email=user.email, hashed_password=fake_hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.get('/users/{user_id}', response_model=User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user is None:
raise HTTPException(status_code=404, detail='User not found')
return db_user
Tortoise ORM
from fastapi import FastAPI
from tortoise import fields, models
from tortoise.contrib.fastapi import register_tortoise
app = FastAPI()
# 模型
class Users(models.Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=50)
email = fields.CharField(max_length=100)
class Meta:
table = 'users'
# 注册 Tortoise
register_tortoise(
app,
db_url='sqlite://:memory:',
modules={'models': ['__main__']},
generate_schemas=True
)
# 路由
@app.post('/users')
async def create_user(user: UserCreate):
user_obj = await Users.create(**user.dict())
return user_obj
@app.get('/users')
async def get_users():
users = await Users.all()
return users
后台任务
BackgroundTasks
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
import time
app = FastAPI()
# 后台任务函数
def write_notification(email: str, message=''):
with open('log.txt', mode='a') as email_file:
content = f'notification for {email}: {message}\n'
email_file.write(content)
def send_email(email: str, message=''):
time.sleep(3) # 模拟发送邮件的耗时操作
with open('log.txt', mode='a') as email_file:
content = f'send email to {email}: {message}\n'
email_file.write(content)
class UserSchema(BaseModel):
email: str
message: str
@app.post('/send-notification/{email}')
def send_notification(
email: str,
background_tasks: BackgroundTasks,
message: str = 'Hello'
):
background_tasks.add_task(write_notification, email, message)
return {'message': 'Notification sent in the background'}
@app.post('/send-email/{email}')
def send_email_endpoint(
email: str,
background_tasks: BackgroundTasks,
user_schema: UserSchema
):
background_tasks.add_task(send_email, email, user_schema.message)
return {'message': 'Email sent in the background'}
自动文档
Swagger UI
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
app = FastAPI()
# 自定义 OpenAPI schema
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title='Custom Title',
version='1.0.0',
description='This is a very custom OpenAPI schema',
routes=app.routes,
)
openapi_schema['info']['x-logo'] = {
'url': 'https://fastapi.tiangolo.com/img/logo-margin/logo-teal.png'
}
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
# 文档标签
@app.get('/items/', tags=['items'])
def read_items():
return [{'name': 'Foo'}, {'name': 'Bar'}]
@app.get('/users/', tags=['users'])
def read_users():
return [{'username': 'johndoe'}]
# 文档摘要和描述
@app.post(
'/items/',
summary='Create an item',
description='Create an item with all the information',
response_description='The created item'
)
def create_item(item: Item):
return item
# 文档标记为已弃用
@app.get('/elements/', tags=['items'], deprecated=True)
def read_elements():
return [{'item_id': 'Foo'}]
ReDoc
from fastapi import FastAPI
app = FastAPI(
title='My API',
description='This is the API description',
version='1.0.0',
docs_url='/swagger', # Swagger UI URL
redoc_url='/redoc' # ReDoc URL
)
@app.get('/')
def root():
return {
'message': 'Hello World',
'docs': '/swagger',
'redoc': '/redoc'
}
项目结构
推荐的项目结构
myproject/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI 应用实例
│ ├── dependencies.py # 依赖项
│ ├── routers/ # 路由
│ │ ├── __init__.py
│ │ ├── items.py
│ │ ├── users.py
│ │ └── auth.py
│ ├── models/ # 数据库模型
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── item.py
│ ├── schemas/ # Pydantic 模型
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── item.py
│ ├── core/ # 核心配置
│ │ ├── __init__.py
│ │ ├── config.py
│ │ └── security.py
│ ├── services/ # 业务逻辑
│ │ ├── __init__.py
│ │ └── auth.py
│ └── utils/ # 工具函数
│ ├── __init__.py
│ └── helpers.py
├── tests/ # 测试
│ ├── __init__.py
│ ├── test_items.py
│ └── test_users.py
├── requirements.txt
└── .env
实际应用示例
# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.routers import items, users, auth
app = FastAPI(
title='My API',
description='My API Description',
version='1.0.0'
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=['http://localhost:3000'],
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'],
)
# 注册路由
app.include_router(auth.router, prefix='/auth', tags=['auth'])
app.include_router(users.router, prefix='/users', tags=['users'])
app.include_router(items.router, prefix='/items', tags=['items'])
# app/routers/items.py
from fastapi import APIRouter, Depends, HTTPException
from app.schemas.item import Item, ItemCreate, ItemUpdate
from app.dependencies import get_db
from sqlalchemy.orm import Session
router = APIRouter()
@router.get('/', response_model=list[Item])
def get_items(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
items = db.query(Item).offset(skip).limit(limit).all()
return items
@router.post('/', response_model=Item, status_code=201)
def create_item(
item: ItemCreate,
db: Session = Depends(get_db)
):
db_item = Item(**item.dict())
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
@router.get('/{item_id}', response_model=Item)
def get_item(item_id: int, db: Session = Depends(get_db)):
item = db.query(Item).filter(Item.id == item_id).first()
if not item:
raise HTTPException(status_code=404, detail='Item not found')
return item
@router.put('/{item_id}', response_model=Item)
def update_item(
item_id: int,
item: ItemUpdate,
db: Session = Depends(get_db)
):
db_item = db.query(Item).filter(Item.id == item_id).first()
if not db_item:
raise HTTPException(status_code=404, detail='Item not found')
for field, value in item.dict(exclude_unset=True).items():
setattr(db_item, field, value)
db.commit()
db.refresh(db_item)
return db_item
部署
Uvicorn
# 开发环境
uvicorn app.main:app --reload
# 生产环境
uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4
# 使用配置文件
# uvicornicorn file:app --workers 4
Docker
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用
COPY ./app ./app
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ['uvicorn', 'app.main:app', '--host', '0.0.0.0', '--port', '8000']
# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- '8000:8000'
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/mydb
depends_on:
- db
db:
image: postgres:15
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: mydb
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:
Nginx 反向代理
server {
listen 80;
server_name api.example.com;
location / {
proxy_pass http://127.0.0.1:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# WebSocket 支持
location /ws/ {
proxy_pass http://127.0.0.1:8000;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_set_header Host $host;
}
}
性能优化
异步数据库
from fastapi import FastAPI
from databases import Database
from sqlalchemy import MetaData, Table, Column, Integer, String
app = FastAPI()
# 数据库配置
DATABASE_URL = 'postgresql://user:password@localhost/dbname'
database = Database(DATABASE_URL)
metadata = MetaData()
# 表定义
users = Table(
'users',
metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
Column('email', String)
)
# 启动和关闭事件
@app.on_event('startup')
async def startup():
await database.connect()
@app.on_event('shutdown')
async def shutdown():
await database.disconnect()
# 路由
@app.get('/users/')
async def read_users():
query = users.select()
return await database.fetch_all(query)
@app.post('/users/')
async def create_user(user: UserCreate):
query = users.insert().values(**user.dict())
await database.execute(query)
return user
缓存
from fastapi import FastAPI
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
app = FastAPI()
# 初始化缓存
@app.on_event('startup')
async def startup():
redis = aioredis.from_url('redis://localhost')
await FastAPICache.init(RedisBackend(redis), prefix='fastapi-cache')
@app.get('/items/{item_id}')
@cache(expire=60)
async def read_item(item_id: int):
return {'item_id': item_id, 'name': 'Item Name'}
测试
测试客户端
from fastapi.testclient import TestClient
from fastapi import FastAPI
app = FastAPI()
@app.get('/')
def read_root():
return {'hello': 'world'}
# 测试
client = TestClient(app)
def test_read_root():
response = client.get('/')
assert response.status_code == 200
assert response.json() == {'hello': 'world'}
def test_read_item():
response = client.get('/items/5')
assert response.status_code == 200
assert response.json() == {'item_id': 5}
Pytest
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_read_items():
async with AsyncClient(app=app, base_url='http://test') as ac:
response = await ac.get('/items/')
assert response.status_code == 200
assert response.json()[0]['name'] == 'Item 1'
@pytest.mark.asyncio
async def test_create_item():
async with AsyncClient(app=app, base_url='http://test') as ac:
response = await ac.post(
'/items/',
json={'name': 'Test Item', 'price': 10.5}
)
assert response.status_code == 200
assert response.json()['name'] == 'Test Item'
最佳实践
项目组织
# 使用 APIRouter
from fastapi import APIRouter
router = APIRouter(
prefix='/items',
tags=['items'],
responses={404: {'description': 'Not found'}},
)
@router.get('/')
def get_items():
return ['item1', 'item2']
# 在 main.py 中导入
from app.routers import items
app.include_router(items.router)
类型提示
# 始终使用类型提示
from typing import List, Optional
@app.get('/items/')
def get_items(q: Optional[str] = None) -> List[dict]:
results = [{'name': 'Item 1'}, {'name': 'Item 2'}]
if q:
results = [r for r in results if q.lower() in r['name'].lower()]
return results
# 使用 Pydantic 模型
from pydantic import BaseModel, EmailStr, HttpUrl
class UserBase(BaseModel):
email: EmailStr
full_name: Optional[str] = None
class User(UserBase):
id: int
错误处理
from fastapi import FastAPI, HTTPException, status
app = FastAPI()
# 使用状态码常量
@app.get('/items/{item_id}')
def read_item(item_id: int):
if item_id < 1:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Item ID must be positive'
)
return {'item_id': item_id}
# 自定义异常处理器
class AppException(Exception):
def __init__(self, status_code: int, detail: str):
self.status_code = status_code
self.detail = detail
@app.exception_handler(AppException)
async def app_exception_handler(request, exc: AppException):
return JSONResponse(
status_code=exc.status_code,
content={'detail': exc.detail}
)
依赖管理
# 集中管理依赖
# dependencies.py
from fastapi import Depends, Header, HTTPException, status
from typing import Optional
async def get_db():
# 数据库连接逻辑
pass
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
):
# 获取当前用户逻辑
pass
# 在路由中使用
@app.get('/users/me')
def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
总结
FastAPI 是一个现代、高性能的 Web 框架:
核心概念
- 类型提示:基于 Python 类型提示自动验证和转换数据
- 自动文档:自动生成 Swagger UI 和 ReDoc 文档
- 依赖注入:强大而优雅的依赖注入系统
- 异步支持:原生支持异步编程,高性能
- 数据验证:Pydantic 提供强大的数据验证
- 安全性:内置 OAuth2 和 JWT 认证
关键特性
- 高性能:与 NodeJS 和 Go 相当
- 快速开发:开发效率提升 200-300%
- 减少错误:减少约 40% 的人为(开发者)错误
- 简单易用:直观易学,代码简短
- 标准兼容:基于 OpenAPI 规范
最佳实践
- 使用 Pydantic 模型进行数据验证
- 利用依赖注入管理共享逻辑
- 使用 APIRouter 组织代码
- 编写完整的类型提示
- 实现完善的错误处理
- 使用异步数据库操作
- 添加缓存优化性能
- 使用 Docker 容器化部署
FastAPI 是构建现代 API 的理想选择,特别适合需要高性能和自动文档的微服务架构。