目录1、ORM 的核心优势2、主流 ORM 工具对比3、SQLAlchemy 异步版实战步骤步骤 1安装依赖步骤 2创建数据库提前准备步骤 3初始化 FastAPI 项目创建异步数据库引擎步骤 4定义 ORM 模型类映射数据库表步骤 5创建数据库会话依赖项注入到路由步骤 6ORM 核心操作增删改查CRUD1. 查询基础查询、条件查询2. 新增创建对象 → 添加到会话 → 提交事务3. 更新先查询 → 重新赋值 → 提交事务4. 删除先查询 → 删除对象 → 提交事务4、ORM 操作核心总结完整代码建环境项目初始化数据操作CRUD在 Web 开发中数据库操作是核心环节。原生 SQL 编写繁琐、易出错且存在 SQL 注入风险而ORM对象关系映射 技术可以让我们通过操作 Python 对象的方式与数据库交互无需直接编写 SQL大幅提升开发效率。FastAPI 中推荐使用SQLAlchemy异步版作为 ORM 工具它功能强大、灵活性高是企业级 Python 项目的首选本文将基于SQLAlchemy[asyncio] MySQL讲解异步 ORM 的实战使用。1、ORM 的核心优势告别原生 SQL通过 Python 对象操作数据库代码更简洁易读避免 SQL 注入ORM 自动做参数转义从根源上防止注入攻击代码复用性高减少重复的 SQL 语句统一数据库操作逻辑自动管理连接处理数据库连接、会话和事务无需手动管理跨数据库兼容修改数据库类型时无需大幅修改业务代码2、主流 ORM 工具对比排名ORM 工具特点适应场景1SQLAlchemy功能最强、最灵活企业级各类 API、微服务、数据应用2Django ORM封装好、上手快Django 项目、管理后台3Tortoise ORM全异步设计高并发异步 Web 服务3、SQLAlchemy 异步版实战步骤步骤 1安装依赖需要安装异步版 SQLAlchemy 和 MySQL 的异步驱动aiomysql# 安装异步SQLAlchemy pip install sqlalchemy[asyncio] -i https://mirrors.aliyun.com/pypi/simple/ # 安装MySQL异步驱动 pip install aiomysql -i https://mirrors.aliyun.com/pypi/simple/步骤 2创建数据库提前准备确保 MySQL 服务已启动连接 MySQL 并创建项目数据库按下 Win R输入 cmd 回车打开命令提示符# 假设 MySQL 安装在 C 盘 Program Files 下根据你的实际路径修改cd C:\Program Files\MySQL\MySQL Server 8.0\bin输入连接命令回车后输入MySQL 密码安装时设置的root密码输入密码后回车如下图所示说明连接成功接下来创建数据库-- 创建数据库指定字符集为utf8mb4支持emoji CREATE DATABASE IF NOT EXISTS fastapi_orm DEFAULT CHARACTER SET utf8mb4 DEFAULT COLLATE utf8mb4_general_ci;打开pycharm如下图就是连接成功了步骤 3初始化 FastAPI 项目创建异步数据库引擎数据库引擎是 SQLAlchemy 与数据库的连接核心异步版使用create_async_engine创建同时配置连接池参数提升数据库访问性能。from fastapi import FastAPI from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine from contextlib import asynccontextmanager # 数据库配置 DB_URL mysqlaiomysql://root:你的密码localhost:3306/fastapi_orm?charsetutf8mb4 # 全局异步引擎避免重复创建 async_engine: AsyncEngine | None None # 全局会话工厂延迟初始化 AsyncSessionLocal: async_sessionmaker | None None # 项目生命周期管理器替代app.on_event优化资源释放 asynccontextmanager async def lifespan(app: FastAPI): global async_engine # 应用启动初始化异步引擎 async_engine create_async_engine( DB_URL, echoTrue, # 输出SQL日志开发阶段开启生产阶段关闭 pool_size10, # 连接池常驻连接数 max_overflow20, # 连接池允许的额外连接数 pool_pre_pingTrue # 检查连接有效性避免无效连接 ) print(应用启动数据库引擎初始化完成) yield # 应用关闭优雅释放引擎和连接池 if async_engine: await async_engine.dispose() print(应用关闭数据库连接已释放) # 初始化FastAPI绑定生命周期 app FastAPI(lifespanlifespan)步骤 4定义 ORM 模型类映射数据库表ORM 的核心是模型类与数据库表的一一映射先定义基础模型类包含所有表的公共字段如创建时间、更新时间再定义业务模型类如书籍表、用户表。需要使用DeclarativeBase作为基类Mapped约定属性类型mapped_column映射数据库字段。from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column from sqlalchemy import DateTime, func, String, Float from datetime import datetime # 基础模型类包含公共的创建时间、更新时间 class Base(DeclarativeBase): # 创建时间默认当前时间仅插入时生效 create_time: Mapped[datetime] mapped_column(DateTime, server_defaultfunc.now(), comment创建时间) # 更新时间默认当前时间更新时自动刷新 update_time: Mapped[datetime] mapped_column(DateTime, server_defaultfunc.now(), onupdatefunc.now(), comment更新时间) # 书籍模型类映射book表 class Book(Base): __tablename__ book # 数据库表名 __table_args__ {extend_existing: True} # 允许重复加载模型避免报错 # 字段映射主键id、书名、作者、价格、出版社 id: Mapped[int] mapped_column(primary_keyTrue, autoincrementTrue, comment书籍ID) name: Mapped[str] mapped_column(String(255), comment书名) author: Mapped[str] mapped_column(String(255), comment作者) price: Mapped[float] mapped_column(Float, comment价格) publisher: Mapped[str] mapped_column(String(255), comment出版社) # 建表函数基于模型类创建数据库表 async def create_tables(): if not async_engine: raise RuntimeError(数据库引擎未初始化) # 开启事务创建所有未存在的表 async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) # 注意需要在lifespan的应用启动阶段调用建表函数 # 改造lifespan的yield上方代码 # await create_tables() # print(数据库表创建完成)改造后的lifespanasynccontextmanager async def lifespan(app: FastAPI): global async_engine async_engine create_async_engine(DB_URL, echoTrue, pool_size10, max_overflow20, pool_pre_pingTrue) await create_tables() # 调用建表函数 print(应用启动数据库引擎初始化完成表创建成功) yield if async_engine: await async_engine.dispose() print(应用关闭数据库连接已释放)步骤 5创建数据库会话依赖项注入到路由数据库会话Session是 ORM 操作数据库的核心对象通过依赖注入将会话注入到路由中实现会话的统一管理创建、使用、关闭、事务回滚。from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession from fastapi import Depends # 创建异步会话工厂绑定数据库引擎 AsyncSessionLocal async_sessionmaker( bindasync_engine, # 绑定数据库引擎 class_AsyncSession, # 指定会话类 expire_on_commitFalse # 提交后会话对象不过期避免重复查询 autoflushTrue # 开启自动刷新确保数据及时写入数据库 ) # 定义数据库会话依赖项路由中通过Depends调用 async def get_db(): if not AsyncSessionLocal: raise RuntimeError(数据库会话工厂未初始化) db AsyncSessionLocal() try: yield db # 将会话传递给路由 await db.commit() # 无异常则提交事务 except Exception as e: await db.rollback() # 有异常则回滚事务 raise e finally: await db.close() # 无论是否异常最终关闭会话改造后的lifespan# 项目生命周期管理器替代app.on_event优化资源释放 asynccontextmanager async def lifespan(app: FastAPI): global async_engine, AsyncSessionLocal # 应用启动初始化异步引擎 async_engine create_async_engine( DB_URL, echoTrue, # 输出SQL日志开发阶段开启生产阶段关闭 pool_size10, # 连接池常驻连接数 max_overflow20, # 连接池允许的额外连接数 pool_pre_pingTrue # 检查连接有效性避免无效连接 ) await create_tables() # 调用建表函数 # 引擎初始化完成后再创建会话工厂 AsyncSessionLocal async_sessionmaker( bindasync_engine, class_AsyncSession, expire_on_commitFalse, autoflushTrue # 开启自动刷新确保数据及时写入数据库 ) print(应用启动数据库引擎初始化完成表创建成功) yield # 应用关闭优雅释放引擎和连接池 if async_engine: await async_engine.dispose() print(应用关闭数据库连接已释放) # 初始化FastAPI绑定生命周期 app FastAPI(lifespanlifespan)步骤 6ORM 核心操作增删改查CRUD基于上述配置我们可以通过操作 Python 对象实现数据库的增删改查无需编写任何原生 SQL以下是书籍表的完整 CRUD 实战。前置定义 Pydantic 模型请求体校验FastAPI 中推荐使用 Pydantic 做请求参数校验定义与 Book 模型对应的 Pydantic 类接收前端传入的参数from pydantic import BaseModel # 书籍新增/更新的参数校验模型 class BookBase(BaseModel): name: str author: str price: float publisher: str # 支持Pydantic模型解析ORM对象 class BookResponse(BookBase): id: int create_time: datetime update_time: datetime # 查询数据库得到 ORM 对象后直接返回给前端无需手动转字典 class Config: orm_mode True # V1 写法V2 用 from_attributes True1. 查询基础查询、条件查询查询是数据库操作中最常用的场景SQLAlchemy 提供了丰富的查询语法核心是select()函数配合where()/offset()/limit()实现复杂查询。from fastapi import APIRouter from sqlalchemy import select, func from typing import List # 1. 查询所有书籍直接使用app定义路由 app.get(/book/all, response_modelList[BookResponse]) async def get_all_books(db: AsyncSession Depends(get_db)): result await db.execute(select(Book)) return result.scalars().all() # 2. 根据ID查询单本书籍直接使用app定义路由 app.get(/book/{book_id}, response_modelBookResponse | dict) async def get_book_by_id(book_id: int, db: AsyncSession Depends(get_db)): book await db.get(Book, book_id) if not book: return {code: 404, message: 书籍不存在} return book # 3. 条件查询价格小于指定值 作者模糊查询直接使用app定义路由 app.get(/book/search/condition, response_modelList[BookResponse]) async def search_book(price: float, author: str, db: AsyncSession Depends(get_db)): # 模糊匹配 like # result await db.execute(select(Book).where(Book.author.like(曹_))) # 逻辑与 ; , 作用同于 ; | 逻辑或 # result await db.execute(select(Book).where((Book.author.like(曹%))(Book.price100))) # result await db.execute(select(Book).where((Book.author.like(曹%)),(Book.price100))) # result await db.execute(select(Book).where((Book.author.like(曹%))|(Book.price100))) # 需求书籍id列表数据库里面的id在这个列表中 就返回 # id_list [1,3,5,7] # result await db.execute(select(Book).where(Book.id.in_(id_list))) result await db.execute( select(Book).where(Book.price price).where(Book.author.like(f%{author}%)) ) return result.scalars().all()2. 新增创建对象 → 添加到会话 → 提交事务核心步骤通过 Pydantic 模型创建 ORM 对象 → 用db.add()添加到会话 → 提交事务依赖项中自动提交。# 新增书籍 # 用户输入图书信息id、书名、作者、价格、出版社 - 新增 # 用户输入 - 参数 - 请求体 book_router.post(/add, response_modelBookResponse) async def add_book(book: BookBase, db: AsyncSession Depends(get_db)): # 获取 book 参数 创建图书对象__dict__ 返回 book 对象的属性字典 book_obj Book(**book.dict()) # 添加到数据库会话 db.add(book_obj) await db.commit() return book_obj3. 更新先查询 → 重新赋值 → 提交事务FastAPI 中 ORM 更新的核心是先查后改先根据主键查询到对象再对对象的属性重新赋值提交事务后自动同步到数据库。from fastapi import HTTPException # 7. 更新书籍信息 # 先查在改 # 设计思路路径参数 书籍id作用是查找请求体参数作用是新数据书名、作者、价格、出版社 app.put(/book/update/{book_id}, response_modelBookResponse) async def update_book(book_id: int, book: BookBase, db: AsyncSession Depends(get_db)): db_book await db.get(Book, book_id) # 未找到抛出异常 if not db_book: raise HTTPException(status_code404, detail书籍不存在) # 更新字段update_time会自动刷新 db_book.name book.name db_book.author book.author db_book.price book.price db_book.publisher book.publisher # 刷新会话确保字段更新 await db.flush() await db.refresh(db_book) return db_book4. 删除先查询 → 删除对象 → 提交事务核心步骤先查询对象 → 用db.delete()删除对象 → 提交事务完成数据库删除。# 8. 删除书籍直接使用app定义路由 app.delete(/book/delete/{book_id}) async def delete_book(book_id: int, db: AsyncSession Depends(get_db)): db_book await db.get(Book, book_id) # 未找到抛出异常 if not db_book: raise HTTPException(status_code404, detail书籍不存在) await db.delete(db_book) await db.commit() return {code: 200, message: 书籍删除成功}4、ORM 操作核心总结完整代码from fastapi import FastAPI, Depends, HTTPException from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine, async_sessionmaker, AsyncSession from contextlib import asynccontextmanager from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column from sqlalchemy import DateTime, func, String, Float, select from datetime import datetime from pydantic import BaseModel from typing import List # 数据库配置 DB_URL mysqlaiomysql://root:458362localhost:3306/fastapi_orm?charsetutf8mb4 # 全局异步引擎和会话工厂 async_engine: AsyncEngine | None None AsyncSessionLocal: async_sessionmaker | None None # 基础模型类 class Base(DeclarativeBase): create_time: Mapped[datetime] mapped_column( DateTime, comment创建时间, defaultdatetime.now() ) update_time: Mapped[datetime] mapped_column( DateTime, comment更新时间, defaultdatetime.now(), onupdatedatetime.now() ) # 书籍模型类 class Book(Base): __tablename__ book __table_args__ {extend_existing: True} id: Mapped[int] mapped_column(primary_keyTrue, autoincrementTrue, comment书籍ID) name: Mapped[str] mapped_column(String(255), comment书名) author: Mapped[str] mapped_column(String(255), comment作者) price: Mapped[float] mapped_column(Float, comment价格) publisher: Mapped[str] mapped_column(String(255), comment出版社) # 建表函数 async def create_tables(): if not async_engine: raise RuntimeError(数据库引擎未初始化) async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) # 项目生命周期管理器 asynccontextmanager async def lifespan(app: FastAPI): global async_engine, AsyncSessionLocal async_engine create_async_engine( DB_URL, echoTrue, pool_size10, max_overflow20, pool_pre_pingTrue ) await create_tables() AsyncSessionLocal async_sessionmaker( bindasync_engine, class_AsyncSession, expire_on_commitFalse, autoflushTrue ) print(应用启动数据库引擎初始化完成表创建成功) yield if async_engine: await async_engine.dispose() print(应用关闭数据库连接已释放) # 初始化FastAPI app FastAPI(lifespanlifespan) # 数据库会话依赖项 async def get_db(): if not AsyncSessionLocal: raise RuntimeError(数据库会话工厂未初始化) db AsyncSessionLocal() try: yield db await db.commit() except Exception as e: await db.rollback() raise e finally: await db.close() # Pydantic模型 class BookBase(BaseModel): name: str author: str price: float publisher: str class BookResponse(BookBase): id: int create_time: datetime update_time: datetime class Config: from_attributes True orm_mode True # 根路由 app.get(/) async def root(): return {message: FastAPI ORM项目运行成功, docs_url: /docs} # 查询所有书籍 app.get(/book/all, response_modelList[BookResponse]) async def get_all_books(db: AsyncSession Depends(get_db)): result await db.execute(select(Book)) return result.scalars().all() # 根据ID查询单本书籍 app.get(/book/{book_id}, response_modelBookResponse | dict) async def get_book_by_id(book_id: int, db: AsyncSession Depends(get_db)): book await db.get(Book, book_id) if not book: return {code: 404, message: 书籍不存在} return book # 条件查询价格小于指定值 作者模糊查询 app.get(/book/search/condition, response_modelList[BookResponse]) async def search_book(price: float, author: str, db: AsyncSession Depends(get_db)): result await db.execute( select(Book).where(Book.price price).where(Book.author.like(f%{author}%)) ) return result.scalars().all() # 新增书籍 app.post(/book/add, response_modelBookResponse) async def add_book(book: BookBase, db: AsyncSession Depends(get_db)): book_obj Book(**book.__dict__) db.add(book_obj) await db.commit() return book_obj # 更新书籍信息 app.put(/book/update/{book_id}, response_modelBookResponse) async def update_book(book_id: int, book: BookBase, db: AsyncSession Depends(get_db)): db_book await db.get(Book, book_id) if not db_book: raise HTTPException(status_code404, detail书籍不存在) db_book.name book.name db_book.author book.author db_book.price book.price db_book.publisher book.publisher await db.flush() await db.refresh(db_book) return db_book # 删除书籍 app.delete(/book/delete/{book_id}) async def delete_book(book_id: int, db: AsyncSession Depends(get_db)): db_book await db.get(Book, book_id) if not db_book: raise HTTPException(status_code404, detail书籍不存在) await db.delete(db_book) await db.commit() return {code: 200, message: 书籍删除成功}SQLAlchemy 异步版操作数据库的核心流程可概括为3 步建环境4 步做操作建环境项目初始化安装依赖sqlalchemy[asyncio] 对应数据库的异步驱动创建异步引擎create_async_engine配置连接池和日志定义模型类继承DeclarativeBase实现表与对象的映射数据操作CRUD创建会话工厂async_sessionmaker绑定引擎定义会话依赖项get_db()统一管理会话的创建和释放路由中注入会话db: AsyncSession Depends(get_db)操作对象通过select()/add()/delete() 对象属性赋值实现 CRUD。