Python3.x:SQLAlchemy操作数据库详解编程语言

Python3.x:SQLAlchemy操作数据库

前言

SQLAlchemy是一个ORM框架(Object Rational Mapping,对象关系映射),它可以帮助我们更加优雅、更加高效的实现数据库操作,而且还不限于mysql。

SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:

MySQL-Python 
    mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> 
   
pymysql 
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] 
   
MySQL-Connector 
    mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> 
   
cx_Oracle 
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 
 
pyodbc 
    sybase+pyodbc://<username>:<password>@<dsn>[/<database>] 
 
Python-Sybase 
    sybase+pysybase://<username>:<password>@<dsn>/[database name] 
 
mxODBC 
    sybase+mxodbc://<username>:<password>@<dsnname>

SQLAlchemy库安装

pip install sqlalchemy 
#安装mysql 
pip install pymysql 
#安装mysql-connector2.2.3版本会报错:Unable to find Protobuf include directory. 
#所以我们指定安装的版本 
pip install mysql-connector==2.1.4

示例代码(mysql数据库)

# python3 
# author lizm 
# datetime 2018-01-28 12:00:00 
''' 
    Demo:sqlalchemy对mysql数据库的操作 
''' 
from sqlalchemy import Column,Integer, String, create_engine 
from sqlalchemy.orm import sessionmaker 
from sqlalchemy.ext.declarative import declarative_base 
import pymysql 
 
# 创建对象的基类: 
Base = declarative_base() 
 
# 定义Channel对象: 
class Channel(Base): 
 
    # 表名 
    __tablename__ = 'playback' 
 
    # 表结构 
    # Column:行声明,可指定主键 Integer:数据类型 String:数据类型,可指定长度  
    id = Column(Integer,primary_key=True,autoincrement=True) 
    channel_name = Column(String(45),unique=True, nullable=False) 
    address = Column(String(80),unique=True, nullable=False) 
    service_name = Column(String(45),unique=True, nullable=False) 
 
    def __init__(self,id,channel_name,address,service_name): 
        self.id = id 
        self.channel_name = channel_name 
        self.address = address 
        self.service_name = service_name 
 
# 初始化数据库连接, 
# 传入参数:数据库类型+连接库+用户名+密码+主机,字符编码,是否打印建表细节 
engine = create_engine('mysql+mysqlconnector://root:[email protected]:3306/pythondb',encoding='utf-8') 
 
# 创建表 
Base.metadata.create_all(engine) 
# 删除表 
# Base.metadata.drop_all(engine)  
 
# 创建DBSession类型: 
DBSession = sessionmaker(bind=engine) 
session = DBSession() 
 
try: 
    # 增操作 
    item1 = Channel(id='1',channel_name='cctv8',address='http://10.10.10.1/cctv8',service_name='news') 
    session.add(item1) 
    item2 = Channel(id='2',channel_name='cctv10',address='http://10.10.10.1/cctv10',service_name='sports') 
    session.add(item2) 
    item3 = Channel(id='3',channel_name='cctv12',address='http://10.10.10.1/cctv12',service_name='economics') 
    session.add(item3) 
    #提交数据 
    session.commit() 
except Exception as e: 
    session.rollback() 
finally: 
    #关闭 
    session.close() 
 
# 查操作 
session1 = DBSession() 
 
# 输出sql 语句 
print("查询sql语句:%s"%session1.query(Channel).filter(Channel.id < '3')) 
# 返回的是一个类似列表的对象 
channel = session1.query(Channel).filter(Channel.id < '3').all() 
 
for i in range(len(channel)): 
    print(channel[i].id) 
    print(channel[i].channel_name) 
    print(channel[i].address) 
    print(channel[i].service_name) 
session1.close() 
 
# 改操作 
session2 = DBSession() 
session2.query(Channel).filter(Channel.id == '2').update({Channel.service_name: 'movie',Channel.address: '127.0.0.1'}, synchronize_session=False) 
session2.commit() 
session2.close() 
 
