FastAPI 核心概念
FastAPI 是一个现代、快速(高性能)的 Python Web 框架,用于构建 API。它基于标准 Python 类型提示,使用 Starlette 和 Pydantic 构建。
主要特点
- 高性能:与 NodeJS 和 Go 相当,是最快的 Python 框架之一
- 快速开发:开发速度提高约 200%-300%
- 减少错误:减少约 40% 的人为错误
- 直观:强大的编辑器支持,自动补全无处不在
- 简单:易于使用和学习,减少阅读文档的时间
ASGI 与异步编程
什么是 ASGI?
ASGI(Asynchronous Server Gateway Interface)是 Python 异步 Web 服务器和应用程序之间的标准接口。
WSGI vs ASGI 对比:
实例
# WSGI 应用(同步)- 传统 Flask 风格
def wsgi_app(environ, start_response):
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
return [b'Hello World']
# ASGI 应用(异步)- FastAPI 风格
async def asgi_app(scope, receive, send):
await send({
'type': 'http.response.start',
'status': 200,
'headers': [(b'content-type', b'text/plain')],
})
await send({
'type': 'http.response.body',
'body': b'Hello World',
})
def wsgi_app(environ, start_response):
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
return [b'Hello World']
# ASGI 应用(异步)- FastAPI 风格
async def asgi_app(scope, receive, send):
await send({
'type': 'http.response.start',
'status': 200,
'headers': [(b'content-type', b'text/plain')],
})
await send({
'type': 'http.response.body',
'body': b'Hello World',
})
异步编程核心概念
同步 vs 异步执行:
实例
import asyncio
import time
import httpx
# 同步方式 - 阻塞执行
def sync_fetch_data():
start_time = time.time()
# 模拟三个网络请求
time.sleep(1) # 第一个请求
time.sleep(1) # 第二个请求
time.sleep(1) # 第三个请求
print(f"同步执行耗时: {time.time() - start_time:.2f}秒") # 约3秒
# 异步方式 - 并发执行
async def async_fetch_data():
start_time = time.time()
# 三个请求并发执行
await asyncio.gather(
asyncio.sleep(1), # 第一个请求
asyncio.sleep(1), # 第二个请求
asyncio.sleep(1), # 第三个请求
)
print(f"异步执行耗时: {time.time() - start_time:.2f}秒") # 约1秒
# 运行示例
sync_fetch_data() # 输出: 同步执行耗时: 3.00秒
asyncio.run(async_fetch_data()) # 输出: 异步执行耗时: 1.00秒
import time
import httpx
# 同步方式 - 阻塞执行
def sync_fetch_data():
start_time = time.time()
# 模拟三个网络请求
time.sleep(1) # 第一个请求
time.sleep(1) # 第二个请求
time.sleep(1) # 第三个请求
print(f"同步执行耗时: {time.time() - start_time:.2f}秒") # 约3秒
# 异步方式 - 并发执行
async def async_fetch_data():
start_time = time.time()
# 三个请求并发执行
await asyncio.gather(
asyncio.sleep(1), # 第一个请求
asyncio.sleep(1), # 第二个请求
asyncio.sleep(1), # 第三个请求
)
print(f"异步执行耗时: {time.time() - start_time:.2f}秒") # 约1秒
# 运行示例
sync_fetch_data() # 输出: 同步执行耗时: 3.00秒
asyncio.run(async_fetch_data()) # 输出: 异步执行耗时: 1.00秒
FastAPI 中的异步路径操作:
实例
from fastapi import FastAPI
import asyncio
app = FastAPI()
# 同步路径操作
@app.get("/sync")
def sync_endpoint():
# 同步操作会阻塞整个应用
time.sleep(2)
return {"message": "同步响应"}
# 异步路径操作(推荐)
@app.get("/async")
async def async_endpoint():
# 异步操作不会阻塞其他请求
await asyncio.sleep(2)
return {"message": "异步响应"}
# 异步数据库操作示例
@app.get("/users/{user_id}")
async def get_user(user_id: int):
# 异步数据库查询
user = await database.fetch_one(
"SELECT * FROM users WHERE id = :user_id",
{"user_id": user_id}
)
return user
import asyncio
app = FastAPI()
# 同步路径操作
@app.get("/sync")
def sync_endpoint():
# 同步操作会阻塞整个应用
time.sleep(2)
return {"message": "同步响应"}
# 异步路径操作(推荐)
@app.get("/async")
async def async_endpoint():
# 异步操作不会阻塞其他请求
await asyncio.sleep(2)
return {"message": "异步响应"}
# 异步数据库操作示例
@app.get("/users/{user_id}")
async def get_user(user_id: int):
# 异步数据库查询
user = await database.fetch_one(
"SELECT * FROM users WHERE id = :user_id",
{"user_id": user_id}
)
return user
何时使用异步?
适合异步的场景:
- 数据库操作
- 网络请求(API 调用)
- 文件 I/O 操作
- 长时间等待的操作
不适合异步的场景:
- CPU 密集型计算
- 简单的数据处理
- 没有 I/O 等待的操作
实例
# 好的异步用法
@app.get("/weather")
async def get_weather():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.weather.com/data")
return response.json()
# 不必要的异步(没有 I/O 等待)
@app.get("/calculate")
def calculate_sync(): # 保持同步即可
result = sum(range(1000000))
return {"result": result}
@app.get("/weather")
async def get_weather():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.weather.com/data")
return response.json()
# 不必要的异步(没有 I/O 等待)
@app.get("/calculate")
def calculate_sync(): # 保持同步即可
result = sum(range(1000000))
return {"result": result}
REST API 设计原则
REST 基础概念
REST(Representational State Transfer)是一种 Web API 设计风格,强调:
- 资源导向:URL 表示资源
- 状态无关:每个请求都是独立的
- 统一接口:使用标准 HTTP 方法
- 分层系统:支持缓存、负载均衡等
RESTful URL 设计
良好的 URL 设计示例:
实例
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel
from typing import List, Optional
app = FastAPI()
# 资源集合和单个资源
@app.get("/users") # GET /users - 获取用户列表
async def get_users():
return users_db
@app.get("/users/{user_id}") # GET /users/123 - 获取特定用户
async def get_user(user_id: int):
return find_user(user_id)
@app.post("/users") # POST /users - 创建新用户
async def create_user(user: UserCreate):
return create_new_user(user)
@app.put("/users/{user_id}") # PUT /users/123 - 完整更新用户
async def update_user(user_id: int, user: UserUpdate):
return update_existing_user(user_id, user)
@app.patch("/users/{user_id}") # PATCH /users/123 - 部分更新用户
async def patch_user(user_id: int, user: UserPatch):
return patch_existing_user(user_id, user)
@app.delete("/users/{user_id}") # DELETE /users/123 - 删除用户
async def delete_user(user_id: int):
return delete_existing_user(user_id)
# 嵌套资源
@app.get("/users/{user_id}/posts") # 获取用户的所有文章
async def get_user_posts(user_id: int):
return get_posts_by_user(user_id)
@app.post("/users/{user_id}/posts") # 为用户创建新文章
async def create_user_post(user_id: int, post: PostCreate):
return create_post_for_user(user_id, post)
from pydantic import BaseModel
from typing import List, Optional
app = FastAPI()
# 资源集合和单个资源
@app.get("/users") # GET /users - 获取用户列表
async def get_users():
return users_db
@app.get("/users/{user_id}") # GET /users/123 - 获取特定用户
async def get_user(user_id: int):
return find_user(user_id)
@app.post("/users") # POST /users - 创建新用户
async def create_user(user: UserCreate):
return create_new_user(user)
@app.put("/users/{user_id}") # PUT /users/123 - 完整更新用户
async def update_user(user_id: int, user: UserUpdate):
return update_existing_user(user_id, user)
@app.patch("/users/{user_id}") # PATCH /users/123 - 部分更新用户
async def patch_user(user_id: int, user: UserPatch):
return patch_existing_user(user_id, user)
@app.delete("/users/{user_id}") # DELETE /users/123 - 删除用户
async def delete_user(user_id: int):
return delete_existing_user(user_id)
# 嵌套资源
@app.get("/users/{user_id}/posts") # 获取用户的所有文章
async def get_user_posts(user_id: int):
return get_posts_by_user(user_id)
@app.post("/users/{user_id}/posts") # 为用户创建新文章
async def create_user_post(user_id: int, post: PostCreate):
return create_post_for_user(user_id, post)
URL 设计原则:
# ✅ 好的设计 GET /api/v1/users # 获取用户列表 GET /api/v1/users/123 # 获取用户 123 POST /api/v1/users # 创建用户 GET /api/v1/users/123/orders # 获取用户 123 的订单 # ❌ 不好的设计 GET /api/v1/getAllUsers # 动词不应该在 URL 中 GET /api/v1/user/123 # 集合名应该用复数 POST /api/v1/createUser # URL 中包含动作 GET /api/v1/users-orders-123 # 关系不清晰
资源的层次关系
实例
# 用户和文章的关系设计
class User(BaseModel):
id: int
name: str
email: str
class Post(BaseModel):
id: int
title: str
content: str
author_id: int
# 主资源操作
@app.get("/users")
async def list_users() -> List[User]:
return users
@app.get("/posts")
async def list_posts() -> List[Post]:
return posts
# 关联资源操作
@app.get("/users/{user_id}/posts")
async def get_user_posts(user_id: int) -> List[Post]:
"""获取特定用户的所有文章"""
return [post for post in posts if post.author_id == user_id]
@app.get("/posts/{post_id}/author")
async def get_post_author(post_id: int) -> User:
"""获取文章的作者信息"""
post = find_post(post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
return find_user(post.author_id)
class User(BaseModel):
id: int
name: str
email: str
class Post(BaseModel):
id: int
title: str
content: str
author_id: int
# 主资源操作
@app.get("/users")
async def list_users() -> List[User]:
return users
@app.get("/posts")
async def list_posts() -> List[Post]:
return posts
# 关联资源操作
@app.get("/users/{user_id}/posts")
async def get_user_posts(user_id: int) -> List[Post]:
"""获取特定用户的所有文章"""
return [post for post in posts if post.author_id == user_id]
@app.get("/posts/{post_id}/author")
async def get_post_author(post_id: int) -> User:
"""获取文章的作者信息"""
post = find_post(post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
return find_user(post.author_id)
HTTP 方法与状态码
HTTP 方法的语义
实例
from fastapi import FastAPI, status, HTTPException
from fastapi.responses import JSONResponse
app = FastAPI()
# GET - 安全且幂等
@app.get("/users/{user_id}")
async def get_user(user_id: int):
"""获取资源,不修改服务器状态"""
user = find_user(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
# POST - 不安全且非幂等
@app.post("/users", status_code=status.HTTP_201_CREATED)
async def create_user(user: UserCreate):
"""创建新资源"""
if user_exists(user.email):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="User already exists"
)
new_user = create_new_user(user)
return new_user
# PUT - 不安全但幂等
@app.put("/users/{user_id}")
async def replace_user(user_id: int, user: UserUpdate):
"""完整替换资源"""
if not user_exists(user_id):
# PUT 可以创建资源
return create_user_with_id(user_id, user)
return replace_existing_user(user_id, user)
# PATCH - 不安全且通常非幂等
@app.patch("/users/{user_id}")
async def update_user(user_id: int, user: UserPatch):
"""部分更新资源"""
existing_user = find_user(user_id)
if not existing_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return patch_user_fields(existing_user, user)
# DELETE - 不安全但幂等
@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: int):
"""删除资源"""
if not user_exists(user_id):
# 幂等性:删除不存在的资源仍返回成功
return JSONResponse(status_code=status.HTTP_204_NO_CONTENT)
delete_existing_user(user_id)
return JSONResponse(status_code=status.HTTP_204_NO_CONTENT)
from fastapi.responses import JSONResponse
app = FastAPI()
# GET - 安全且幂等
@app.get("/users/{user_id}")
async def get_user(user_id: int):
"""获取资源,不修改服务器状态"""
user = find_user(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
# POST - 不安全且非幂等
@app.post("/users", status_code=status.HTTP_201_CREATED)
async def create_user(user: UserCreate):
"""创建新资源"""
if user_exists(user.email):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="User already exists"
)
new_user = create_new_user(user)
return new_user
# PUT - 不安全但幂等
@app.put("/users/{user_id}")
async def replace_user(user_id: int, user: UserUpdate):
"""完整替换资源"""
if not user_exists(user_id):
# PUT 可以创建资源
return create_user_with_id(user_id, user)
return replace_existing_user(user_id, user)
# PATCH - 不安全且通常非幂等
@app.patch("/users/{user_id}")
async def update_user(user_id: int, user: UserPatch):
"""部分更新资源"""
existing_user = find_user(user_id)
if not existing_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return patch_user_fields(existing_user, user)
# DELETE - 不安全但幂等
@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: int):
"""删除资源"""
if not user_exists(user_id):
# 幂等性:删除不存在的资源仍返回成功
return JSONResponse(status_code=status.HTTP_204_NO_CONTENT)
delete_existing_user(user_id)
return JSONResponse(status_code=status.HTTP_204_NO_CONTENT)
HTTP 状态码的使用
实例
from fastapi import status
# 2xx 成功响应
@app.post("/users", status_code=status.HTTP_201_CREATED) # 201 Created
@app.get("/users") # 200 OK(默认)
@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT) # 204 No Content
# 4xx 客户端错误
@app.get("/users/{user_id}")
async def get_user(user_id: int):
if user_id < 1:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, # 400 Bad Request
detail="User ID must be positive"
)
user = find_user(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, # 404 Not Found
detail="User not found"
)
return user
# 业务逻辑错误
@app.post("/users/{user_id}/follow")
async def follow_user(user_id: int, current_user_id: int):
if user_id == current_user_id:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, # 422 Unprocessable Entity
detail="Cannot follow yourself"
)
if is_already_following(current_user_id, user_id):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, # 409 Conflict
detail="Already following this user"
)
# 5xx 服务器错误(通常由异常处理器处理)
@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, # 500 Internal Server Error
content={"detail": "Internal server error"}
)
# 2xx 成功响应
@app.post("/users", status_code=status.HTTP_201_CREATED) # 201 Created
@app.get("/users") # 200 OK(默认)
@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT) # 204 No Content
# 4xx 客户端错误
@app.get("/users/{user_id}")
async def get_user(user_id: int):
if user_id < 1:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, # 400 Bad Request
detail="User ID must be positive"
)
user = find_user(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, # 404 Not Found
detail="User not found"
)
return user
# 业务逻辑错误
@app.post("/users/{user_id}/follow")
async def follow_user(user_id: int, current_user_id: int):
if user_id == current_user_id:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, # 422 Unprocessable Entity
detail="Cannot follow yourself"
)
if is_already_following(current_user_id, user_id):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, # 409 Conflict
detail="Already following this user"
)
# 5xx 服务器错误(通常由异常处理器处理)
@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, # 500 Internal Server Error
content={"detail": "Internal server error"}
)
常用状态码速查:
# 成功状态码 200 OK # 请求成功 201 Created # 资源创建成功 204 No Content # 成功但无内容返回 206 Partial Content # 部分内容(分页、断点续传) # 重定向 301 Moved Permanently # 永久重定向 302 Found # 临时重定向 304 Not Modified # 资源未修改(缓存) # 客户端错误 400 Bad Request # 请求格式错误 401 Unauthorized # 未认证 403 Forbidden # 已认证但无权限 404 Not Found # 资源不存在 405 Method Not Allowed # HTTP 方法不允许 409 Conflict # 资源冲突 422 Unprocessable Entity # 数据验证失败 429 Too Many Requests # 请求过于频繁 # 服务器错误 500 Internal Server Error # 服务器内部错误 502 Bad Gateway # 网关错误 503 Service Unavailable # 服务不可用
SON 数据格式
JSON 基础与最佳实践
实例
from datetime import datetime
from decimal import Decimal
from typing import Optional, List
from pydantic import BaseModel, Field
import json
# JSON 数据格式规范
class UserResponse(BaseModel):
id: int
username: str
email: str
full_name: Optional[str] = None
is_active: bool = True
created_at: datetime
updated_at: Optional[datetime] = None
# JSON 序列化配置
class Config:
# 允许使用字段别名
allow_population_by_field_name = True
# JSON 编码器
json_encoders = {
datetime: lambda v: v.isoformat(),
Decimal: lambda v: float(v)
}
# 响应格式标准化
class APIResponse(BaseModel):
"""标准 API 响应格式"""
success: bool = True
message: str = "操作成功"
data: Optional[dict] = None
errors: Optional[List[str]] = None
timestamp: datetime = Field(default_factory=datetime.now)
# 分页响应格式
class PaginatedResponse(BaseModel):
items: List[dict]
total: int
page: int
size: int
pages: int
@app.get("/users", response_model=PaginatedResponse)
async def get_users(page: int = 1, size: int = 10):
"""返回分页的用户列表"""
users = get_users_paginated(page, size)
total = count_users()
return PaginatedResponse(
items=users,
total=total,
page=page,
size=size,
pages=(total + size - 1) // size
)
from decimal import Decimal
from typing import Optional, List
from pydantic import BaseModel, Field
import json
# JSON 数据格式规范
class UserResponse(BaseModel):
id: int
username: str
email: str
full_name: Optional[str] = None
is_active: bool = True
created_at: datetime
updated_at: Optional[datetime] = None
# JSON 序列化配置
class Config:
# 允许使用字段别名
allow_population_by_field_name = True
# JSON 编码器
json_encoders = {
datetime: lambda v: v.isoformat(),
Decimal: lambda v: float(v)
}
# 响应格式标准化
class APIResponse(BaseModel):
"""标准 API 响应格式"""
success: bool = True
message: str = "操作成功"
data: Optional[dict] = None
errors: Optional[List[str]] = None
timestamp: datetime = Field(default_factory=datetime.now)
# 分页响应格式
class PaginatedResponse(BaseModel):
items: List[dict]
total: int
page: int
size: int
pages: int
@app.get("/users", response_model=PaginatedResponse)
async def get_users(page: int = 1, size: int = 10):
"""返回分页的用户列表"""
users = get_users_paginated(page, size)
total = count_users()
return PaginatedResponse(
items=users,
total=total,
page=page,
size=size,
pages=(total + size - 1) // size
)
JSON Schema 与数据验证
实例
from pydantic import BaseModel, validator, Field
from typing import Optional, List
from enum import Enum
class UserRole(str, Enum):
ADMIN = "admin"
USER = "user"
MODERATOR = "moderator"
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=50, description="用户名")
email: str = Field(..., regex=r'^[\w\.-]+@[\w\.-]+\.\w+$', description="邮箱地址")
password: str = Field(..., min_length=8, description="密码")
full_name: Optional[str] = Field(None, max_length=100, description="全名")
role: UserRole = Field(UserRole.USER, description="用户角色")
age: Optional[int] = Field(None, ge=0, le=150, description="年龄")
@validator('username')
def username_must_be_alphanumeric(cls, v):
if not v.isalnum():
raise ValueError('用户名只能包含字母和数字')
return v
@validator('password')
def password_strength(cls, v):
if not any(c.isupper() for c in v):
raise ValueError('密码必须包含至少一个大写字母')
if not any(c.islower() for c in v):
raise ValueError('密码必须包含至少一个小写字母')
if not any(c.isdigit() for c in v):
raise ValueError('密码必须包含至少一个数字')
return v
class Config:
# 生成 JSON Schema 示例
schema_extra = {
"example": {
"username": "johndoe",
"email": "john@example.com",
"password": "SecurePass123",
"full_name": "John Doe",
"role": "user",
"age": 30
}
}
# 使用自定义验证器
@app.post("/users", response_model=UserResponse)
async def create_user(user: UserCreate):
"""创建新用户,包含数据验证"""
# Pydantic 自动验证输入数据
return await create_new_user(user)
from typing import Optional, List
from enum import Enum
class UserRole(str, Enum):
ADMIN = "admin"
USER = "user"
MODERATOR = "moderator"
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=50, description="用户名")
email: str = Field(..., regex=r'^[\w\.-]+@[\w\.-]+\.\w+$', description="邮箱地址")
password: str = Field(..., min_length=8, description="密码")
full_name: Optional[str] = Field(None, max_length=100, description="全名")
role: UserRole = Field(UserRole.USER, description="用户角色")
age: Optional[int] = Field(None, ge=0, le=150, description="年龄")
@validator('username')
def username_must_be_alphanumeric(cls, v):
if not v.isalnum():
raise ValueError('用户名只能包含字母和数字')
return v
@validator('password')
def password_strength(cls, v):
if not any(c.isupper() for c in v):
raise ValueError('密码必须包含至少一个大写字母')
if not any(c.islower() for c in v):
raise ValueError('密码必须包含至少一个小写字母')
if not any(c.isdigit() for c in v):
raise ValueError('密码必须包含至少一个数字')
return v
class Config:
# 生成 JSON Schema 示例
schema_extra = {
"example": {
"username": "johndoe",
"email": "john@example.com",
"password": "SecurePass123",
"full_name": "John Doe",
"role": "user",
"age": 30
}
}
# 使用自定义验证器
@app.post("/users", response_model=UserResponse)
async def create_user(user: UserCreate):
"""创建新用户,包含数据验证"""
# Pydantic 自动验证输入数据
return await create_new_user(user)
错误响应格式
实例
class ErrorDetail(BaseModel):
field: str
message: str
code: str
class ErrorResponse(BaseModel):
error: str
message: str
details: Optional[List[ErrorDetail]] = None
timestamp: datetime = Field(default_factory=datetime.now)
# 自定义异常处理
from fastapi import Request
from fastapi.exceptions import RequestValidationError
from starlette.exceptions import HTTPException as StarletteHTTPException
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
errors = []
for error in exc.errors():
errors.append(ErrorDetail(
field='.'.join(str(x) for x in error['loc']),
message=error['msg'],
code=error['type']
))
return JSONResponse(
status_code=422,
content=ErrorResponse(
error="Validation Error",
message="请求数据验证失败",
details=errors
).dict()
)
@app.exception_handler(StarletteHTTPException)
async def http_exception_handler(request: Request, exc: StarletteHTTPException):
return JSONResponse(
status_code=exc.status_code,
content=ErrorResponse(
error=f"HTTP {exc.status_code}",
message=exc.detail
).dict()
)
field: str
message: str
code: str
class ErrorResponse(BaseModel):
error: str
message: str
details: Optional[List[ErrorDetail]] = None
timestamp: datetime = Field(default_factory=datetime.now)
# 自定义异常处理
from fastapi import Request
from fastapi.exceptions import RequestValidationError
from starlette.exceptions import HTTPException as StarletteHTTPException
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
errors = []
for error in exc.errors():
errors.append(ErrorDetail(
field='.'.join(str(x) for x in error['loc']),
message=error['msg'],
code=error['type']
))
return JSONResponse(
status_code=422,
content=ErrorResponse(
error="Validation Error",
message="请求数据验证失败",
details=errors
).dict()
)
@app.exception_handler(StarletteHTTPException)
async def http_exception_handler(request: Request, exc: StarletteHTTPException):
return JSONResponse(
status_code=exc.status_code,
content=ErrorResponse(
error=f"HTTP {exc.status_code}",
message=exc.detail
).dict()
)
FastAPI 特有概念
自动文档生成
实例
from fastapi import FastAPI, Query, Path, Body
from fastapi.openapi.utils import get_openapi
app = FastAPI(
title="我的 API",
description="这是一个示例 API,展示 FastAPI 的功能",
version="1.0.0",
terms_of_service="http://example.com/terms/",
contact={
"name": "开发者",
"url": "http://example.com/contact/",
"email": "developer@example.com",
},
license_info={
"name": "MIT",
"url": "https://opensource.org/licenses/MIT",
},
)
@app.get(
"/users/{user_id}",
summary="获取用户信息",
description="根据用户 ID 获取用户的详细信息",
response_description="用户信息对象",
tags=["用户管理"]
)
async def get_user(
user_id: int = Path(..., title="用户ID", description="要获取的用户ID", ge=1),
include_posts: bool = Query(False, title="包含文章", description="是否包含用户的文章列表")
):
"""
获取用户信息:
- **user_id**: 用户的唯一标识符
- **include_posts**: 是否在响应中包含用户的文章列表
返回用户的基本信息,如果 include_posts 为 True,还会包含文章列表。
"""
user = find_user(user_id)
if include_posts:
user.posts = get_user_posts(user_id)
return user
# 自定义 OpenAPI schema
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title="自定义 API 文档",
version="2.5.0",
description="这是自定义的 OpenAPI schema",
routes=app.routes,
)
# 添加自定义信息
openapi_schema["info"]["x-logo"] = {
"url": "https://example.com/logo.png"
}
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
from fastapi.openapi.utils import get_openapi
app = FastAPI(
title="我的 API",
description="这是一个示例 API,展示 FastAPI 的功能",
version="1.0.0",
terms_of_service="http://example.com/terms/",
contact={
"name": "开发者",
"url": "http://example.com/contact/",
"email": "developer@example.com",
},
license_info={
"name": "MIT",
"url": "https://opensource.org/licenses/MIT",
},
)
@app.get(
"/users/{user_id}",
summary="获取用户信息",
description="根据用户 ID 获取用户的详细信息",
response_description="用户信息对象",
tags=["用户管理"]
)
async def get_user(
user_id: int = Path(..., title="用户ID", description="要获取的用户ID", ge=1),
include_posts: bool = Query(False, title="包含文章", description="是否包含用户的文章列表")
):
"""
获取用户信息:
- **user_id**: 用户的唯一标识符
- **include_posts**: 是否在响应中包含用户的文章列表
返回用户的基本信息,如果 include_posts 为 True,还会包含文章列表。
"""
user = find_user(user_id)
if include_posts:
user.posts = get_user_posts(user_id)
return user
# 自定义 OpenAPI schema
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title="自定义 API 文档",
version="2.5.0",
description="这是自定义的 OpenAPI schema",
routes=app.routes,
)
# 添加自定义信息
openapi_schema["info"]["x-logo"] = {
"url": "https://example.com/logo.png"
}
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
依赖注入系统预览
实例
from fastapi import Depends
# 简单依赖
def get_current_user_id() -> int:
# 从 token 中解析用户 ID
return 123
def get_database_session():
# 创建数据库会话
db = DatabaseSession()
try:
yield db
finally:
db.close()
# 使用依赖
@app.get("/profile")
async def get_profile(
user_id: int = Depends(get_current_user_id),
db = Depends(get_database_session)
):
return get_user_profile(db, user_id)
# 依赖链
def get_current_user(
user_id: int = Depends(get_current_user_id),
db = Depends(get_database_session)
):
return db.query(User).filter(User.id == user_id).first()
@app.get("/dashboard")
async def get_dashboard(
current_user: User = Depends(get_current_user) # 依赖于其他依赖
):
return generate_dashboard(current_user)
# 简单依赖
def get_current_user_id() -> int:
# 从 token 中解析用户 ID
return 123
def get_database_session():
# 创建数据库会话
db = DatabaseSession()
try:
yield db
finally:
db.close()
# 使用依赖
@app.get("/profile")
async def get_profile(
user_id: int = Depends(get_current_user_id),
db = Depends(get_database_session)
):
return get_user_profile(db, user_id)
# 依赖链
def get_current_user(
user_id: int = Depends(get_current_user_id),
db = Depends(get_database_session)
):
return db.query(User).filter(User.id == user_id).first()
@app.get("/dashboard")
async def get_dashboard(
current_user: User = Depends(get_current_user) # 依赖于其他依赖
):
return generate_dashboard(current_user)
类型提示的重要性
实例
from typing import List, Optional, Union, Dict, Any
from pydantic import BaseModel
# FastAPI 完全依赖类型提示进行:
# 1. 数据验证
# 2. 文档生成
# 3. 编辑器支持
@app.get("/items/{item_id}")
async def get_item(
item_id: int, # 路径参数,自动转换为 int
q: Optional[str] = None, # 可选查询参数
limit: int = 10 # 有默认值的查询参数
) -> Dict[str, Any]: # 返回类型提示
"""类型提示告诉 FastAPI 如何处理参数和响应"""
return {"item_id": item_id, "q": q, "limit": limit}
# 复杂类型提示
@app.post("/items/")
async def create_items(
items: List[ItemCreate] # 接收 ItemCreate 对象列表
) -> List[ItemResponse]: # 返回 ItemResponse 对象列表
return [create_item(item) for item in items]
# Union 类型(多种可能的类型)
@app.get("/search")
async def search(
q: str,
result_type: str = "json"
) -> Union[List[Dict], str]: # 可以返回字典列表或字符串
if result_type == "json":
return [{"title": "结果1"}, {"title": "结果2"}]
else:
return "结果1, 结果2"
from pydantic import BaseModel
# FastAPI 完全依赖类型提示进行:
# 1. 数据验证
# 2. 文档生成
# 3. 编辑器支持
@app.get("/items/{item_id}")
async def get_item(
item_id: int, # 路径参数,自动转换为 int
q: Optional[str] = None, # 可选查询参数
limit: int = 10 # 有默认值的查询参数
) -> Dict[str, Any]: # 返回类型提示
"""类型提示告诉 FastAPI 如何处理参数和响应"""
return {"item_id": item_id, "q": q, "limit": limit}
# 复杂类型提示
@app.post("/items/")
async def create_items(
items: List[ItemCreate] # 接收 ItemCreate 对象列表
) -> List[ItemResponse]: # 返回 ItemResponse 对象列表
return [create_item(item) for item in items]
# Union 类型(多种可能的类型)
@app.get("/search")
async def search(
q: str,
result_type: str = "json"
) -> Union[List[Dict], str]: # 可以返回字典列表或字符串
if result_type == "json":
return [{"title": "结果1"}, {"title": "结果2"}]
else:
return "结果1, 结果2"
核心概念实践示例
实例
from fastapi import FastAPI, HTTPException, status, Depends, Query
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime
import asyncio
# 综合示例:博客 API
app = FastAPI(
title="博客 API",
description="展示 FastAPI 核心概念的博客系统",
version="1.0.0"
)
# 数据模型
class PostBase(BaseModel):
title: str = Field(..., min_length=1, max_length=200)
content: str = Field(..., min_length=1)
published: bool = Field(True)
class PostCreate(PostBase):
pass
class PostResponse(PostBase):
id: int
author_id: int
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
orm_mode = True
# 模拟数据库
posts_db = []
next_id = 1
# 依赖:获取当前用户(简化版)
async def get_current_user() -> int:
# 实际应用中会从 JWT token 解析
return 1
# 异步路径操作
@app.get("/posts", response_model=List[PostResponse], tags=["文章"])
async def list_posts(
skip: int = Query(0, ge=0, description="跳过的文章数"),
limit: int = Query(10, ge=1, le=100, description="返回的文章数"),
published_only: bool = Query(True, description="只返回已发布的文章")
):
"""
获取文章列表
支持分页和筛选功能
"""
# 模拟异步数据库查询
await asyncio.sleep(0.1)
filtered_posts = posts_db
if published_only:
filtered_posts = [p for p in posts_db if p["published"]]
return filtered_posts[skip:skip + limit]
@app.post("/posts", response_model=PostResponse, status_code=status.HTTP_201_CREATED, tags=["文章"])
async def create_post(
post: PostCreate,
current_user_id: int = Depends(get_current_user)
):
"""
创建新文章
需要用户认证
"""
global next_id
# 模拟异步数据库操作
await asyncio.sleep(0.1)
new_post = {
"id": next_id,
"title": post.title,
"content": post.content,
"published": post.published,
"author_id": current_user_id,
"created_at": datetime.now(),
"updated_at": None
}
posts_db.append(new_post)
next_id += 1
return new_post
@app.get("/posts/{post_id}", response_model=PostResponse, tags=["文章"])
async def get_post(post_id: int):
"""
获取特定文章
根据文章 ID 返回文章详情
"""
# 模拟异步数据库查询
await asyncio.sleep(0.1)
post = next((p for p in posts_db if p["id"] == post_id), None)
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Post with id {post_id} not found"
)
return post
# 健康检查端点
@app.get("/health", tags=["系统"])
async def health_check():
"""系统健康检查"""
return {
"status": "healthy",
"timestamp": datetime.now(),
"posts_count": len(posts_db)
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime
import asyncio
# 综合示例:博客 API
app = FastAPI(
title="博客 API",
description="展示 FastAPI 核心概念的博客系统",
version="1.0.0"
)
# 数据模型
class PostBase(BaseModel):
title: str = Field(..., min_length=1, max_length=200)
content: str = Field(..., min_length=1)
published: bool = Field(True)
class PostCreate(PostBase):
pass
class PostResponse(PostBase):
id: int
author_id: int
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
orm_mode = True
# 模拟数据库
posts_db = []
next_id = 1
# 依赖:获取当前用户(简化版)
async def get_current_user() -> int:
# 实际应用中会从 JWT token 解析
return 1
# 异步路径操作
@app.get("/posts", response_model=List[PostResponse], tags=["文章"])
async def list_posts(
skip: int = Query(0, ge=0, description="跳过的文章数"),
limit: int = Query(10, ge=1, le=100, description="返回的文章数"),
published_only: bool = Query(True, description="只返回已发布的文章")
):
"""
获取文章列表
支持分页和筛选功能
"""
# 模拟异步数据库查询
await asyncio.sleep(0.1)
filtered_posts = posts_db
if published_only:
filtered_posts = [p for p in posts_db if p["published"]]
return filtered_posts[skip:skip + limit]
@app.post("/posts", response_model=PostResponse, status_code=status.HTTP_201_CREATED, tags=["文章"])
async def create_post(
post: PostCreate,
current_user_id: int = Depends(get_current_user)
):
"""
创建新文章
需要用户认证
"""
global next_id
# 模拟异步数据库操作
await asyncio.sleep(0.1)
new_post = {
"id": next_id,
"title": post.title,
"content": post.content,
"published": post.published,
"author_id": current_user_id,
"created_at": datetime.now(),
"updated_at": None
}
posts_db.append(new_post)
next_id += 1
return new_post
@app.get("/posts/{post_id}", response_model=PostResponse, tags=["文章"])
async def get_post(post_id: int):
"""
获取特定文章
根据文章 ID 返回文章详情
"""
# 模拟异步数据库查询
await asyncio.sleep(0.1)
post = next((p for p in posts_db if p["id"] == post_id), None)
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Post with id {post_id} not found"
)
return post
# 健康检查端点
@app.get("/health", tags=["系统"])
async def health_check():
"""系统健康检查"""
return {
"status": "healthy",
"timestamp": datetime.now(),
"posts_count": len(posts_db)
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
点我分享笔记