FastAPI 完全指南:现代 Python Web 开发的终极选择

FastAPI 完全指南:现代 Python Web 开发的终极选择

目录

  1. 引言:为什么选择 FastAPI?
  2. 环境搭建与基础配置
  3. 核心概念深度解析
  4. 路由与请求处理
  5. 数据验证与序列化
  6. 依赖注入系统
  7. 数据库集成
  8. 认证与安全
  9. 中间件与后台任务
  10. 测试与部署
  11. 性能优化最佳实践

引言:为什么选择 FastAPI?

FastAPI 是由 Sebastián Ramírez 于 2018 年创建的现代、高性能 Web 框架。它基于 Starlette(ASGI 工具集)和 Pydantic(数据验证库),为 Python 开发者带来了革命性的开发体验。

核心优势

特性说明
极致性能与 Node.js 和 Go 相当,是 Python 最快的框架之一
自动文档内置 Swagger UI 和 ReDoc,零配置生成交互式 API 文档
类型安全基于 Python 3.6+ 类型提示,减少 40% 的人为错误
智能编辑器支持完整的自动补全和类型检查
异步原生原生支持 async/await,轻松处理高并发

环境搭建与基础配置

安装 FastAPI

# 基础安装 pip install fastapi ​ # 生产环境需要 ASGI 服务器 pip install "uvicorn[standard]" ​ # 完整安装(包含所有常用依赖) pip install fastapi[all]

最小可行应用(MVP)

创建一个 main.py