## 查看修改结果 
session3 = DBSession() 
print(session3.query(Channel).filter(Channel.id == '2').one().service_name) 
session3.close() 
 
# 删操作 
session4 = DBSession() 
session4.query(Channel).filter(Channel.id == '3').delete() 
session4.commit() 
session4.close()

 

示例代码(sybase数据库:odbc连接方式需要先配置好odbc连接):

# python3 
# author lizm 
# datetime 2018-02-28 12:00:00 
''' 
    Demo:sqlalchemy对sybase数据库的操作 
''' 
from selenium import webdriver 
from sqlalchemy import Column,Integer,String,DateTime,create_engine 
from sqlalchemy.orm import sessionmaker 
from sqlalchemy.ext.declarative import declarative_base 
from sqlalchemy import and_,func 
import pyodbc 
import time,datetime 
 
# 创建对象的基类: 
Base = declarative_base() 
 
# 定义Channel对象: 
class Channel(Base): 
 
    # 表名     
    __tablename__ = 'shrjj' 
 
    # 表结构 
    # Column:行声明,可指定主键 Integer:数据类型 String:数据类型,可指定长度  
    id = Column(Integer,primary_key=True,autoincrement=True) 
    rpname = Column(String(500), nullable=False) 
    rpdate = Column(String(50)) 
    jjzt = Column(String(255)) 
    fbsjj = Column(String(255)) 
    etf = Column(String(255)) 
    lof = Column(String(255)) 
    fjlof = Column(String(255)) 
    create_date = Column(DateTime,nullable=False) 
    update_date = Column(DateTime,nullable=False) 
 
    def __init__(self,rpname,rpdate,jjzt,fbsjj,etf,lof,fjlof,create_date,update_date): 
        self.rpname = rpname 
        self.rpdate = rpdate 
        self.jjzt = jjzt 
        self.fbsjj = fbsjj 
        self.etf = etf 
        self.lof = lof 
        self.fjlof = fjlof 
        self.create_date = create_date 
        self.update_date = update_date 
 
# 初始化数据库连接, 
# 传入参数:数据库类型+连接库+用户名+密码+主机,字符编码,是否打印建表细节 
engine = create_engine('sybase+pyodbc://username:[email protected]') 
 
# 创建表 
#Base.metadata.create_all(engine) 
 
# 创建DBSession类型: 
DBSession = sessionmaker(bind=engine) 
 
# 查操作 
session1 = DBSession() 
# 输出sql 语句 
print("查询sql语句:%s"%session1.query(Channel).filter(Channel.rpdate == '2018-02-12')) 
# 返回的是一个类似列表的对象 
channel = session1.query(Channel).filter(Channel.rpdate == '2018-02-12').all() 
for i in range(len(channel)): 
    print(channel[i].rpname) 
    print(channel[i].rpdate) 
    print(channel[i].jjzt) 
session1.close() 
 
# 改操作 
session2 = DBSession() 
session2.query(Channel).filter(Channel.rpdate == '2018-02-12').update({Channel.jjzt: '0'}, synchronize_session=False) 
session2.commit() 
session2.close() 
# 查看修改结果 
session3 = DBSession() 
print(session3.query(Channel).filter(Channel.rpdate == '2018-02-12').one().jjzt) 
session3.close() 
 
#新增数据操作 
session4 = DBSession() 
# 增操作 
item1 = Channel(rpname='市价总值',rpdate='2018-02-29',jjzt='1',fbsjj='1',etf='1',lof='1',fjlof='1',create_date=time.strftime('%Y-%m-%d %H:%M:%S'),update_date=time.strftime('%Y-%m-%d %H:%M:%S')) 
session4.add(item1) 
#提交数据 
session4.commit() 
#关闭 
session4.close()

SQLAlchemy中列类型.配置选项和关系选项

