Python3.x:SQLAlchemy操作数据库
前言
SQLAlchemy是一个ORM框架(Object Rational Mapping,对象关系映射),它可以帮助我们更加优雅、更加高效的实现数据库操作,而且还不限于mysql。
SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] pyodbc sybase+pyodbc://<username>:<password>@<dsn>[/<database>] Python-Sybase sybase+pysybase://<username>:<password>@<dsn>/[database name] mxODBC sybase+mxodbc://<username>:<password>@<dsnname>
SQLAlchemy库安装
pip install sqlalchemy #安装mysql pip install pymysql #安装mysql-connector2.2.3版本会报错:Unable to find Protobuf include directory. #所以我们指定安装的版本 pip install mysql-connector==2.1.4
示例代码(mysql数据库)
# python3 # author lizm # datetime 2018-01-28 12:00:00 ''' Demo:sqlalchemy对mysql数据库的操作 ''' from sqlalchemy import Column,Integer, String, create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base import pymysql # 创建对象的基类: Base = declarative_base() # 定义Channel对象: class Channel(Base): # 表名 __tablename__ = 'playback' # 表结构 # Column:行声明,可指定主键 Integer:数据类型 String:数据类型,可指定长度 id = Column(Integer,primary_key=True,autoincrement=True) channel_name = Column(String(45),unique=True, nullable=False) address = Column(String(80),unique=True, nullable=False) service_name = Column(String(45),unique=True, nullable=False) def __init__(self,id,channel_name,address,service_name): self.id = id self.channel_name = channel_name self.address = address self.service_name = service_name # 初始化数据库连接, # 传入参数:数据库类型+连接库+用户名+密码+主机,字符编码,是否打印建表细节 engine = create_engine('mysql+mysqlconnector://root:[email protected]:3306/pythondb',encoding='utf-8') # 创建表 Base.metadata.create_all(engine) # 删除表 # Base.metadata.drop_all(engine) # 创建DBSession类型: DBSession = sessionmaker(bind=engine) session = DBSession() try: # 增操作 item1 = Channel(id='1',channel_name='cctv8',address='http://10.10.10.1/cctv8',service_name='news') session.add(item1) item2 = Channel(id='2',channel_name='cctv10',address='http://10.10.10.1/cctv10',service_name='sports') session.add(item2) item3 = Channel(id='3',channel_name='cctv12',address='http://10.10.10.1/cctv12',service_name='economics') session.add(item3) #提交数据 session.commit() except Exception as e: session.rollback() finally: #关闭 session.close() # 查操作 session1 = DBSession() # 输出sql 语句 print("查询sql语句:%s"%session1.query(Channel).filter(Channel.id < '3')) # 返回的是一个类似列表的对象 channel = session1.query(Channel).filter(Channel.id < '3').all() for i in range(len(channel)): print(channel[i].id) print(channel[i].channel_name) print(channel[i].address) print(channel[i].service_name) session1.close() # 改操作 session2 = DBSession() session2.query(Channel).filter(Channel.id == '2').update({Channel.service_name: 'movie',Channel.address: '127.0.0.1'}, synchronize_session=False) session2.commit() session2.close() ## 查看修改结果 session3 = DBSession() print(session3.query(Channel).filter(Channel.id == '2').one().service_name) session3.close() # 删操作 session4 = DBSession() session4.query(Channel).filter(Channel.id == '3').delete() session4.commit() session4.close()
示例代码(sybase数据库:odbc连接方式需要先配置好odbc连接):
# python3 # author lizm # datetime 2018-02-28 12:00:00 ''' Demo:sqlalchemy对sybase数据库的操作 ''' from selenium import webdriver from sqlalchemy import Column,Integer,String,DateTime,create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import and_,func import pyodbc import time,datetime # 创建对象的基类: Base = declarative_base() # 定义Channel对象: class Channel(Base): # 表名 __tablename__ = 'shrjj' # 表结构 # Column:行声明,可指定主键 Integer:数据类型 String:数据类型,可指定长度 id = Column(Integer,primary_key=True,autoincrement=True) rpname = Column(String(500), nullable=False) rpdate = Column(String(50)) jjzt = Column(String(255)) fbsjj = Column(String(255)) etf = Column(String(255)) lof = Column(String(255)) fjlof = Column(String(255)) create_date = Column(DateTime,nullable=False) update_date = Column(DateTime,nullable=False) def __init__(self,rpname,rpdate,jjzt,fbsjj,etf,lof,fjlof,create_date,update_date): self.rpname = rpname self.rpdate = rpdate self.jjzt = jjzt self.fbsjj = fbsjj self.etf = etf self.lof = lof self.fjlof = fjlof self.create_date = create_date self.update_date = update_date # 初始化数据库连接, # 传入参数:数据库类型+连接库+用户名+密码+主机,字符编码,是否打印建表细节 engine = create_engine('sybase+pyodbc://username:[email protected]') # 创建表 #Base.metadata.create_all(engine) # 创建DBSession类型: DBSession = sessionmaker(bind=engine) # 查操作 session1 = DBSession() # 输出sql 语句 print("查询sql语句:%s"%session1.query(Channel).filter(Channel.rpdate == '2018-02-12')) # 返回的是一个类似列表的对象 channel = session1.query(Channel).filter(Channel.rpdate == '2018-02-12').all() for i in range(len(channel)): print(channel[i].rpname) print(channel[i].rpdate) print(channel[i].jjzt) session1.close() # 改操作 session2 = DBSession() session2.query(Channel).filter(Channel.rpdate == '2018-02-12').update({Channel.jjzt: '0'}, synchronize_session=False) session2.commit() session2.close() # 查看修改结果 session3 = DBSession() print(session3.query(Channel).filter(Channel.rpdate == '2018-02-12').one().jjzt) session3.close() #新增数据操作 session4 = DBSession() # 增操作 item1 = Channel(rpname='市价总值',rpdate='2018-02-29',jjzt='1',fbsjj='1',etf='1',lof='1',fjlof='1',create_date=time.strftime('%Y-%m-%d %H:%M:%S'),update_date=time.strftime('%Y-%m-%d %H:%M:%S')) session4.add(item1) #提交数据 session4.commit() #关闭 session4.close()
SQLAlchemy中列类型.配置选项和关系选项
常见的列类型:
类型名称 python类型 描述
Integer int 常规整形,通常为32位
SmallInteger int 短整形,通常为16位
BigInteger int或long 精度不受限整形
Float float 浮点数
Numeric decimal.Decimal 定点数
String str 可变长度字符串
Text str 可变长度字符串,适合大量文本
Unicode unicode 可变长度Unicode字符串
Boolean bool 布尔型
Date datetime.date 日期类型
Time datetime.time 时间类型
Interval datetime.timedelta 时间间隔
Enum str 字符列表
PickleType 任意Python对象 自动Pickle序列化
LargeBinary str 二进制
常见的SQLALCHEMY列选项:
可选参数 描述
primary_key 如果设置为True,则为该列表的主键
unique 如果设置为True,该列不允许相同值
index 如果设置为True,为该列创建索引,查询效率会更高
nullable 如果设置为True,该列允许为空。如果设置为False,该列不允许空值
default 定义该列的默认值
常见sqlalchemy查询
#简单查询 print(session.query(User).all()) print(session.query(User.name, User.fullname).all()) print(session.query(User, User.name).all()) #带条件查询 print(session.query(User).filter_by(name='user1').all()) print(session.query(User).filter(User.name == "user").all()) print(session.query(User).filter(User.name.like("user%")).all()) #多条件查询 from sqlalchemy import and_ print(session.query(User).filter(and_(User.name.like("user%"), User.fullname.like("first%"))).all()) from sqlalchemy import or_ print(session.query(User).filter(or_(User.name.like("user%"), User.password != None)).all()) #sql过滤 print(session.query(User).filter("id>:id").params(id=1).all()) #关联查询 print(session.query(User, Address).filter(User.id == Address.user_id).all()) print(session.query(User).join(User.addresses).all()) print(session.query(User).outerjoin(User.addresses).all()) #聚合查询 print(session.query(User.name, func.count('*').label("user_count")).group_by(User.name).all()) print(session.query(User.name, func.sum(User.id).label("user_id_sum")).group_by(User.name).all()) #子查询 stmt = session.query(Address.user_id, func.count('*').label("address_count")).group_by(Address.user_id).subquery() print(session.query(User, stmt.c.address_count).outerjoin((stmt, User.id == stmt.c.user_id)).order_by(User.id).all()) #exists print(session.query(User).filter(exists().where(Address.user_id == User.id))) print(session.query(User).filter(User.addresses.any())) # 限制 ret = session.query(Users)[1:2] # 排序 ret = session.query(Users).order_by(Users.name.desc()).all() ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 分组 from sqlalchemy.sql import func ret = session.query(Users).group_by(Users.extra).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() # 连表 ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() ret = session.query(Person).join(Favor).all() ret = session.query(Person).join(Favor, isouter=True).all() # 组合 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all() q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union_all(q2).all()
注意:SQLAlchemy中的model必须要有主键,才可以使用;
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/16771.html