from fastapi import FastAPI ​ app = FastAPI(    title="My Awesome API",    description="这是一个展示 FastAPI 强大功能的示例 API",    version="1.0.0",    docs_url="/docs",      # Swagger UI 路径    redoc_url="/redoc"     # ReDoc 路径 ) ​ @app.get("/") async def root():    return {"message": "Hello FastAPI!", "framework": "FastAPI"} ​ @app.get("/items/{item_id}") async def read_item(item_id: int, q: str | None = None):    """   获取特定项目信息       - **item_id**: 项目的唯一标识符   - **q**: 可选的查询参数   """    return {"item_id": item_id, "q": q}

启动应用

# 开发模式(热重载) uvicorn main:app --reload --host 0.0.0.0 --port 8000 ​ # 生产模式(多 worker) uvicorn main:app --workers 4 --host 0.0.0.0 --port 8000

启动后访问:

  • API 文档:http://localhost:8000/docs
  • 替代文档:http://localhost:8000/redoc

核心概念深度解析

1. 路径操作(Path Operations)

FastAPI 使用装饰器定义路由,支持所有 HTTP 方法:

from fastapi import FastAPI, HTTPException, status from enum import Enum ​ app = FastAPI() ​ class ModelName(str, Enum):    alexnet = "alexnet"    resnet = "resnet"    lenet = "lenet" ​ # GET 请求 @app.get("/models/{model_name}") async def get_model(model_name: ModelName):    if model_name == ModelName.alexnet:        return {"model_name": model_name, "message": "Deep Learning FTW!"}    if model_name.value == "lenet":        return {"model_name": model_name, "message": "LeCNN all the images"}    return {"model_name": model_name, "message": "Have some residuals"} ​ # POST 请求 - 创建资源 @app.post("/items/", status_code=status.HTTP_201_CREATED) async def create_item(item: dict):    return {"item": item, "message": "Item created successfully"} ​ # PUT 请求 - 完整更新 @app.put("/items/{item_id}") async def update_item(item_id: int, item: dict):    return {"item_id": item_id, **item} ​ # PATCH 请求 - 部分更新 @app.patch("/items/{item_id}") async def partial_update(item_id: int, item: dict):    return {"item_id": item_id, "updated_fields": list(item.keys())} ​ # DELETE 请求 @app.delete("/items/{item_id}") async def delete_item(item_id: int):    return {"message": f"Item {item_id} deleted"}

2. 请求参数详解

FastAPI 支持多种参数类型,自动进行验证和转换:

from fastapi import FastAPI, Query, Path, Body, File, UploadFile from typing import Annotated ​ app = FastAPI() ​ # 查询参数(Query Parameters) @app.get("/items/") async def read_items(    # 必填参数    q: str,    # 可选参数(带默认值)    skip: int = 0,    limit: int = 10,    # 复杂验证    price: Annotated[        float | None,        Query(            gt=0,           # 大于 0            le=1000,        # 小于等于 1000            description="价格范围必须在 0-1000 之间",            alias="item-price"  # 别名:?item-price=100       )   ] = None,    # 列表参数:?tags=foo&tags=bar    tags: Annotated[list[str] | None, Query()] = None,    # 布尔参数:?is_active=true    is_active: bool = True ):    results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}], "skip": skip, "limit": limit}    if q:        results.update({"q": q})    return results ​ # 路径参数(Path Parameters)- 带验证 @app.get("/items/{item_id}") async def read_item(    item_id: Annotated[        int,        Path(            title="项目 ID",            description="项目的唯一标识符",            ge=1,  # 大于等于 1            le=1000       )   ] ):    return {"item_id": item_id} ​ # 请求体(Request Body) @app.post("/items/") async def create_item(    # 单个请求体    item: Annotated[dict, Body()],    # 嵌套请求体    user: Annotated[dict, Body(embed=True)],  # {"user": {...}}    # 额外数据    importance: Annotated[int, Body()] = 0 ):    return {        "item": item,        "user": user,        "importance": importance   } ​ # 文件上传 @app.post("/uploadfile/") async def create_upload_file(    file: UploadFile = File(..., description="上传的文件"),    # 多文件上传    files: list[UploadFile] = File(default=[], description="多个文件") ):    content = await file.read()    return {        "filename": file.filename,        "content_type": file.content_type,        "size": len(content),        "files_count": len(files)   }


数据验证与序列化

Pydantic 模型:FastAPI 的核心

from pydantic import BaseModel, Field, EmailStr, validator, root_validator from typing import Optional, List from datetime import datetime from enum import Enum ​ # 基础模型 class ItemBase(BaseModel):    title: str = Field(..., min_length=3, max_length=50, description="项目标题")    description: Optional[str] = Field(        None,        max_length=300,        title="项目描述",        example="这是一个示例描述"   )    price: float = Field(..., gt=0, description="必须大于 0")    tax: Optional[float] = None ​ # 创建时模型(无 ID) class ItemCreate(ItemBase):    pass ​ # 响应模型(包含 ID 和时间戳) class Item(ItemBase):    id: int    created_at: datetime    updated_at: Optional[datetime] = None        class Config:        orm_mode = True  # 支持 ORM 对象转换        schema_extra = {            "example": {                "title": "示例项目",                "description": "详细描述",                "price": 35.4,                "tax": 3.2,                "id": 1,                "created_at": "2024-01-01T00:00:00"           }       } ​ # 复杂验证示例 class UserCreate(BaseModel):    username: str = Field(..., min_length=3, max_length=20, regex="^[a-zA-Z0-9_]+$")    email: EmailStr    password: str = Field(..., min_length=8)    password_confirm: str    age: Optional[int] = Field(None, ge=18, le=120)        @validator('username')    def username_must_be_unique(cls, v):        # 模拟数据库检查        forbidden = ["admin", "root", "superuser"]        if v.lower() in forbidden:            raise ValueError(f'用户名 "{v}" 不可用')        return v        @root_validator    def passwords_match(cls, values):        pw1 = values.get('password')        pw2 = values.get('password_confirm')        if pw1 != pw2:            raise ValueError('两次输入的密码不匹配')        return values ​ # 使用模型 from fastapi import FastAPI ​ app = FastAPI() ​ @app.post("/items/", response_model=Item, status_code=201) async def create_item(item: ItemCreate):    # 模拟数据库操作    db_item = Item(        id=1,        created_at=datetime.now(),        **item.dict()   )    return db_item ​ @app.post("/users/", response_model=dict) async def create_user(user: UserCreate):    return {        "username": user.username,        "email": user.email,        "message": "用户创建成功"   }

嵌套模型与复杂数据结构

from pydantic import BaseModel, HttpUrl from typing import Set, List, Dict ​ class Image(BaseModel):    url: HttpUrl    name: str ​ class Item(BaseModel):    name: str    description: Optional[str] = None    price: float    tax: Optional[float] = None    tags: Set[str] = set()    images: Optional[List[Image]] = None        # 深度嵌套    metadata: Optional[Dict[str, str]] = None ​ # 使用示例 item_data = {    "name": "Foo",    "description": "The pretender",    "price": 42.0,    "tax": 3.2,    "tags": ["rock", "metal", "rock"],  # 自动去重为 Set    "images": [       {"url": "http://example.com/baz.jpg", "name": "The Foo live"},       {"url": "http://example.com/dave.jpg", "name": "The Baz"}   ],    "metadata": {"key1": "value1", "key2": "value2"} }


依赖注入系统

FastAPI 的依赖注入(Dependency Injection)是其最强大的特性之一,支持嵌套依赖、子依赖、路径操作装饰器依赖等。

from fastapi import Depends, HTTPException, status, Header from typing import Annotated from functools import lru_cache import jwt from datetime import datetime, timedelta ​ # 基础依赖 async def common_parameters(    q: str | None = None,    skip: int = 0,    limit: int = 100 ):    return {"q": q, "skip": skip, "limit": limit} ​ @app.get("/items/") async def read_items(commons: Annotated[dict, Depends(common_parameters)]):    return commons ​ # 类作为依赖(更复杂的场景) class CommonQueryParams:    def __init__(        self,        q: str | None = None,        skip: int = 0,        limit: int = 100   ):        self.q = q        self.skip = skip        self.limit = limit ​ @app.get("/users/") async def read_users(commons: Annotated[CommonQueryParams, Depends()]):    return commons ​ # 数据库依赖(实际应用模式) from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, Session ​ SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db" engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) ​ def get_db():    db = SessionLocal()    try:        yield db    finally:        db.close() ​ @app.get("/users/{user_id}") async def read_user(user_id: int, db: Annotated[Session, Depends(get_db)]):    # 使用 db 进行查询    return {"user_id": user_id} ​ # 认证依赖 SECRET_KEY = "your-secret-key" ALGORITHM = "HS256" ​ async def get_current_user(token: Annotated[str, Header(alias="X-Token")]):    credentials_exception = HTTPException(        status_code=status.HTTP_401_UNAUTHORIZED,        detail="Could not validate credentials",        headers={"WWW-Authenticate": "Bearer"},   )    try:        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])        username: str = payload.get("sub")        if username is None:            raise credentials_exception    except jwt.PyJWTError:        raise credentials_exception        # 模拟获取用户    user = {"username": username, "id": 1}    if user is None:        raise credentials_exception    return user ​ async def get_current_active_user(    current_user: Annotated[dict, Depends(get_current_user)] ):    if current_user.get("disabled"):        raise HTTPException(status_code=400, detail="Inactive user")    return current_user ​ @app.get("/users/me") async def read_users_me(    current_user: Annotated[dict, Depends(get_current_active_user)] ):    return current_user ​ # 路径操作装饰器依赖(适用于整个路由) async def verify_token(x_token: Annotated[str, Header()]):    if x_token != "fake-super-secret-token":        raise HTTPException(status_code=400, detail="X-Token header invalid") ​ async def verify_key(x_key: Annotated[str, Header()]):    if x_key != "fake-super-secret-key":        raise HTTPException(status_code=400, detail="X-Key header invalid")    return x_key ​ @app.get("/items/", dependencies=[Depends(verify_token), Depends(verify_key)]) async def read_items():    return [{"item": "Foo"}, {"item": "Bar"}] ​ # 带 yield 的依赖(用于资源管理) async def get_db_with_transaction():    db = SessionLocal()    try:        yield db        db.commit()  # 成功时提交    except Exception:        db.rollback()  # 失败时回滚        raise    finally:        db.close()


数据库集成

SQLAlchemy 2.0 + FastAPI 最佳实践

from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey from sqlalchemy.orm import declarative_base, relationship, Session, joinedload from sqlalchemy.sql import func from fastapi import FastAPI, Depends, HTTPException from pydantic import BaseModel from typing import List, Optional from contextlib import asynccontextmanager ​ # 数据库配置 DATABASE_URL = "postgresql://user:password@localhost/dbname" engine = create_engine(DATABASE_URL, pool_pre_ping=True, pool_size=10, max_overflow=20) Base = declarative_base() ​ # 模型定义 class User(Base):    __tablename__ = "users"        id = Column(Integer, primary_key=True, index=True)    email = Column(String, unique=True, index=True, nullable=False)    hashed_password = Column(String, nullable=False)    full_name = Column(String)    is_active = Column(Integer, default=1)    created_at = Column(DateTime(timezone=True), server_default=func.now())        items = relationship("Item", back_populates="owner", cascade="all, delete-orphan") ​ class Item(Base):    __tablename__ = "items"        id = Column(Integer, primary_key=True, index=True)    title = Column(String, index=True)    description = Column(String)    price = Column(Float)    owner_id = Column(Integer, ForeignKey("users.id"))        owner = relationship("User", back_populates="items") ​ # Pydantic 模型 class UserBase(BaseModel):    email: str    full_name: Optional[str] = None ​ class UserCreate(UserBase):    password: str ​ class UserResponse(UserBase):    id: int    is_active: bool    created_at: Optional[str] = None        class Config:        from_attributes = True ​ class ItemBase(BaseModel):    title: str    description: Optional[str] = None    price: float ​ class ItemCreate(ItemBase):    pass ​ class ItemResponse(ItemBase):    id: int    owner_id: int        class Config:        from_attributes = True ​ class UserWithItems(UserResponse):    items: List[ItemResponse] = [] ​ # 数据库会话管理 def get_db():    db = Session(bind=engine)    try:        yield db    finally:        db.close() ​ # 生命周期管理 @asynccontextmanager async def lifespan(app: FastAPI):    # 启动时创建表(生产环境使用 Alembic 迁移)    Base.metadata.create_all(bind=engine)    yield    # 关闭时清理    engine.dispose() ​ app = FastAPI(lifespan=lifespan) ​ # CRUD 操作 @app.post("/users/", response_model=UserResponse, status_code=201) def create_user(user: UserCreate, db: Session = Depends(get_db)):    # 检查邮箱是否已存在    db_user = db.query(User).filter(User.email == user.email).first()    if db_user:        raise HTTPException(status_code=400, detail="Email already registered")        # 创建用户(实际应用需要哈希密码)    fake_hashed_password = user.password + "notreallyhashed"    db_user = User(        email=user.email,        hashed_password=fake_hashed_password,        full_name=user.full_name   )    db.add(db_user)    db.commit()    db.refresh(db_user)    return db_user ​ @app.get("/users/", response_model=List[UserResponse]) def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):    users = db.query(User).offset(skip).limit(limit).all()    return users ​ @app.get("/users/{user_id}", response_model=UserWithItems) def read_user(user_id: int, db: Session = Depends(get_db)):    # 使用 joinedload 避免 N+1 查询    user = db.query(User).options(joinedload(User.items)).filter(User.id == user_id).first()    if user is None:        raise HTTPException(status_code=404, detail="User not found")    return user ​ @app.post("/users/{user_id}/items/", response_model=ItemResponse) def create_item_for_user(    user_id: int,    item: ItemCreate,    db: Session = Depends(get_db) ):    db_user = db.query(User).filter(User.id == user_id).first()    if not db_user:        raise HTTPException(status_code=404, detail="User not found")        db_item = Item(**item.dict(), owner_id=user_id)    db.add(db_item)    db.commit()    db.refresh(db_item)    return db_item ​ @app.delete("/users/{user_id}") def delete_user(user_id: int, db: Session = Depends(get_db)):    user = db.query(User).filter(User.id == user_id).first()    if not user:        raise HTTPException(status_code=404, detail="User not found")        db.delete(user)    db.commit()    return {"message": "User deleted successfully"}

异步数据库(推荐用于高并发)

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.future import select ​ # 使用异步驱动 ASYNC_DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname" async_engine = create_async_engine(ASYNC_DATABASE_URL, echo=True) AsyncSessionLocal = async_sessionmaker(async_engine, class_=AsyncSession, expire_on_commit=False) ​ async def get_async_db():    async with AsyncSessionLocal() as session:        yield session ​ @app.get("/users/async", response_model=List[UserResponse]) async def read_users_async(    skip: int = 0,    limit: int = 100,    db: AsyncSession = Depends(get_async_db) ):    result = await db.execute(select(User).offset(skip).limit(limit))    users = result.scalars().all()    return users


认证与安全

JWT 认证完整实现

from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pydantic import BaseModel ​ # 配置 SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 ​ # 密码加密上下文 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") ​ # OAuth2 方案 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") ​ # 模型 class Token(BaseModel):    access_token: str    token_type: str ​ class TokenData(BaseModel):    username: Optional[str] = None ​ class User(BaseModel):    username: str    email: Optional[str] = None    full_name: Optional[str] = None    disabled: Optional[bool] = None ​ class UserInDB(User):    hashed_password: str ​ # 模拟数据库 fake_users_db = {    "johndoe": {        "username": "johndoe",        "full_name": "John Doe",        "email": "[email protected]",        "hashed_password": pwd_context.hash("secret"),        "disabled": False,   } } ​ # 工具函数 def verify_password(plain_password, hashed_password):    return pwd_context.verify(plain_password, hashed_password) ​ def get_password_hash(password):    return pwd_context.hash(password) ​ def get_user(db, username: str):    if username in db:        user_dict = db[username]        return UserInDB(**user_dict) ​ def authenticate_user(fake_db, username: str, password: str):    user = get_user(fake_db, username)    if not user:        return False    if not verify_password(password, user.hashed_password):        return False    return user ​ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):    to_encode = data.copy()    if expires_delta:        expire = datetime.utcnow() + expires_delta    else:        expire = datetime.utcnow() + timedelta(minutes=15)    to_encode.update({"exp": expire})    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)    return encoded_jwt ​ # 依赖 async def get_current_user(token: str = Depends(oauth2_scheme)):    credentials_exception = HTTPException(        status_code=status.HTTP_401_UNAUTHORIZED,        detail="Could not validate credentials",        headers={"WWW-Authenticate": "Bearer"},   )    try:        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])        username: str = payload.get("sub")        if username is None:            raise credentials_exception        token_data = TokenData(username=username)    except JWTError:        raise credentials_exception    user = get_user(fake_users_db, username=token_data.username)    if user is None:        raise credentials_exception    return user ​ async def get_current_active_user(current_user: User = Depends(get_current_user)):    if current_user.disabled:        raise HTTPException(status_code=400, detail="Inactive user")    return current_user ​ # 路由 @app.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):    user = authenticate_user(fake_users_db, form_data.username, form_data.password)    if not user:        raise HTTPException(            status_code=status.HTTP_401_UNAUTHORIZED,            detail="Incorrect username or password",            headers={"WWW-Authenticate": "Bearer"},       )    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)    access_token = create_access_token(        data={"sub": user.username}, expires_delta=access_token_expires   )    return {"access_token": access_token, "token_type": "bearer"} ​ @app.get("/users/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_active_user)):    return current_user ​ @app.get("/users/me/items/") async def read_own_items(current_user: User = Depends(get_current_active_user)):    return [{"item_id": "Foo", "owner": current_user.username}]

CORS 配置与安全头

from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.trustedhost import TrustedHostMiddleware from fastapi.middleware.gzip import GZipMiddleware ​ app = FastAPI() ​ # CORS 配置 app.add_middleware(    CORSMiddleware,    allow_origins=["https://example.com", "https://www.example.com"],    allow_credentials=True,    allow_methods=["GET", "POST", "PUT", "DELETE"],    allow_headers=["*"],    expose_headers=["X-Custom-Header"],    max_age=600,  # 预检请求缓存时间 ) ​ # 受信任主机 app.add_middleware(    TrustedHostMiddleware,    allowed_hosts=["example.com", "*.example.com"] ) ​ # Gzip 压缩 app.add_middleware(GZipMiddleware, minimum_size=1000) ​ # 安全头中间件 @app.middleware("http") async def add_security_headers(request, call_next):    response = await call_next(request)    response.headers["X-Content-Type-Options"] = "nosniff"    response.headers["X-Frame-Options"] = "DENY"    response.headers["X-XSS-Protection"] = "1; mode=block"    response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"    return response


中间件与后台任务

自定义中间件

import time import logging from fastapi import Request from starlette.middleware.base import BaseHTTPMiddleware ​ # 日志配置 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) ​ class TimingMiddleware(BaseHTTPMiddleware):    async def dispatch(self, request: Request, call_next):        start_time = time.time()                # 记录请求信息        logger.info(f"Request: {request.method} {request.url}")                response = await call_next(request)                process_time = time.time() - start_time        response.headers["X-Process-Time"] = str(process_time)                logger.info(f"Response time: {process_time:.4f}s - Status: {response.status_code}")        return response ​ class RateLimitMiddleware(BaseHTTPMiddleware):    def __init__(self, app, max_requests: int = 100, window_seconds: int = 60):        super().__init__(app)        self.max_requests = max_requests        self.window_seconds = window_seconds        self.requests = {}        async def dispatch(self, request: Request, call_next):        client_ip = request.client.host        current_time = time.time()                # 清理过期记录        self.requests = {            ip: [t for t in times if current_time - t < self.window_seconds]            for ip, times in self.requests.items()       }                # 检查限制        if len(self.requests.get(client_ip, [])) >= self.max_requests:            from fastapi.responses import JSONResponse            return JSONResponse(                status_code=429,                content={"detail": "Rate limit exceeded"}           )                # 记录请求        if client_ip not in self.requests:            self.requests[client_ip] = []        self.requests[client_ip].append(current_time)                return await call_next(request) ​ app.add_middleware(TimingMiddleware) app.add_middleware(RateLimitMiddleware, max_requests=100, window_seconds=60)

后台任务

from fastapi import BackgroundTasks, Depends from typing import Annotated import asyncio import smtplib from email.mime.text import MIMEText ​ def send_email(email_to: str, subject: str, body: str):    """模拟发送邮件(实际应用中配置 SMTP)"""    print(f"Sending email to {email_to}")    # smtp_server = smtplib.SMTP("smtp.gmail.com", 587)    # ... ​ async def process_large_file(file_path: str):    """异步处理大文件"""    await asyncio.sleep(10)  # 模拟长时间处理    print(f"File {file_path} processed") ​ @app.post("/send-notification/{email}") async def send_notification(    email: str,    background_tasks: BackgroundTasks,    message: str = "Hello from FastAPI" ):    background_tasks.add_task(send_email, email, "Notification", message)    return {"message": "Notification sent in the background"} ​ @app.post("/upload/") async def upload_file(    background_tasks: BackgroundTasks,    file: UploadFile = File(...) ):    file_path = f"/tmp/{file.filename}"    with open(file_path, "wb") as f:        content = await file.read()        f.write(content)        # 添加后台任务    background_tasks.add_task(process_large_file, file_path)        return {        "filename": file.filename,        "status": "File uploaded, processing in background"   } ​ # 使用 Celery 进行更复杂的后台任务(生产环境推荐) """ from celery import Celery ​ celery_app = Celery(   "tasks",   broker="redis://localhost:6379/0",   backend="redis://localhost:6379/0" ) ​ @celery_app.task def heavy_computation(data: dict):   import time   time.sleep(30)   return {"result": "completed", "data": data} ​ @app.post("/heavy-task/") async def trigger_heavy_task(data: dict):   task = heavy_computation.delay(data)   return {"task_id": task.id, "status": "processing"} """


测试与部署

测试 FastAPI 应用

# test_main.py import pytest from fastapi.testclient import TestClient from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import StaticPool ​ from main import app, get_db, Base ​ # 内存数据库用于测试 SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:" ​ engine = create_engine(    SQLALCHEMY_DATABASE_URL,    connect_args={"check_same_thread": False},    poolclass=StaticPool, ) TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) ​ def override_get_db():    try:        db = TestingSessionLocal()        yield db    finally:        db.close() ​ app.dependency_overrides[get_db] = override_get_db client = TestClient(app) ​ @pytest.fixture(scope="function") def setup_database():    Base.metadata.create_all(bind=engine)    yield    Base.metadata.drop_all(bind=engine) ​ def test_create_user(setup_database):    response = client.post(        "/users/",        json={"email": "[email protected]", "password": "testpass123", "full_name": "Test User"}   )    assert response.status_code == 201    data = response.json()    assert data["email"] == "[email protected]"    assert "id" in data ​ def test_read_user(setup_database):    # 先创建用户    client.post(        "/users/",        json={"email": "[email protected]", "password": "testpass123"}   )        response = client.get("/users/1")    assert response.status_code == 200    assert response.json()["email"] == "[email protected]" ​ def test_read_nonexistent_user(setup_database):    response = client.get("/users/999")    assert response.status_code == 404 ​ def test_invalid_user_data(setup_database):    response = client.post(        "/users/",        json={"email": "invalid-email", "password": "short"}   )    assert response.status_code == 422  # 验证错误 ​ # 异步测试 import httpx import pytest_asyncio ​ @pytest_asyncio.fixture async def async_client():    async with httpx.AsyncClient(app=app, base_url="http://test") as ac:        yield ac ​ @pytest.mark.asyncio async def test_async_endpoint(async_client):    response = await async_client.get("/")    assert response.status_code == 200

Docker 部署

# Dockerfile FROM python:3.11-slim ​ WORKDIR /app ​ # 安装依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt ​ # 复制应用代码 COPY . . ​ # 非 root 用户运行 RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app USER appuser ​ # 暴露端口 EXPOSE 8000 ​ # 启动命令(生产环境使用 gunicorn + uvicorn) CMD ["gunicorn", "main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:8000"] # docker-compose.yml version: '3.8' ​ services: web:   build: .   ports:     - "8000:8000"   environment:     - DATABASE_URL=postgresql://postgres:password@db:5432/fastapi_db     - SECRET_KEY=${SECRET_KEY}   depends_on:     - db     - redis   volumes:     - ./:/app   command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload ​ db:   image: postgres:15   environment:     - POSTGRES_USER=postgres     - POSTGRES_PASSWORD=password     - POSTGRES_DB=fastapi_db   volumes:     - postgres_data:/var/lib/postgresql/data   ports:     - "5432:5432" ​ redis:   image: redis:7-alpine   ports:     - "6379:6379" ​ volumes: postgres_data:

Kubernetes 部署配置

# k8s-deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: fastapi-app spec: replicas: 3 selector:   matchLabels:     app: fastapi template:   metadata:     labels:       app: fastapi   spec:     containers:     - name: fastapi       image: your-registry/fastapi-app:latest       ports:       - containerPort: 8000       env:       - name: DATABASE_URL         valueFrom:           secretKeyRef:             name: db-secret             key: url       resources:         requests:           memory: "256Mi"           cpu: "250m"         limits:           memory: "512Mi"           cpu: "500m"       livenessProbe:         httpGet:           path: /health           port: 8000         initialDelaySeconds: 30         periodSeconds: 10       readinessProbe:         httpGet:           path: /ready           port: 8000         initialDelaySeconds: 5         periodSeconds: 5 --- apiVersion: v1 kind: Service metadata: name: fastapi-service spec: selector:   app: fastapi ports: - port: 80   targetPort: 8000 type: LoadBalancer


性能优化最佳实践

1. 异步数据库连接池

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker import aioredis ​ # 优化连接池配置 async_engine = create_async_engine(    "postgresql+asyncpg://user:pass@localhost/db",    pool_size=20,              # 基础连接数    max_overflow=10,           # 最大溢出连接    pool_pre_ping=True,        # 连接前 ping 检查    pool_recycle=3600,         # 连接回收时间    echo=False                 # 生产环境关闭 SQL 日志 ) ​ # Redis 连接池 redis_pool = aioredis.ConnectionPool.from_url(    "redis://localhost",    max_connections=100,    decode_responses=True )

2. 缓存策略

from fastapi_cache import FastAPICache from fastapi_cache.backends.redis import RedisBackend from fastapi_cache.decorator import cache from redis import asyncio as aioredis ​ @app.on_event("startup") async def startup():    redis = aioredis.from_url("redis://localhost", encoding="utf8", decode_responses=True)    FastAPICache.init(RedisBackend(redis), prefix="fastapi-cache") ​ @app.get("/heavy-computation") @cache(expire=3600)  # 缓存 1 小时 async def heavy_computation():    # 模拟耗时操作    await asyncio.sleep(2)    return {"result": "expensive data"} ​ # 手动缓存控制 from fastapi_cache.coder import JsonCoder ​ @app.get("/users/{user_id}") async def get_user(user_id: int):    cache_key = f"user:{user_id}"        # 尝试从缓存获取    cached = await FastAPICache.get_backend().get(cache_key)    if cached:        return json.loads(cached)        # 查询数据库    user = await fetch_user_from_db(user_id)        # 写入缓存    await FastAPICache.get_backend().set(        cache_key,        json.dumps(user),        expire=300   )    return user

3. 响应模型优化

from fastapi import Response from fastapi.responses import ORJSONResponse, StreamingResponse import orjson ​ # 使用更快的 JSON 库 @app.get("/items/", response_class=ORJSONResponse) async def get_items():    return [{"id": i, "data": "value"} for i in range(1000)] ​ # 流式响应(大文件) @app.get("/large-file/") async def get_large_file():    def file_generator():        with open("large_file.zip", "rb") as f:            while chunk := f.read(8192):                yield chunk        return StreamingResponse(        file_generator(),        media_type="application/zip",        headers={"Content-Disposition": "attachment; filename=large_file.zip"}   ) ​ # 分页优化 from fastapi import Query from sqlalchemy import func ​ @app.get("/items/paginated/") async def get_items_paginated(    page: int = Query(1, ge=1),    size: int = Query(20, ge=1, le=100),    db: AsyncSession = Depends(get_async_db) ):    offset = (page - 1) * size        # 使用 count(*) OVER() 优化总数查询    result = await db.execute(        select(            Item,            func.count().over().label("total")       )       .offset(offset)       .limit(size)   )        rows = result.all()    total = rows[0].total if rows else 0        return {        "items": [row.Item for row in rows],        "total": total,        "page": page,        "size": size,        "pages": (total + size - 1) // size   }

4. 性能监控

from prometheus_client import Counter, Histogram, generate_latest from starlette.middleware.base import BaseHTTPMiddleware ​ # Prometheus 指标 REQUEST_COUNT = Counter(    'http_requests_total',    'Total HTTP requests',   ['method', 'endpoint', 'status'] ) ​ REQUEST_LATENCY = Histogram(    'http_request_duration_seconds',    'HTTP request latency',   ['method', 'endpoint'] ) ​ class PrometheusMiddleware(BaseHTTPMiddleware):    async def dispatch(self, request, call_next):        start_time = time.time()        response = await call_next(request)        duration = time.time() - start_time                REQUEST_COUNT.labels(            method=request.method,            endpoint=request.url.path,            status=response.status_code       ).inc()                REQUEST_LATENCY.labels(            method=request.method,            endpoint=request.url.path       ).observe(duration)                return response ​ app.add_middleware(PrometheusMiddleware) ​ @app.get("/metrics") async def metrics():    return Response(generate_latest(), media_type="text/plain")


总结

FastAPI 代表了 Python Web 开发的现代化方向,它通过以下特性成为构建高性能 API 的首选框架:

  1. 开发效率:类型提示带来的智能补全和自动文档生成
  2. 运行时性能:异步原生设计,媲美 Node.js 和 Go
  3. 工程规范:内置数据验证、依赖注入、认证授权等生产级功能
  4. 生态兼容:与 SQLAlchemy、Pydantic、Celery 等主流库无缝集成

学习路径建议

  1. 入门:掌握基础路由、请求参数、Pydantic 模型
  2. 进阶:深入依赖注入系统、数据库集成、异步编程
  3. 生产:学习 Docker 部署、Kubernetes 编排、监控告警
  4. 架构:微服务拆分、事件驱动架构、CQRS 模式

FastAPI 不仅是一个框架,更是一种现代 Python 工程实践的体现。无论你是构建小型微服务还是大型分布式系统,FastAPI 都能提供坚实的技术支撑。


本文代码示例基于 FastAPI 0.104+ 和 Python 3.11+,建议在实际项目中使用最新稳定版本。

Read more

假网站排全网第二,真官网翻五页都找不到!NanoClaw创始人破防:SEO之战,我快要输了

假网站排全网第二,真官网翻五页都找不到!NanoClaw创始人破防:SEO之战,我快要输了

整理 | 苏宓 出品 | ZEEKLOG(ID:ZEEKLOGnews) 自从 OpenClaw 爆火之后,各种“Claw”项目接连出现,其中以安全优化版 NanoClaw 最为知名。它的核心代码仅有 4000 行,却获得了 AI 大牛 Andrej Karpathy 的点赞。 可谁也没想到,这款口碑极佳的开源项目,近来竟被一个仿冒网站抢了风头。 投诉无门之下,NanoClaw 创始人 Gavriel Cohen 在 X 社交平台上无奈发文怒斥:谷歌搜索错误地将假网站排在真官网前面,不仅破坏了项目声誉,还埋下了严重的安全隐患,而他费尽心力,却只能哀叹一句——“我正在为自己的开源项目打 SEO 战,但我快要输了。” 那么,NanoClaw 究竟发生了什么?又是怎么走红的?事情还要从 OpenClaw

By Ne0inhk
曝Windows 12将于今年发布?以AI为核心、NPU成「硬件门槛」,网友吐槽:“不想要的全塞进来了”

曝Windows 12将于今年发布?以AI为核心、NPU成「硬件门槛」,网友吐槽:“不想要的全塞进来了”

整理 | 郑丽媛 出品 | ZEEKLOG(ID:ZEEKLOGnews) 当年,微软一句“Windows 10 将是最后一个版本”的表态,让不少用户以为 Windows 进入了“只更新、不换代”的时代。但几年过去,现实却完全不同。 在 Windows 11 发布之后,如今关于 Windows 12 的传闻再次密集出现。从内部代号、代码片段,到硬件厂商的暗示与 OEM 预热标签,种种线索拼在一起,勾勒出一个明显的趋势——这不会只是一次常规升级,而更像是一次围绕 AI 的平台级重构。 更关键的是,这次争议,可能远比当年 TPM 2.0 更大。 精准卡位 Windows 10 退场的时间?

By Ne0inhk
Python热度下滑、AI能取代搜索引擎?TIOBE最新榜单揭晓!

Python热度下滑、AI能取代搜索引擎?TIOBE最新榜单揭晓!

整理 | 屠敏 出品 | ZEEKLOG(ID:ZEEKLOGnews) 日前,TIOBE 发布了最新的 3 月编程语言榜单。整体来看,本月排名变化不算大,但榜单中仍然出现了一些值得关注的小波动。  AI 工具能帮大家秒懂最新编程语言趋势? 由于 2 月天数较少,3 月的榜单整体变化有限。借着这次发布,TIOBE CEO Paul Jansen 也回应了一个最近被频繁讨论的问题:为什么 TIOBE 指数仍然依赖搜索引擎统计结果?在大语言模型流行的今天,直接询问 AI 哪些编程语言最流行,是不是更简单? 对此,Jansen 的回答是否定的。 他解释称,TIOBE 指数本质上统计的是互联网上关于某种编程语言的网页数量。而大语言模型的训练数据同样来自这些网页内容,因此从信息来源来看,两者并没有本质区别。换句话说,LLM 的判断,本质上也是建立在这些网页数据之上的。 Python 活跃度仍在下降

By Ne0inhk
“裸奔龙虾”数量已达27万只,业内人士警告;AI浪潮下,中传“砍掉”翻译等16个专业;薪资谈判破裂,三星电子8.9万人要罢工 | 极客头条

“裸奔龙虾”数量已达27万只,业内人士警告;AI浪潮下,中传“砍掉”翻译等16个专业;薪资谈判破裂,三星电子8.9万人要罢工 | 极客头条

「极客头条」—— 技术人员的新闻圈! ZEEKLOG 的读者朋友们好,「极客头条」来啦,快来看今天都有哪些值得我们技术人关注的重要新闻吧。(投稿或寻求报道:[email protected]) 整理 | 郑丽媛 出品 | ZEEKLOG(ID:ZEEKLOGnews) 一分钟速览新闻点! * “裸奔龙虾”已高达27万只!业内人士警告:一旦黑客入侵,敏感信息一秒搬空 * 阿里云 CTO 周靖人代管千问模型一号位,刘大一恒管理更多团队 * 中国传媒大学砍掉翻译、摄影等 16 个本科专业,直言教育要面向人机分工时代 * 雷军放话:小米将很快推出 L3、L4 的驾驶 * 消息称原理想汽车智驾一号位郎咸朋具身智能赛道创业 * vivo 前产品经理宋紫薇创业,瞄准 AI 时尚Agent,获亿元融资 * MiniMax 发布龙虾新技能,股价暴涨超 23% * 薪资谈判破裂,三星电子

By Ne0inhk