常见的列类型:
类型名称 python类型 描述
Integer int 常规整形,通常为32位 SmallInteger int 短整形,通常为16位 BigInteger int或long 精度不受限整形 Float float 浮点数 Numeric decimal.Decimal 定点数 String str 可变长度字符串 Text str 可变长度字符串,适合大量文本 Unicode unicode 可变长度Unicode字符串 Boolean bool 布尔型 Date datetime.date 日期类型 Time datetime.time 时间类型 Interval datetime.timedelta 时间间隔 Enum str 字符列表 PickleType 任意Python对象 自动Pickle序列化 LargeBinary str 二进制
常见的SQLALCHEMY列选项: 可选参数 描述 primary_key 如果设置为True,则为该列表的主键 unique 如果设置为True,该列不允许相同值 index 如果设置为True,为该列创建索引,查询效率会更高 nullable 如果设置为True,该列允许为空。如果设置为False,该列不允许空值 default 定义该列的默认值

 常见sqlalchemy查询

#简单查询 
print(session.query(User).all()) 
print(session.query(User.name, User.fullname).all()) 
print(session.query(User, User.name).all()) 
     
#带条件查询 
print(session.query(User).filter_by(name='user1').all()) 
print(session.query(User).filter(User.name == "user").all()) 
print(session.query(User).filter(User.name.like("user%")).all()) 
     
#多条件查询 
from sqlalchemy import and_ 
print(session.query(User).filter(and_(User.name.like("user%"), User.fullname.like("first%"))).all()) 
 
from sqlalchemy import or_ 
print(session.query(User).filter(or_(User.name.like("user%"), User.password != None)).all()) 
     
#sql过滤 
print(session.query(User).filter("id>:id").params(id=1).all()) 
     
#关联查询  
print(session.query(User, Address).filter(User.id == Address.user_id).all()) 
print(session.query(User).join(User.addresses).all()) 
print(session.query(User).outerjoin(User.addresses).all()) 
     
#聚合查询 
print(session.query(User.name, func.count('*').label("user_count")).group_by(User.name).all()) 
print(session.query(User.name, func.sum(User.id).label("user_id_sum")).group_by(User.name).all()) 
     
#子查询 
stmt = session.query(Address.user_id, func.count('*').label("address_count")).group_by(Address.user_id).subquery() 
print(session.query(User, stmt.c.address_count).outerjoin((stmt, User.id == stmt.c.user_id)).order_by(User.id).all()) 
     
#exists 
print(session.query(User).filter(exists().where(Address.user_id == User.id))) 
print(session.query(User).filter(User.addresses.any())) 
 
# 限制 
ret = session.query(Users)[1:2] 
 
# 排序 
ret = session.query(Users).order_by(Users.name.desc()).all() 
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() 
 
# 分组 
from sqlalchemy.sql import func 
 
ret = session.query(Users).group_by(Users.extra).all() 
ret = session.query( 
    func.max(Users.id), 
    func.sum(Users.id), 
    func.min(Users.id)).group_by(Users.name).all() 
ret = session.query( 
    func.max(Users.id), 
    func.sum(Users.id), 
func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() 
 
# 连表 
ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() 
ret = session.query(Person).join(Favor).all() 
ret = session.query(Person).join(Favor, isouter=True).all() 
 
# 组合 
q1 = session.query(Users.name).filter(Users.id > 2) 
q2 = session.query(Favor.caption).filter(Favor.nid < 2) 
ret = q1.union(q2).all() 
 
q1 = session.query(Users.name).filter(Users.id > 2) 
q2 = session.query(Favor.caption).filter(Favor.nid < 2) 
ret = q1.union_all(q2).all()

 注意:SQLAlchemy中的model必须要有主键,才可以使用;

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/16771.html

(0)
上一篇 2021年7月19日 19:37
下一篇 2021年7月19日 19:37

相关推荐

发表回复

登录后才能评论