from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker # 创建数据库连接引擎 # SQLite示例 engine = create_engine('sqlite:///example.db', echo=True) # PostgreSQL示例 # engine = create_engine('postgresql://username:password@localhost:5432/mydatabase') # MySQL示例 # engine = create_engine('mysql+mysqlconnector://username:password@localhost:3306/mydatabase') # 创建会话工厂 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # 创建会话实例 session = SessionLocal()
from sqlalchemy import Column, Integer, String, ForeignKey from sqlalchemy.orm import relationship, declarative_base # 创建基类 Base = declarative_base() class User(Base): tablename = 'users' id = Column(Integer, primary_key=True, index=True) name = Column(String(50), nullable=False) email = Column(String(100), unique=True, index=True) # 定义一对多关系 posts = relationship("Post", back_populates="author") class Post(Base): tablename = 'posts' id = Column(Integer, primary_key=True, index=True) title = Column(String(100), nullable=False) content = Column(String(500)) author_id = Column(Integer, ForeignKey('users.id')) # 定义多对一关系 author = relationship("User", back_populates="posts") # 定义多对多关系(通过关联表) tags = relationship("Tag", secondary="post_tags", back_populates="posts") class Tag(Base): tablename = 'tags' id = Column(Integer, primary_key=True, index=True) name = Column(String(30), unique=True, nullable=False) posts = relationship("Post", secondary="post_tags", back_populates="tags") # 关联表(用于多对多关系) class PostTag(Base): tablename = 'post_tags' post_id = Column(Integer, ForeignKey('posts.id'), primary_key=True) tag_id = Column(Integer, ForeignKey('tags.id'), primary_key=True)
from sqlalchemy import or_ # 等值过滤 user = session.query(User).filter(User.name == "张三").first() # 模糊查询 users = session.query(User).filter(User.name.like("张%")).all() # IN查询 users = session.query(User).filter(User.name.in_( ["张三", "李四"] )).all() # 多条件查询 users = session.query(User).filter( User.name == "张三", User.email.like("%@example.com") ).all() # 或条件 users = session.query(User).filter( or_(User.name == "张三", User.name == "李四") ).all() # 不等于 users = session.query(User).filter(User.name != "张三").all()
from sqlalchemy import func # 计数 count = session.query(User).count() # 分组计数 user_post_count = session.query( User.name, func.count(Post.id) ).join(Post).group_by(User.name).all() # 求和、平均值等 avg_id = session.query(func.avg(User.id)).scalar()